## 项目背景  最近接到一个比较简单的任务,需求如下:  从MQTT服务器订阅断电报警信息然后入库到SQLServer或者MySQL数据库中  从MQTT服务器订阅到站点报警(0断电,1来电)、GPS信息(经纬度)、设备信号,然后在内存中缓存每个站点的这三种信息,再加上最新通信时间(接收到订阅的消息的最新时间), 针对每个站点(SS打头的编码)和ClientID(设备编码),做一个HTTP GET请求接口,前端可以根据站点编码和设备编码请求该站点的数据,主要是为后期做站点在线、离线状态判断、断电告警来服务的。 程序简单的思维导图如下图所示: ![程序思维导图] 本来打算使用C++写的,考虑到C++写HTTP接口相对比较麻烦,还是采用Nodejs写比较方便,因为Nodejs对于MQTT、HTTP的支持比较友好,比较适合写这种简单的后台程序。
 程序大概的流程是: 从MQTT服务器上订阅如下的三种主题消息: 订阅主题. 报警, 0断电, 1来电/alarmSing  0消息主题和内容示例如下:/alarmSing/865650043997457=>0. GPS信息/lbsLocationlat=022.6315208&lng=114.0741963消息主题和内容示例如下:/lbsLocation/865650043997457=> lat=022.6315208&lng=114.0741963.设备信号/csq 18消息主题和内容示例如下:/csq/865650043997457=>27需要在config.yaml文件中配置好MQTT服务器的配置信息,示例如下:```yamlrxmqtt:  host:127.0.0.1  port: 8099  user: poweralarm  pwd: "poweralarm@123"  id: "mqweb_20200826_nodejs_alarm"  clean:true```然后先连接MQTT服务器,设置订阅的主题并针对这三个主题分别写对应的回调处理函数。

在内存中维护一张站点信息的Map缓存数据结构,这里为了方便选择了Typescript编写,```ts stationInfos: Map; ``` 其中StationInfo是一个站点信息类在接收到MQTT服务器推送的报警(/alarmSing)、GPS信息(/lbsLocation)、设备信号(/csq  )这三种消息时,分别修改stationInfos这个Map缓存对象,并根据传递的DeviceId查询是否存在该站点,如果存在则更新设置对应的数据、最新通信时间、站点在线状态等。编写http接口,根据站点编码集合站点信息Map缓存stationInfos返回对应的信息当接收到站点断电消息时除了更新stationInfos缓存外,还需要将对应的断电报警信息入库。 ## 数据库结构 目前数据库操作只涉及到两张表:站点和设备ID表Breakelectric以及断电报警记录表PowerCutHistory### MySQL数据表结构```sqlDROP TABLE IF EXISTS `breakelectric`;CREATE TABLE `breakelectric`   CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '站点编码',  `DeviceId` varchar CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '设备Id',  `SStationName` varchar CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '站点名称') ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;DROP TABLE IF EXISTS `powercuthistory`;CREATE TABLE `powercuthistory`   CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,  `DeviceId` varchar CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,  `SDateTime` datetime NULL DEFAULT NULL,  `DevState` bit NULL DEFAULT NULL) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;```### SQLServer数据表结构```sqlDROP TABLE [dbo].[Breakelectric]GOCREATE TABLE [dbo].[Breakelectric]  NOT NULL ,[DeviceId] varchar NULL ,[SStationName] varchar NOT NULL )DROP TABLE [dbo].[PowerCutHistory]GOCREATE TABLE [dbo].[PowerCutHistory]  NULL ,[DeviceId] varchar NULL ,[SDateTime] datetime NULL ,[DevState] bit NULL )```## 几个关键的封装类### MQTT-Typescript封装为了简便,将MQTT客户端封装成一个类来使用,代码如下:```tsimport mqtt = requireimport moment = requireexport interface MqttConnOpt extends mqtt.IClientOptions{}export declare type onMessageFunc =  => voiddeclare class Topic {  public topic: string;  public qos: 0|1|2;}export class MQTT {  mqclient: mqtt.MqttClient;  brokerHost: string;  brokerPort: number;  subscribeTopics: Array;  subscribeCallbacks: Map;  connOpt: MqttConnOpt;    connected: boolean;  constructor {this.brokerHost = host;this.brokerPort = port;this.subscribeTopics = new Array;this.subscribeCallbacks = new Map;this.connected = false;  }    public subscribe {this.subscribeTopics.push;if ){  this.mqclient.subscribe;}  }    public set_message_callback {this.subscribeCallbacks.set;  }    public is_connected {// return this.mqclient.connected == true;return this.connected == true;  }    public connect{// 打开重新订阅opts.resubscribe = false;this.connOpt = opts;this.mqclient = mqtt.connect;this.mqclient.on=>{  console.log;  this.connected = true;  for  { const element = this.subscribeTopics[index]; this.mqclient.subscribe;  }});this.mqclient.on=>{  console.log.format}] ${this.brokerHost} ${topic}`)  this.mqclient;  this.subscribeCallbacks.forEach=>{ if  != -1){   val; }  });});this.mqclient.on=>{  console.log});this.mqclient.on=>{  console.log});  }    public publish {this.mqclient.publish  }}```其中,需要注意的一点就是MQTT服务器有可能意外重启或者其他原因断开,这时需要断线重连。

