xref: /OK3568_Linux_fs/external/camera_engine_rkaiq/rkaiq/ipc_server/socket_server.cpp (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1 #include "socket_server.h"
2 #include "xcam_obj_debug.h"
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <sys/time.h>
6 
7 #ifdef __ANDROID__
8 #include <cutils/sockets.h>
9 #endif
10 
11 #ifdef __ANDROID__
12 #define UNIX_DOMAIN "/dev/socket/camera_tool"
13 #else
14 #define UNIX_DOMAIN "/tmp/UNIX.domain"
15 #endif
16 
17 #define RKAIQ_SOCKET_DATA_OFFSET 24
18 #define RKAIQ_SOCKET_OLD_HEADER_LEN 2
19 #define RKAIQ_SOCKET_DATA_HEADER_LEN 4
20 std::mutex SocketServer::send_mutex;
21 
22 typedef struct aiq_tunning_ctx_s {
23   int socketfd;
24   rk_aiq_sys_ctx_t *aiq_ctx;
25   RkAiqSocketPacket_t *aiq_data;
26 } aiq_tunning_ctx;
27 
28 int onPacketHandle(void *pri, void *packet, MessageType type);
29 
SocketServer()30 SocketServer::SocketServer()
31     : tool_mode_on(false), sockfd(-1), client_socket(-1), quit_(0),
32       serverAddress{AF_UNIX, ""}, clientAddress{AF_UNIX, ""}, aiq_ctx(nullptr),
33       accept_threads_(nullptr), tunning_thread(nullptr),
34       callback_(nullptr), _stop_fds{-1, -1} {
35   msg_parser =
36       std::unique_ptr<RkMSG::MessageParser>(new RkMSG::MessageParser(this));
37   msg_parser->setMsgCallBack(onPacketHandle);
38   msg_parser->start();
39 }
40 
~SocketServer()41 SocketServer::~SocketServer() {
42 }
43 
SaveEixt()44 void SocketServer::SaveEixt() {
45   LOGV_IPC("SocketServer::%s enter", __func__);
46   quit_ = 1;
47   if (_stop_fds[1] != -1) {
48     char buf = 0xf; // random value to write to flush fd.
49     unsigned int size = write(_stop_fds[1], &buf, sizeof(char));
50     if (size != sizeof(char)) {
51       LOGW("Flush write not completed");
52     }
53   }
54 }
55 
hexdump2(char * buf,const int num)56 void hexdump2(char *buf, const int num) {
57   int i;
58   for (i = 0; i < num; i++) {
59     LOGD_IPC("%02X ", buf[i]);
60     if ((i + 1) % 32 == 0) {
61     }
62   }
63   return;
64 }
65 
ProcessText(int client_socket,rk_aiq_sys_ctx_t * ctx,RkAiqSocketPacket * receivedData)66 int ProcessText(int client_socket, rk_aiq_sys_ctx_t *ctx,
67                 RkAiqSocketPacket *receivedData) {
68   int ret = -1;
69   RkAiqSocketPacket dataReply;
70   ret = ProcessCommand(ctx, receivedData, &dataReply);
71   if (ret != -1) {
72     const std::lock_guard<std::mutex> lock(SocketServer::send_mutex);
73     unsigned int packetSize =
74         sizeof(RkAiqSocketPacket) + dataReply.dataSize - sizeof(char *);
75     memcpy(dataReply.packetSize, &packetSize, 4);
76     char *dataToSend = (char *)malloc(packetSize);
77     int offset = 0;
78     char magic[2] = {'R', 'K'};
79     memset(dataToSend, 0, packetSize);
80     memcpy(dataToSend, magic, 2);
81     offset += 2;
82     memcpy(dataToSend + offset, dataReply.packetSize, 4);
83     offset += 4;
84     memcpy(dataToSend + offset, (void *)&dataReply.commandID,
85            sizeof(dataReply.commandID));
86     offset += sizeof(dataReply.commandID);
87     memcpy(dataToSend + offset, (void *)&dataReply.commandResult,
88            sizeof(dataReply.commandResult));
89     offset += sizeof(dataReply.commandResult);
90     memcpy(dataToSend + offset, (void *)&dataReply.dataSize,
91            sizeof(dataReply.dataSize));
92     offset += sizeof(dataReply.dataSize);
93     memcpy(dataToSend + offset, dataReply.data, dataReply.dataSize);
94     offset += dataReply.dataSize;
95     // LOGE_IPC("offset is %d,packetsize is %d",offset,packetSize);
96     memcpy(dataToSend + offset, (void *)&dataReply.dataHash,
97            sizeof(dataReply.dataHash));
98     send(client_socket, dataToSend, packetSize, 0);
99     if (dataReply.data != NULL) {
100       free(dataReply.data);
101       dataReply.data = NULL;
102     }
103     free(dataToSend);
104     dataToSend = NULL;
105   } else {
106     return -1;
107   }
108   return 0;
109 }
110 
Send(int cilent_socket,char * buff,int size)111 int SocketServer::Send(int cilent_socket, char *buff, int size) {
112   return send(cilent_socket, buff, size, 0);
113 }
114 
Recvieve()115 int SocketServer::Recvieve() { return 0; }
116 
bit_stream_find(uint8_t * data,int size,const uint8_t * dst,int len)117 uint8_t *bit_stream_find(uint8_t *data, int size, const uint8_t *dst, int len) {
118   int start_pos = -1;
119 
120   if (!data || !size || !dst || !len) {
121     return NULL;
122   }
123 
124   if (size < len) {
125     return NULL;
126   }
127 
128   for (start_pos = 0; start_pos < size - len; start_pos++) {
129     if (0 == memcmp(data + start_pos, dst, len)) {
130       return data + start_pos;
131     }
132   }
133 
134   return NULL;
135 }
136 
rkaiq_ipc_send(int sockfd,int id,int ack,int seqn,void * data,uint32_t data_len)137 int rkaiq_ipc_send(int sockfd, int id, int ack, int seqn, void *data,
138                    uint32_t data_len) {
139   uint32_t out_len = sizeof(RkAiqSocketPacket_t) - sizeof(char *) + data_len;
140   char *out_data = (char *)malloc(out_len);
141   RkAiqSocketPacket_t *out_res = (RkAiqSocketPacket_t *)out_data;
142   const std::lock_guard<std::mutex> lock(SocketServer::send_mutex);
143   int ret = 0;
144 
145   out_res->magic[0] = 'R';
146   out_res->magic[1] = 0xAA;
147   out_res->magic[2] = 0xFF;
148   out_res->magic[3] = 'K';
149 
150   out_res->cmd_id = id;
151   out_res->cmd_ret = ack;
152   out_res->payload_size = data_len;
153   out_res->sequence = seqn;
154   out_res->packet_size = data_len;
155 
156   memcpy(&out_res->data, data, data_len);
157 
158   ret = send(sockfd, out_data, out_len, 0);
159 
160   free(out_data);
161 
162   return 0;
163 }
164 
165 // return 0 if a sigle packet or payload size
rkaiq_packet_parse_old(RkAiqSocketPacket * aiq_data,uint8_t * buffer,int len)166 int rkaiq_packet_parse_old(RkAiqSocketPacket *aiq_data, uint8_t *buffer,
167                            int len) {
168   uint8_t *start_pos = NULL;
169   uint32_t packet_size = 0;
170   uint32_t valid_size = 0;
171   RkAiqSocketPacket *aiq_pkt = NULL;
172 
173   if (buffer[0] == 'R' && buffer[1] == 'K') {
174     start_pos = buffer;
175   }
176 
177   if (start_pos) {
178     if ((len - (start_pos - buffer)) < (int)sizeof(RkAiqSocketPacket)) {
179       LOGE_IPC("Not a complete packet [%d], discard!\n", len);
180       return -1;
181     }
182 
183     aiq_pkt = (RkAiqSocketPacket *)start_pos;
184     memcpy(aiq_data, aiq_pkt, sizeof(RkAiqSocketPacket));
185 
186     packet_size = (start_pos[2] & 0xff) | ((start_pos[3] & 0xff) << 8) |
187                   ((start_pos[4] & 0xff) << 16) | ((start_pos[5] & 0xff) << 24);
188 
189     // refer to the real offset of data
190     aiq_data->data = (char *)(&start_pos);
191     aiq_data->dataSize = packet_size;
192     valid_size = (buffer + len) - start_pos;
193 
194     // sigle packet : HEAD:24byte + PAYLOAD + CRC:1byte
195     if (valid_size == packet_size) {
196       return 0;
197     }
198     return packet_size;
199   } else {
200     // may be fragment packet, head already parsed just return full size
201     return -1;
202   }
203 }
204 
205 // return 0 if a sigle packet or payload size
rkaiq_packet_parse(RkAiqSocketPacket_t * aiq_data,uint8_t * buffer,int len)206 int rkaiq_packet_parse(RkAiqSocketPacket_t *aiq_data, uint8_t *buffer,
207                        int len) {
208   uint8_t *start_pos = NULL;
209   uint32_t packet_size = 0;
210   RkAiqSocketPacket_t *aiq_pkt = NULL;
211 
212   start_pos = bit_stream_find(buffer, len, RKAIQ_SOCKET_DATA_HEADER,
213                               RKAIQ_SOCKET_DATA_HEADER_LEN);
214 
215   if (start_pos) {
216     if ((len - (start_pos - buffer)) < (int)sizeof(RkAiqSocketPacket_t)) {
217       LOGE_IPC("Not a complete packet [%d], discard!\n", len);
218       return -1;
219     }
220 
221     aiq_pkt = (RkAiqSocketPacket_t *)start_pos;
222     packet_size = &buffer[len - 1] - start_pos;
223 
224     memcpy(aiq_data, aiq_pkt, sizeof(RkAiqSocketPacket_t));
225 
226     // refer to the real offset of data
227     aiq_data->data = (uint8_t *)(&aiq_pkt->data);
228 
229     // sigle packet : HEAD:24byte + PAYLOAD + CRC:1byte
230     if (aiq_pkt->payload_size <= (packet_size - 1)) {
231       return 0;
232     }
233     return packet_size;
234   } else {
235     // may be fragment packet, head already parsed just return full size
236     return -1;
237   }
238 }
239 
rkaiq_is_uapi(const char * cmd_str)240 int rkaiq_is_uapi(const char *cmd_str) {
241   if (strstr(cmd_str, "uapi/0/")) {
242     return 1;
243   } else {
244     return 0;
245   }
246 }
247 
rkaiq_params_tuning(aiq_tunning_ctx * tunning_ctx)248 void rkaiq_params_tuning(aiq_tunning_ctx *tunning_ctx) {
249   LOGV_IPC("SocketServer::%s enter", __func__);
250   int sockfd = -1;
251   rk_aiq_sys_ctx_t *aiq_ctx = NULL;
252   RkAiqSocketPacket_t *aiq_data = NULL;
253 
254   if (!tunning_ctx) {
255     return;
256   }
257 
258   sockfd = tunning_ctx->socketfd;
259   aiq_ctx = tunning_ctx->aiq_ctx;
260   aiq_data = tunning_ctx->aiq_data;
261 
262   LOGI_IPC("[TCP]%d,%d,%d--->PC CMD STRING:\n%s\n", sockfd, aiq_data->cmd_id,
263          aiq_data->payload_size, aiq_data->data);
264 
265   switch (aiq_data->cmd_id) {
266   case AIQ_IPC_CMD_WRITE: {
267     if (rkaiq_is_uapi((char *)aiq_data->data)) {
268       char *ret_str_js = NULL;
269       rkaiq_uapi_unified_ctl(aiq_ctx, (char *)aiq_data->data, &ret_str_js, 0);
270     } else {
271       rk_aiq_uapi_sysctl_tuning(aiq_ctx, (char *)aiq_data->data);
272     }
273   } break;
274   case AIQ_IPC_CMD_READ: {
275     char *out_data = NULL;
276     if (rkaiq_is_uapi((char *)aiq_data->data)) {
277       rkaiq_uapi_unified_ctl(aiq_ctx, (char *)aiq_data->data, &out_data, 1);
278     } else {
279       out_data = rk_aiq_uapi_sysctl_readiq(aiq_ctx, (char *)aiq_data->data);
280     }
281 
282     if (!out_data) {
283       LOGE_IPC("[Tuning]: aiq return NULL!\n");
284       break;
285     }
286     LOGI_IPC("---> read:\n%s\n", out_data);
287     rkaiq_ipc_send(sockfd, AIQ_IPC_CMD_READ, 0, 0, out_data, strlen(out_data));
288     if (out_data)
289       free(out_data);
290   } break;
291   default:
292     break;
293   }
294 
295   if (aiq_data) {
296     RkMSG::MessageParser::freePacket(aiq_data, RKAIQ_MESSAGE_NEW);
297   }
298 
299   free(tunning_ctx);
300 }
301 
packetHandle(void * packet,MessageType type)302 int SocketServer::packetHandle(void *packet, MessageType type) {
303   LOGV_IPC("SocketServer::%s enter", __func__);
304   if (type == RKAIQ_MESSAGE_NEW) {
305     RkAiqSocketPacket_t *aiq_data = (RkAiqSocketPacket_t *)packet;
306 
307     if (this->tunning_thread && this->tunning_thread->joinable()) {
308       this->tunning_thread->join();
309       this->tunning_thread.reset();
310       this->tunning_thread = nullptr;
311     }
312 
313     aiq_tunning_ctx *tunning_ctx =
314         (aiq_tunning_ctx *)calloc(1, sizeof(aiq_tunning_ctx));
315     tunning_ctx->aiq_data = aiq_data;
316     tunning_ctx->aiq_ctx = aiq_ctx;
317     tunning_ctx->socketfd = client_socket;
318 
319     this->tunning_thread = std::make_shared<std::thread>(
320         std::thread(rkaiq_params_tuning, tunning_ctx));
321   } else {
322     RkAiqSocketPacket *aiq_data = (RkAiqSocketPacket *)packet;
323     ProcessText(client_socket, aiq_ctx, aiq_data);
324   }
325 
326   return 0;
327 }
328 
onPacketHandle(void * pri,void * packet,MessageType type)329 int onPacketHandle(void *pri, void *packet, MessageType type) {
330   LOGV_IPC("SocketServer::%s enter", __func__);
331   SocketServer *server = (SocketServer *)pri;
332   if (server) {
333     server->packetHandle(packet, type);
334   }
335 
336   return 0;
337 }
338 
Recvieve(int sync)339 int SocketServer::Recvieve(int sync) {
340   LOGV_IPC("SocketServer::%s enter", __func__);
341   uint8_t buffer[MAXPACKETSIZE];
342   struct timeval interval = {3, 0};
343 
344   setsockopt(client_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&interval,
345              sizeof(struct timeval));
346   while (!quit_) {
347     int recv_len = -1;
348 
349     memset(&buffer, 0, MAXPACKETSIZE);
350 
351     // 1. recv MAX SIZE every timne.
352     recv_len = recv(client_socket, buffer, MAXPACKETSIZE, 0);
353 
354     LOGD_IPC("SocketServer::%s recv_len %d", __func__, recv_len);
355     if (recv_len == 0) {
356       break;
357     }
358 
359     if (recv_len < 0) {
360       continue;
361     }
362 
363     msg_parser->pushRawData(buffer, recv_len);
364 
365     if (sync) {
366     }
367   }
368 
369   return 0;
370 }
371 
372 #define POLL_STOP_RET (3)
373 
poll_event(int timeout_msec,int fds[])374 int SocketServer::poll_event(int timeout_msec, int fds[]) {
375   int num_fds = fds[1] == -1 ? 1 : 2;
376   struct pollfd poll_fds[num_fds];
377   int ret = 0;
378 
379   memset(poll_fds, 0, sizeof(poll_fds));
380   poll_fds[0].fd = fds[0];
381   poll_fds[0].events = (POLLIN | POLLOUT | POLLHUP);
382 
383   if (fds[1] != -1) {
384     poll_fds[1].fd = fds[1];
385     poll_fds[1].events = POLLPRI | POLLIN | POLLOUT;
386     poll_fds[1].revents = 0;
387   }
388 
389   ret = poll(poll_fds, num_fds, timeout_msec);
390   if (fds[1] != -1) {
391     if ((poll_fds[1].revents & POLLIN) || (poll_fds[1].revents & POLLPRI)) {
392       LOGD_IPC("%s: Poll returning from flush", __FUNCTION__);
393       return POLL_STOP_RET;
394     }
395   }
396 
397   if (ret > 0 && (poll_fds[0].revents & (POLLERR | POLLNVAL | POLLHUP))) {
398     LOGE_IPC("polled error");
399     return -1;
400   }
401 
402   return ret;
403 }
404 
Accepted()405 void SocketServer::Accepted() {
406   LOGV_IPC("SocketServer::%s enter", __func__);
407   struct timeval interval = {3, 0};
408   setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&interval,
409              sizeof(struct timeval));
410   while (!quit_) {
411     // int client_socket;
412     socklen_t sosize = sizeof(clientAddress);
413     int fds[2] = {sockfd, _stop_fds[0]};
414     int poll_ret = poll_event(-1, fds);
415     if (poll_ret == POLL_STOP_RET) {
416       LOG1("poll socket stop success !");
417       break;
418     } else if (poll_ret <= 0) {
419       LOGW("poll socket got error(0x%x) but continue\n");
420       ::usleep(10000); // 10ms
421       continue;
422     }
423     client_socket = accept(sockfd, (struct sockaddr *)&clientAddress, &sosize);
424     if (client_socket < 0) {
425       if (errno != EAGAIN)
426         LOGE_IPC("Error socket accept failed %d %d\n", client_socket, errno);
427       continue;
428     }
429     LOGD_IPC("socket accept ip %s\n", serverAddress);
430     tool_mode_set(true);
431 
432     // std::shared_ptr<std::thread> recv_thread;
433     // recv_thread = make_shared<thread>(&SocketServer::Recvieve, this,
434     // client_socket); recv_thread->join(); recv_thread = nullptr;
435     this->Recvieve(0);
436     close(client_socket);
437     LOGD_IPC("socket accept close\n");
438     tool_mode_set(false);
439   }
440   LOGD_IPC("socket accept exit\n");
441 }
442 
443 #ifdef __ANDROID__
getAndroidLocalSocket()444 int SocketServer::getAndroidLocalSocket() {
445   static const char socketName[] = "camera_tool";
446   int sock = android_get_control_socket(socketName);
447 
448   if (sock < 0) {
449     // TODO(Cody): will always failed with permission denied
450     // Should let init to create socket
451     sock = socket_local_server(socketName, ANDROID_SOCKET_NAMESPACE_RESERVED,
452                                SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK);
453   }
454 
455   return sock;
456 }
457 #endif
458 
Process(rk_aiq_sys_ctx_t * ctx,int camid)459 int SocketServer::Process(rk_aiq_sys_ctx_t *ctx, int camid) {
460   LOGV_IPC("SocketServer::%s enter", __func__);
461   int opt = 1;
462   aiq_ctx = ctx;
463 
464 #ifdef __ANDROID__
465   sockfd = getAndroidLocalSocket();
466   if (sockfd < 0) {
467     LOGE_IPC("Error get socket %s\n", strerror(errno));
468     return -1;
469   }
470   fcntl(sockfd, F_SETFD, FD_CLOEXEC);
471 #else
472   sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
473   memset(&serverAddress, 0, sizeof(serverAddress));
474 
475   serverAddress.sun_family = AF_LOCAL;
476   snprintf(serverAddress.sun_path, sizeof(serverAddress.sun_path),
477            "%s%d", UNIX_DOMAIN, camid > 0 ? camid : 0);
478   unlink(serverAddress.sun_path);
479 
480   if ((::bind(sockfd, (struct sockaddr *)&serverAddress,
481               sizeof(serverAddress))) < 0) {
482     LOGE_IPC("Error bind %s\n", strerror(errno));
483     return -1;
484   }
485 #endif
486   if (listen(sockfd, 5) < 0) {
487     LOGE_IPC("Error listen\n");
488     return -1;
489   }
490 
491   if (pipe(_stop_fds) < 0) {
492     LOGE_IPC("poll stop pipe error: %s", strerror(errno));
493   } else {
494     if (fcntl(_stop_fds[0], F_SETFL, O_NONBLOCK)) {
495       LOGE_IPC("Fail to set stop pipe flag: %s", strerror(errno));
496     }
497   }
498 
499   this->accept_threads_ = std::unique_ptr<std::thread>(
500       new std::thread(&SocketServer::Accepted, this));
501 
502   return 0;
503 }
504 
Deinit()505 void SocketServer::Deinit() {
506   LOGV_IPC("SocketServer::%s enter", __func__);
507   struct linger so_linger;
508   so_linger.l_onoff = 1;
509   so_linger.l_linger = 0;
510   this->SaveEixt();
511   // setsockopt(client_socket,SOL_SOCKET,SO_LINGER,&so_linger,sizeof(so_linger));
512   // struct timeval interval = {0, 0};
513   // setsockopt(client_socket, SOL_SOCKET, SO_RCVTIMEO, (char
514   // *)&interval,sizeof(struct timeval));
515   if (this->accept_threads_)
516     this->accept_threads_->join();
517   if (this->tunning_thread && this->tunning_thread->joinable())
518     this->tunning_thread->join();
519   // shutdown(client_socket, SHUT_RDWR);
520   // close(client_socket);
521 #ifndef __ANDROID__
522   unlink(serverAddress.sun_path);
523   close(sockfd);
524 #endif
525   this->accept_threads_ = nullptr;
526   this->tunning_thread = nullptr;
527   if (_stop_fds[0] != -1)
528     close(_stop_fds[0]);
529   if (_stop_fds[1] != -1)
530     close(_stop_fds[1]);
531   if (msg_parser) {
532     msg_parser->stop();
533   }
534 }
535