1 /*
2 * Copyright (c) 2019-2021 Rockchip Eletronics Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "MessageParser.hpp"
18
19 #define BUFFER_MAX_SIZE (1024 * 512)
20
21 #ifndef LOGE
22 #define LOGE printf
23 #endif
24
25 // #define AIQ_MSGPARSER_CHECK_HASH
26
27 #define CALC_32BIT_LITTLE(array) \
28 ((array[0] & 0xff) | ((array[1] & 0xff) << 8) | ((array[2] & 0xff) << 16) | \
29 ((array[3] & 0xff) << 24))
30
31 namespace RkMSG {
32
MessageParser(void * ptr)33 MessageParser::MessageParser(void* ptr) : pri(ptr), is_running(false) {
34 }
35
stop()36 int MessageParser::stop() {
37 is_running = false;
38 notify_wakeup();
39 return 0;
40 }
41
~MessageParser()42 MessageParser::~MessageParser() {
43 this->stop();
44 if (this->proc_thread && this->proc_thread->joinable()) {
45 this->proc_thread->join();
46 this->proc_thread.reset();
47 this->proc_thread = nullptr;
48 }
49 if (raw_stream.size()) {
50 raw_stream.clear();
51 }
52 }
53
notify_wakeup()54 int MessageParser::notify_wakeup() {
55 std::unique_lock<std::mutex> lck(this->proc_mutex);
56 this->proc_cond.notify_all();
57
58 return 0;
59 }
60
freePacket(void * packet,MessageType type)61 int MessageParser::freePacket(void *packet, MessageType type) {
62 if (!packet) {
63 return 0;
64 }
65
66 if (type == RKAIQ_MESSAGE_NEW) {
67 RkAiqSocketPacket_t *newpkt = (RkAiqSocketPacket_t *)packet;
68 if (newpkt->data) {
69 free(newpkt->data);
70 }
71 free(newpkt);
72 } else if (type == RKAIQ_MESSAGE_OLD) {
73 RkAiqSocketPacket *newpkt = (RkAiqSocketPacket *)packet;
74 if (newpkt->data) {
75 free(newpkt->data);
76 }
77 free(newpkt);
78 }
79
80 return 0;
81 }
82
clonePacket(void * from,MessageType type)83 void *MessageParser::clonePacket(void *from, MessageType type) {
84 if (type == RKAIQ_MESSAGE_NEW) {
85 RkAiqSocketPacket_t *temp = (RkAiqSocketPacket_t *)from;
86 RkAiqSocketPacket_t *opkt =
87 (RkAiqSocketPacket_t *)malloc(sizeof(RkAiqSocketPacket_t));
88 if (!opkt) {
89 return nullptr;
90 }
91
92 memcpy(opkt, temp, sizeof(RkAiqSocketPacket_t));
93 opkt->data = (uint8_t *)malloc(temp->payload_size + 1);
94
95 if (!opkt->data) {
96 free(opkt);
97 return nullptr;
98 }
99
100 memcpy(opkt->data, (uint8_t *)&(temp->data), temp->payload_size);
101 opkt->data[temp->payload_size] = '\0';
102
103 return opkt;
104 } else if (type == RKAIQ_MESSAGE_OLD) {
105 RkAiqSocketPacket *temp = (RkAiqSocketPacket *)from;
106 RkAiqSocketPacket *opkt =
107 (RkAiqSocketPacket *)malloc(sizeof(RkAiqSocketPacket));
108 if (!opkt) {
109 return nullptr;
110 }
111
112 memcpy(opkt, temp, sizeof(RkAiqSocketPacket));
113 opkt->data = (char *)malloc(temp->dataSize + 1);
114
115 if (!opkt->data) {
116 free(opkt);
117 return nullptr;
118 }
119
120 memcpy(opkt->data, temp->data, temp->dataSize);
121 opkt->data[temp->dataSize] = '\0';
122
123 return opkt;
124 }
125
126 return nullptr;
127 }
128
process()129 void MessageParser::process() {
130 while (is_running) {
131 std::unique_lock<std::mutex> lck(proc_mutex);
132 while (raw_stream.size() <= 0 && is_running) {
133 proc_cond.wait(lck);
134 }
135
136 if (!is_running) {
137 break;
138 }
139
140 // Found full packet: deal and call then erease
141 // Found non full packet: wait
142 // Found nothing: clear raw stream
143
144 size_t start_index = 0;
145 size_t end_index = 0;
146
147 RkAiqSocketPacket *old_pkt = nullptr;
148 RkAiqSocketPacket_t *new_pkt = nullptr;
149
150 // Check if a new packet, if so, copy it and erase all ahead data
151 new_pkt = findValidSection(&raw_stream[0], raw_stream.size(), &start_index,
152 &end_index);
153 // if found full packet
154 if (new_pkt && mCallBackFunc) {
155 mCallBackFunc(pri, new_pkt, RKAIQ_MESSAGE_NEW);
156 }
157
158 if (((ssize_t)start_index >= 0) && (end_index > 0)) {
159 raw_stream.erase(raw_stream.begin(), raw_stream.begin() + end_index);
160 }
161
162 start_index = 0;
163 end_index = 0;
164
165 // Check if a new packet, if so, copy it and erase all ahead data
166 old_pkt = findValidSection2(raw_stream.size() > 0 ? &raw_stream[0] : NULL,
167 raw_stream.size(), &start_index, &end_index);
168 // if found full packet
169 if (old_pkt && mCallBackFunc) {
170 mCallBackFunc(pri, old_pkt, RKAIQ_MESSAGE_OLD);
171 freePacket(old_pkt, RKAIQ_MESSAGE_OLD);
172 }
173
174 if (((ssize_t)start_index >= 0) && (end_index > 0)) {
175 raw_stream.erase(raw_stream.begin(), raw_stream.begin() + end_index);
176 }
177
178 if (!new_pkt && !old_pkt && is_running) {
179 proc_cond.wait(lck);
180 }
181 }
182 LOGE("MessageParser %s loop exit!\n", __func__);
183 }
184
pushRawData(const uint8_t * data,size_t size)185 int MessageParser::pushRawData(const uint8_t *data, size_t size) {
186 {
187 int erase_section = 0;
188 const std::lock_guard<std::mutex> lock(proc_mutex);
189
190 // reach max buffer size, erase old
191 if (size > RKAIQ_RAW_STREAM_MAX_SIZE) {
192 erase_section = RKAIQ_RAW_STREAM_MAX_SIZE;
193 } else {
194 erase_section = size;
195 }
196
197 // do need erase
198 if (raw_stream.size() >= RKAIQ_RAW_STREAM_MAX_SIZE) {
199 raw_stream.erase(raw_stream.begin(), raw_stream.begin() + erase_section);
200 }
201
202 raw_stream.insert(raw_stream.end(), data, data + size);
203 }
204
205 notify_wakeup();
206
207 return 0;
208 }
209
bit_stream_find(uint8_t * data,int size,const uint8_t * dst,int len)210 uint8_t *MessageParser::bit_stream_find(uint8_t *data, int size,
211 const uint8_t *dst, int len) {
212 int start_pos = -1;
213
214 if (!data || !size || !dst || !len) {
215 return NULL;
216 }
217
218 if (size < len) {
219 return NULL;
220 }
221
222 for (start_pos = 0; start_pos < size - len; start_pos++) {
223 if (0 == memcmp(data + start_pos, dst, len)) {
224 return data + start_pos;
225 }
226 }
227
228 return NULL;
229 }
230
231 // 1. find valid erase
232 // 2. find error packet
233 // 3. if crashed erase header then search again
234 // 4. erase ahead of first valid data
235 // 5. return null with non zero start en,then packet error
236
findValidSection(uint8_t * buffer,int len,size_t * start_of,size_t * end_of)237 RkAiqSocketPacket_t *MessageParser::findValidSection(uint8_t *buffer, int len,
238 size_t *start_of,
239 size_t *end_of) {
240 RkAiqSocketPacket_t *aiq_pkt;
241 uint8_t *start_pos = NULL;
242 size_t skip_size = 0;
243 size_t remain_size = 0;
244
245 *start_of = 0;
246 *end_of = 0;
247
248 start_pos = bit_stream_find(buffer, len, RKAIQ_SOCKET_DATA_HEADER,
249 RKAIQ_SOCKET_DATA_HEADER_LEN);
250
251 // Found valid start
252 if (NULL == start_pos) {
253 return nullptr;
254 }
255
256 // Calculate data size
257 skip_size = start_pos - buffer;
258 remain_size = len - skip_size;
259
260 // Check if contains packet information
261 if (remain_size < (int)sizeof(RkAiqSocketPacket_t)) {
262 LOGE("Not a complete packet [%d], wait more...\n", len);
263 return nullptr;
264 }
265
266 /*
267 * +---------------------------------------------------------------------+
268 * |<--------------------------VALID DATA SIZE-------------------------->|
269 * +-------------------+------------------------------+------------------+
270 * |<---HEADER-SIZE--->|<--------PAYLOAD-SIZE-------->|<---HASH-SIZE---->|
271 * +-------------------+------------------------------+------------------+
272 * | HEADER DATA | REAL DATA/CMD | VERIFY DATA |
273 * +-------------------+------------------------------+------------------+
274 *
275 * */
276
277 // Found complete packet header, then parse packet info
278 aiq_pkt = (RkAiqSocketPacket_t *)start_pos;
279
280 // Assume Single packet, check if data all present
281 if (remain_size < (aiq_pkt->packet_size + RKAIQ_SOCKET_DATA_EXTRA_SIZE)) {
282 return nullptr;
283 }
284
285 *start_of = start_pos - buffer;
286 *end_of = *start_of + aiq_pkt->payload_size + RKAIQ_SOCKET_DATA_EXTRA_SIZE;
287
288 #ifdef AIQ_MSGPARSER_CHECK_HASH
289 #endif
290
291 return (RkAiqSocketPacket_t *)clonePacket(aiq_pkt, RKAIQ_MESSAGE_NEW);
292 }
293
findValidSection2(uint8_t * buffer,int len,size_t * start_of,size_t * end_of)294 RkAiqSocketPacket *MessageParser::findValidSection2(uint8_t *buffer, int len,
295 size_t *start_of,
296 size_t *end_of) {
297 RkAiqSocketPacket *aiq_pkt;
298 uint8_t *start_pos = NULL;
299 size_t skip_size = 0;
300 size_t remain_size = 0;
301 size_t pkt_size = 0;
302
303 *start_of = 0;
304 *end_of = 0;
305
306 start_pos = bit_stream_find(buffer, len, RKAIQ_SOCKET_OLD_HEADER,
307 RKAIQ_SOCKET_OLD_HEADER_LEN);
308
309 // Found valid start
310 if (NULL == start_pos) {
311 return nullptr;
312 }
313
314 // Calculate data size
315 skip_size = start_pos - buffer;
316 remain_size = len - skip_size;
317
318 // Check if contains packet information
319 if (remain_size < ((int)sizeof(RkAiqSocketPacket) - sizeof(void*))) {
320 LOGE("Not a complete packet [%d], wait more...\n", len);
321 return nullptr;
322 }
323
324 // Found complete packet header, then parse packet info
325 aiq_pkt = (RkAiqSocketPacket *)start_pos;
326 pkt_size = CALC_32BIT_LITTLE(aiq_pkt->packetSize);
327
328 // Assume Single packet, check if data all present
329 if (remain_size < pkt_size) {
330 return nullptr;
331 }
332
333 char *tmpArray = (char *)start_pos;
334 int packetSize = (tmpArray[2] & 0xff) | ((tmpArray[3] & 0xff) << 8) |
335 ((tmpArray[4] & 0xff) << 16) | ((tmpArray[5] & 0xff) << 24);
336
337 if (packetSize >= 102400) {
338 LOGE("MessageParser %s: packetSize error!\n", __func__);
339 return nullptr;
340 }
341
342 char *receivedPacket = (char *)malloc(packetSize);
343 memset(receivedPacket, 0, packetSize);
344 memcpy(receivedPacket, tmpArray, packetSize);
345
346 // parse data
347 RkAiqSocketPacket receivedData;
348 int offset = 0;
349 offset += 2;
350
351 // packetSize
352 memcpy(receivedData.packetSize, receivedPacket + offset, 4);
353 offset += 4;
354 // command id
355 memcpy((void *)&(receivedData.commandID), receivedPacket + offset,
356 sizeof(int));
357 offset += sizeof(int);
358 // command id
359 memcpy((void *)&(receivedData.commandResult), receivedPacket + offset,
360 sizeof(int));
361 offset += sizeof(int);
362
363 // data size
364 memcpy((void *)&(receivedData.dataSize), receivedPacket + offset,
365 sizeof(unsigned int));
366
367 offset += sizeof(unsigned int);
368 // data
369 receivedData.data = (char *)start_pos + offset;
370 offset += receivedData.dataSize;
371 // data hash
372 memcpy((void *)&(receivedData.dataHash), receivedPacket + offset,
373 sizeof(unsigned int));
374 free(receivedPacket);
375 receivedPacket = NULL;
376
377 // hash check
378 unsigned int dataHash = MurMurHash(receivedData.data, receivedData.dataSize);
379
380 // got wrong, discard these data
381 if (dataHash != receivedData.dataHash) {
382 *start_of = 0;
383 *end_of = 0;
384 return nullptr;
385 }
386
387 *start_of = start_pos - buffer;
388 *end_of = *start_of + pkt_size;
389
390 return (RkAiqSocketPacket *)clonePacket(&receivedData, RKAIQ_MESSAGE_OLD);
391 }
392
MurMurHash(const void * key,int len)393 unsigned int MessageParser::MurMurHash(const void *key, int len) {
394 const unsigned int m = 0x5bd1e995;
395 const int r = 24;
396 const int seed = 97;
397 unsigned int h = seed ^ len;
398 // Mix 4 bytes at a time into the hash
399 const unsigned char *data = (const unsigned char *)key;
400 while (len >= 4) {
401 unsigned int k = *(unsigned int *)data;
402 k *= m;
403 k ^= k >> r;
404 k *= m;
405 h *= m;
406 h ^= k;
407 data += 4;
408 len -= 4;
409 }
410 #if defined(__GNUC__) && !defined(__clang__)
411 #pragma GCC diagnostic push
412 #pragma GCC diagnostic ignored "-Wimplicit-fallthrough"
413 #endif
414 // Handle the last few bytes of the input array
415 switch (len) {
416 case 3:
417 h ^= data[2] << 16;
418 case 2:
419 h ^= data[1] << 8;
420 case 1:
421 h ^= data[0];
422 h *= m;
423 };
424 #if defined(__GNUC__) && !defined(__clang__)
425 #pragma GCC diagnostic pop
426 #endif
427 // Do a few final mixes of the hash to ensure the last few
428 // bytes are well-incorporated.
429 h ^= h >> 13;
430 h *= m;
431 h ^= h >> 15;
432 return h;
433 }
434
start()435 int MessageParser::start() {
436 const std::lock_guard<std::mutex> lock(proc_mutex);
437 if (is_running) {
438 return -1;
439 }
440
441 is_running = true;
442 proc_thread =
443 std::make_shared<std::thread>(&RkMSG::MessageParser::process, this);
444
445 return 0;
446 }
447
448 } // namespace RkMSG
449