This post was last edited by Li Baiyi on 2021-12-22 21:12
The previous article introduced the system architecture: each task has an independent state machine. When adding an MQTT task, just add a new state machine that is mutually exclusive with other tasks.
The MQTT task state machine includes: query connection status -> whether to disconnect the previous connection -> configure MQTT access platform -> configure MQTT client parameters -> open client -> create connection -> subscribe to topic 1 -> subscribe to topic 2 -> publish topic -> exception handling, etc.
Mutual exclusion between tasks: Since the receiving buffer is cleared before each AT command is sent, the initialization task, the periodic query NB status task, and the MQTT/Socket task must be mutually exclusive. Otherwise, the command that NB replies to task A may be cleared by task B.
void NB_MQTT_Aliyun_ReportProcess(void)
{
//static U8 status = 0;
U8 res = 0;
static char ack[200] = {0};
static char cmd [100] = {0};
if((NB.CycleCheckStatus > 1) || (NB.GpsReportStatus > 1))
{
return;
}
if(NB.mqtt.err_code != 0)
{
//return;
}
switch(NB.ReportStatus)
{
case 0:
if((NB.CON == 0xFF)
&& (NB.RegStatus == 1)
&& (NB.attach == 1)
&& (NB.csq != 99)
&& (NB.csq != 0)
&& (NB.CGACT_state == 1))
{
if(NB.isConnect > 0)//已经建立连接,延长处理间隔
{
NB.ReportTime = (200*100);
}
else
{
NB.ReportTime = (200*10);
}
NB.ReportStatus = 1;
uprintf("\r\n NB.ReportTime %d", NB.ReportTime);
}
break;
case 1:
if(NB.ReportTime == 0)
{
NB.ReportStatus = 2;
memset(ack, 0, 200);
memset(cmd, 0, 100);
uprintf("\r\n NB.ReportTimeout");
}
break;
case 2:
NB.t = 23.0f;//(float)DS18B20_Get_Temp();
if(NB.t != 85.0f)
{
uprintf("\r\n NB.isConnect %d", NB.isConnect);
if(NB.isConnect <= 0)
{
NB.ReportStatus = 31;//creat connect
NB.net.socket = 0;
NB.mqtt.tcpconnectID = 0;
NB.ReportStatus = 28;//config keep_alive_time
NB.mqtt.keep_alive_time = 120;
}
else
{
NB.ReportStatus = 36;
}
}
break;
case 28://config keep_alive_time
sprintf(cmd, "AT+ECMTCFG=\"keepalive\",%d,%d\r\n",NB.mqtt.tcpconnectID, NB.mqtt.keep_alive_time);
res = NB_AT_CMD((U8 *)cmd, 1000, "OK", 2);
if(res == ATCMD_REVOK)
{
uprintf("\r\n ECMTCFG OK");
NB.ReportStatus = 29;//Disable Sleep Mode
}
else if(res == ATCMD_TIMEOUT)
{
uprintf("\r\n ECMTCFG_TIMEOUT");
NB.ReportStatus = 29;
}
break;
case 29://查询连接状态
res = NB_AT_CMD("AT+ECMTCONN?\r\n", 1000, "+ECMTCONN:", 2);
if(res == ATCMD_REVOK)
{
uprintf("\r\n ECMTCONN1 OK");
if(!NB_parseMqttConStatus(&EC01RxCache[0], EC01_handle.rcv_size))
{
}
if(NB.mqtt.state == 3)//3 MQTT 已经连接成功
{
NB.ReportStatus = 100;//跳转到断开连接
}
else
{
NB.ReportStatus = 31;
}
}
else if(res == ATCMD_TIMEOUT)
{
uprintf("\r\n ECMTCONN1_TIMEOUT");
NB.ReportStatus = 31;
}
break;
case 100://断开客户端和服务器连接
//+ECMTDISC: 0,0
sprintf(cmd, "AT+ECMTDISC=%d\r\n",NB.mqtt.tcpconnectID);
sprintf(ack, "+ECMTDISC: %d,0",NB.mqtt.tcpconnectID);
res = NB_AT_CMD((U8 *)cmd, 1000, ack, 2);//Close Socket
if(res == ATCMD_REVOK)
{
uprintf("\r\n ECMTDISC OK");
//NB.ReportStatus = 31;
NB.ReportStatus = 29;
}
else if(res == ATCMD_TIMEOUT)
{
uprintf("\r\n ECMTDISC_TIMEOUT");
NB.ReportStatus = 31;
}
break;
#if 1
case 30://关闭客户端
//+ECMTCLOSE: 0,0
sprintf(cmd, "AT+ECMTCLOSE=%d\r\n",NB.mqtt.tcpconnectID);
sprintf(ack, "+ECMTCLOSE: %d,0",NB.mqtt.tcpconnectID);
res = NB_AT_CMD((U8 *)cmd, 1000, ack, 2);//Close Socket
if(res == ATCMD_REVOK)
{
uprintf("\r\n ECMTCLOSE OK");
NB.ReportStatus = 31;
}
else if(res == ATCMD_TIMEOUT)
{
uprintf("\r\n ECMTCLOSE_TIMEOUT");
NB.ReportStatus = 31;
}
break;
#endif
case 31://配置mqtt客户端参数
//AT+QMTCFG="aliauth",0,"a1KApkvNNDZ","EVN_2021_4_17","cd9b161999b754c3613f1bc1baf78ea5"\r
sprintf(cmd, "AT+ECMTCFG=\"aliauth\",%d,"PRODUCT_KEY","DEVICE_NAME","DEVICE_SECRET"\r\n",
NB.mqtt.tcpconnectID);
res = NB_AT_CMD((U8 *)cmd, 1000, "OK", 2);
if(res == ATCMD_REVOK)
{
uprintf("\r\n ECMTCFG OK");
NB.ReportStatus = 32;
//NB.isConnect = 1;
}
else if(res == ATCMD_TIMEOUT)
{
uprintf("\r\n ECMTCFG_TIMEOUT");
NB.ReportStatus = 0;
}
break;
case 32://建立tcp连接mqtt服务器
//+ECMTOPEN: 0,0
sprintf(cmd, "AT+ECMTOPEN=%d,\"%s\",%d\r\n",
NB.mqtt.tcpconnectID, HOST_NAME,HOST_PORT);
sprintf(ack, "+ECMTOPEN: %d,",NB.mqtt.tcpconnectID);
res = NB_AT_CMD((U8 *)cmd, 5000, ack, 2);
if(res == ATCMD_REVOK)
{
uprintf("\r\n ECMTOPEN OK");
NB.ReportStatus = 33;
}
else if(res == ATCMD_TIMEOUT)
{
uprintf("\r\n ECMTOPEN_TIMEOUT");
NB.ReportStatus = 30;
}
break;
case 33://登陆mqtt服务器
//+ECMTCONN: 0,0,0
NB.mqtt.result = 0;//0 打开网络成功
NB.mqtt.ret_code = 0;//0 接受连接
sprintf(cmd, "AT+ECMTCONN=%d,\"12345_33\"\r\n",
NB.mqtt.tcpconnectID);
sprintf(ack, "+ECMTCONN: %d,%d,%d",NB.mqtt.tcpconnectID, NB.mqtt.result, NB.mqtt.ret_code);
res = NB_AT_CMD((U8 *)cmd, 5000, ack, 2);
if(res == ATCMD_REVOK)
{
uprintf("\r\n ECMTCONN OK");
NB.ReportStatus = 34;
}
else if(res == ATCMD_TIMEOUT)
{
uprintf("\r\n ECMTCONN_TIMEOUT");
NB.ReportStatus = 0;
}
break;
case 34://订阅服务器设置
//+ECMTSUB: 0,1,0,1
NB.mqtt.tcpconnectID = 0;
NB.mqtt.msgID= 1;
NB.mqtt.qos = 0;
NB.mqtt.result = 0;//0 数据包发送成功且接收到服务器的 ACK
sprintf(cmd, "AT+ECMTSUB=%d,%d,%s,%d\r\n",
NB.mqtt.tcpconnectID, NB.mqtt.msgID, DEVICE_SUBSCRIBE_SET, NB.mqtt.qos);
sprintf(ack, "+ECMTSUB: %d,%d,%d,%d",NB.mqtt.tcpconnectID, NB.mqtt.msgID, NB.mqtt.result, 1);
res = NB_AT_CMD((U8 *)cmd, 5000, ack, 2);
if(res == ATCMD_REVOK)
{
uprintf("\r\n ECMTSUB1 OK");
NB.ReportStatus = 35;
NB.isConnect = 1;
}
else if(res == ATCMD_TIMEOUT)
{
uprintf("\r\n ECMTSUB1_TIMEOUT");
NB.ReportStatus = 0;
}
break;
case 35://订阅服务器应答
//+ECMTSUB: 0,2,0,1
NB.mqtt.tcpconnectID = 0;
NB.mqtt.msgID= 2;
NB.mqtt.qos = 1;
NB.mqtt.result = 0;//0 数据包发送成功且接收到服务器的 ACK
sprintf(cmd, "AT+ECMTSUB=%d,%d,%s,%d\r\n",
NB.mqtt.tcpconnectID, NB.mqtt.msgID, DEVICE_SUBSCRIBE_POST_RELAY, NB.mqtt.qos);
sprintf(ack, "+ECMTSUB: %d,%d,%d,%d",NB.mqtt.tcpconnectID, NB.mqtt.msgID, NB.mqtt.result, 1);
res = NB_AT_CMD((U8 *)cmd, 2000, ack, 2);
if(res == ATCMD_REVOK)
{
uprintf("\r\n ECMTSUB2 OK");
NB.ReportStatus = 36;
NB.isConnect = 1;
}
else if(res == ATCMD_TIMEOUT)
{
uprintf("\r\n ECMTSUB2_TIMEOUT");
NB.ReportStatus = 0;
}
break;
case 36://发布
//+ECMTPUB: 0,0,0
NB.mqtt.retain= 0;
NB.mqtt.msgID = 0;
NB.mqtt.qos = 0;
NB.mqtt.result = 0;//0 数据包发送成功且接收到服务器的 ACK
#ifdef USER_206
sprintf((char*)payload_out, PAYLOAD, Temp.temp, MQTT.Index);
#elif defined USER_174
sprintf((char*)payload_out, PAYLOAD, Temp.temp, CO2.co2,
Temp.low_threshold, Temp.high_threshold, CO2.low_threshold, CO2.high_threshold,
CO2.fanSwitch, Temp.CoolingSwitch,
GeoLocation.CoordinateSystem, GeoLocation.Latitude, GeoLocation.Longitude, GeoLocation.Altitude,
MQTT.Index);
#endif
sprintf(cmd, "AT+ECMTPUB=%d,%d,%d,%d,%s,%s\r\n",
NB.mqtt.tcpconnectID, NB.mqtt.msgID, NB.mqtt.qos, NB.mqtt.retain, DEVICE_PUBLISH, payload_out);
//uprintf("\r\n payload_out:%s", payload_out);
sprintf(ack, "+ECMTPUB: %d,%d,%d",NB.mqtt.tcpconnectID, NB.mqtt.msgID, NB.mqtt.result);
res = NB_AT_CMD((U8 *)cmd, 3000, ack, 2);
if(res == ATCMD_REVOK)
{
uprintf("\r\n payload_out:%s", payload_out);
uprintf("\r\n ECMTPUB OK");
MQTT.Index++;
GeoLocation.Latitude += 0.00001f;
GeoLocation.Longitude += 0.00001f;
GeoLocation.Altitude++;
uprintf("\r\n Latitude %f Longitude %f", GeoLocation.Latitude, GeoLocation.Longitude );
NB.ReportStatus = 0;
}
else if(res == ATCMD_TIMEOUT)
{
uprintf("\r\n ECMTPUB_TIMEOUT");
NB.ReportStatus = 37;
}
break;
case 37://查询连接状态
/*
+QMTCONN: 0,3
OK
*/
res = NB_AT_CMD("AT+ECMTCONN?\r\n", 1000, "+ECMTCONN:", 2);//Close Socket
if(res == ATCMD_REVOK)
{
uprintf("\r\n ECMTCONN1 OK");
if(!NB_parseMqttConStatus(&EC01RxCache[0], EC01_handle.rcv_size))
{
NB.ReportStatus = 0;
}
else
{
NB.ReportStatus = 30;
}
}
else if(res == ATCMD_TIMEOUT)
{
uprintf("\r\n ECMTCONN1_TIMEOUT");
NB.ReportStatus = 30;
NB_ClearStatus();
NB.CON = 0;
}
break;
case 0xFE://关闭mqtt客户端
//+ECMTCLOSE: 0,0
sprintf(cmd, "AT+ECMTCLOSE=%d\r\n",NB.mqtt.tcpconnectID);
sprintf(ack, "+ECMTCLOSE: %d,0",NB.mqtt.tcpconnectID);
res = NB_AT_CMD((U8 *)cmd, 1000, ack, 2);//Close Socket
if(res == ATCMD_REVOK)
{
uprintf("\r\n ECMTCLOSE OK");
NB.net.socket = -1;
NB_ClearStatus();
NB.ReportStatus = 0xFF;
}
else if(res == ATCMD_TIMEOUT)
{
uprintf("\r\n ECMTCLOSE_TIMEOUT");
NB.ReportStatus = 0;
}
break;
default:
break;
}
}
Add parsing MQTT server instructions for polling messages:
void NB_Polling_Msg(void)
{
EC01_handle.rcv_size = NB_CopyFIFOToParseBuf(EC01RxCache);
if((EC01_handle.rcv_size != 0)
&& (NB.CON == 0xFF))//初始化完成允许轮询消息,否则可能导致数据冲突
{
if(NB_parseDefaultMsg(&EC01RxCache[0], EC01_handle.rcv_size))
{
}
if(!NB_parseMqttRECV(&EC01RxCache[0], EC01_handle.rcv_size))//解析MQTT服务器指令
{
}
}
}
/*
+QMTRECV: 0,0,"/sys/a1KApkvNND
Z/EVN_2021_4_17/thing/service/pr
*/
u8 NB_parseMqttRECV(u8* cmd, u32 len)
{
char *ptr = NULL;
S16 uiResult;
char str[20];
U16 uiIndexStart = 0, uiAckIndexStart = 0, uiAckIndexEnd = 0;
S8 res = 0xFF;
if(cmd == NULL) return 1;
sprintf(str, "+ECMTRECV: %d,",NB.mqtt.tcpconnectID);
ptr = (char*)cmd;
if(!NB_strcmp(ptr,str,0,len))
{
if(!NB_GetHexFromStr((char*)ptr, str, ",", 0, len, &uiResult))
{
NB.mqtt.msgID = (S8)uiResult;
uprintf("\r\n NB.mqtt.msgID=%d", NB.mqtt.msgID);
res = 3;
}
sprintf(str, "+ECMTRECV: %d,%d,",NB.mqtt.tcpconnectID, NB.mqtt.msgID);
while(uiAckIndexEnd < (len - 2))
{
if(!NB_GetStrFromStr2((char*)ptr, str, "\r\n", uiIndexStart, len, (char *)NB.net.rxdata, &uiAckIndexStart, &uiAckIndexEnd))
{
if(!NB_strcmp(( char*)NB.net.rxdata, "\"fanSwitch\":1", 0, 200))
{
CO2.fanSwitch = 1;
GPIO_ResetBits(GPIOD, GPIO_Pin_2);
uprintf("\r\n fanSwitch=1");
}
if(!NB_strcmp(( char*)NB.net.rxdata, "\"fanSwitch\":0", 0, 200))
{
CO2.fanSwitch = 0;
GPIO_SetBits(GPIOD, GPIO_Pin_2);
uprintf("\r\n fanSwitch=0");
}
if(!NB_strcmp(( char*)NB.net.rxdata, "\"CoolingSwitch\":1", 0, 200))
{
Temp.CoolingSwitch = 1;
GPIO_ResetBits(GPIOD, GPIO_Pin_2);
uprintf("\r\n CoolingSwitch=1");
}
if(!NB_strcmp(( char*)NB.net.rxdata, "\"CoolingSwitch\":0", 0, 200))
{
Temp.CoolingSwitch = 0;
GPIO_SetBits(GPIOD, GPIO_Pin_2);
uprintf("\r\n CoolingSwitch=0");
}
res = 0;
}
else
{
break;
}
uiIndexStart = uiAckIndexEnd;
}
}
return res;
}
Operation effect: Mobile phone APP remote control LED:
This concludes the review.
There are still some issues to be resolved.
https://en.eeworld.com/bbs/thread-1189547-1-1.html#pid3111773