在C++、C#、Java等语言中可以开启一个断线重连监测线程,每隔一段时间监测与MQTT服务器的连接情况,如果断线则重新连接。
## yaml文件配置类对象为了方便这里采用yaml文件作为配置文件,之前使用C++时也常用xml、ini、yaml作为配置文件,Java SpringBoot也常用yml或yaml作为配置文件。我的yaml配置文件如下图所示:```yamlrxmqtt:  host:127.0.0.1  port: 8099  user: poweralarm  pwd: "poweralarm@123"  id: "mqweb_20200826_nodejs_alarm"  clean:true# dbsql:#   host: 127.0.0.1#   port: 1433#   user: sa#   pwd: "123456"#   database: EMCHNVideoMonitordbsql:  host: 127.0.0.1  port: 3306  user: root  pwd: "123456"  database: EMCHNVideoMonitorredis:   host: 127.0.0.1  port: 7001  pwd: 123456  index: 3http: 3000rpcUrl: 127.0.0.1:18885enableMqtt: trueenableDB: trueenableRedis: trueenableWS: trueenableRPC: trueofflineTimeout: 90000cacheInterval: 10000```针对上面的yaml配置文件,编写对应的yaml配置读取类,如下所示:```tsimport YAML = requireimport fs = requiredeclare interface MqttConnOpt{  host: string;  port: number;  user: string;  pwd: string;  clean: boolean;  id: string;}declare interface DBConnOpt{  host: string;  port: number;  user: string;  pwd: string;  database: string;}declare interface RedisConnOpt{  host: string;  port: number;  pwd: string;  db: number;}export {  MqttConnOpt,  DBConnOpt,  RedisConnOpt,  Config,}class Config {  rxmqtt: MqttConnOpt;  dbsql: DBConnOpt;  redis: RedisConnOpt;    http: number;    rpcUrl: string;    enableMqtt: boolean;    enableDB: boolean;    enableRedis: boolean;    enableWS: boolean;    enableRPC: boolean;    offlineTimeout: number;    cacheInterval: number;  constructor{try{  let buffer = fs.readFileSync;  let config = YAML.parse;  this.rxmqtt = config['rxmqtt'];  this.dbsql = config['dbsql'];  this.redis = config['redis'];  this.http = config['http'];  this.rpcUrl = config['rpcUrl'];  this.enableMqtt = config['enableMqtt'];  this.enableDB = config['enableDB'];  this.enableRedis = config['enableRedis'];  this.enableWS = config['enableWS'];  this.enableRPC = config['enableRPC'];  this.offlineTimeout = config['offlineTimeout'];  this.cacheInterval = config['cacheInterval'];}catch{  console.log}  }    public save {try{  fs.writeFileSync)}catch{  console.log}  }}```其实使用yaml这个第三方库结合typescript读写yaml文件还是比较方便的。## 数据操作类的封装### mysql操作类nodejs中可以使用mariadb或者sequelize等库操作mysql数据库,这里使用mariadb这个库MariaDBClient.ts```tsimport mariadb = requireimport { StationInfo } from './StationInfo'import moment = require// 定义数据查询回调接口export declare type onQueryInfoReqCallback =  => void// 定义入库回调接口export declare type onRecordReqCallback =  => voidexport class MariaDBClient {  dbpool: mariadb.Pool;  host: string;  port: number;  user: string;  password: string;  dbName: string;  connected: boolean;   // 站点信息 Map   public stationInfos: Map;  constructor {this.host = host;this.port = port;this.user = username;this.password = password;this.dbName = dbName;this.connected = false;this.stationInfos = new Map;// 初始化mariadb数据库客户端this.initMariadb;// 加载站点信息到内存中this.getStationInfo;  }     public initMariadb {this.dbpool = mariadb.createPool;  }    public is_connected {return this.connected == true;  }     public async getStationInfo {let conn;try {  conn = await this.dbpool.getConnection;  const rows = await conn.query;  for  { const it = rows[i]; const SStation = it['SStation']; this.stationInfos.has if ) {   let si = new StationInfo;   si.SStation = it['SStation'];   si.DeviceId = it['DeviceId'];   si.SStationName = it['SStationName'];   console.log;   this.stationInfos.set; }   }} catch  {  console.error;} finally {  if  conn.release; //release to pool}  }     public async getStationList {let conn;try {  conn = await this.dbpool.getConnection;  const rows = await conn.query;  let stationList = new Array;  for  { const rowItem = rows[i]; let iitem = {   'SStation': rowItem['SStation'],   'DeviceId': rowItem['DeviceId'],   'SStationName': rowItem['SStationName'] } stationList.push;  }  if  cb;} catch  {  console.error;  if  cb;} finally {  if  conn.release; //release to pool}  }      public async insertStationRecord {if  {  return;}let sql1 = "INSERT INTO `emchnvideomonitor`.`powercuthistory`   VALUES";let conn: mariadb.PoolConnection;try {  conn = await this.dbpool.getConnection;  var sqlstr = sql1;  let SStation = record.SStation;// 站点名称  let DeviceId = record.DeviceId;// 设备Id  let SDateTime = record.SDateTime;  // 时间  let DevState = record.DevState;// 状态(0停电,1来电)  var it = ``;  sqlstr += it;  console.log;  await conn.query;  // if  cb;} catch  {  console.error;  // if  cb;} finally {  if  conn.release; //release to pool}  }}```### sqlserver操作类nodejs中可以使用tedious、mmsql、sequelize等库操作sqlserver数据库,这里采用mssql封装sqlserver操作:MariaDBClient.ts```tsimport mssql = require;// 定义数据查询回调接口export declare type onQueryCallback =  => voidexport declare type onExecCallback =  => voidexport class MSSQLDBClient {  // 数据库连接字符串  // 连接方式:"mssql://用户名:密码@ip地址:1433/数据库名称"  constr: string;  constructor {this.constr = `mssql://${username}:${password}@${host}:${port}/${dbName}`;mssql.connect.then {  console.log;  console.log;  console.log;}).catch {  console.log;})  }    public async query {try {  await mssql.connect.then { new mssql.Request.query.then {   // console.log;   if  cb; }).catch {   console.log;   if  cb; }); // Stored Procedure}).catch {  console.log;  if  cb;})} catch  {  console.log;  if  cb;}   }    public async exec {await mssql.connect {  mssql.query { if  {   if  cb; } else {   if  cb;   mssql.close; }  });});  } }```## 主服务类 service.ts```tsimport moment = requireimport sql = requireimport { Config } from './config'import { MQTT } from './mq'import * as http from 'http'import { StationInfo } from './StationInfo'// const mssqlDBClient = require;// import { MSSQLDBClient } from './MSSQLDBClient'import { MariaDBClient } from './MariaDBClient'export class Service {  mqttList: Array;  // mqtt客户端列表  stationInfos: Map;  config: Config;  Server: http.Server;  App: any;  // mssqlDBClient: MSSQLDBClient;  mySQLDBClient: MariaDBClient;  // 构造函数  constructor {this.Server = server;Domain im Kundenauftrag registriert = app;this.config = new Config;// 初始化配置// this.mssqlDBClient = new MSSQLDBClient; // 创建数据库客户端 this.mySQLDBClient = new MariaDBClient;this.mqttList = new Array;this.stationInfos = new Map;// 建立客户端连接if  {  this.connectMqtt;}// 加载缓存数据到内存中this.LoadStations;// 定时检查站点是否在线// this.taskCheckStationOnline;// 定时存储站点数据缓存this.taskStoreStationData;// 定时重载站点信息this.timerLoadStationInfo;// 初始化http请求this.initApp;  }    public timerLoadStationInfo {setInterval=>{  // this.getStationInfo;  await this.mySQLDBClient.getStationInfo;  this.stationInfos = this.mySQLDBClient.stationInfos;}, 120*1000);  }    public async LoadStations {// 加载最后的缓存数据// await this.LoadRedisData;// 加载站点信息// await this.getStationInfo;await this.mySQLDBClient.getStationInfo;this.stationInfos = this.mySQLDBClient.stationInfos;  }    public taskCheckStationOnline {setInterval=>{  this.timerCheckOnline;}, this.config.offlineTimeout);  }    public taskStoreStationData {setInterval=>{  // this.timerStorStationData;  // this.taskStorNewData;}, this.config.cacheInterval);  }    public timerCheckonline {let stcodeIds = [];this.stationInfos.forEach => {  let previous_online = val.Online;  val.checkOnline;  // 如果先前在线,现在离线  if  { stcodeIds.push; // this.sendHeart2WSClient);  }})  }    public initApp {if  {  return;}// 路由接口// 获取所有的站点编码和设备ID映射列表this.App.get => {  let stationList = [];  this.stationInfos.forEach => { stationList.push;  });  res.send});// 根据站点编码获取当前站点信息this.App.get => {  let { stcode } = req.params;  if ) { let item = this.stationInfos.get; return res.send  } else { res.send  }})// 获取所有站点断电报警、设备信号、经纬度等信息this.App.get => {  let stationList = [];  this.stationInfos.forEach => { stationList.push;  })  res.send})  }    public async getStationInfo {// SQLServer// let strSql = "SELECT SStation, DeviceId, SStationName from Breakelectric WHERE SStation != '' AND DeviceId != '';";// this.mssqlDBClient.query => {//   if  {// return;//   }//   let resultArray = result.recordsets[0];//   if  {// for  {//// this.stationList.push;//let iitem = resultArray[i];//console.log;//// console.log;//let stcode = iitem['SStation'];//if ) {//  this.stationInfos.set);//}//var si = this.stationInfos.get;//si.SStation = iitem['SStation'];//si.DeviceId = iitem['DeviceId'];//si.SStationName = iitem['SStationName'];// }// console.log);//   }// });// MariaDBthis.mySQLDBClient.getStationList => {  if  { for  {   let iitem = result[i];   let SStation = iitem['SStation'];   if ) { this.stationInfos.set);   }var si = this.stationInfos.get;   si.SStation = iitem['SStation'];   si.DeviceId = iitem['DeviceId'];   si.SStationName = iitem['SStationName'];   console.log; }  }})  }    public connectMqtt {var it = new MQTT;// 订阅主题// 1. 报警, 0断电, 1来电// /alarmSing  0it.subscribe;// 2. GPS信息// /lbsLocationlat=022.6315208&lng=114.0741963it.subscribe;// 3.设备信号// /csq 18it.subscribe;it.set_message_callback);it.set_message_callback);it.set_message_callback);it.connect;this.mqttList.push;  }    handleAlarmSing {console.log}`);const topics = topic.split;// /alarmSing/867814045313299=>1console.log;if  {  let deviceId = topics[2];  console.log;  let alarmDevState = parseInt);  console.log;  // 根据DeviceId查询对应的站点编码SStation  let stcode = '';  this.stationInfos.forEach => { if  {   stcode = key;   // 更新该站点的通信时间以及断电报警信息   let comTime = moment.format;   var si = this.stationInfos.get;   let strStcode = si.SStation;   si.alarmSing = alarmDevState;   si.CommTime = comTime;si.online = true;   this.stationInfos.set;   // 将断电报警信息做入库处理   // this.powerCutAlarmStore;   this.mySQLDBClient.insertStationRecord  }  });}  }   handleGpsLocation  {console.log}`);const topics = topic.split;// /lbsLocation/867814045313299=>lat=022.7219409&lng=114.0222168if  {  let deviceId = topics[2];  console.log;  let strPayload = payload.toString;  let strLatitude = strPayload.substring+4, strPayload.indexOf);  let latitude = parseFloat;  console.log;  let strLongitude = strPayload.substring+4);  let longitude = parseFloat);  console.log;  // 根据DeviceId查询对应的站点编码SStation  let stcode = '';  this.stationInfos.forEach => { if  {   stcode = key;   // 更新该站点的通信时间以及经纬度信息   let comTime = moment.format;   var si = this.stationInfos.get;si.online = true;   si.longitude = longitude;   si.latitude = latitude;   si.CommTime = comTime;   this.stationInfos.set;  }  });}  }   handleCsq {console.log}`);const topics = topic.split;// /csq/867814045454838=>20if  { let deviceId = topics[2]; console.log; let csq = parseInt); console.log; // 根据DeviceId查询对应的站点编码SStation let stcode = ''; this.stationInfos.forEach => {if  {  stcode = key;  // 更新该站点的通信时间以及csq信号值  let comTime = moment.format;  var si = this.stationInfos.get;si.online = true;  si.csq = csq;  si.CommTime = comTime;  this.stationInfos.set; } });   }  }    public async powerCutAlarmStore {var SStation = alarmRecord.SStation;var DeviceId = alarmRecord.DeviceId;var SDateTime = alarmRecord.SDateTime;var DevState = alarmRecord.DevState;let strInsert = "INSERT INTO powercuthistory  VALUES";strInsert += ``;// this.mssqlDBClient.exec => {//   if  {// console.log;//   }// })  }}```## app.js这里为了简便,我直接使用express生成器生成了项目的基本框架,对应的app.js文件如下:```tsvar createError = require;var express = require;var app = express;var path = require;var logger = require;// var indexRouter = require;// var usersRouter = require;var app = express;// view engine setupapp.set);app.set;app.use);app.use);app.use);app.use));// app.use;// app.use;module.exports = app;```## bin/www在bin/www文件中创建了service类的实例,然后读取config配置,并启动相关服务。注意:这里需要将app和server传入到service对象中,在service对象中编写http接口,这样就能保证http接口和站点信息缓存共享同一份数据了,如果将http接口写在app.js或者routes/api.js中,创建两个service对象,就不能保证站点信息缓存信息的数据同步了。

免责声明:本平台仅供信息发布交流之途,请谨慎判断信息真伪。如遇虚假诈骗信息,请立即举报
 举报