xref: /OK3568_Linux_fs/external/camera_engine_rkaiq/rkaiq/ipc_server/MessageParser.cpp (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1 /*
2  * Copyright (c) 2019-2021 Rockchip Eletronics Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "MessageParser.hpp"
18 
19 #define BUFFER_MAX_SIZE (1024 * 512)
20 
21 #ifndef LOGE
22 #define LOGE printf
23 #endif
24 
25 // #define AIQ_MSGPARSER_CHECK_HASH
26 
27 #define CALC_32BIT_LITTLE(array)                                               \
28   ((array[0] & 0xff) | ((array[1] & 0xff) << 8) | ((array[2] & 0xff) << 16) |  \
29    ((array[3] & 0xff) << 24))
30 
31 namespace RkMSG {
32 
MessageParser(void * ptr)33 MessageParser::MessageParser(void* ptr) : pri(ptr), is_running(false) {
34 }
35 
stop()36 int MessageParser::stop() {
37   is_running = false;
38   notify_wakeup();
39   return 0;
40 }
41 
~MessageParser()42 MessageParser::~MessageParser() {
43   this->stop();
44   if (this->proc_thread && this->proc_thread->joinable()) {
45     this->proc_thread->join();
46     this->proc_thread.reset();
47     this->proc_thread = nullptr;
48   }
49   if (raw_stream.size()) {
50     raw_stream.clear();
51   }
52 }
53 
notify_wakeup()54 int MessageParser::notify_wakeup() {
55   std::unique_lock<std::mutex> lck(this->proc_mutex);
56   this->proc_cond.notify_all();
57 
58   return 0;
59 }
60 
freePacket(void * packet,MessageType type)61 int MessageParser::freePacket(void *packet, MessageType type) {
62   if (!packet) {
63     return 0;
64   }
65 
66   if (type == RKAIQ_MESSAGE_NEW) {
67     RkAiqSocketPacket_t *newpkt = (RkAiqSocketPacket_t *)packet;
68     if (newpkt->data) {
69       free(newpkt->data);
70     }
71     free(newpkt);
72   } else if (type == RKAIQ_MESSAGE_OLD) {
73     RkAiqSocketPacket *newpkt = (RkAiqSocketPacket *)packet;
74     if (newpkt->data) {
75       free(newpkt->data);
76     }
77     free(newpkt);
78   }
79 
80   return 0;
81 }
82 
clonePacket(void * from,MessageType type)83 void *MessageParser::clonePacket(void *from, MessageType type) {
84   if (type == RKAIQ_MESSAGE_NEW) {
85     RkAiqSocketPacket_t *temp = (RkAiqSocketPacket_t *)from;
86     RkAiqSocketPacket_t *opkt =
87         (RkAiqSocketPacket_t *)malloc(sizeof(RkAiqSocketPacket_t));
88     if (!opkt) {
89       return nullptr;
90     }
91 
92     memcpy(opkt, temp, sizeof(RkAiqSocketPacket_t));
93     opkt->data = (uint8_t *)malloc(temp->payload_size + 1);
94 
95     if (!opkt->data) {
96       free(opkt);
97       return nullptr;
98     }
99 
100     memcpy(opkt->data, (uint8_t *)&(temp->data), temp->payload_size);
101     opkt->data[temp->payload_size] = '\0';
102 
103     return opkt;
104   } else if (type == RKAIQ_MESSAGE_OLD) {
105     RkAiqSocketPacket *temp = (RkAiqSocketPacket *)from;
106     RkAiqSocketPacket *opkt =
107         (RkAiqSocketPacket *)malloc(sizeof(RkAiqSocketPacket));
108     if (!opkt) {
109       return nullptr;
110     }
111 
112     memcpy(opkt, temp, sizeof(RkAiqSocketPacket));
113     opkt->data = (char *)malloc(temp->dataSize + 1);
114 
115     if (!opkt->data) {
116       free(opkt);
117       return nullptr;
118     }
119 
120     memcpy(opkt->data, temp->data, temp->dataSize);
121     opkt->data[temp->dataSize] = '\0';
122 
123     return opkt;
124   }
125 
126   return nullptr;
127 }
128 
process()129 void MessageParser::process() {
130   while (is_running) {
131     std::unique_lock<std::mutex> lck(proc_mutex);
132     while (raw_stream.size() <= 0 && is_running) {
133       proc_cond.wait(lck);
134     }
135 
136     if (!is_running) {
137       break;
138     }
139 
140     // Found full packet: deal and call then erease
141     // Found non full packet: wait
142     // Found nothing: clear raw stream
143 
144     size_t start_index = 0;
145     size_t end_index = 0;
146 
147     RkAiqSocketPacket *old_pkt = nullptr;
148     RkAiqSocketPacket_t *new_pkt = nullptr;
149 
150     // Check if a new packet, if so, copy it and erase all ahead data
151     new_pkt = findValidSection(&raw_stream[0], raw_stream.size(), &start_index,
152                                &end_index);
153     // if found full packet
154     if (new_pkt && mCallBackFunc) {
155       mCallBackFunc(pri, new_pkt, RKAIQ_MESSAGE_NEW);
156     }
157 
158     if (((ssize_t)start_index >= 0) && (end_index > 0)) {
159       raw_stream.erase(raw_stream.begin(), raw_stream.begin() + end_index);
160     }
161 
162     start_index = 0;
163     end_index = 0;
164 
165     // Check if a new packet, if so, copy it and erase all ahead data
166     old_pkt = findValidSection2(raw_stream.size() > 0 ? &raw_stream[0] : NULL,
167                                 raw_stream.size(), &start_index, &end_index);
168     // if found full packet
169     if (old_pkt && mCallBackFunc) {
170       mCallBackFunc(pri, old_pkt, RKAIQ_MESSAGE_OLD);
171       freePacket(old_pkt, RKAIQ_MESSAGE_OLD);
172     }
173 
174     if (((ssize_t)start_index >= 0) && (end_index > 0)) {
175       raw_stream.erase(raw_stream.begin(), raw_stream.begin() + end_index);
176     }
177 
178     if (!new_pkt && !old_pkt && is_running) {
179       proc_cond.wait(lck);
180     }
181   }
182   LOGE("MessageParser %s loop exit!\n", __func__);
183 }
184 
pushRawData(const uint8_t * data,size_t size)185 int MessageParser::pushRawData(const uint8_t *data, size_t size) {
186   {
187     int erase_section = 0;
188     const std::lock_guard<std::mutex> lock(proc_mutex);
189 
190     // reach max buffer size, erase old
191     if (size > RKAIQ_RAW_STREAM_MAX_SIZE) {
192       erase_section = RKAIQ_RAW_STREAM_MAX_SIZE;
193     } else {
194       erase_section = size;
195     }
196 
197     // do need erase
198     if (raw_stream.size() >= RKAIQ_RAW_STREAM_MAX_SIZE) {
199       raw_stream.erase(raw_stream.begin(), raw_stream.begin() + erase_section);
200     }
201 
202     raw_stream.insert(raw_stream.end(), data, data + size);
203   }
204 
205   notify_wakeup();
206 
207   return 0;
208 }
209 
bit_stream_find(uint8_t * data,int size,const uint8_t * dst,int len)210 uint8_t *MessageParser::bit_stream_find(uint8_t *data, int size,
211                                         const uint8_t *dst, int len) {
212   int start_pos = -1;
213 
214   if (!data || !size || !dst || !len) {
215     return NULL;
216   }
217 
218   if (size < len) {
219     return NULL;
220   }
221 
222   for (start_pos = 0; start_pos < size - len; start_pos++) {
223     if (0 == memcmp(data + start_pos, dst, len)) {
224       return data + start_pos;
225     }
226   }
227 
228   return NULL;
229 }
230 
231 // 1. find valid erase
232 // 2. find error packet
233 // 3. if crashed erase header then search again
234 // 4. erase ahead of first valid data
235 // 5. return null with non zero start en,then packet error
236 
findValidSection(uint8_t * buffer,int len,size_t * start_of,size_t * end_of)237 RkAiqSocketPacket_t *MessageParser::findValidSection(uint8_t *buffer, int len,
238                                                      size_t *start_of,
239                                                      size_t *end_of) {
240   RkAiqSocketPacket_t *aiq_pkt;
241   uint8_t *start_pos = NULL;
242   size_t skip_size = 0;
243   size_t remain_size = 0;
244 
245   *start_of = 0;
246   *end_of = 0;
247 
248   start_pos = bit_stream_find(buffer, len, RKAIQ_SOCKET_DATA_HEADER,
249                               RKAIQ_SOCKET_DATA_HEADER_LEN);
250 
251   // Found valid start
252   if (NULL == start_pos) {
253     return nullptr;
254   }
255 
256   // Calculate data size
257   skip_size = start_pos - buffer;
258   remain_size = len - skip_size;
259 
260   // Check if contains packet information
261   if (remain_size < (int)sizeof(RkAiqSocketPacket_t)) {
262     LOGE("Not a complete packet [%d], wait more...\n", len);
263     return nullptr;
264   }
265 
266   /*
267    * +---------------------------------------------------------------------+
268    * |<--------------------------VALID DATA SIZE-------------------------->|
269    * +-------------------+------------------------------+------------------+
270    * |<---HEADER-SIZE--->|<--------PAYLOAD-SIZE-------->|<---HASH-SIZE---->|
271    * +-------------------+------------------------------+------------------+
272    * |   HEADER DATA     |      REAL DATA/CMD           |   VERIFY DATA    |
273    * +-------------------+------------------------------+------------------+
274    *
275    * */
276 
277   // Found complete packet header, then parse packet info
278   aiq_pkt = (RkAiqSocketPacket_t *)start_pos;
279 
280   // Assume Single packet, check if data all present
281   if (remain_size < (aiq_pkt->packet_size + RKAIQ_SOCKET_DATA_EXTRA_SIZE)) {
282     return nullptr;
283   }
284 
285   *start_of = start_pos - buffer;
286   *end_of = *start_of + aiq_pkt->payload_size + RKAIQ_SOCKET_DATA_EXTRA_SIZE;
287 
288 #ifdef AIQ_MSGPARSER_CHECK_HASH
289 #endif
290 
291   return (RkAiqSocketPacket_t *)clonePacket(aiq_pkt, RKAIQ_MESSAGE_NEW);
292 }
293 
findValidSection2(uint8_t * buffer,int len,size_t * start_of,size_t * end_of)294 RkAiqSocketPacket *MessageParser::findValidSection2(uint8_t *buffer, int len,
295                                                     size_t *start_of,
296                                                     size_t *end_of) {
297   RkAiqSocketPacket *aiq_pkt;
298   uint8_t *start_pos = NULL;
299   size_t skip_size = 0;
300   size_t remain_size = 0;
301   size_t pkt_size = 0;
302 
303   *start_of = 0;
304   *end_of = 0;
305 
306   start_pos = bit_stream_find(buffer, len, RKAIQ_SOCKET_OLD_HEADER,
307                               RKAIQ_SOCKET_OLD_HEADER_LEN);
308 
309   // Found valid start
310   if (NULL == start_pos) {
311     return nullptr;
312   }
313 
314   // Calculate data size
315   skip_size = start_pos - buffer;
316   remain_size = len - skip_size;
317 
318   // Check if contains packet information
319   if (remain_size < ((int)sizeof(RkAiqSocketPacket) - sizeof(void*))) {
320     LOGE("Not a complete packet [%d], wait more...\n", len);
321     return nullptr;
322   }
323 
324   // Found complete packet header, then parse packet info
325   aiq_pkt = (RkAiqSocketPacket *)start_pos;
326   pkt_size = CALC_32BIT_LITTLE(aiq_pkt->packetSize);
327 
328   // Assume Single packet, check if data all present
329   if (remain_size < pkt_size) {
330     return nullptr;
331   }
332 
333   char *tmpArray = (char *)start_pos;
334   int packetSize = (tmpArray[2] & 0xff) | ((tmpArray[3] & 0xff) << 8) |
335                    ((tmpArray[4] & 0xff) << 16) | ((tmpArray[5] & 0xff) << 24);
336 
337   if (packetSize >= 102400) {
338     LOGE("MessageParser %s: packetSize error!\n", __func__);
339     return nullptr;
340   }
341 
342   char *receivedPacket = (char *)malloc(packetSize);
343   memset(receivedPacket, 0, packetSize);
344   memcpy(receivedPacket, tmpArray, packetSize);
345 
346   // parse data
347   RkAiqSocketPacket receivedData;
348   int offset = 0;
349   offset += 2;
350 
351   // packetSize
352   memcpy(receivedData.packetSize, receivedPacket + offset, 4);
353   offset += 4;
354   // command id
355   memcpy((void *)&(receivedData.commandID), receivedPacket + offset,
356          sizeof(int));
357   offset += sizeof(int);
358   // command id
359   memcpy((void *)&(receivedData.commandResult), receivedPacket + offset,
360          sizeof(int));
361   offset += sizeof(int);
362 
363   // data size
364   memcpy((void *)&(receivedData.dataSize), receivedPacket + offset,
365          sizeof(unsigned int));
366 
367   offset += sizeof(unsigned int);
368   // data
369   receivedData.data = (char *)start_pos + offset;
370   offset += receivedData.dataSize;
371   // data hash
372   memcpy((void *)&(receivedData.dataHash), receivedPacket + offset,
373          sizeof(unsigned int));
374   free(receivedPacket);
375   receivedPacket = NULL;
376 
377   // hash check
378   unsigned int dataHash = MurMurHash(receivedData.data, receivedData.dataSize);
379 
380   // got wrong, discard these data
381   if (dataHash != receivedData.dataHash) {
382     *start_of = 0;
383     *end_of = 0;
384     return nullptr;
385   }
386 
387   *start_of = start_pos - buffer;
388   *end_of = *start_of + pkt_size;
389 
390   return (RkAiqSocketPacket *)clonePacket(&receivedData, RKAIQ_MESSAGE_OLD);
391 }
392 
MurMurHash(const void * key,int len)393 unsigned int MessageParser::MurMurHash(const void *key, int len) {
394   const unsigned int m = 0x5bd1e995;
395   const int r = 24;
396   const int seed = 97;
397   unsigned int h = seed ^ len;
398   // Mix 4 bytes at a time into the hash
399   const unsigned char *data = (const unsigned char *)key;
400   while (len >= 4) {
401     unsigned int k = *(unsigned int *)data;
402     k *= m;
403     k ^= k >> r;
404     k *= m;
405     h *= m;
406     h ^= k;
407     data += 4;
408     len -= 4;
409   }
410 #if defined(__GNUC__) && !defined(__clang__)
411 #pragma GCC diagnostic push
412 #pragma GCC diagnostic ignored "-Wimplicit-fallthrough"
413 #endif
414   // Handle the last few bytes of the input array
415   switch (len) {
416   case 3:
417     h ^= data[2] << 16;
418   case 2:
419     h ^= data[1] << 8;
420   case 1:
421     h ^= data[0];
422     h *= m;
423   };
424 #if defined(__GNUC__) && !defined(__clang__)
425 #pragma GCC diagnostic pop
426 #endif
427   // Do a few final mixes of the hash to ensure the last few
428   // bytes are well-incorporated.
429   h ^= h >> 13;
430   h *= m;
431   h ^= h >> 15;
432   return h;
433 }
434 
start()435 int MessageParser::start() {
436   const std::lock_guard<std::mutex> lock(proc_mutex);
437   if (is_running) {
438     return -1;
439   }
440 
441   is_running = true;
442   proc_thread =
443       std::make_shared<std::thread>(&RkMSG::MessageParser::process, this);
444 
445   return 0;
446 }
447 
448 } // namespace RkMSG
449