Init basesystem source codes.
[staging/basesystem.git] / video_in_hal / otherservice / rpc_library / library / src / rpc_udp.c
1 /*
2  * @copyright Copyright (c) 2016-2020 TOYOTA MOTOR CORPORATION.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 /**
18  * @file rpc_udp.c
19  * @brief RPC Library Internal Implementation--UDP Communication
20  *
21  */
22 /** @addtogroup RPClib_in */
23 /** @{ */
24 #include <stdio.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <sys/poll.h>
28 #include <sys/time.h>
29 #include <netinet/in.h>
30 #include <string.h>
31 #include <errno.h>
32 #include <unistd.h> // for usleep
33
34 #include <fcntl.h>
35 #include <sys/un.h>
36
37 #include <sys/inotify.h>
38
39 #include <other_service/rpc.h>
40 #include "rpc_internal.h"
41
42 static /*inline*/ UINT32
43 RpcGetSequenceNumber(RpcThreadInfo *th) {
44   RPC_THREAD_MUTEX_LOCK(th);
45   UINT32 ret = th->sequence_number;
46   (th->sequence_number)++;
47   if (th->sequence_number == RPC_SEQ_NUM_INVALID) {  // LCOV_EXCL_BR_LINE 200: overflow check, but test time is too long
48     AGL_ASSERT_NOT_TESTED();  // LCOV_EXCL_LINE 200: test assert
49     th->sequence_number = RPC_SEQ_NUM_START;  // LCOV_EXCL_LINE 200: overflow check, but test time is too long
50   }
51   RPC_THREAD_MUTEX_UNLOCK(th);
52   return ret;
53 }
54
55 /**
56  */
57 int
58 RpcReadUdpPacket(const RpcIdInfo *id, UINT8 *buf) {
59   struct sockaddr_un sa;
60   socklen_t sa_len = sizeof(sa);
61   // sa passed to recvfrom does not require initialization
62
63   for(;;) {
64
65     ssize_t ret = recvfrom(RPC_my_sock(id), buf, RPC_UDP_PACKET_SIZE,
66        0, (struct sockaddr *)&sa, &sa_len);
67
68     if (ret < 0) {
69       if (errno == EAGAIN) {  // LCOV_EXCL_BR_LINE 5: fail safe for libc recvfrom
70         //RPC_LOG_PERROR("recvfrom port %d", RPC_port(id));
71         return 0;
72       } else if (errno == EINTR) {  // LCOV_EXCL_START 5: fail safe for libc recvfrom
73         AGL_ASSERT_NOT_TESTED();  // LCOV_EXCL_LINE 200: test assert
74         continue;
75       } else {
76         RPC_LOG_PERROR("recvfrom port %d", RPC_port(id));
77         return -1;
78       }  // LCOV_EXCL_STOP
79     } else if (ret == 0) {
80       RPC_LOG_STATE("*** recvfrom ret 0");
81       return 0;
82     } else {
83       return (int)ret;
84     }
85     AGL_ASSERT_NOT_TESTED();  // LCOV_EXCL_LINE 200: test assert
86   }  // LCOV_EXCL_LINE 10: final line
87 }
88
89 /** Number of sendto retries */
90 #define RPC_SENDTO_RETRY 5
91
92 RUNS_IN_CALLERS_THREAD
93 RUNS_IN_READING_THREAD
94 /**
95  */
96 static int
97 RpcSendUdpPacket(RpcIdInfo *id,
98         struct sockaddr_un *to, int do_retry,
99         void *mesg, unsigned int bytes) {
100   int nretry = 0;
101   ssize_t ret;
102   int myerr;
103  retry:
104   ret = sendto(RPC_my_sock(id), mesg, bytes, 0,
105          (struct sockaddr *)to, RPC_SOCKET_ADDR_LEN);
106   myerr = errno;
107
108   if (ret < 0) {
109     RPC_LOG_STATE("*** sendto %s ***", to->sun_path + 1);
110     if (myerr == EAGAIN  || (do_retry && myerr==ECONNREFUSED && ++nretry <= RPC_SENDTO_RETRY)) {  // LCOV_EXCL_BR_LINE 11: Unexpected branch // NOLINT(readability/nolint)
111       usleep(100000);
112 #if defined(RPC_USE_UNIX_AUTOBIND)
113       RPC_LOG_DEBUG("*** sendto %s ***", to->sun_path + 1);
114 #else  /* AUTOBIND */
115       RPC_LOG_DEBUG("**************** sendto retry *********************");
116 #endif  /* !AUTOBIND */
117       goto retry;
118     }
119     errno = myerr;
120 #if defined(RPC_USE_UNIX_AUTOBIND)
121     if (do_retry) {
122       RPC_LOG_PERROR("sendto %s", to->sun_path + 1);
123     }
124 #endif  /* !AUTOBIND */
125     return -1;
126   } else if ((unsigned int)ret != bytes) {
127     RPC_LOG_STATE("can't send all");
128     return -1;
129   }
130   return 0;
131 }
132
133 RUNS_IN_CALLERS_THREAD
134 RUNS_IN_READING_THREAD
135 /**
136  */
137 int
138 RpcSendUdp(RpcIdInfo *id, const RPC_ID receiver, int direction, RPC_packet_type type, const void *mesg, unsigned int bytes) {  // LCOV_EXCL_START 8: dead code
139   AGL_ASSERT_NOT_TESTED();  // LCOV_EXCL_LINE 200: test assert
140   rpc_send_buf sendbuf;
141   sendbuf.buf = mesg;
142   sendbuf.bytes = bytes;
143
144   return RpcSendUdp2(id, receiver, direction, type, 1, &sendbuf);
145 }  // LCOV_EXCL_STOP
146 RUNS_IN_CALLERS_THREAD
147 RUNS_IN_READING_THREAD
148 /** UDP packet transmission process
149  *
150  * UDP packet format
151  * @verbatim
152 +--------------+--------------+
153 |*1|*2|*3|*4|*5|   PAYLOAD    |
154 +--------------+--------------+
155 @endverbatim
156  *
157  * *1 to *5 indicate the header. @see RPC_PACKET_HEADER
158  *
159  * - *1: [@%-2d] Type of packet
160  *   - @see RPC_packet_type
161  * - *2: [@%-5x] Source RPC_ID
162  * - *3: [@%-8x] Sequence number
163  *   - Set the value incremented from 1. This value must be unique for each source thread.
164  *   - In the case of response packets, this field contains the sequence number of the packet from which the response was received.
165  * - *4: [@%-4u] Size of send data
166  *   - In the case of sending data consists of single packet in PAYLOAD, this field contains number of the packet.
167  *   - For a multipacket, the first packet contains the sum of the PAYLOAD of all packets,
168  *     the last packet contains the PAYLOAD bytes of the packet.
169  *     All other intermediate packets are filled with 0.(With up to two packets in the current constraint,
170  *     there are no intermediate packets.)
171  * - *5: [@%1d]  Position of packet
172  *   - @see rpc_packet_position
173  */
174 int
175 RpcSendUdp2(struct RpcIdInfo *id, RPC_ID receiver, int direction,
176         RPC_packet_type type, unsigned int num, rpc_send_buf *sendbuf) {
177   unsigned char buf[RPC_UDP_PACKET_SIZE];
178   UINT32 seq_num = RpcGetSequenceNumber(id->thread_info);
179   unsigned int bytes = 0;
180   rpc_send_buf *sendbufp = sendbuf;
181   int i, do_retry = 1;
182   for(i = 0 ; i < num ; i++) {
183     bytes += sendbufp->bytes;
184     sendbufp++;
185   }
186   rpc_assert(bytes <= RPC_UDP_PAYLOAD);  // LCOV_EXCL_BR_LINE 6: double check
187
188   struct sockaddr_un to;
189   memset(&to, 0, sizeof(to));
190   to.sun_family = AF_UNIX;
191 #if defined(RPC_USE_UNIX_AUTOBIND)
192   if (direction != RPC_SEND_TO_CLIENT) {
193     RpcSetServerName(to.sun_path, receiver);
194     if (direction == RPC_SEND_TO_SERVER_NO_RETRY) {
195       do_retry = 0;
196     }
197   } else {
198     RpcSetClientName(to.sun_path, receiver);
199   }
200 #else  /* !AUTOBIND */
201   rpc_set_socket_name(to.sun_path, rpc_get_port(receiver));
202 #endif  /* !AUTOBIND */
203
204   sprintf((char *)buf, RPC_PACKET_HEADER,
205     (int)type, RPC_my_id(id), seq_num, bytes,
206     RPC_PACKET_POS_ONEANDONLY);
207
208   unsigned char *bufp = buf + RPC_PACKET_HEADER_LEN;
209   for(i = 0 ; i < num ; i++) {
210     memcpy(bufp, sendbuf->buf, sendbuf->bytes);
211     bufp += sendbuf->bytes;
212     sendbuf++;
213   }
214   if (RpcSendUdpPacket(id, &to, do_retry,
215         buf, RPC_PACKET_HEADER_LEN + bytes) < 0) {
216     return -1;
217   }
218   return (int)seq_num;
219 }
220
221 RUNS_IN_READING_THREAD
222 /**
223  */
224 RPC_Result
225 RpcSendUdpResponse(RpcIdInfo *id, const RPC_ID receiver, int direction,
226           RPC_packet_type type, UINT32 seq_num,
227           char *mesg, UINT32 bytes) {
228   rpc_assert(bytes <= RPC_MAX_RESPONSE_MESSAGE_SIZE);  // LCOV_EXCL_BR_LINE 6: double check
229   char buf[RPC_PACKET_HEADER_LEN + RPC_MAX_RESPONSE_MESSAGE_SIZE];
230   sprintf(buf, RPC_PACKET_HEADER,
231     (int)type, RPC_my_id(id), seq_num, bytes,
232     (int)RPC_PACKET_POS_ONEANDONLY);
233   memcpy(buf + RPC_PACKET_HEADER_LEN, mesg, bytes);
234
235   struct sockaddr_un sa;
236   memset(&sa, 0, sizeof(sa));
237   sa.sun_family = AF_UNIX;
238 #if defined(RPC_USE_UNIX_AUTOBIND)
239   if (direction != RPC_SEND_TO_CLIENT) {
240     RpcSetServerName(sa.sun_path, receiver);
241   } else {
242     RpcSetClientName(sa.sun_path, receiver);
243   }
244 #else  /* !AUTOBIND */
245   rpc_set_socket_name(sa.sun_path, rpc_get_port(receiver));
246 #endif  /* !AUTOBIND */
247   return RpcSendUdpPacket(id, &sa, 0, buf, RPC_PACKET_HEADER_LEN + bytes);
248 }
249
250 /**
251  */
252 RPC_Result
253 RpcParsePacketHeader(const char *str, RPC_packet_type *command,
254       RPC_ID_p id, UINT32 *seq_num, UINT32 *size) {
255   // LCOV_EXCL_BR_START 6: double check
256   rpc_assert(str != NULL && command != NULL && id != NULL
257        && seq_num != NULL && size != NULL);
258   // LCOV_EXCL_BR_STOP
259
260   if (sscanf(str, RPC_PACKET_HEADER_scanf, (int *)command, id, seq_num, size) != 4) {  // LCOV_EXCL_BR_LINE 11: Unexpected branch // NOLINT(readability/nolint)
261     *command = RPC_PACKET_NONE;  // LCOV_EXCL_START 5: fail safe for libc sscanf
262     AGL_ASSERT_NOT_TESTED();  // LCOV_EXCL_LINE 200: test assert
263     BUG_ASSERT(0, "Parsing packet failed");
264     // LCOV_EXCL_STOP
265     return RPC_ERR_Fatal;
266   }
267   return RPC_OK;
268 }
269
270 #include <sys/time.h>
271
272 RPC_Result
273 RpcClientWaitResponse(RpcIdInfo *idinfo, UINT32 seq_num,
274        UINT32 timeout_msec, UINT16 *response) {
275   unsigned char readbuf[RPC_UDP_PACKET_SIZE];
276   fd_set fds;
277   int fd = idinfo->sock;
278
279   struct timeval timeout;
280   timeout.tv_sec = (__time_t)(timeout_msec / 1000);
281   timeout.tv_usec = (__suseconds_t)((timeout_msec % 1000) * 1000);
282
283   *response = RPC_RESPONSE_NONE;
284
285   for(;;) {
286     FD_ZERO(&fds);
287     FD_SET(fd, &fds);
288     int sret = select(fd + 1, &fds, NULL, NULL, &timeout);
289     if (sret < 0 && errno == EINTR) {  /* signal interrupt */  // LCOV_EXCL_START 5: fail safe for libc select
290       AGL_ASSERT_NOT_TESTED();  // LCOV_EXCL_LINE 200: test assert
291       continue;
292       // LCOV_EXCL_STOP
293     } else if (sret == 0) {  /* timeout */
294       RPC_LOG_ERR("server response timeout");
295       return RPC_ERR_No_Response;
296     } else if (sret > 0 && FD_ISSET(fd, &fds)) {  // LCOV_EXCL_BR_LINE 5: fail safe for libc select
297       RPC_ID sender;
298       UINT32 seq;
299       UINT32 size;
300       RPC_packet_type command;
301       int readret = RpcReadUdpPacket(idinfo, readbuf);
302       if (readret <= 0) {  // LCOV_EXCL_START 5: fail safe for libc recvfrom
303         AGL_ASSERT_NOT_TESTED();  // LCOV_EXCL_LINE 200: test assert
304         /* debug code to see socket status */
305         /* why recvfrom() returned 0 ? */
306         struct pollfd pfd;
307         pfd.fd = fd;
308         pfd.events = POLLIN|POLLHUP|POLLERR;
309         sret = poll(&pfd, 1, 0/* timeout 0 */);
310         RPC_LOG_STATE("** poll revents=%x", pfd.revents);
311         return RPC_ERR_Fatal;
312       }  // LCOV_EXCL_STOP
313       if (RpcParsePacketHeader((const char *)readbuf, &command, &sender, &seq, &size) != RPC_OK) {  // LCOV_EXCL_BR_LINE 11: Unexpected branch // NOLINT(readability/nolint)
314         return RPC_ERR_Fatal;
315       }
316       unsigned char c;
317       if (seq == seq_num) {
318         switch(command) {
319           case RPC_RESPONSE_APICALL:
320             c = readbuf[RPC_PACKET_HEADER_LEN];
321             switch(c) {
322               case 'O':
323                 *response = RPC_RESPONSE_API_OK;
324                 goto exit_loop_ok;
325                 break;
326               case 'B':
327                 *response = RPC_RESPONSE_API_BUSY;
328                 goto exit_loop_ok;
329                 break;
330               case 'E':
331                 *response = RPC_RESPONSE_API_ERR;
332                 goto exit_loop_ok;
333                 break;
334               case 'D':
335                 *response = RPC_RESPONSE_API_DEADLOCK;
336                 goto exit_loop_ok;
337                 break;
338               case 'C':
339                 *response = RPC_RESPONSE_API_CERTIFY;
340                 goto exit_loop_ok;
341                 break;
342               default:
343                 AGL_ASSERT_NOT_TESTED();  // LCOV_EXCL_LINE 200: test assert
344                 BUG_ASSERT(0, "illegal response\n");  // LCOV_EXCL_LINE 15: marco defined in rpc_internal.h
345                 return RPC_ERR_Fatal;
346                 break;
347             }
348             break;
349
350           default:
351             RPC_LOG_STATE("Unknown packet command=%d", command);
352             return RPC_ERR_Fatal;
353             break;
354         }
355       } else {  /* sequence number mismatch == response to other request */
356         RPC_LOG_DEBUG("unwanted response received(delayed response?)");
357         continue;
358       }
359     } else {  /* poll error */
360       RPC_LOG_PERROR("select in wait response");
361       return RPC_ERR_Fatal;
362     }
363   }
364   exit_loop_ok:
365   return RPC_OK;
366 }
367
368 RPC_Result
369 RpcClientWaitResult(RpcIdInfo *idinfo, RPC_ID srvr_id) {
370   unsigned char readbuf[RPC_UDP_PACKET_SIZE];
371   fd_set fds;
372   int fd = idinfo->sock;
373   int inotify_fd = RPC_clnt_inotify_fd(idinfo);
374   int maxfd;
375   RPC_Result result = RPC_OK;
376
377   for(;;) {
378     FD_ZERO(&fds);
379     FD_SET(fd, &fds);
380     FD_SET(inotify_fd, &fds);
381
382     /* Determine the maximum value of fd to wait */
383     if (fd > inotify_fd) {
384       maxfd = fd;
385     } else {
386       maxfd = inotify_fd;
387     }
388
389     int sret = select(maxfd + 1, &fds, NULL, NULL, NULL);
390     if (sret < 0 && errno == EINTR) {  /* signal interrupt */  // LCOV_EXCL_START 5: fail safe for libc select
391       AGL_ASSERT_NOT_TESTED();  // LCOV_EXCL_LINE 200: test assert
392       continue;
393       // LCOV_EXCL_STOP
394     }  else if (sret > 0 && FD_ISSET(fd, &fds)) {  /* success */  // LCOV_EXCL_BR_LINE 5: fail safe for libc select
395       RPC_ID sender;
396       UINT32 seq;
397       UINT32 size;
398       RPC_packet_type command;
399       int readret = RpcReadUdpPacket(idinfo, readbuf);
400       if (readret <= 0) {  // LCOV_EXCL_START 5: fail safe for libc recvfrom
401         AGL_ASSERT_NOT_TESTED();  // LCOV_EXCL_LINE 200: test assert
402         rpc_assert(readret >= 0);
403         result = RPC_ERR_Fatal;
404         goto exit_loop_ok;
405       }  // LCOV_EXCL_STOP
406       if (RpcParsePacketHeader((const char *)readbuf, &command, &sender, &seq, &size) != RPC_OK) {  // LCOV_EXCL_BR_LINE 11: Unexpected branch // NOLINT(readability/nolint)
407         // LCOV_EXCL_START 5: fail safe for libc sscanf
408         AGL_ASSERT_NOT_TESTED();  // LCOV_EXCL_LINE 200: test assert
409         result = RPC_ERR_Fatal;
410         goto exit_loop_ok;
411       }  // LCOV_EXCL_STOP
412       switch(command) {
413         case RPC_PACKET_APIRETURN:
414           result = RpcSetAPIcallReturn(idinfo,
415               (const char *)(readbuf + RPC_PACKET_HEADER_LEN),
416               size);
417           if(result == RPC_OK) {  // LCOV_EXCL_BR_LINE 5: fail safe for libc malloc
418             if (sscanf((const char *)(readbuf + RPC_PACKET_HEADER_LEN), "%08x ", (unsigned int *)&result) != 1) {  // LCOV_EXCL_BR_LINE 5: fail safe for libc sscanf
419               AGL_ASSERT_NOT_TESTED();  // LCOV_EXCL_LINE 200: test assert
420               BUG_ASSERT(0, "Parsing packet failed");  // LCOV_EXCL_LINE 15: marco defined in rpc_internal.h
421               return RPC_ERR_Fatal;
422             }
423           }
424           goto exit_loop_ok;
425         default:
426           RPC_LOG_STATE("unwanted packet received while waiting for API return");
427           continue;
428           break;
429       }
430     } else if (sret > 0 && FD_ISSET(inotify_fd, &fds)) {  /* server process is terminate. */
431       UINT32 read_len = 0;
432       int length = 0;
433       char *buffer;
434       buffer = (char *)rpc_malloc(BUF_LEN);
435       if (NULL == buffer) {  // LCOV_EXCL_START 5: fail safe for libc malloc
436         AGL_ASSERT_NOT_TESTED();  // LCOV_EXCL_LINE 200: test assert
437         RPC_LOG_ERR("rpc_malloc() ERROR.");
438       }  // LCOV_EXCL_STOP
439
440       if((length = (int)read( inotify_fd, buffer, BUF_LEN )) < 0 ) {  // LCOV_EXCL_START 5: fail safe for libc read
441         AGL_ASSERT_NOT_TESTED();  // LCOV_EXCL_LINE 200: test assert
442         RPC_LOG_PERROR("inotify read() ERROR.");
443       } // LCOV_EXCL_STOP
444       while (read_len < length) {
445         struct inotify_event *event = ( struct inotify_event * )&buffer[read_len];
446
447         if (event->mask & IN_DELETE_SELF) {  /* Terminating a Server Process */
448           if (RPC_ERR_Server_Finish == RpcDeleteSrvrPid(idinfo, srvr_id, event->wd)) {
449             RPC_LOG_PERROR("server process is terminate. : srvr_ID = %x", srvr_id);
450             result = RPC_ERR_Fatal;
451           }
452         }
453         read_len += (UINT32)(EVENT_SIZE + event->len);  /* Size of the inotify_event structure */
454       }
455       rpc_free(buffer);
456       if (RPC_OK != result) {
457         goto exit_loop_ok;
458       }
459
460     } else {  /* poll error */  // LCOV_EXCL_START 5: fail safe for libc select
461       AGL_ASSERT_NOT_TESTED();  // LCOV_EXCL_LINE 200: test assert
462       RPC_LOG_PERROR("select in wait result");
463       result = RPC_ERR_Fatal;
464       goto exit_loop_ok;
465       // LCOV_EXCL_STOP
466     }
467   }
468 exit_loop_ok:
469
470   return result;
471 }
472
473 /** @} */