From 2117dc433abc11b526b74ffb32de851f1d82dee9 Mon Sep 17 00:00:00 2001
From: fctom1215 <fctom1215@outlook.com>
Date: 星期一, 03 二月 2020 22:09:55 +0800
Subject: [PATCH] 重写了消息事件部分,之前的缺乏线程安全可能导致崩溃。

---
 lib/src/main/cpp/native-lib.cpp             |    2 
 lib/src/main/cpp/rtk_module/virtual_rtk.cpp |    4 +
 lib/src/main/cpp/rtk_platform/platform.cpp  |  146 ++++++++++++++++++++++++++++++++----------------
 3 files changed, 101 insertions(+), 51 deletions(-)

diff --git a/lib/src/main/cpp/native-lib.cpp b/lib/src/main/cpp/native-lib.cpp
index a605f17..760751c 100644
--- a/lib/src/main/cpp/native-lib.cpp
+++ b/lib/src/main/cpp/native-lib.cpp
@@ -245,7 +245,7 @@
     InitPlatform(phone, RTK_PLATFORM_IP, RTK_PLATFORM_PORT);
     AppTimer_add(SendBootIndicate, 500);
 
-    InitVirtualDevice("192.168.16.104", 9001);
+    InitVirtualDevice("192.168.3.52", 9001);
 }
 
 static void SendBootIndicate(union sigval sig) {
diff --git a/lib/src/main/cpp/rtk_module/virtual_rtk.cpp b/lib/src/main/cpp/rtk_module/virtual_rtk.cpp
index da1baa7..903255b 100644
--- a/lib/src/main/cpp/rtk_module/virtual_rtk.cpp
+++ b/lib/src/main/cpp/rtk_module/virtual_rtk.cpp
@@ -55,6 +55,10 @@
 
     fd = ConnectTCP(vs->domain_name, vs->port);
 
+    if (fd > 0) {
+        DEBUG("铏氭嫙骞冲彴杩炴帴鎴愬姛");
+    }
+
     while (fd > 0) {
         tv.tv_sec = 5;
         tv.tv_usec = 0;
diff --git a/lib/src/main/cpp/rtk_platform/platform.cpp b/lib/src/main/cpp/rtk_platform/platform.cpp
index 833dd63..322708f 100644
--- a/lib/src/main/cpp/rtk_platform/platform.cpp
+++ b/lib/src/main/cpp/rtk_platform/platform.cpp
@@ -53,17 +53,18 @@
     uint32_t downloadRtk : 1;
 } platformStatus;
 
-static struct event_queue_t {
-    uint32_t event;
+struct event_t {
+    uint32_t id;
     int length;
     void *data;
+};
+
+static struct event_queue_t {
+    struct event_t event;
     struct event_queue_t *next;
 } *eventQueue;
 
 struct platformSocket exceptSocket, currSocket;
-static uint32_t eventMask;
-static void *eventData[32];
-static int eventDataLength[32];
 static sem_t sem_status_changed;
 
 static bool requestPlatformSendRtk = false;
@@ -71,6 +72,10 @@
 
 static pthread_mutex_t platform_tx_mutex = PTHREAD_MUTEX_INITIALIZER;
 static pthread_mutex_t events_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static void AddEvnet(uint32_t event, const uint8_t *data, int length);
+static struct event_t * FetchEvent(void);
+static void RemoveEvent(void);
 
 static void ReqRtkPlatformConfigTimeout(union sigval sig);
 
@@ -89,22 +94,71 @@
 static void TriggerHeartbeat(union sigval sig);
 static void RequestRtkNoResp(union sigval sig);
 
-void AddEvnet(uint32_t event, const uint8_t *data, int length)
+static void AddEvnet(uint32_t event, const uint8_t *data, int length)
 {
+    DEBUG("AddEvnet 0x%04X length %d", event, length);
+
+    struct event_queue_t *nw = (struct event_queue_t *)malloc(sizeof(struct event_queue_t));
+    nw->next = NULL;
+
+    nw->event.id = event;
+    nw->event.data = NULL;
+    nw->event.length = 0;
+
+    if (data != NULL && length > 0) {
+        nw->event.data = malloc(length);
+        nw->event.length = length;
+        memcpy(nw->event.data, data, length);
+    }
+
+    pthread_mutex_lock(&events_mutex);
+
     struct event_queue_t *p = eventQueue;
 
-    while (p != NULL) {
-        struct event_queue_t *nw = (struct event_queue_t *)malloc(sizeof(struct event_queue_t));
+    while (p != NULL && p->next != NULL) p = p->next;
 
-        nw->event = event;
-        nw->next = NULL;
-        nw->data = NULL;
-        nw->length = 0;
-        if (data != NULL && length > 0) {
-            nw->data = malloc(length);
-            nw->length = length;
-        }
+    if (eventQueue == NULL)
+        eventQueue = nw;
+    else
+        p->next = nw;
+
+    pthread_mutex_unlock(&events_mutex);
+}
+
+static struct event_t * FetchEvent(void)
+{
+    struct event_t *evp = NULL;
+
+    pthread_mutex_lock(&events_mutex);
+    struct event_queue_t *p = eventQueue;
+
+    if (p != NULL) {
+        evp = &p->event;
     }
+
+    pthread_mutex_unlock(&events_mutex);
+
+    return evp;
+}
+
+static void RemoveEvent(void)
+{
+    void *p1 = NULL, *p2 = NULL;
+
+    pthread_mutex_lock(&events_mutex);
+    struct event_queue_t *p = eventQueue;
+
+    if (p != NULL) {
+        eventQueue = p->next;
+        p1 = p;
+        p2 = p->event.data;
+    }
+    pthread_mutex_unlock(&events_mutex);
+
+    if (p2 != NULL)
+        free(p2);
+    if (p1 != NULL)
+        free(p1);
 }
 
 void InitPlatform(const uint8_t *phone, const char *domain_name, int port)
@@ -116,9 +170,6 @@
 
     eventQueue = NULL;
 
-    eventMask = 0;
-    memset(eventData, 0, sizeof(eventData));
-    memset(eventDataLength, 0, sizeof(eventDataLength));
     platform_tcp_fd = -1;
 
     memset(&currSocket, 0, sizeof(currSocket));
@@ -133,6 +184,26 @@
     pthread_attr_init(&attr);
     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);//detached
     pthread_create(&pid, &attr, StatusListenThread, NULL);
+
+
+    AddEvnet(100, (uint8_t *)"1234567890", 11);
+    AddEvnet(101, (uint8_t *)"abcd", 5);
+    AddEvnet(102, (uint8_t *)"xyz", 4);
+    AddEvnet(104, NULL, 0);
+    AddEvnet(105, (uint8_t *)"uio", 4);
+
+    while (true) {
+        struct event_t *evp = FetchEvent();
+
+        if (evp != NULL) {
+            DEBUG("寰楀埌event %d %d", evp->id, evp->length);
+
+            if (evp->data != NULL)
+                DEBUG("鏁版嵁 %s", evp->data);
+            RemoveEvent();
+        } else
+         break;
+    }
 }
 
 static void ReqRtkPlatformConfigTimeout(union sigval sig)
@@ -179,20 +250,7 @@
 
 void PlatformStatusChanged(uint32_t event, const uint8_t *data, int length)
 {
-    if (data != NULL && length > 0) {
-        int x = BitCount(event-1);
-
-        if (eventData[x] != NULL) free(eventData[x]);
-        eventData[x] = malloc(length);
-        memcpy(eventData[x], data, length);
-        eventDataLength[x] = length;
-
-//        DEBUG("ADD %d len %d", x, length);
-    }
-
-    pthread_mutex_lock(&events_mutex);
-    eventMask |= event;
-    pthread_mutex_unlock(&events_mutex);
+    AddEvnet(event, data, length);
 
     sem_post(&sem_status_changed);
 }
@@ -229,25 +287,13 @@
     while (true) {
         sem_wait(&sem_status_changed);
 
-        uint32_t events;
-        pthread_mutex_lock(&events_mutex);
-        events = eventMask;
-        eventMask = 0;
-        pthread_mutex_unlock(&events_mutex);
+        struct event_t *evp = FetchEvent();
 
-        while (events > 0) {
-            int x = BitCount(((events - 1) ^ events) - 1);
-
-//            DEBUG("FETCH %d len %d", x, eventDataLength[x]);
-
-            PlatformChangeEntry(BV(x), (uint8_t *)eventData[x], eventDataLength[x]);
-            if (eventData[x] != NULL) {
-                free(eventData[x]);
-                eventData[x] = NULL;
-                eventDataLength[x] = 0;
-            }
-            events &= events - 1;
+        if (evp != NULL) {
+            PlatformChangeEntry(evp->id, (uint8_t *)evp->data, evp->length);
         }
+
+        RemoveEvent();
     }
 }
 

--
Gitblit v1.8.0