2 * @copyright Copyright (c) 2016-2020 TOYOTA MOTOR CORPORATION.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 * @brief RPC Library Internal Implementation--UDP Communication
22 /** @addtogroup RPClib_in */
25 #include <sys/types.h>
26 #include <sys/socket.h>
29 #include <netinet/in.h>
32 #include <unistd.h> // for usleep
37 #include <sys/inotify.h>
39 #include <other_service/rpc.h>
40 #include "rpc_internal.h"
42 static /*inline*/ UINT32
43 RpcGetSequenceNumber(RpcThreadInfo *th) {
44 RPC_THREAD_MUTEX_LOCK(th);
45 UINT32 ret = th->sequence_number;
46 (th->sequence_number)++;
47 if (th->sequence_number == RPC_SEQ_NUM_INVALID) { // LCOV_EXCL_BR_LINE 200: overflow check, but test time is too long
48 AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert
49 th->sequence_number = RPC_SEQ_NUM_START; // LCOV_EXCL_LINE 200: overflow check, but test time is too long
51 RPC_THREAD_MUTEX_UNLOCK(th);
58 RpcReadUdpPacket(const RpcIdInfo *id, UINT8 *buf) {
59 struct sockaddr_un sa;
60 socklen_t sa_len = sizeof(sa);
61 // sa passed to recvfrom does not require initialization
65 ssize_t ret = recvfrom(RPC_my_sock(id), buf, RPC_UDP_PACKET_SIZE,
66 0, (struct sockaddr *)&sa, &sa_len);
69 if (errno == EAGAIN) { // LCOV_EXCL_BR_LINE 5: fail safe for libc recvfrom
70 //RPC_LOG_PERROR("recvfrom port %d", RPC_port(id));
72 } else if (errno == EINTR) { // LCOV_EXCL_START 5: fail safe for libc recvfrom
73 AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert
76 RPC_LOG_PERROR("recvfrom port %d", RPC_port(id));
79 } else if (ret == 0) {
80 RPC_LOG_STATE("*** recvfrom ret 0");
85 AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert
86 } // LCOV_EXCL_LINE 10: final line
89 /** Number of sendto retries */
90 #define RPC_SENDTO_RETRY 5
92 RUNS_IN_CALLERS_THREAD
93 RUNS_IN_READING_THREAD
97 RpcSendUdpPacket(RpcIdInfo *id,
98 struct sockaddr_un *to, int do_retry,
99 void *mesg, unsigned int bytes) {
104 ret = sendto(RPC_my_sock(id), mesg, bytes, 0,
105 (struct sockaddr *)to, RPC_SOCKET_ADDR_LEN);
109 RPC_LOG_STATE("*** sendto %s ***", to->sun_path + 1);
110 if (myerr == EAGAIN || (do_retry && myerr==ECONNREFUSED && ++nretry <= RPC_SENDTO_RETRY)) { // LCOV_EXCL_BR_LINE 11: Unexpected branch // NOLINT(readability/nolint)
112 #if defined(RPC_USE_UNIX_AUTOBIND)
113 RPC_LOG_DEBUG("*** sendto %s ***", to->sun_path + 1);
115 RPC_LOG_DEBUG("**************** sendto retry *********************");
116 #endif /* !AUTOBIND */
120 #if defined(RPC_USE_UNIX_AUTOBIND)
122 RPC_LOG_PERROR("sendto %s", to->sun_path + 1);
124 #endif /* !AUTOBIND */
126 } else if ((unsigned int)ret != bytes) {
127 RPC_LOG_STATE("can't send all");
133 RUNS_IN_CALLERS_THREAD
134 RUNS_IN_READING_THREAD
138 RpcSendUdp(RpcIdInfo *id, const RPC_ID receiver, int direction, RPC_packet_type type, const void *mesg, unsigned int bytes) { // LCOV_EXCL_START 8: dead code
139 AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert
140 rpc_send_buf sendbuf;
142 sendbuf.bytes = bytes;
144 return RpcSendUdp2(id, receiver, direction, type, 1, &sendbuf);
146 RUNS_IN_CALLERS_THREAD
147 RUNS_IN_READING_THREAD
148 /** UDP packet transmission process
152 +--------------+--------------+
153 |*1|*2|*3|*4|*5| PAYLOAD |
154 +--------------+--------------+
157 * *1 to *5 indicate the header. @see RPC_PACKET_HEADER
159 * - *1: [@%-2d] Type of packet
160 * - @see RPC_packet_type
161 * - *2: [@%-5x] Source RPC_ID
162 * - *3: [@%-8x] Sequence number
163 * - Set the value incremented from 1. This value must be unique for each source thread.
164 * - In the case of response packets, this field contains the sequence number of the packet from which the response was received.
165 * - *4: [@%-4u] Size of send data
166 * - In the case of sending data consists of single packet in PAYLOAD, this field contains number of the packet.
167 * - For a multipacket, the first packet contains the sum of the PAYLOAD of all packets,
168 * the last packet contains the PAYLOAD bytes of the packet.
169 * All other intermediate packets are filled with 0.(With up to two packets in the current constraint,
170 * there are no intermediate packets.)
171 * - *5: [@%1d] Position of packet
172 * - @see rpc_packet_position
175 RpcSendUdp2(struct RpcIdInfo *id, RPC_ID receiver, int direction,
176 RPC_packet_type type, unsigned int num, rpc_send_buf *sendbuf) {
177 unsigned char buf[RPC_UDP_PACKET_SIZE];
178 UINT32 seq_num = RpcGetSequenceNumber(id->thread_info);
179 unsigned int bytes = 0;
180 rpc_send_buf *sendbufp = sendbuf;
182 for(i = 0 ; i < num ; i++) {
183 bytes += sendbufp->bytes;
186 rpc_assert(bytes <= RPC_UDP_PAYLOAD); // LCOV_EXCL_BR_LINE 6: double check
188 struct sockaddr_un to;
189 memset(&to, 0, sizeof(to));
190 to.sun_family = AF_UNIX;
191 #if defined(RPC_USE_UNIX_AUTOBIND)
192 if (direction != RPC_SEND_TO_CLIENT) {
193 RpcSetServerName(to.sun_path, receiver);
194 if (direction == RPC_SEND_TO_SERVER_NO_RETRY) {
198 RpcSetClientName(to.sun_path, receiver);
200 #else /* !AUTOBIND */
201 rpc_set_socket_name(to.sun_path, rpc_get_port(receiver));
202 #endif /* !AUTOBIND */
204 sprintf((char *)buf, RPC_PACKET_HEADER,
205 (int)type, RPC_my_id(id), seq_num, bytes,
206 RPC_PACKET_POS_ONEANDONLY);
208 unsigned char *bufp = buf + RPC_PACKET_HEADER_LEN;
209 for(i = 0 ; i < num ; i++) {
210 memcpy(bufp, sendbuf->buf, sendbuf->bytes);
211 bufp += sendbuf->bytes;
214 if (RpcSendUdpPacket(id, &to, do_retry,
215 buf, RPC_PACKET_HEADER_LEN + bytes) < 0) {
221 RUNS_IN_READING_THREAD
225 RpcSendUdpResponse(RpcIdInfo *id, const RPC_ID receiver, int direction,
226 RPC_packet_type type, UINT32 seq_num,
227 char *mesg, UINT32 bytes) {
228 rpc_assert(bytes <= RPC_MAX_RESPONSE_MESSAGE_SIZE); // LCOV_EXCL_BR_LINE 6: double check
229 char buf[RPC_PACKET_HEADER_LEN + RPC_MAX_RESPONSE_MESSAGE_SIZE];
230 sprintf(buf, RPC_PACKET_HEADER,
231 (int)type, RPC_my_id(id), seq_num, bytes,
232 (int)RPC_PACKET_POS_ONEANDONLY);
233 memcpy(buf + RPC_PACKET_HEADER_LEN, mesg, bytes);
235 struct sockaddr_un sa;
236 memset(&sa, 0, sizeof(sa));
237 sa.sun_family = AF_UNIX;
238 #if defined(RPC_USE_UNIX_AUTOBIND)
239 if (direction != RPC_SEND_TO_CLIENT) {
240 RpcSetServerName(sa.sun_path, receiver);
242 RpcSetClientName(sa.sun_path, receiver);
244 #else /* !AUTOBIND */
245 rpc_set_socket_name(sa.sun_path, rpc_get_port(receiver));
246 #endif /* !AUTOBIND */
247 return RpcSendUdpPacket(id, &sa, 0, buf, RPC_PACKET_HEADER_LEN + bytes);
253 RpcParsePacketHeader(const char *str, RPC_packet_type *command,
254 RPC_ID_p id, UINT32 *seq_num, UINT32 *size) {
255 // LCOV_EXCL_BR_START 6: double check
256 rpc_assert(str != NULL && command != NULL && id != NULL
257 && seq_num != NULL && size != NULL);
260 if (sscanf(str, RPC_PACKET_HEADER_scanf, (int *)command, id, seq_num, size) != 4) { // LCOV_EXCL_BR_LINE 11: Unexpected branch // NOLINT(readability/nolint)
261 *command = RPC_PACKET_NONE; // LCOV_EXCL_START 5: fail safe for libc sscanf
262 AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert
263 BUG_ASSERT(0, "Parsing packet failed");
265 return RPC_ERR_Fatal;
270 #include <sys/time.h>
273 RpcClientWaitResponse(RpcIdInfo *idinfo, UINT32 seq_num,
274 UINT32 timeout_msec, UINT16 *response) {
275 unsigned char readbuf[RPC_UDP_PACKET_SIZE];
277 int fd = idinfo->sock;
279 struct timeval timeout;
280 timeout.tv_sec = (__time_t)(timeout_msec / 1000);
281 timeout.tv_usec = (__suseconds_t)((timeout_msec % 1000) * 1000);
283 *response = RPC_RESPONSE_NONE;
288 int sret = select(fd + 1, &fds, NULL, NULL, &timeout);
289 if (sret < 0 && errno == EINTR) { /* signal interrupt */ // LCOV_EXCL_START 5: fail safe for libc select
290 AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert
293 } else if (sret == 0) { /* timeout */
294 RPC_LOG_ERR("server response timeout");
295 return RPC_ERR_No_Response;
296 } else if (sret > 0 && FD_ISSET(fd, &fds)) { // LCOV_EXCL_BR_LINE 5: fail safe for libc select
300 RPC_packet_type command;
301 int readret = RpcReadUdpPacket(idinfo, readbuf);
302 if (readret <= 0) { // LCOV_EXCL_START 5: fail safe for libc recvfrom
303 AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert
304 /* debug code to see socket status */
305 /* why recvfrom() returned 0 ? */
308 pfd.events = POLLIN|POLLHUP|POLLERR;
309 sret = poll(&pfd, 1, 0/* timeout 0 */);
310 RPC_LOG_STATE("** poll revents=%x", pfd.revents);
311 return RPC_ERR_Fatal;
313 if (RpcParsePacketHeader((const char *)readbuf, &command, &sender, &seq, &size) != RPC_OK) { // LCOV_EXCL_BR_LINE 11: Unexpected branch // NOLINT(readability/nolint)
314 return RPC_ERR_Fatal;
317 if (seq == seq_num) {
319 case RPC_RESPONSE_APICALL:
320 c = readbuf[RPC_PACKET_HEADER_LEN];
323 *response = RPC_RESPONSE_API_OK;
327 *response = RPC_RESPONSE_API_BUSY;
331 *response = RPC_RESPONSE_API_ERR;
335 *response = RPC_RESPONSE_API_DEADLOCK;
339 *response = RPC_RESPONSE_API_CERTIFY;
343 AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert
344 BUG_ASSERT(0, "illegal response\n"); // LCOV_EXCL_LINE 15: marco defined in rpc_internal.h
345 return RPC_ERR_Fatal;
351 RPC_LOG_STATE("Unknown packet command=%d", command);
352 return RPC_ERR_Fatal;
355 } else { /* sequence number mismatch == response to other request */
356 RPC_LOG_DEBUG("unwanted response received(delayed response?)");
359 } else { /* poll error */
360 RPC_LOG_PERROR("select in wait response");
361 return RPC_ERR_Fatal;
369 RpcClientWaitResult(RpcIdInfo *idinfo, RPC_ID srvr_id) {
370 unsigned char readbuf[RPC_UDP_PACKET_SIZE];
372 int fd = idinfo->sock;
373 int inotify_fd = RPC_clnt_inotify_fd(idinfo);
375 RPC_Result result = RPC_OK;
380 FD_SET(inotify_fd, &fds);
382 /* Determine the maximum value of fd to wait */
383 if (fd > inotify_fd) {
389 int sret = select(maxfd + 1, &fds, NULL, NULL, NULL);
390 if (sret < 0 && errno == EINTR) { /* signal interrupt */ // LCOV_EXCL_START 5: fail safe for libc select
391 AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert
394 } else if (sret > 0 && FD_ISSET(fd, &fds)) { /* success */ // LCOV_EXCL_BR_LINE 5: fail safe for libc select
398 RPC_packet_type command;
399 int readret = RpcReadUdpPacket(idinfo, readbuf);
400 if (readret <= 0) { // LCOV_EXCL_START 5: fail safe for libc recvfrom
401 AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert
402 rpc_assert(readret >= 0);
403 result = RPC_ERR_Fatal;
406 if (RpcParsePacketHeader((const char *)readbuf, &command, &sender, &seq, &size) != RPC_OK) { // LCOV_EXCL_BR_LINE 11: Unexpected branch // NOLINT(readability/nolint)
407 // LCOV_EXCL_START 5: fail safe for libc sscanf
408 AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert
409 result = RPC_ERR_Fatal;
413 case RPC_PACKET_APIRETURN:
414 result = RpcSetAPIcallReturn(idinfo,
415 (const char *)(readbuf + RPC_PACKET_HEADER_LEN),
417 if(result == RPC_OK) { // LCOV_EXCL_BR_LINE 5: fail safe for libc malloc
418 if (sscanf((const char *)(readbuf + RPC_PACKET_HEADER_LEN), "%08x ", (unsigned int *)&result) != 1) { // LCOV_EXCL_BR_LINE 5: fail safe for libc sscanf
419 AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert
420 BUG_ASSERT(0, "Parsing packet failed"); // LCOV_EXCL_LINE 15: marco defined in rpc_internal.h
421 return RPC_ERR_Fatal;
426 RPC_LOG_STATE("unwanted packet received while waiting for API return");
430 } else if (sret > 0 && FD_ISSET(inotify_fd, &fds)) { /* server process is terminate. */
434 buffer = (char *)rpc_malloc(BUF_LEN);
435 if (NULL == buffer) { // LCOV_EXCL_START 5: fail safe for libc malloc
436 AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert
437 RPC_LOG_ERR("rpc_malloc() ERROR.");
440 if((length = (int)read( inotify_fd, buffer, BUF_LEN )) < 0 ) { // LCOV_EXCL_START 5: fail safe for libc read
441 AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert
442 RPC_LOG_PERROR("inotify read() ERROR.");
444 while (read_len < length) {
445 struct inotify_event *event = ( struct inotify_event * )&buffer[read_len];
447 if (event->mask & IN_DELETE_SELF) { /* Terminating a Server Process */
448 if (RPC_ERR_Server_Finish == RpcDeleteSrvrPid(idinfo, srvr_id, event->wd)) {
449 RPC_LOG_PERROR("server process is terminate. : srvr_ID = %x", srvr_id);
450 result = RPC_ERR_Fatal;
453 read_len += (UINT32)(EVENT_SIZE + event->len); /* Size of the inotify_event structure */
456 if (RPC_OK != result) {
460 } else { /* poll error */ // LCOV_EXCL_START 5: fail safe for libc select
461 AGL_ASSERT_NOT_TESTED(); // LCOV_EXCL_LINE 200: test assert
462 RPC_LOG_PERROR("select in wait result");
463 result = RPC_ERR_Fatal;