## 项目背景 最近接到一个比较简单的任务,需求如下: 从MQTT服务器订阅断电报警信息然后入库到SQLServer或者MySQL数据库中 从MQTT服务器订阅到站点报警(0断电,1来电)、GPS信息(经纬度)、设备信号,然后在内存中缓存每个站点的这三种信息,再加上最新通信时间(接收到订阅的消息的最新时间), 针对每个站点(SS打头的编码)和ClientID(设备编码),做一个HTTP GET请求接口,前端可以根据站点编码和设备编码请求该站点的数据,主要是为后期做站点在线、离线状态判断、断电告警来服务的。 程序简单的思维导图如下图所示: ![程序思维导图] 本来打算使用C++写的,考虑到C++写HTTP接口相对比较麻烦,还是采用Nodejs写比较方便,因为Nodejs对于MQTT、HTTP的支持比较友好,比较适合写这种简单的后台程序。
程序大概的流程是: 从MQTT服务器上订阅如下的三种主题消息: 订阅主题. 报警, 0断电, 1来电/alarmSing 0消息主题和内容示例如下:/alarmSing/865650043997457=>0. GPS信息/lbsLocationlat=022.6315208&lng=114.0741963消息主题和内容示例如下:/lbsLocation/865650043997457=> lat=022.6315208&lng=114.0741963.设备信号/csq 18消息主题和内容示例如下:/csq/865650043997457=>27需要在config.yaml文件中配置好MQTT服务器的配置信息,示例如下:```yamlrxmqtt: host:127.0.0.1 port: 8099 user: poweralarm pwd: "poweralarm@123" id: "mqweb_20200826_nodejs_alarm" clean:true```然后先连接MQTT服务器,设置订阅的主题并针对这三个主题分别写对应的回调处理函数。
在内存中维护一张站点信息的Map缓存数据结构,这里为了方便选择了Typescript编写,```ts stationInfos: Map; ``` 其中StationInfo是一个站点信息类在接收到MQTT服务器推送的报警(/alarmSing)、GPS信息(/lbsLocation)、设备信号(/csq )这三种消息时,分别修改stationInfos这个Map缓存对象,并根据传递的DeviceId查询是否存在该站点,如果存在则更新设置对应的数据、最新通信时间、站点在线状态等。编写http接口,根据站点编码集合站点信息Map缓存stationInfos返回对应的信息当接收到站点断电消息时除了更新stationInfos缓存外,还需要将对应的断电报警信息入库。 ## 数据库结构 目前数据库操作只涉及到两张表:站点和设备ID表Breakelectric以及断电报警记录表PowerCutHistory### MySQL数据表结构```sqlDROP TABLE IF EXISTS `breakelectric`;CREATE TABLE `breakelectric` CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '站点编码', `DeviceId` varchar CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '设备Id', `SStationName` varchar CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '站点名称') ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;DROP TABLE IF EXISTS `powercuthistory`;CREATE TABLE `powercuthistory` CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL, `DeviceId` varchar CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL, `SDateTime` datetime NULL DEFAULT NULL, `DevState` bit NULL DEFAULT NULL) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;```### SQLServer数据表结构```sqlDROP TABLE [dbo].[Breakelectric]GOCREATE TABLE [dbo].[Breakelectric] NOT NULL ,[DeviceId] varchar NULL ,[SStationName] varchar NOT NULL )DROP TABLE [dbo].[PowerCutHistory]GOCREATE TABLE [dbo].[PowerCutHistory] NULL ,[DeviceId] varchar NULL ,[SDateTime] datetime NULL ,[DevState] bit NULL )```## 几个关键的封装类### MQTT-Typescript封装为了简便,将MQTT客户端封装成一个类来使用,代码如下:```tsimport mqtt = requireimport moment = requireexport interface MqttConnOpt extends mqtt.IClientOptions{}export declare type onMessageFunc = => voiddeclare class Topic { public topic: string; public qos: 0|1|2;}export class MQTT { mqclient: mqtt.MqttClient; brokerHost: string; brokerPort: number; subscribeTopics: Array; subscribeCallbacks: Map; connOpt: MqttConnOpt; connected: boolean; constructor {this.brokerHost = host;this.brokerPort = port;this.subscribeTopics = new Array;this.subscribeCallbacks = new Map;this.connected = false; } public subscribe {this.subscribeTopics.push;if ){ this.mqclient.subscribe;} } public set_message_callback {this.subscribeCallbacks.set; } public is_connected {// return this.mqclient.connected == true;return this.connected == true; } public connect{// 打开重新订阅opts.resubscribe = false;this.connOpt = opts;this.mqclient = mqtt.connect;this.mqclient.on=>{ console.log; this.connected = true; for { const element = this.subscribeTopics[index]; this.mqclient.subscribe; }});this.mqclient.on=>{ console.log.format}] ${this.brokerHost} ${topic}`) this.mqclient; this.subscribeCallbacks.forEach=>{ if != -1){ val; } });});this.mqclient.on=>{ console.log});this.mqclient.on=>{ console.log}); } public publish {this.mqclient.publish }}```其中,需要注意的一点就是MQTT服务器有可能意外重启或者其他原因断开,这时需要断线重连。
在C++、C#、Java等语言中可以开启一个断线重连监测线程,每隔一段时间监测与MQTT服务器的连接情况,如果断线则重新连接。
## yaml文件配置类对象为了方便这里采用yaml文件作为配置文件,之前使用C++时也常用xml、ini、yaml作为配置文件,Java SpringBoot也常用yml或yaml作为配置文件。我的yaml配置文件如下图所示:```yamlrxmqtt: host:127.0.0.1 port: 8099 user: poweralarm pwd: "poweralarm@123" id: "mqweb_20200826_nodejs_alarm" clean:true# dbsql:# host: 127.0.0.1# port: 1433# user: sa# pwd: "123456"# database: EMCHNVideoMonitordbsql: host: 127.0.0.1 port: 3306 user: root pwd: "123456" database: EMCHNVideoMonitorredis: host: 127.0.0.1 port: 7001 pwd: 123456 index: 3http: 3000rpcUrl: 127.0.0.1:18885enableMqtt: trueenableDB: trueenableRedis: trueenableWS: trueenableRPC: trueofflineTimeout: 90000cacheInterval: 10000```针对上面的yaml配置文件,编写对应的yaml配置读取类,如下所示:```tsimport YAML = requireimport fs = requiredeclare interface MqttConnOpt{ host: string; port: number; user: string; pwd: string; clean: boolean; id: string;}declare interface DBConnOpt{ host: string; port: number; user: string; pwd: string; database: string;}declare interface RedisConnOpt{ host: string; port: number; pwd: string; db: number;}export { MqttConnOpt, DBConnOpt, RedisConnOpt, Config,}class Config { rxmqtt: MqttConnOpt; dbsql: DBConnOpt; redis: RedisConnOpt; http: number; rpcUrl: string; enableMqtt: boolean; enableDB: boolean; enableRedis: boolean; enableWS: boolean; enableRPC: boolean; offlineTimeout: number; cacheInterval: number; constructor{try{ let buffer = fs.readFileSync; let config = YAML.parse; this.rxmqtt = config['rxmqtt']; this.dbsql = config['dbsql']; this.redis = config['redis']; this.http = config['http']; this.rpcUrl = config['rpcUrl']; this.enableMqtt = config['enableMqtt']; this.enableDB = config['enableDB']; this.enableRedis = config['enableRedis']; this.enableWS = config['enableWS']; this.enableRPC = config['enableRPC']; this.offlineTimeout = config['offlineTimeout']; this.cacheInterval = config['cacheInterval'];}catch{ console.log} } public save {try{ fs.writeFileSync)}catch{ console.log} }}```其实使用yaml这个第三方库结合typescript读写yaml文件还是比较方便的。## 数据操作类的封装### mysql操作类nodejs中可以使用mariadb或者sequelize等库操作mysql数据库,这里使用mariadb这个库MariaDBClient.ts```tsimport mariadb = requireimport { StationInfo } from './StationInfo'import moment = require// 定义数据查询回调接口export declare type onQueryInfoReqCallback = => void// 定义入库回调接口export declare type onRecordReqCallback = => voidexport class MariaDBClient { dbpool: mariadb.Pool; host: string; port: number; user: string; password: string; dbName: string; connected: boolean; // 站点信息 Map public stationInfos: Map; constructor {this.host = host;this.port = port;this.user = username;this.password = password;this.dbName = dbName;this.connected = false;this.stationInfos = new Map;// 初始化mariadb数据库客户端this.initMariadb;// 加载站点信息到内存中this.getStationInfo; } public initMariadb {this.dbpool = mariadb.createPool; } public is_connected {return this.connected == true; } public async getStationInfo {let conn;try { conn = await this.dbpool.getConnection; const rows = await conn.query; for { const it = rows[i]; const SStation = it['SStation']; this.stationInfos.has if ) { let si = new StationInfo; si.SStation = it['SStation']; si.DeviceId = it['DeviceId']; si.SStationName = it['SStationName']; console.log; this.stationInfos.set; } }} catch { console.error;} finally { if conn.release; //release to pool} } public async getStationList {let conn;try { conn = await this.dbpool.getConnection; const rows = await conn.query; let stationList = new Array; for { const rowItem = rows[i]; let iitem = { 'SStation': rowItem['SStation'], 'DeviceId': rowItem['DeviceId'], 'SStationName': rowItem['SStationName'] } stationList.push; } if cb;} catch { console.error; if cb;} finally { if conn.release; //release to pool} } public async insertStationRecord {if { return;}let sql1 = "INSERT INTO `emchnvideomonitor`.`powercuthistory` VALUES";let conn: mariadb.PoolConnection;try { conn = await this.dbpool.getConnection; var sqlstr = sql1; let SStation = record.SStation;// 站点名称 let DeviceId = record.DeviceId;// 设备Id let SDateTime = record.SDateTime; // 时间 let DevState = record.DevState;// 状态(0停电,1来电) var it = ``; sqlstr += it; console.log; await conn.query; // if cb;} catch { console.error; // if cb;} finally { if conn.release; //release to pool} }}```### sqlserver操作类nodejs中可以使用tedious、mmsql、sequelize等库操作sqlserver数据库,这里采用mssql封装sqlserver操作:MariaDBClient.ts```tsimport mssql = require;// 定义数据查询回调接口export declare type onQueryCallback = => voidexport declare type onExecCallback = => voidexport class MSSQLDBClient { // 数据库连接字符串 // 连接方式:"mssql://用户名:密码@ip地址:1433/数据库名称" constr: string; constructor {this.constr = `mssql://${username}:${password}@${host}:${port}/${dbName}`;mssql.connect.then { console.log; console.log; console.log;}).catch { console.log;}) } public async query {try { await mssql.connect.then { new mssql.Request.query.then { // console.log; if cb; }).catch { console.log; if cb; }); // Stored Procedure}).catch { console.log; if cb;})} catch { console.log; if cb;} } public async exec {await mssql.connect { mssql.query { if { if cb; } else { if cb; mssql.close; } });}); } }```## 主服务类 service.ts```tsimport moment = requireimport sql = requireimport { Config } from './config'import { MQTT } from './mq'import * as http from 'http'import { StationInfo } from './StationInfo'// const mssqlDBClient = require;// import { MSSQLDBClient } from './MSSQLDBClient'import { MariaDBClient } from './MariaDBClient'export class Service { mqttList: Array; // mqtt客户端列表 stationInfos: Map; config: Config; Server: http.Server; App: any; // mssqlDBClient: MSSQLDBClient; mySQLDBClient: MariaDBClient; // 构造函数 constructor {this.Server = server;Domain im Kundenauftrag registriert = app;this.config = new Config;// 初始化配置// this.mssqlDBClient = new MSSQLDBClient; // 创建数据库客户端 this.mySQLDBClient = new MariaDBClient;this.mqttList = new Array;this.stationInfos = new Map;// 建立客户端连接if { this.connectMqtt;}// 加载缓存数据到内存中this.LoadStations;// 定时检查站点是否在线// this.taskCheckStationOnline;// 定时存储站点数据缓存this.taskStoreStationData;// 定时重载站点信息this.timerLoadStationInfo;// 初始化http请求this.initApp; } public timerLoadStationInfo {setInterval=>{ // this.getStationInfo; await this.mySQLDBClient.getStationInfo; this.stationInfos = this.mySQLDBClient.stationInfos;}, 120*1000); } public async LoadStations {// 加载最后的缓存数据// await this.LoadRedisData;// 加载站点信息// await this.getStationInfo;await this.mySQLDBClient.getStationInfo;this.stationInfos = this.mySQLDBClient.stationInfos; } public taskCheckStationOnline {setInterval=>{ this.timerCheckOnline;}, this.config.offlineTimeout); } public taskStoreStationData {setInterval=>{ // this.timerStorStationData; // this.taskStorNewData;}, this.config.cacheInterval); } public timerCheckonline {let stcodeIds = [];this.stationInfos.forEach => { let previous_online = val.Online; val.checkOnline; // 如果先前在线,现在离线 if { stcodeIds.push; // this.sendHeart2WSClient); }}) } public initApp {if { return;}// 路由接口// 获取所有的站点编码和设备ID映射列表this.App.get => { let stationList = []; this.stationInfos.forEach => { stationList.push; }); res.send});// 根据站点编码获取当前站点信息this.App.get => { let { stcode } = req.params; if ) { let item = this.stationInfos.get; return res.send } else { res.send }})// 获取所有站点断电报警、设备信号、经纬度等信息this.App.get => { let stationList = []; this.stationInfos.forEach => { stationList.push; }) res.send}) } public async getStationInfo {// SQLServer// let strSql = "SELECT SStation, DeviceId, SStationName from Breakelectric WHERE SStation != '' AND DeviceId != '';";// this.mssqlDBClient.query => {// if {// return;// }// let resultArray = result.recordsets[0];// if {// for {//// this.stationList.push;//let iitem = resultArray[i];//console.log;//// console.log;//let stcode = iitem['SStation'];//if ) {// this.stationInfos.set);//}//var si = this.stationInfos.get;//si.SStation = iitem['SStation'];//si.DeviceId = iitem['DeviceId'];//si.SStationName = iitem['SStationName'];// }// console.log);// }// });// MariaDBthis.mySQLDBClient.getStationList => { if { for { let iitem = result[i]; let SStation = iitem['SStation']; if ) { this.stationInfos.set); }var si = this.stationInfos.get; si.SStation = iitem['SStation']; si.DeviceId = iitem['DeviceId']; si.SStationName = iitem['SStationName']; console.log; } }}) } public connectMqtt {var it = new MQTT;// 订阅主题// 1. 报警, 0断电, 1来电// /alarmSing 0it.subscribe;// 2. GPS信息// /lbsLocationlat=022.6315208&lng=114.0741963it.subscribe;// 3.设备信号// /csq 18it.subscribe;it.set_message_callback);it.set_message_callback);it.set_message_callback);it.connect;this.mqttList.push; } handleAlarmSing {console.log}`);const topics = topic.split;// /alarmSing/867814045313299=>1console.log;if { let deviceId = topics[2]; console.log; let alarmDevState = parseInt); console.log; // 根据DeviceId查询对应的站点编码SStation let stcode = ''; this.stationInfos.forEach => { if { stcode = key; // 更新该站点的通信时间以及断电报警信息 let comTime = moment.format; var si = this.stationInfos.get; let strStcode = si.SStation; si.alarmSing = alarmDevState; si.CommTime = comTime;si.online = true; this.stationInfos.set; // 将断电报警信息做入库处理 // this.powerCutAlarmStore; this.mySQLDBClient.insertStationRecord } });} } handleGpsLocation {console.log}`);const topics = topic.split;// /lbsLocation/867814045313299=>lat=022.7219409&lng=114.0222168if { let deviceId = topics[2]; console.log; let strPayload = payload.toString; let strLatitude = strPayload.substring+4, strPayload.indexOf); let latitude = parseFloat; console.log; let strLongitude = strPayload.substring+4); let longitude = parseFloat); console.log; // 根据DeviceId查询对应的站点编码SStation let stcode = ''; this.stationInfos.forEach => { if { stcode = key; // 更新该站点的通信时间以及经纬度信息 let comTime = moment.format; var si = this.stationInfos.get;si.online = true; si.longitude = longitude; si.latitude = latitude; si.CommTime = comTime; this.stationInfos.set; } });} } handleCsq {console.log}`);const topics = topic.split;// /csq/867814045454838=>20if { let deviceId = topics[2]; console.log; let csq = parseInt); console.log; // 根据DeviceId查询对应的站点编码SStation let stcode = ''; this.stationInfos.forEach => {if { stcode = key; // 更新该站点的通信时间以及csq信号值 let comTime = moment.format; var si = this.stationInfos.get;si.online = true; si.csq = csq; si.CommTime = comTime; this.stationInfos.set; } }); } } public async powerCutAlarmStore {var SStation = alarmRecord.SStation;var DeviceId = alarmRecord.DeviceId;var SDateTime = alarmRecord.SDateTime;var DevState = alarmRecord.DevState;let strInsert = "INSERT INTO powercuthistory VALUES";strInsert += ``;// this.mssqlDBClient.exec => {// if {// console.log;// }// }) }}```## app.js这里为了简便,我直接使用express生成器生成了项目的基本框架,对应的app.js文件如下:```tsvar createError = require;var express = require;var app = express;var path = require;var logger = require;// var indexRouter = require;// var usersRouter = require;var app = express;// view engine setupapp.set);app.set;app.use);app.use);app.use);app.use));// app.use;// app.use;module.exports = app;```## bin/www在bin/www文件中创建了service类的实例,然后读取config配置,并启动相关服务。注意:这里需要将app和server传入到service对象中,在service对象中编写http接口,这样就能保证http接口和站点信息缓存共享同一份数据了,如果将http接口写在app.js或者routes/api.js中,创建两个service对象,就不能保证站点信息缓存信息的数据同步了。
免责声明:本平台仅供信息发布交流之途,请谨慎判断信息真伪。如遇虚假诈骗信息,请立即举报
举报