// // Created by YY on 2019/10/8. // #include #include #include #include #include #include #include "../jni_log.h" #include "../defs.h" #include "../native-lib.h" #include "parse_net.h" #include "../common/apptimer.h" #include "../common/serial_port.h" #include "../common/net.h" #include "platform.h" #include "../mcu/mcu_if.h" #include "../utils/xconvert.h" #define DEBUG(fmt, args...) LOGD(" <%s>: " fmt, __func__, ##args) #define xENABLE_DEBUG_PROTOCOL using namespace std; #define PKT_HEAD_CHAR 0x7E #define MAX_CONTENT_SIZE 1100 #define MAX_MESSAGE_DATA 1023 #define PKT_SYNC_HEAD 0x1 #define PKT_7D 0x02 #define DEVICE_SERIAL_NUM_SIZE 16 #define PHONE_NUM_SIZE 8 #define DEFAULT_MAX_RESEND_CNT 3 #define DEFAULT_SHORT_RESEND_INTERVAL D_SEC(1) #define DEFAULT_LONG_RESEND_INTERVAL D_SEC(2) typedef struct { uint16_t length:10; uint16_t encrypt:3; uint16_t is_multi_msg:1; } message_attrib_t; typedef struct { uint8_t version; uint16_t id; union { message_attrib_t a; uint16_t b; } attrib; uint8_t phone_num[PHONE_NUM_SIZE]; uint16_t seq; uint8_t reserve; uint16_t multi_msg_num; uint16_t multi_msg_seq; uint8_t resend; //消息重传次数,如果消息无需远端应答,就设置为0,否则就应为大于0的值 uint32_t resend_interval; } message_t; typedef struct { uint16_t type : 1; uint16_t need_rsp : 1; uint16_t reserve : 2; uint16_t encrypt : 4; }ex_message_attrib_t; #define EXTERNAL_ENCRYPT_SIZE 256 typedef struct { uint8_t type; uint16_t id; union { ex_message_attrib_t a; uint16_t b; }attrib; uint16_t seq; uint8_t sn[16]; uint32_t length; uint8_t encrypt[EXTERNAL_ENCRYPT_SIZE]; // 单片机和平台的扩展消息都要处理,故按较长的算 } ex_message_t; typedef struct message_tx_table_ { uint8_t access; uint16_t id; uint16_t seq; uint8_t curr_cnt; uint8_t max_try_cnt; uint32_t resend_interval; uint32_t time_out; uint32_t length; struct message_tx_table_ *prev; struct message_tx_table_ *next; uint8_t data[0]; } message_tx_table_t; #define ID_CP_COMMON_RSP 0x0001 #define ID_PC_COMMON_RSP 0x8001 #define ID_CP_HEARTBEAT 0x0002 #define ID_CP_DEVICE_REG 0x0100 #define ID_PC_DEVICE_REG_RSP 0x8100 #define ID_CP_LOGIN_REQ 0x0101 #define ID_CP_RTK_UPLOAD 0x0301 #define ID_CP_RTK_START_REQ 0x0401 #define ID_PC_RTK_DOWNLOAD 0x8401 #define ID_CP_RTK_END_REQ 0x0402 const uint8_t PKT_RESEVER = 0x3C; static message_tx_table_t * message_tx_table_head[DATA_ACCESS_END]; static uint8_t packet_parse_status[DATA_ACCESS_END]; static uint8_t packet_buffer[DATA_ACCESS_END][MAX_CONTENT_SIZE]; static uint16_t packet_buffer_length[DATA_ACCESS_END]; struct largeMessage_t{ uint16_t currSeq; uint32_t length; uint8_t *data; }largeMessage[DATA_ACCESS_END]; static uint8_t PhoneNumber[PHONE_NUM_SIZE] = {0}; static pthread_mutex_t seq_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t tx_queue_mutex = PTHREAD_MUTEX_INITIALIZER; static void ParseTimeout(union sigval sig); static uint32_t GetResendTimeout(uint8_t access, uint8_t curr_resend_cnt, uint32_t base_time); static uint16_t GetMessageSeq(uint8_t access); static void PacketEntry(uint8_t access, const uint8_t *data, uint16_t length); static void MessageEntry(uint8_t access, const message_t *msg, const uint8_t *data, uint32_t length); static void MakeMessage(message_t *msg, uint8_t version, uint16_t id, uint8_t encrypt, const uint8_t *phone_number, uint8_t reserve, uint8_t resend, uint32_t resend_intval); static void SendMessage(uint8_t access, const message_t *srcMsg, const uint8_t *data, uint32_t length); static void SendPacket(uint8_t access, const message_t *msg, const uint8_t *data, uint16_t id); static message_tx_table_t *FindTxQueue(message_tx_table_t **head, uint16_t id, uint16_t seq); static message_tx_table_t *FindTxQueue(message_tx_table_t **head, uint32_t tm); static message_tx_table_t *FindTxQueue(message_tx_table_t **head, message_tx_table_t *item); static void AddTxQueue(message_tx_table_t **head, uint8_t access, uint16_t id, uint16_t seq, const uint8_t *data, uint32_t length, uint8_t resend, uint32_t resend_intval); static void RemoveTxQueue(message_tx_table_t **head, uint16_t id, uint16_t seq); static void RemoveTxQueue(message_tx_table_t **head, uint16_t id); static void RemoveAllTxQueue(message_tx_table_t **head); static int SendQueue(message_tx_table_t *item); static void ResendItemTimeout(message_tx_table_t **head, uint32_t tm); static uint32_t GetResentTimeoutTxQueue(message_tx_table_t **head); static void *TxQueueMgrThread(void *p); static void TriggerResendTxQueue(union sigval sig); /* 7E 80 89 00 40 87 00 00 01 37 20 20 55 68 00 76 00 9A 41 11 00 22 00 77 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 2C 00 00 02 E0 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 04 38 36 39 30 37 34 30 33 30 39 34 38 37 30 32 F2 0F 4B 93 32 B6 CE 5C 55 98 64 20 D1 7A C6 B7 DB E4 93 15 C9 59 5C F4 54 63 11 5C 81 EE F8 35 52 12 64 A5 51 40 B8 E5 D7 4D B8 EF 5F C2 2F D4 E5 EA 20 41 E5 C9 F6 BA 64 04 5B 5B 1D F3 9A 09 9D 7E */ void Parse(uint8_t access, const uint8_t *buff, uint16_t length) { /*{ static char buffd[16384]; buffd[0] = 0; DEBUG("Parse access = %s Length = %d", access ? "TCP" : "UART", length); int i = 0; for (i = 0; i < length; i++) { if ((i % 32) == 0) { sprintf(buffd + strlen(buffd), "\n"); } sprintf(buffd + strlen(buffd), "%02X ", buff[i]); if (strlen(buffd) > 800) { DEBUG("%s <- %s...", access ? "TCP" : "UART" , buffd); buffd[0] = 0; } } if (strlen(buffd) > 0) DEBUG("%s <- %s", access ? "TCP" : "UART" , buffd); }*/ for (uint16_t i = 0; i < length; i++) { unsigned char dat = buff[i]; if (packet_parse_status[access] & PKT_SYNC_HEAD) { if (dat == PKT_HEAD_CHAR) { packet_parse_status[access] &= ~PKT_SYNC_HEAD; packet_parse_status[access] &= ~PKT_7D; packet_buffer_length[access] = 0; AppTimer_add(ParseTimeout, D_SEC(5), access); } } else { if (dat == PKT_HEAD_CHAR) { if (packet_buffer_length[access] == 0) { /* 出现0长度载荷的情况,往往是单片机过来的数据[7E..]..7E,7E.....7E的括号内丢失了,和后一个7E头结合在一起*/ packet_parse_status[access] &= ~PKT_7D; AppTimer_add(ParseTimeout, D_SEC(5), access); } else { AppTimer_delete(ParseTimeout); packet_parse_status[access] = PKT_SYNC_HEAD; PacketEntry(access, packet_buffer[access], packet_buffer_length[access]); } continue; } else if (dat == 0x7D && !(packet_parse_status[access] & PKT_7D)) { packet_parse_status[access] |= PKT_7D; continue; } if (packet_parse_status[access] & PKT_7D) { packet_parse_status[access] &= ~PKT_7D; dat = 0x7D + dat - 1; } if (packet_buffer_length[access] < MAX_CONTENT_SIZE) { packet_buffer[access][ packet_buffer_length[access] ] = dat; packet_buffer_length[access]++; } else { // Too much, discard DEBUG("Parse error: pkt too large!"); AppTimer_delete(ParseTimeout); packet_parse_status[access] = PKT_SYNC_HEAD; } } } } static void ParseTimeout(union sigval sig) { AppTimer_delete(ParseTimeout); packet_parse_status[sig.sival_int] = PKT_SYNC_HEAD; DEBUG("ParseTimeout %d", sig.sival_int); } static uint32_t GetResendTimeout(uint8_t access, uint8_t curr_resend_cnt, uint32_t base_time) { if (access == DATA_ACCESS_MCU) { return base_time; } //T2 = T1 * (n + 1), 首次时间认为是1秒 uint32_t t = base_time; for (int i = 0; i <= curr_resend_cnt; i++) { t = t * (i + 1); } return t; } static uint16_t GetMessageSeq(uint8_t access) { static uint16_t TxSeq[DATA_ACCESS_END] = {0}; uint16_t seq; pthread_mutex_lock(&seq_mutex); seq = TxSeq[access]; TxSeq[access]++; pthread_mutex_unlock(&seq_mutex); return seq; } static void PacketEntry(uint8_t access, const uint8_t *data, uint16_t length) { uint16_t i, x = 0; uint8_t checkByte = 0; message_t theMessage; for (i = 0; i < length; i++) { checkByte ^= data[i]; } if (checkByte != 0 || length <= 16) { DEBUG("recv pkt error, checkByte: 0x%02X, length: %d", checkByte, length); return; } length--; // Ignore check byte theMessage.version = data[x++]; theMessage.id = BUILD_UINT16(data[x + 1], data[x]); x += 2; theMessage.attrib.b = BUILD_UINT16(data[x + 1], data[x]); x += 2; memcpy(theMessage.phone_num, data + x, PHONE_NUM_SIZE); x += PHONE_NUM_SIZE; theMessage.seq = BUILD_UINT16(data[x + 1], data[x]); x += 2; theMessage.reserve = data[x++]; theMessage.resend = DEFAULT_MAX_RESEND_CNT; if (theMessage.attrib.a.is_multi_msg) { if (x + 4 <= length) { theMessage.multi_msg_num = BUILD_UINT16(data[x + 1], data[x]); theMessage.multi_msg_seq = BUILD_UINT16(data[x + 3], data[x + 2]); x += 4; if (theMessage.multi_msg_seq == 1) { // First multi-msg largeMessage[access].currSeq = 1; largeMessage[access].length = 0; if (largeMessage[access].data != NULL) { largeMessage[access].data = (uint8_t *) realloc(largeMessage[access].data, theMessage.multi_msg_num * MAX_MESSAGE_DATA); } else { largeMessage[access].data = (uint8_t *) malloc( theMessage.multi_msg_num * MAX_MESSAGE_DATA); } memcpy(largeMessage[access].data, data + x, theMessage.attrib.a.length); largeMessage[access].length = theMessage.attrib.a.length; CommonRespend(access, theMessage.seq, theMessage.id, COMMON_RESP_SUCCESS); } else if (largeMessage[access].data != NULL && theMessage.multi_msg_seq != largeMessage[access].currSeq + 1) { //请求重传分包? CommonRespend(access, theMessage.seq, theMessage.id, COMMON_RESP_FAIL); return; } else if (largeMessage[access].data != NULL){ largeMessage[access].currSeq = theMessage.multi_msg_seq; memcpy(largeMessage[access].data + largeMessage[access].length, data + x, theMessage.attrib.a.length); largeMessage[access].length += theMessage.attrib.a.length; CommonRespend(access, theMessage.seq, theMessage.id, COMMON_RESP_SUCCESS); if (theMessage.multi_msg_seq == theMessage.multi_msg_num) { DEBUG("多包接收完毕 %d %d\n", theMessage.multi_msg_seq, largeMessage[access].length); MessageEntry(access, &theMessage, largeMessage[access].data, largeMessage[access].length); free(largeMessage[access].data); largeMessage[access].data = NULL; } } } else { // Length error return; } } else { MessageEntry(access, &theMessage, data + x, theMessage.attrib.a.length); } } static void MessageEntry(uint8_t access, const message_t *msg, const uint8_t *data, uint32_t length) { DEBUG("MessageEntry[%d] id = 0x%04X, seq = %d, length = %d", access, msg->id, msg->seq, length); switch (msg->id) { case ID_PC_COMMON_RSP: if (length == 5) { uint16_t seq = BUILD_UINT16(data[1], data[0]); uint16_t id = BUILD_UINT16(data[3], data[2]); uint8_t res = data[4]; DEBUG("ID_PC_COMMON_RSP seq = %d, id = 0x%04X res = %d", seq, id, res); RemoveTxQueue(&message_tx_table_head[DATA_ACCESS_PLATFORM], id); if (id == ID_CP_LOGIN_REQ) { DeviceLoginCallback(res); } } break; case ID_PC_DEVICE_REG_RSP: { DEBUG("ID_PC_DEVICE_REG_RSP"); RemoveTxQueue(&message_tx_table_head[DATA_ACCESS_PLATFORM], ID_CP_DEVICE_REG); if (length >= 3) { if (data[2] == 0 && length == 11) { DeviceRegisterCallback(0, data + 3, 8); } else { DeviceRegisterCallback(data[2], 0, 0); } } break; } case ID_PC_RTK_DOWNLOAD: DEBUG("ID_PC_RTK_DOWNLOAD"); ReceivedRtk(data, length); break; default: break; } CommonRespend(access, msg->seq, msg->id, COMMON_RESP_FAIL); } static void MakeMessage(message_t *msg, uint8_t version, uint16_t id, uint8_t encrypt, const uint8_t *phone_number, uint8_t reserve, uint8_t resend, uint32_t resend_intval) { msg->version = version; msg->id = id; msg->attrib.b = 0; msg->attrib.a.encrypt = encrypt; memcpy(msg->phone_num, phone_number, PHONE_NUM_SIZE); msg->reserve = reserve; msg->resend = resend; msg->resend_interval = resend_intval; } static void SendMessage(uint8_t access, const message_t *srcMsg, const uint8_t *data, uint32_t length) { uint16_t total_num = (length + MAX_MESSAGE_DATA - 1) / MAX_MESSAGE_DATA; uint32_t x = 0; message_t msg = *srcMsg; if (length == 0) { total_num = 1; } msg.attrib.a.is_multi_msg = (total_num > 1 ? 1 : 0); DEBUG("length = %d, total_num %d\n", length, total_num); for (uint16_t curr_num = 1; curr_num <= total_num; curr_num++) { uint32_t load = 0; DEBUG("curr_num %d\n", curr_num); msg.seq = GetMessageSeq(access); if (msg.attrib.a.is_multi_msg) { msg.multi_msg_num = total_num; msg.multi_msg_seq = curr_num; } load = (length - x > MAX_MESSAGE_DATA) ? MAX_MESSAGE_DATA : (length - x); msg.attrib.a.length = load; DEBUG("load %d\n", load); SendPacket(access, &msg, data + x, msg.id); x += load; } } static void SendPacket(uint8_t access, const message_t *msg, const uint8_t *data, uint16_t id) { uint8_t buffer[MAX_CONTENT_SIZE]; uint8_t buffer2[MAX_CONTENT_SIZE*2]; uint16_t x = 0; DEBUG("SendPacket len = %d", msg->attrib.a.length); buffer[x++] = msg->version; buffer[x++] = HI_UINT16(msg->id); buffer[x++] = LO_UINT16(msg->id); buffer[x++] = HI_UINT16(msg->attrib.b); buffer[x++] = LO_UINT16(msg->attrib.b); memcpy(buffer + x, msg->phone_num, PHONE_NUM_SIZE); x += PHONE_NUM_SIZE; buffer[x++] = HI_UINT16(msg->seq); buffer[x++] = LO_UINT16(msg->seq); buffer[x++] = msg->reserve; if (msg->attrib.a.is_multi_msg) { buffer[x++] = HI_UINT16(msg->multi_msg_num); buffer[x++] = LO_UINT16(msg->multi_msg_num); buffer[x++] = HI_UINT16(msg->multi_msg_seq); buffer[x++] = LO_UINT16(msg->multi_msg_seq); } memcpy(buffer + x, data, msg->attrib.a.length); x += msg->attrib.a.length; uint8_t checkByte = 0; for (int i = 0; i < x; i++) { checkByte ^= buffer[i]; } buffer[x++] = checkByte; uint32_t y = 0; buffer2[y++] = PKT_HEAD_CHAR; for (int i = 0; i < x; i++) { if (buffer[i] == 0x7E) { buffer2[y++] = 0x7D; buffer2[y++] = 0x02; } else if (buffer[i] == 0x7D) { buffer2[y++] = 0x7D; buffer2[y++] = 0x01; } else { buffer2[y++] = buffer[i]; } } buffer2[y++] = PKT_HEAD_CHAR; AddTxQueue(&message_tx_table_head[access], access, id, msg->seq, buffer2, y, msg->resend, msg->resend_interval); } static message_tx_table_t *FindTxQueue(message_tx_table_t **head, uint16_t id, uint16_t seq) { if (head == NULL) { return NULL; } for (message_tx_table_t *ptr = *head; ptr != NULL; ptr = ptr->next) { if (ptr->id == id && ptr->seq == seq) { return ptr; } } return NULL; } static message_tx_table_t *FindTxQueue(message_tx_table_t **head, uint32_t tm) { if (head == NULL) { return NULL; } for (message_tx_table_t *ptr = *head; ptr != NULL; ptr = ptr->next) { if (ptr->time_out <= tm) { return ptr; } } return NULL; } static message_tx_table_t *FindTxQueue(message_tx_table_t **head, message_tx_table_t *item) { if (head == NULL) { return NULL; } for (message_tx_table_t *ptr = *head; ptr != NULL; ptr = ptr->next) { if (ptr == item) { return ptr; } } return NULL; } static void AddTxQueue(message_tx_table_t **head, uint8_t access, uint16_t id, uint16_t seq, const uint8_t *data, uint32_t length, uint8_t resend, uint32_t resend_intval) { message_tx_table_t *new_item; if (length == 0 || head == NULL) { return; } DEBUG("AddTxQueue id = 0x%X, seq = %d, len = %d", id, seq, length); pthread_mutex_lock(&tx_queue_mutex); // If the item is exist, skip it if (FindTxQueue(head, id, seq) != NULL) { DEBUG("已经有了,我们不干了"); goto ATQ_END; } if ((new_item = (message_tx_table_t *)malloc(sizeof(message_tx_table_t) + length))== NULL) { // Not enough memory! LOGE("Not enough memory!"); goto ATQ_END; } new_item->next = NULL; new_item->access = access; new_item->id = id; new_item->seq = seq; new_item->length = length; new_item->curr_cnt = 0; new_item->max_try_cnt = 1 + resend; // Zero means unlimited new_item->resend_interval = resend_intval; new_item->time_out = resend_intval + AppTimer_GetTickCount(); memcpy(new_item->data, data, length); if (*head == NULL) { new_item->prev = NULL; *head = new_item; DEBUG("############### Create head node ##################"); } else { message_tx_table_t *ptr = *head; while (ptr->next != NULL) { ptr = ptr->next; } ptr->next = new_item; new_item->prev = ptr; } SendQueue(new_item); if (new_item->max_try_cnt > 0 && new_item->curr_cnt >= new_item->max_try_cnt) { if (new_item == *head) { if (new_item->next == NULL) { *head = NULL; DEBUG("****************** Delete all ******************"); } else { *head = new_item->next; new_item->next->prev = NULL; } } else { if (new_item->next != NULL) { new_item->next->prev = new_item->prev; } new_item->prev->next = new_item->next; } free(new_item); } ATQ_END: pthread_mutex_unlock(&tx_queue_mutex); uint32_t tim = GetResentTimeoutTxQueue(&message_tx_table_head[DATA_ACCESS_PLATFORM]); DEBUG("NEXT ==== %ld", tim); AppTimer_delete(TriggerResendTxQueue); if (tim != uint32_t (-1)) { AppTimer_add(TriggerResendTxQueue, tim); } } static void RemoveTxQueue(message_tx_table_t **head, uint16_t id, uint16_t seq) { if (head == NULL) return; message_tx_table_t *ptr = *head; pthread_mutex_lock(&tx_queue_mutex); while ( ptr != NULL ) { if (ptr->id == id && ptr->seq == seq) { // delete message_tx_table_t *temp = ptr; if (ptr == *head) { if (ptr->next == NULL) { DEBUG("****************** RemoveTxQueue Delete all ******************"); ptr = *head = NULL; } else { *head = ptr->next; ptr->next->prev = NULL; ptr = *head; } } else { if (ptr->next != NULL) { ptr->next->prev = ptr->prev; } ptr->prev->next = ptr->next; ptr = ptr->next; } free(temp); } else { ptr = ptr->next; } } pthread_mutex_unlock(&tx_queue_mutex); } static void RemoveTxQueue(message_tx_table_t **head, uint16_t id) { if (head == NULL) return; message_tx_table_t *ptr = *head; pthread_mutex_lock(&tx_queue_mutex); while ( ptr != NULL ) { if (ptr->id == id) { // delete message_tx_table_t *temp = ptr; DEBUG("****************** 我们找到了, 弄死它 ******************"); if (ptr == *head) { if (ptr->next == NULL) { DEBUG("****************** RemoveTxQueue Delete all ******************"); ptr = *head = NULL; } else { *head = ptr->next; ptr->next->prev = NULL; ptr = *head; } } else { if (ptr->next != NULL) { ptr->next->prev = ptr->prev; } ptr->prev->next = ptr->next; ptr = ptr->next; } free(temp); } else { ptr = ptr->next; } } pthread_mutex_unlock(&tx_queue_mutex); } static void RemoveAllTxQueue(message_tx_table_t **head) { if (head == NULL) { return; } pthread_mutex_lock(&tx_queue_mutex); for (message_tx_table_t *ptr = *head, *next; ptr != NULL; ptr = next) { next = ptr->next; free(ptr); } *head = NULL; pthread_mutex_unlock(&tx_queue_mutex); } static int SendQueue(message_tx_table_t *item) { DEBUG("SendQueue id = 0x%04X, seq = %d, length = %d", item->id, item->seq, item->length); if (item != NULL) { if (item->access == DATA_ACCESS_MCU) { if (WriteSerialPort(GetSerialPort(UART_1), item->data, item->length) != item->length) { item->time_out = 100 + AppTimer_GetTickCount(); LOGE("发往串口出错了"); // return -1; } } else if(item->access == DATA_ACCESS_PLATFORM) { if (WritePlatform(item->data, item->length) != item->length) { item->time_out = D_SEC(3) + AppTimer_GetTickCount(); LOGE("发往网络出错了 seq = %d", item->seq); // return -2; } } else { // return 0; } #ifdef ENABLE_DEBUG_PROTOCOL { static char buff[16384]; buff[0] = 0; int i = 0; for (i = 0; i < item->length; i++) { if ((i % 32) == 0) { sprintf(buff + strlen(buff), "\n"); } sprintf(buff + strlen(buff), "%02X ", item->data[i]); if (strlen(buff) > 800) { DEBUG("DATAOUT: %s -> %s...", item->access == DATA_ACCESS_MCU ? "UART" : "TCP" , buff); buff[0] = 0; } } if (strlen(buff) > 0) { DEBUG("DATAOUT: %s -> %s", item->access == DATA_ACCESS_MCU ? "UART" : "TCP", buff); } } #endif // 系统层确认发送成功 item->curr_cnt += 1; item->time_out = GetResendTimeout(item->access, item->curr_cnt - 1, item->resend_interval) + AppTimer_GetTickCount(); } return 0; } static void ResendItemTimeout(message_tx_table_t **head, uint32_t tm) { if (head == NULL) return; message_tx_table_t *ptr = *head; while ( ptr != NULL ) { if (ptr->time_out <= tm) { // send LOGE("重发 id 0x%04X seq %d", ptr->id, ptr->seq); int ret = SendQueue(ptr); if (ret < 0 && ret != -3) { break; } if (ptr->max_try_cnt > 0 && ptr->curr_cnt >= ptr->max_try_cnt) { // delete DEBUG("Delete item %d", ptr->curr_cnt); message_tx_table_t *temp = ptr; pthread_mutex_lock(&tx_queue_mutex); if (ptr == *head) { if (ptr->next == NULL) { DEBUG("****************** Delete all 2 ******************"); ptr = *head = NULL; } else { *head = ptr->next; ptr->next->prev = NULL; ptr = *head; } } else { if (ptr->next != NULL) { ptr->next->prev = ptr->prev; } ptr->prev->next = ptr->next; ptr = ptr->next; } pthread_mutex_unlock(&tx_queue_mutex); free(temp); } else { ptr = ptr->next; } } else { ptr = ptr->next; } } } static uint32_t GetResentTimeoutTxQueue(message_tx_table_t **head) { uint32_t resentTime = uint32_t (-1); uint32_t now = AppTimer_GetTickCount(); if (head == NULL) return resentTime; pthread_mutex_lock(&tx_queue_mutex); message_tx_table_t *ptr = *head; while ( ptr != NULL ) { if (ptr->time_out <= now) { resentTime = 0; break; } else if (ptr->time_out - now < resentTime) { resentTime = ptr->time_out - now; } ptr = ptr->next; } pthread_mutex_unlock(&tx_queue_mutex); return resentTime; } static sem_t sem_tx_mgr; static void *TxQueueMgrThread(void *p) { while (true) { sem_wait(&sem_tx_mgr); uint32_t tim = AppTimer_GetTickCount(); DEBUG("TxQueueMgrThread %ld", tim); // Check send queue, and resend ResendItemTimeout(&message_tx_table_head[DATA_ACCESS_PLATFORM], tim); tim = GetResentTimeoutTxQueue(&message_tx_table_head[DATA_ACCESS_PLATFORM]); DEBUG("NEXT ==== %ld", tim); AppTimer_delete(TriggerResendTxQueue); if (tim != uint32_t (-1)) { AppTimer_add(TriggerResendTxQueue, tim); } } pthread_exit(NULL); } static void TriggerResendTxQueue(union sigval sig) { AppTimer_delete(TriggerResendTxQueue); sem_post(&sem_tx_mgr); } void PlatformTxInit(void) { //2019101500000001 /* PhoneNumber[0] = 0x20; PhoneNumber[1] = 0x19; PhoneNumber[2] = 0x10; PhoneNumber[3] = 0x15; PhoneNumber[4] = 0x00; PhoneNumber[5] = 0x00; PhoneNumber[6] = 0x00; PhoneNumber[7] = 0x02;*/ pthread_mutex_init(&seq_mutex, NULL); pthread_mutex_init(&tx_queue_mutex, NULL); sem_init(&sem_tx_mgr, 0, 0); pthread_t pid; pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);//detached pthread_create(&pid, &attr, TxQueueMgrThread, NULL); } void SetPlatformTxPhoneNum(const char *phone) { ConvertPhoneNum(PhoneNumber, sizeof(PhoneNumber), phone); } void CommonRespend(uint8_t access, uint16_t seq, uint16_t id, uint8_t value) { uint8_t data[5]; data[0] = HI_UINT16(seq); data[1] = LO_UINT16(seq); data[2] = HI_UINT16(id); data[3] = LO_UINT16(id); data[4] = value; message_t msg; MakeMessage(&msg, MESSAGE_VERSION_B2016, ID_CP_COMMON_RSP, PROTOCOL_ENCRYPT_NONE, PhoneNumber, PKT_RESEVER, 0, DEFAULT_SHORT_RESEND_INTERVAL); SendMessage(access, &msg, data, sizeof(data)); } void SendHeartBeat(uint8_t access) { message_t msg; MakeMessage(&msg, MESSAGE_VERSION_B2016, ID_CP_HEARTBEAT, PROTOCOL_ENCRYPT_NONE, PhoneNumber, PKT_RESEVER, 0, DEFAULT_SHORT_RESEND_INTERVAL); SendMessage(access, &msg, NULL, 0); } void SendDeviceRegister(uint16_t province, uint16_t city, const uint8_t *device_model, int device_model_length, const uint8_t *device_sn, const char *imei) { uint8_t data[64] = {0}; int x = 0; data[x++] = HI_UINT16(province); data[x++] = LO_UINT16(province); data[x++] = HI_UINT16(city); data[x++] = LO_UINT16(city); memcpy(data + x, device_model, device_model_length); x += 20; memcpy(data + x, device_sn, 16); x += 16; memcpy(data + x, imei, strlen(imei)); x += 15; message_t msg; MakeMessage(&msg, MESSAGE_VERSION_B2016, ID_CP_DEVICE_REG, PROTOCOL_ENCRYPT_NONE, PhoneNumber, PKT_RESEVER, 0, DEFAULT_SHORT_RESEND_INTERVAL); SendMessage(DATA_ACCESS_PLATFORM, &msg, data, x); } void SendDeviceLogin(const uint8_t *data, int length) { message_t msg; MakeMessage(&msg, MESSAGE_VERSION_B2016, ID_CP_LOGIN_REQ, PROTOCOL_ENCRYPT_NONE, PhoneNumber, PKT_RESEVER, 0, DEFAULT_SHORT_RESEND_INTERVAL); SendMessage(DATA_ACCESS_PLATFORM, &msg, data, length); } void SendRTKReport(uint8_t gps_status, uint32_t latitude, uint32_t longitude, uint16_t altitude, const uint8_t *bcd_time, const uint8_t *rtk, int rtk_length) { uint8_t data[MAX_CONTENT_SIZE]; int x = 0; data[x++] = 0; data[x++] = gps_status; data[x++] = BREAK_UINT32(latitude, 3); data[x++] = BREAK_UINT32(latitude, 2); data[x++] = BREAK_UINT32(latitude, 1); data[x++] = BREAK_UINT32(latitude, 0); data[x++] = BREAK_UINT32(longitude, 3); data[x++] = BREAK_UINT32(longitude, 2); data[x++] = BREAK_UINT32(longitude, 1); data[x++] = BREAK_UINT32(longitude, 0); data[x++] = HI_UINT16(altitude); data[x++] = LO_UINT16(altitude); memcpy(data + x, bcd_time, 6); x += 6; memcpy(data + x, rtk, rtk_length); x += rtk_length; message_t msg; MakeMessage(&msg, MESSAGE_VERSION_B2016, ID_CP_RTK_UPLOAD, PROTOCOL_ENCRYPT_NONE, PhoneNumber, PKT_RESEVER, 0, DEFAULT_SHORT_RESEND_INTERVAL); SendMessage(DATA_ACCESS_PLATFORM, &msg, data, x); } void SendRTKStart(uint32_t latitude, uint32_t longitude, uint16_t altitude, const uint8_t *bcd_time, uint16_t rtk_pkt_interval) { uint8_t data[18]; int x = 0; data[x++] = BREAK_UINT32(latitude, 3); data[x++] = BREAK_UINT32(latitude, 2); data[x++] = BREAK_UINT32(latitude, 1); data[x++] = BREAK_UINT32(latitude, 0); data[x++] = BREAK_UINT32(longitude, 3); data[x++] = BREAK_UINT32(longitude, 2); data[x++] = BREAK_UINT32(longitude, 1); data[x++] = BREAK_UINT32(longitude, 0); data[x++] = HI_UINT16(altitude); data[x++] = LO_UINT16(altitude); memcpy(data + x, bcd_time, 6); x += 6; data[x++] = HI_UINT16(rtk_pkt_interval); data[x++] = LO_UINT16(rtk_pkt_interval); message_t msg; MakeMessage(&msg, MESSAGE_VERSION_B2016, ID_CP_RTK_START_REQ, PROTOCOL_ENCRYPT_NONE, PhoneNumber, PKT_RESEVER, 0, DEFAULT_SHORT_RESEND_INTERVAL); SendMessage(DATA_ACCESS_PLATFORM, &msg, data, x); } void SendRTKStop(void) { message_t msg; MakeMessage(&msg, MESSAGE_VERSION_B2016, ID_CP_RTK_END_REQ, PROTOCOL_ENCRYPT_NONE, PhoneNumber, PKT_RESEVER, 0, DEFAULT_SHORT_RESEND_INTERVAL); SendMessage(DATA_ACCESS_PLATFORM, &msg, NULL, 0); }