ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/cebix/BasiliskII/src/Unix/rpc_unix.cpp
Revision: 1.5
Committed: 2008-01-01T09:40:33Z (16 years, 10 months ago) by gbeauche
Branch: MAIN
CVS Tags: HEAD
Changes since 1.4: +1 -1 lines
Log Message:
Happy New Year!

File Contents

# Content
1 /*
2 * rpc_unix.cpp - Remote Procedure Calls, Unix specific backend
3 *
4 * Basilisk II (C) 1997-2008 Christian Bauer
5 * Contributed by Gwenole Beauchesne
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 */
21
22 /*
23 * NOTES:
24 * - this is subject to rewrite but the API is to be kept intact
25 * - this RPC system is very minimal and only suited for 1:1 communication
26 *
27 * TODO:
28 * - better failure conditions
29 * - windows rpc
30 */
31
32 #include "sysdeps.h"
33
34 #include <errno.h>
35 #include <fcntl.h>
36 #include <stdarg.h>
37 #include <unistd.h>
38 #include <sys/types.h>
39 #include <sys/socket.h>
40 #include <sys/un.h>
41 #include <sys/wait.h>
42 #include <netinet/in.h>
43
44 #include "rpc.h"
45
46 #define DEBUG 0
47 #include "debug.h"
48
49 #define NON_BLOCKING_IO 0
50
51 #if defined __linux__
52 #define USE_ABSTRACT_NAMESPACES 1
53 #endif
54
55
56 /* ====================================================================== */
57 /* === PThreads Glue === */
58 /* ====================================================================== */
59
60 //#define USE_THREADS
61
62 #ifndef USE_THREADS
63 #define pthread_t void *
64 #define pthread_cancel(th)
65 #define pthread_join(th, ret)
66 #define pthread_testcancel()
67 #define pthread_create(th, attr, start, arg) dummy_thread_create()
68 static inline int dummy_thread_create(void) { errno = ENOSYS; return -1; }
69
70 #undef pthread_mutex_t
71 #define pthread_mutex_t volatile int
72 #undef pthread_mutex_lock
73 #define pthread_mutex_lock(m) -1
74 #undef pthread_mutex_unlock
75 #define pthread_mutex_unlock(m) -1
76 #undef PTHREAD_MUTEX_INITIALIZER
77 #define PTHREAD_MUTEX_INITIALIZER 0
78 #endif
79
80
81 /* ====================================================================== */
82 /* === RPC Connection Handling === */
83 /* ====================================================================== */
84
85 // Connection type
86 enum {
87 RPC_CONNECTION_SERVER,
88 RPC_CONNECTION_CLIENT,
89 };
90
91 // Connection status
92 enum {
93 RPC_STATUS_IDLE,
94 RPC_STATUS_BUSY,
95 };
96
97 // Client / Server connection
98 struct rpc_connection_t {
99 int type;
100 int status;
101 int socket;
102 char *socket_path;
103 int server_socket;
104 int server_thread_active;
105 pthread_t server_thread;
106 rpc_method_descriptor_t *callbacks;
107 int n_callbacks;
108 int send_offset;
109 char send_buffer[BUFSIZ];
110 };
111
112 #define return_error(ERROR) do { error = (ERROR); goto do_return; } while (0)
113
114 // Set connection status (XXX protect connection with a lock?)
115 static inline void rpc_connection_set_status(rpc_connection_t *connection, int status)
116 {
117 connection->status = status;
118 }
119
120 // Returns TRUE if the connection is busy (e.g. waiting for a reply)
121 int rpc_connection_busy(rpc_connection_t *connection)
122 {
123 return connection && connection->status == RPC_STATUS_BUSY;
124 }
125
126 // Prepare socket path for addr.sun_path[]
127 static int _rpc_socket_path(char **pathp, const char *ident)
128 {
129 int i, len;
130 len = strlen(ident);
131
132 if (pathp == NULL)
133 return 0;
134
135 char *path;
136 #if USE_ABSTRACT_NAMESPACES
137 const int len_bias = 1;
138 if ((path = (char *)malloc(len + len_bias + 1)) == NULL)
139 return 0;
140 path[0] = 0;
141 strcpy(&path[len_bias], ident);
142 #else
143 const int len_bias = 5;
144 if ((path = (char *)malloc(len + len_bias + 1)) == NULL)
145 return 0;
146 strcpy(path, "/tmp/");
147 for (i = 0; i < len; i++) {
148 char ch = ident[i];
149 if (ch == '/')
150 ch = '_';
151 path[len_bias + i] = ch;
152 }
153 #endif
154 len += len_bias;
155 path[len] = '\0';
156 if (*pathp)
157 free(*pathp);
158 *pathp = path;
159 return len;
160 }
161
162 // Initialize server-side RPC system
163 rpc_connection_t *rpc_init_server(const char *ident)
164 {
165 D(bug("rpc_init_server ident='%s'\n", ident));
166
167 rpc_connection_t *connection;
168 struct sockaddr_un addr;
169 socklen_t addr_len;
170
171 if (ident == NULL)
172 return NULL;
173
174 connection = (rpc_connection_t *)malloc(sizeof(*connection));
175 if (connection == NULL)
176 return NULL;
177 connection->type = RPC_CONNECTION_SERVER;
178 connection->status = RPC_STATUS_IDLE;
179 connection->socket = -1;
180 connection->server_thread_active = 0;
181 connection->callbacks = NULL;
182 connection->n_callbacks = 0;
183
184 if ((connection->server_socket = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
185 perror("server socket");
186 free(connection);
187 return NULL;
188 }
189
190 memset(&addr, 0, sizeof(addr));
191 addr.sun_family = AF_UNIX;
192 connection->socket_path = NULL;
193 addr_len = _rpc_socket_path(&connection->socket_path, ident);
194 memcpy(&addr.sun_path[0], connection->socket_path, addr_len);
195 addr_len += sizeof(struct sockaddr_un) - sizeof(addr.sun_path);
196
197 if (bind(connection->server_socket, (struct sockaddr *)&addr, addr_len) < 0) {
198 perror("server bind");
199 free(connection);
200 return NULL;
201 }
202
203 if (listen(connection->server_socket, 1) < 0) {
204 perror("server listen");
205 free(connection);
206 return NULL;
207 }
208
209 return connection;
210 }
211
212 // Initialize client-side RPC system
213 rpc_connection_t *rpc_init_client(const char *ident)
214 {
215 D(bug("rpc_init_client ident='%s'\n", ident));
216
217 rpc_connection_t *connection;
218 struct sockaddr_un addr;
219 socklen_t addr_len;
220
221 if (ident == NULL)
222 return NULL;
223
224 connection = (rpc_connection_t *)malloc(sizeof(*connection));
225 if (connection == NULL)
226 return NULL;
227 connection->type = RPC_CONNECTION_CLIENT;
228 connection->status = RPC_STATUS_IDLE;
229 connection->server_socket = -1;
230 connection->callbacks = NULL;
231 connection->n_callbacks = 0;
232
233 if ((connection->socket = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
234 perror("client socket");
235 free(connection);
236 return NULL;
237 }
238
239 memset(&addr, 0, sizeof(addr));
240 addr.sun_family = AF_UNIX;
241 connection->socket_path = NULL;
242 addr_len = _rpc_socket_path(&connection->socket_path, ident);
243 memcpy(&addr.sun_path[0], connection->socket_path, addr_len);
244 addr_len += sizeof(struct sockaddr_un) - sizeof(addr.sun_path);
245
246 // Wait at most 5 seconds for server to initialize
247 const int N_CONNECT_WAIT_DELAY = 10;
248 int n_connect_attempts = 5000 / N_CONNECT_WAIT_DELAY;
249 if (n_connect_attempts == 0)
250 n_connect_attempts = 1;
251 while (n_connect_attempts > 0) {
252 if (connect(connection->socket, (struct sockaddr *)&addr, addr_len) == 0)
253 break;
254 if (n_connect_attempts > 1 && errno != ECONNREFUSED && errno != ENOENT) {
255 perror("client_connect");
256 free(connection);
257 return NULL;
258 }
259 n_connect_attempts--;
260 usleep(N_CONNECT_WAIT_DELAY);
261 }
262 if (n_connect_attempts == 0) {
263 free(connection);
264 return NULL;
265 }
266
267 return connection;
268 }
269
270 // Close RPC connection
271 int rpc_exit(rpc_connection_t *connection)
272 {
273 D(bug("rpc_exit\n"));
274
275 if (connection == NULL)
276 return RPC_ERROR_CONNECTION_NULL;
277
278 if (connection->socket_path) {
279 if (connection->socket_path[0])
280 unlink(connection->socket_path);
281 free(connection->socket_path);
282 }
283
284 if (connection->type == RPC_CONNECTION_SERVER) {
285 if (connection->server_thread_active) {
286 pthread_cancel(connection->server_thread);
287 pthread_join(connection->server_thread, NULL);
288 }
289 if (connection->socket != -1)
290 close(connection->socket);
291 if (connection->server_socket != -1)
292 close(connection->server_socket);
293 }
294 else {
295 if (connection->socket != -1)
296 close(connection->socket);
297 }
298
299 if (connection->callbacks)
300 free(connection->callbacks);
301 free(connection);
302
303 return RPC_ERROR_NO_ERROR;
304 }
305
306 // Wait for a message to arrive on the connection port
307 static inline int _rpc_wait_dispatch(rpc_connection_t *connection, int timeout)
308 {
309 struct timeval tv;
310 tv.tv_sec = timeout / 1000000;
311 tv.tv_usec = timeout % 1000000;
312
313 fd_set rfds;
314 FD_ZERO(&rfds);
315 FD_SET(connection->socket, &rfds);
316 return select(connection->socket + 1, &rfds, NULL, NULL, &tv);
317 }
318
319 int rpc_wait_dispatch(rpc_connection_t *connection, int timeout)
320 {
321 if (connection == NULL)
322 return RPC_ERROR_CONNECTION_NULL;
323 if (connection->type != RPC_CONNECTION_SERVER)
324 return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
325
326 return _rpc_wait_dispatch(connection, timeout);
327 }
328
329 // Process incoming messages in the background
330 static void *rpc_server_func(void *arg)
331 {
332 rpc_connection_t *connection = (rpc_connection_t *)arg;
333
334 int ret = rpc_listen_socket(connection);
335 if (ret < 0)
336 return NULL;
337
338 connection->server_thread_active = 1;
339 for (;;) {
340 // XXX broken MacOS X doesn't implement cancellation points correctly
341 pthread_testcancel();
342
343 // wait for data to arrive
344 int ret = _rpc_wait_dispatch(connection, 50000);
345 if (ret == 0)
346 continue;
347 if (ret < 0)
348 break;
349
350 rpc_dispatch(connection);
351 }
352 connection->server_thread_active = 0;
353 return NULL;
354 }
355
356 // Return listen socket of RPC connection
357 int rpc_listen_socket(rpc_connection_t *connection)
358 {
359 D(bug("rpc_listen_socket\n"));
360
361 if (connection == NULL)
362 return RPC_ERROR_CONNECTION_NULL;
363 if (connection->type != RPC_CONNECTION_SERVER)
364 return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
365
366 struct sockaddr_un addr;
367 socklen_t addr_len = sizeof(addr);
368 if ((connection->socket = accept(connection->server_socket, (struct sockaddr *)&addr, &addr_len)) < 0) {
369 perror("server accept");
370 return RPC_ERROR_ERRNO_SET;
371 }
372
373 #if NON_BLOCKING_IO
374 int val = fcntl(connection->socket, F_GETFL, 0);
375 if (val < 0) {
376 perror("server fcntl F_GETFL");
377 return RPC_ERROR_ERRNO_SET;
378 }
379 if (fcntl(connection->socket, F_SETFL, val | O_NONBLOCK) < 0) {
380 perror("server fcntl F_SETFL");
381 return RPC_ERROR_ERRNO_SET;
382 }
383 #endif
384
385 return connection->socket;
386 }
387
388 // Listen for incoming messages on RPC connection
389 #ifdef USE_THREADS
390 int rpc_listen(rpc_connection_t *connection)
391 {
392 D(bug("rpc_listen\n"));
393
394 if (pthread_create(&connection->server_thread, NULL, rpc_server_func, connection) != 0) {
395 perror("server thread");
396 return RPC_ERROR_ERRNO_SET;
397 }
398
399 return RPC_ERROR_NO_ERROR;
400 }
401 #endif
402
403
404 /* ====================================================================== */
405 /* === Message Passing === */
406 /* ====================================================================== */
407
408 // Message markers
409 enum {
410 RPC_MESSAGE_START = -3000,
411 RPC_MESSAGE_END = -3001,
412 RPC_MESSAGE_ACK = -3002,
413 RPC_MESSAGE_REPLY = -3003,
414 RPC_MESSAGE_FAILURE = -3004,
415 };
416
417 // Message type
418 struct rpc_message_t {
419 int socket;
420 int offset;
421 unsigned char buffer[BUFSIZ];
422 };
423
424 // User-defined marshalers
425 static struct {
426 rpc_message_descriptor_t *descs;
427 int last;
428 int count;
429 } g_message_descriptors = { NULL, 0, 0 };
430 static pthread_mutex_t g_message_descriptors_lock = PTHREAD_MUTEX_INITIALIZER;
431
432 // Add a user-defined marshaler
433 static int rpc_message_add_callback(const rpc_message_descriptor_t *desc)
434 {
435 D(bug("rpc_message_add_callback\n"));
436
437 const int N_ENTRIES_ALLOC = 8;
438 int error = RPC_ERROR_NO_ERROR;
439
440 pthread_mutex_lock(&g_message_descriptors_lock);
441 if (g_message_descriptors.descs == NULL) {
442 g_message_descriptors.count = N_ENTRIES_ALLOC;
443 if ((g_message_descriptors.descs = (rpc_message_descriptor_t *)malloc(g_message_descriptors.count * sizeof(g_message_descriptors.descs[0]))) == NULL) {
444 pthread_mutex_unlock(&g_message_descriptors_lock);
445 return RPC_ERROR_NO_MEMORY;
446 }
447 g_message_descriptors.last = 0;
448 }
449 else if (g_message_descriptors.last >= g_message_descriptors.count) {
450 g_message_descriptors.count += N_ENTRIES_ALLOC;
451 if ((g_message_descriptors.descs = (rpc_message_descriptor_t *)realloc(g_message_descriptors.descs, g_message_descriptors.count * sizeof(g_message_descriptors.descs[0]))) == NULL) {
452 pthread_mutex_unlock(&g_message_descriptors_lock);
453 return RPC_ERROR_NO_MEMORY;
454 }
455 }
456
457 // XXX only one callback per ID
458 int i;
459 for (i = 0; i < g_message_descriptors.last; i++) {
460 if (g_message_descriptors.descs[i].id == desc->id) {
461 pthread_mutex_unlock(&g_message_descriptors_lock);
462 return RPC_ERROR_NO_ERROR;
463 }
464 }
465
466 g_message_descriptors.descs[g_message_descriptors.last++] = *desc;
467 pthread_mutex_unlock(&g_message_descriptors_lock);
468 return error;
469 }
470
471 // Add user-defined marshalers
472 int rpc_message_add_callbacks(const rpc_message_descriptor_t *descs, int n_descs)
473 {
474 D(bug("rpc_message_add_callbacks\n"));
475
476 int i, error;
477 for (i = 0; i < n_descs; i++) {
478 if ((error = rpc_message_add_callback(&descs[i])) < 0)
479 return error;
480 }
481
482 return RPC_ERROR_NO_ERROR;
483 }
484
485 // Find user-defined marshaler
486 static rpc_message_descriptor_t *rpc_message_find_descriptor(int id)
487 {
488 D(bug("rpc_message_find_descriptor\n"));
489
490 if (g_message_descriptors.descs) {
491 int i;
492 for (i = 0; i < g_message_descriptors.count; i++) {
493 if (g_message_descriptors.descs[i].id == id)
494 return &g_message_descriptors.descs[i];
495 }
496 }
497
498 return NULL;
499 }
500
501 // Initialize message
502 static inline void rpc_message_init(rpc_message_t *message, rpc_connection_t *connection)
503 {
504 message->socket = connection->socket;
505 message->offset = 0;
506 }
507
508 // Send BYTES
509 static inline int _rpc_message_send_bytes(rpc_message_t *message, unsigned char *bytes, int count)
510 {
511 if (send(message->socket, bytes, count, 0) != count)
512 return RPC_ERROR_ERRNO_SET;
513 return RPC_ERROR_NO_ERROR;
514 }
515
516 // Send message on wire
517 static inline int rpc_message_flush(rpc_message_t *message)
518 {
519 int error = _rpc_message_send_bytes(message, message->buffer, message->offset);
520 message->offset = 0;
521 return error;
522 }
523
524 // Send BYTES (public interface, may need to flush internal buffer)
525 int rpc_message_send_bytes(rpc_message_t *message, unsigned char *bytes, int count)
526 {
527 if (message->offset > 0) {
528 int error = rpc_message_flush(message);
529 if (error != RPC_ERROR_NO_ERROR)
530 return error;
531 }
532 return _rpc_message_send_bytes(message, bytes, count);
533 }
534
535 // Send BYTES (buffered)
536 static inline void _rpc_message_send_bytes_buffered(rpc_message_t *message, unsigned char *bytes, int count)
537 {
538 memcpy(&message->buffer[message->offset], bytes, count);
539 message->offset += count;
540 }
541
542 // Send CHAR
543 int rpc_message_send_char(rpc_message_t *message, char c)
544 {
545 D(bug(" send CHAR '%c'\n", c));
546
547 unsigned char e_value = c;
548 if (message->offset + sizeof(e_value) >= sizeof(message->buffer)) {
549 int error = rpc_message_flush(message);
550 if (error != RPC_ERROR_NO_ERROR)
551 return error;
552 }
553 _rpc_message_send_bytes_buffered(message, (unsigned char *)&e_value, sizeof(e_value));
554 return RPC_ERROR_NO_ERROR;
555 }
556
557 // Send INT32
558 int rpc_message_send_int32(rpc_message_t *message, int32_t value)
559 {
560 D(bug(" send INT32 %d\n", value));
561
562 int32_t e_value = htonl(value);
563 if (message->offset + sizeof(e_value) >= sizeof(message->buffer)) {
564 int error = rpc_message_flush(message);
565 if (error != RPC_ERROR_NO_ERROR)
566 return error;
567 }
568 _rpc_message_send_bytes_buffered(message, (unsigned char *)&e_value, sizeof(e_value));
569 return RPC_ERROR_NO_ERROR;
570 }
571
572 // Send UINT32
573 int rpc_message_send_uint32(rpc_message_t *message, uint32_t value)
574 {
575 D(bug(" send UINT32 %u\n", value));
576
577 uint32_t e_value = htonl(value);
578 if (message->offset + sizeof(e_value) >= sizeof(message->buffer)) {
579 int error = rpc_message_flush(message);
580 if (error != RPC_ERROR_NO_ERROR)
581 return error;
582 }
583 _rpc_message_send_bytes_buffered(message, (unsigned char *)&e_value, sizeof(e_value));
584 return RPC_ERROR_NO_ERROR;
585 }
586
587 // Send STRING
588 int rpc_message_send_string(rpc_message_t *message, const char *str)
589 {
590 D(bug(" send STRING \"%s\"\n", str));
591
592 int error, length = str ? strlen(str) : 0;
593 uint32_t e_value = htonl(length);
594 if (message->offset + sizeof(e_value) >= sizeof(message->buffer)) {
595 error = rpc_message_flush(message);
596 if (error != RPC_ERROR_NO_ERROR)
597 return error;
598 }
599 _rpc_message_send_bytes_buffered(message, (unsigned char *)&e_value, sizeof(e_value));
600 error = rpc_message_flush(message);
601 if (error != RPC_ERROR_NO_ERROR)
602 return error;
603 D(bug("str=%p\n", str));
604 return _rpc_message_send_bytes(message, (unsigned char *)str, length);
605 }
606
607 // Send message arguments
608 static int rpc_message_send_args(rpc_message_t *message, va_list args)
609 {
610 int type;
611 rpc_message_descriptor_t *desc;
612 while ((type = va_arg(args, int)) != RPC_TYPE_INVALID) {
613 int error = rpc_message_send_int32(message, type);
614 if (error != RPC_ERROR_NO_ERROR)
615 return error;
616 switch (type) {
617 case RPC_TYPE_CHAR:
618 error = rpc_message_send_char(message, (char )va_arg(args, int));
619 break;
620 case RPC_TYPE_BOOLEAN:
621 case RPC_TYPE_INT32:
622 error = rpc_message_send_int32(message, va_arg(args, int));
623 break;
624 case RPC_TYPE_UINT32:
625 error = rpc_message_send_uint32(message, va_arg(args, unsigned int));
626 break;
627 case RPC_TYPE_STRING:
628 error = rpc_message_send_string(message, va_arg(args, char *));
629 break;
630 case RPC_TYPE_ARRAY: {
631 int i;
632 int array_type = va_arg(args, int32_t);
633 int array_size = va_arg(args, uint32_t);
634 if ((error = rpc_message_send_int32(message, array_type)) < 0)
635 return error;
636 if ((error = rpc_message_send_uint32(message, array_size)) < 0)
637 return error;
638 switch (array_type) {
639 case RPC_TYPE_CHAR: {
640 unsigned char *array = va_arg(args, unsigned char *);
641 error = rpc_message_flush(message);
642 if (error != RPC_ERROR_NO_ERROR)
643 return error;
644 error = _rpc_message_send_bytes(message, array, array_size);
645 break;
646 }
647 case RPC_TYPE_BOOLEAN:
648 case RPC_TYPE_INT32: {
649 int32_t *array = va_arg(args, int32_t *);
650 for (i = 0; i < array_size; i++) {
651 if ((error = rpc_message_send_int32(message, array[i])) < 0)
652 break;
653 }
654 break;
655 }
656 case RPC_TYPE_UINT32: {
657 uint32_t *array = va_arg(args, uint32_t *);
658 for (i = 0; i < array_size; i++) {
659 if ((error = rpc_message_send_uint32(message, array[i])) < 0)
660 break;
661 }
662 break;
663 }
664 case RPC_TYPE_STRING: {
665 char **array = va_arg(args, char **);
666 for (i = 0; i < array_size; i++) {
667 if ((error = rpc_message_send_string(message, array[i])) < 0)
668 break;
669 }
670 break;
671 }
672 default:
673 if ((desc = rpc_message_find_descriptor(array_type)) != NULL) {
674 uint8_t *array = va_arg(args, uint8_t *);
675 for (i = 0; i < array_size; i++) {
676 if ((error = desc->send_callback(message, &array[i * desc->size])) < 0)
677 break;
678 }
679 }
680 else {
681 fprintf(stderr, "unknown array arg type %d to send\n", type);
682 error = RPC_ERROR_MESSAGE_ARGUMENT_UNKNOWN;
683 }
684 break;
685 }
686 break;
687 }
688 default:
689 if ((desc = rpc_message_find_descriptor(type)) != NULL)
690 error = desc->send_callback(message, va_arg(args, uint8_t *));
691 else {
692 fprintf(stderr, "unknown arg type %d to send\n", type);
693 error = RPC_ERROR_MESSAGE_ARGUMENT_UNKNOWN;
694 }
695 break;
696 }
697 if (error != RPC_ERROR_NO_ERROR)
698 return error;
699 }
700 return RPC_ERROR_NO_ERROR;
701 }
702
703 // Receive raw BYTES
704 static inline int _rpc_message_recv_bytes(rpc_message_t *message, unsigned char *bytes, int count)
705 {
706 do {
707 int n = recv(message->socket, bytes, count, 0);
708 if (n > 0) {
709 count -= n;
710 bytes += n;
711 }
712 else if (n == -1 && errno == EINTR)
713 continue;
714 else {
715 #if NON_BLOCKING_IO
716 if (errno == EAGAIN || errno == EWOULDBLOCK) {
717 // wait for data to arrive
718 fd_set rfds;
719 FD_ZERO(&rfds);
720 FD_SET(message->socket, &rfds);
721 int ret = select(message->socket + 1, &rfds, NULL, NULL, NULL);
722 if (ret > 0)
723 continue;
724 }
725 #endif
726 return RPC_ERROR_ERRNO_SET;
727 }
728 } while (count > 0);
729 return RPC_ERROR_NO_ERROR;
730 }
731
732 int rpc_message_recv_bytes(rpc_message_t *message, unsigned char *bytes, int count)
733 {
734 return _rpc_message_recv_bytes(message, bytes, count);
735 }
736
737 // Receive CHAR
738 int rpc_message_recv_char(rpc_message_t *message, char *ret)
739 {
740 char r_value;
741 int error;
742 if ((error = _rpc_message_recv_bytes(message, (unsigned char *)&r_value, sizeof(r_value))) < 0)
743 return error;
744 *ret = r_value;
745 D(bug(" recv CHAR '%c'\n", *ret));
746 return RPC_ERROR_NO_ERROR;
747 }
748
749 // Receive INT32
750 int rpc_message_recv_int32(rpc_message_t *message, int32_t *ret)
751 {
752 int32_t r_value;
753 int error;
754 if ((error = _rpc_message_recv_bytes(message, (unsigned char *)&r_value, sizeof(r_value))) < 0)
755 return error;
756 *ret = ntohl(r_value);
757 D(bug(" recv INT32 %d\n", *ret));
758 return RPC_ERROR_NO_ERROR;
759 }
760
761 // Receive UINT32
762 int rpc_message_recv_uint32(rpc_message_t *message, uint32_t *ret)
763 {
764 uint32_t r_value;
765 int error;
766 if ((error = _rpc_message_recv_bytes(message, (unsigned char *)&r_value, sizeof(r_value))) < 0)
767 return error;
768 *ret = ntohl(r_value);
769 D(bug(" recv UINT32 %u\n", *ret));
770 return RPC_ERROR_NO_ERROR;
771 }
772
773 // Receive STRING
774 int rpc_message_recv_string(rpc_message_t *message, char **ret)
775 {
776 char *str;
777 int length;
778 uint32_t r_value;
779 int error;
780 if ((error = _rpc_message_recv_bytes(message, (unsigned char *)&r_value, sizeof(r_value))) < 0)
781 return error;
782 length = ntohl(r_value);
783 if (length == 0)
784 str = NULL;
785 else {
786 if ((str = (char *)malloc(length + 1)) == NULL)
787 return RPC_ERROR_NO_MEMORY;
788 if ((error = _rpc_message_recv_bytes(message, (unsigned char *)str, length)) < 0)
789 return error;
790 str[length] = '\0';
791 }
792 *ret = str;
793 D(bug(" recv STRING \"%s\"\n", *ret));
794 return RPC_ERROR_NO_ERROR;
795 }
796
797 // Receive message arguments
798 static int rpc_message_recv_args(rpc_message_t *message, va_list args)
799 {
800 int expected_type, error;
801 rpc_message_descriptor_t *desc;
802
803 while ((expected_type = va_arg(args, int)) != RPC_TYPE_INVALID) {
804 void *p_value = va_arg(args, void *);
805 int32_t type;
806 if ((error = rpc_message_recv_int32(message, &type)) < 0)
807 return error;
808 if (type != expected_type)
809 return RPC_ERROR_MESSAGE_ARGUMENT_MISMATCH;
810 switch (type) {
811 case RPC_TYPE_CHAR:
812 error = rpc_message_recv_char(message, (char *)p_value);
813 break;
814 case RPC_TYPE_BOOLEAN:
815 case RPC_TYPE_INT32:
816 error = rpc_message_recv_int32(message, (int32_t *)p_value);
817 break;
818 case RPC_TYPE_UINT32:
819 error = rpc_message_recv_uint32(message, (uint32_t *)p_value);
820 break;
821 case RPC_TYPE_STRING:
822 error = rpc_message_recv_string(message, (char **)p_value);
823 break;
824 case RPC_TYPE_ARRAY: {
825 int i;
826 int32_t array_type;
827 uint32_t array_size;
828 if ((error = rpc_message_recv_int32(message, &array_type)) < 0)
829 return error;
830 if ((error = rpc_message_recv_uint32(message, &array_size)) < 0)
831 return error;
832 p_value = va_arg(args, void *);
833 *((uint32_t *)p_value) = array_size;
834 p_value = va_arg(args, void *);
835 switch (array_type) {
836 case RPC_TYPE_CHAR: {
837 unsigned char *array;
838 if ((array = (unsigned char *)malloc(array_size * sizeof(*array))) == NULL)
839 return RPC_ERROR_NO_MEMORY;
840 error = _rpc_message_recv_bytes(message, array, array_size);
841 if (error != RPC_ERROR_NO_ERROR)
842 return error;
843 *((void **)p_value) = (void *)array;
844 break;
845 }
846 case RPC_TYPE_BOOLEAN:
847 case RPC_TYPE_INT32: {
848 int *array;
849 if ((array = (int *)malloc(array_size * sizeof(*array))) == NULL)
850 return RPC_ERROR_NO_MEMORY;
851 for (i = 0; i < array_size; i++) {
852 int32_t value;
853 if ((error = rpc_message_recv_int32(message, &value)) < 0)
854 return error;
855 array[i] = value;
856 }
857 *((void **)p_value) = (void *)array;
858 break;
859 }
860 case RPC_TYPE_UINT32: {
861 unsigned int *array;
862 if ((array = (unsigned int *)malloc(array_size * sizeof(*array))) == NULL)
863 return RPC_ERROR_NO_MEMORY;
864 for (i = 0; i < array_size; i++) {
865 uint32_t value;
866 if ((error = rpc_message_recv_uint32(message, &value)) < 0)
867 return error;
868 array[i] = value;
869 }
870 *((void **)p_value) = (void *)array;
871 break;
872 }
873 case RPC_TYPE_STRING: {
874 char **array;
875 if ((array = (char **)malloc(array_size * sizeof(*array))) == NULL)
876 return RPC_ERROR_NO_MEMORY;
877 for (i = 0; i < array_size; i++) {
878 char *str;
879 if ((error = rpc_message_recv_string(message, &str)) < 0)
880 return error;
881 array[i] = str;
882 }
883 *((void **)p_value) = (void *)array;
884 break;
885 }
886 default:
887 if ((desc = rpc_message_find_descriptor(array_type)) != NULL) {
888 char *array;
889 if ((array = (char *)malloc(array_size * desc->size)) == NULL)
890 return RPC_ERROR_NO_MEMORY;
891 for (i = 0; i < array_size; i++) {
892 if ((error = desc->recv_callback(message, &array[i * desc->size])) < 0)
893 return error;
894 }
895 *((void **)p_value) = array;
896 }
897 else {
898 fprintf(stderr, "unknown array arg type %d to receive\n", type);
899 error = RPC_ERROR_MESSAGE_ARGUMENT_UNKNOWN;
900 }
901 break;
902 }
903 break;
904 }
905 default:
906 if ((desc = rpc_message_find_descriptor(type)) != NULL)
907 error = desc->recv_callback(message, p_value);
908 else {
909 fprintf(stderr, "unknown arg type %d to send\n", type);
910 error = RPC_ERROR_MESSAGE_ARGUMENT_UNKNOWN;
911 }
912 break;
913 }
914 if (error != RPC_ERROR_NO_ERROR)
915 return error;
916 }
917 return RPC_ERROR_NO_ERROR;
918 }
919
920 // Skip message argument
921 static int rpc_message_skip_arg(rpc_message_t *message, int type)
922 {
923 unsigned char dummy[BUFSIZ];
924 int error = RPC_ERROR_GENERIC;
925 switch (type) {
926 case RPC_TYPE_CHAR:
927 error = _rpc_message_recv_bytes(message, dummy, 1);
928 break;
929 case RPC_TYPE_BOOLEAN:
930 case RPC_TYPE_INT32:
931 case RPC_TYPE_UINT32:
932 error = _rpc_message_recv_bytes(message, dummy, 4);
933 break;
934 case RPC_TYPE_STRING: {
935 int32_t length;
936 if ((error = rpc_message_recv_int32(message, &length)) < 0)
937 return error;
938 while (length >= sizeof(dummy)) {
939 if ((error = _rpc_message_recv_bytes(message, dummy, sizeof(dummy))) < 0)
940 return error;
941 length -= sizeof(dummy);
942 }
943 if (length > 0) {
944 if ((error = _rpc_message_recv_bytes(message, dummy, length)) < 0)
945 return error;
946 }
947 break;
948 }
949 default:
950 fprintf(stderr, "unknown arg type %d to receive\n", type);
951 break;
952 }
953 return error;
954 }
955
956 // Dispatch message received in the server loop
957 int rpc_dispatch(rpc_connection_t *connection)
958 {
959 rpc_message_t message;
960 rpc_message_init(&message, connection);
961
962 int32_t method, value, ret = RPC_MESSAGE_FAILURE;
963 if (rpc_message_recv_int32(&message, &value) != RPC_ERROR_NO_ERROR &&
964 value != RPC_MESSAGE_START)
965 return ret;
966
967 D(bug("receiving message\n"));
968 if (rpc_message_recv_int32(&message, &method) == RPC_ERROR_NO_ERROR &&
969 connection->callbacks != NULL) {
970 int i;
971 for (i = 0; i < connection->n_callbacks; i++) {
972 if (connection->callbacks[i].id == method) {
973 if (connection->callbacks[i].callback &&
974 connection->callbacks[i].callback(connection) == RPC_ERROR_NO_ERROR) {
975 if (rpc_message_recv_int32(&message, &value) == RPC_ERROR_NO_ERROR && value == RPC_MESSAGE_END)
976 ret = RPC_MESSAGE_ACK;
977 else {
978 fprintf(stderr, "corrupted message handler %d\n", method);
979 for (;;) {
980 if (rpc_message_skip_arg(&message, value) != RPC_ERROR_NO_ERROR)
981 break;
982 if (rpc_message_recv_int32(&message, &value) != RPC_ERROR_NO_ERROR)
983 break;
984 if (value == RPC_MESSAGE_END)
985 break;
986 }
987 }
988 break;
989 }
990 }
991 }
992 }
993 rpc_message_send_int32(&message, ret);
994 rpc_message_flush(&message);
995 D(bug(" -- message received\n"));
996 return ret == RPC_MESSAGE_ACK ? method : ret;
997 }
998
999
1000 /* ====================================================================== */
1001 /* === Method Callbacks Handling === */
1002 /* ====================================================================== */
1003
1004 // Add a user-defined method callback (server side)
1005 static int rpc_method_add_callback(rpc_connection_t *connection, const rpc_method_descriptor_t *desc)
1006 {
1007 const int N_ENTRIES_ALLOC = 8;
1008 int i;
1009
1010 // pre-allocate up to N_ENTRIES_ALLOC entries
1011 if (connection->callbacks == NULL) {
1012 if ((connection->callbacks = (rpc_method_descriptor_t *)calloc(N_ENTRIES_ALLOC, sizeof(connection->callbacks[0]))) == NULL)
1013 return RPC_ERROR_NO_MEMORY;
1014 connection->n_callbacks = N_ENTRIES_ALLOC;
1015 }
1016
1017 // look for a free slot
1018 for (i = connection->n_callbacks - 1; i >= 0; i--) {
1019 if (connection->callbacks[i].callback == NULL)
1020 break;
1021 }
1022
1023 // none found, reallocate
1024 if (i < 0) {
1025 if ((connection->callbacks = (rpc_method_descriptor_t *)realloc(connection->callbacks, (connection->n_callbacks + N_ENTRIES_ALLOC) * sizeof(connection->callbacks[0]))) == NULL)
1026 return RPC_ERROR_NO_MEMORY;
1027 i = connection->n_callbacks;
1028 memset(&connection->callbacks[i], 0, N_ENTRIES_ALLOC * sizeof(connection->callbacks[0]));
1029 connection->n_callbacks += N_ENTRIES_ALLOC;
1030 }
1031
1032 D(bug("rpc_method_add_callback for method %d in slot %d\n", desc->id, i));
1033 connection->callbacks[i] = *desc;
1034 return RPC_ERROR_NO_ERROR;
1035 }
1036
1037 // Add user-defined method callbacks (server side)
1038 int rpc_method_add_callbacks(rpc_connection_t *connection, const rpc_method_descriptor_t *descs, int n_descs)
1039 {
1040 D(bug("rpc_method_add_callbacks\n"));
1041
1042 if (connection == NULL)
1043 return RPC_ERROR_CONNECTION_NULL;
1044 if (connection->type != RPC_CONNECTION_SERVER)
1045 return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
1046
1047 while (--n_descs >= 0) {
1048 int error = rpc_method_add_callback(connection, &descs[n_descs]);
1049 if (error != RPC_ERROR_NO_ERROR)
1050 return error;
1051 }
1052
1053 return RPC_ERROR_NO_ERROR;
1054 }
1055
1056 // Remove a user-defined method callback (common code)
1057 int rpc_method_remove_callback_id(rpc_connection_t *connection, int id)
1058 {
1059 D(bug("rpc_method_remove_callback_id\n"));
1060
1061 if (connection->callbacks) {
1062 int i;
1063 for (i = 0; i < connection->n_callbacks; i++) {
1064 if (connection->callbacks[i].id == id) {
1065 connection->callbacks[i].callback = NULL;
1066 return RPC_ERROR_NO_ERROR;
1067 }
1068 }
1069 }
1070
1071 return RPC_ERROR_GENERIC;
1072 }
1073
1074 // Remove user-defined method callbacks (server side)
1075 int rpc_method_remove_callbacks(rpc_connection_t *connection, const rpc_method_descriptor_t *callbacks, int n_callbacks)
1076 {
1077 D(bug("rpc_method_remove_callbacks\n"));
1078
1079 if (connection == NULL)
1080 return RPC_ERROR_CONNECTION_NULL;
1081 if (connection->type != RPC_CONNECTION_SERVER)
1082 return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
1083
1084 while (--n_callbacks >= 0) {
1085 int error = rpc_method_remove_callback_id(connection, callbacks[n_callbacks].id);
1086 if (error != RPC_ERROR_NO_ERROR)
1087 return error;
1088 }
1089
1090 return RPC_ERROR_NO_ERROR;
1091 }
1092
1093
1094 /* ====================================================================== */
1095 /* === Remote Procedure Call (method invocation) === */
1096 /* ====================================================================== */
1097
1098 // Invoke remote procedure (client side)
1099 int rpc_method_invoke(rpc_connection_t *connection, int method, ...)
1100 {
1101 D(bug("rpc_method_invoke method=%d\n", method));
1102
1103 rpc_message_t message;
1104 int error;
1105 va_list args;
1106
1107 if (connection == NULL)
1108 return RPC_ERROR_CONNECTION_NULL;
1109 if (connection->type != RPC_CONNECTION_CLIENT)
1110 return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
1111
1112 rpc_message_init(&message, connection);
1113 error = rpc_message_send_int32(&message, RPC_MESSAGE_START);
1114 if (error != RPC_ERROR_NO_ERROR)
1115 return error;
1116 error = rpc_message_send_int32(&message, method);
1117 if (error != RPC_ERROR_NO_ERROR)
1118 return error;
1119 va_start(args, method);
1120 error = rpc_message_send_args(&message, args);
1121 va_end(args);
1122 if (error != RPC_ERROR_NO_ERROR)
1123 return error;
1124 error = rpc_message_send_int32(&message, RPC_MESSAGE_END);
1125 if (error != RPC_ERROR_NO_ERROR)
1126 return error;
1127 error = rpc_message_flush(&message);
1128 if (error != RPC_ERROR_NO_ERROR)
1129 return error;
1130 return RPC_ERROR_NO_ERROR;
1131 }
1132
1133 // Retrieve procedure arguments (server side)
1134 int rpc_method_get_args(rpc_connection_t *connection, ...)
1135 {
1136 D(bug("rpc_method_get_args\n"));
1137
1138 int error;
1139 va_list args;
1140 rpc_message_t message;
1141
1142 if (connection == NULL)
1143 return RPC_ERROR_CONNECTION_NULL;
1144 if (connection->type != RPC_CONNECTION_SERVER)
1145 return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
1146
1147 rpc_message_init(&message, connection);
1148 va_start(args, connection);
1149 error = rpc_message_recv_args(&message, args);
1150 va_end(args);
1151
1152 return error;
1153 }
1154
1155 // Wait for a reply from the remote procedure (client side)
1156 int rpc_method_wait_for_reply(rpc_connection_t *connection, ...)
1157 {
1158 D(bug("rpc_method_wait_for_reply\n"));
1159
1160 int error, type;
1161 int32_t ret;
1162 va_list args;
1163 rpc_message_t message;
1164
1165 if (connection == NULL)
1166 return RPC_ERROR_CONNECTION_NULL;
1167 if (connection->type != RPC_CONNECTION_CLIENT)
1168 return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
1169
1170 rpc_connection_set_status(connection, RPC_STATUS_BUSY);
1171
1172 rpc_message_init(&message, connection);
1173 va_start(args, connection);
1174 type = va_arg(args, int);
1175 va_end(args);
1176
1177 if (type != RPC_TYPE_INVALID) {
1178 error = rpc_message_recv_int32(&message, &ret);
1179 if (error != RPC_ERROR_NO_ERROR)
1180 return_error(error);
1181 if (ret != RPC_MESSAGE_REPLY) {
1182 D(bug("TRUNCATED 1 [%d]\n", ret));
1183 return_error(RPC_ERROR_MESSAGE_TRUNCATED);
1184 }
1185 va_start(args, connection);
1186 error = rpc_message_recv_args(&message, args);
1187 va_end(args);
1188 if (error != RPC_ERROR_NO_ERROR)
1189 return_error(error);
1190 error = rpc_message_recv_int32(&message, &ret);
1191 if (error != RPC_ERROR_NO_ERROR)
1192 return_error(error);
1193 if (ret != RPC_MESSAGE_END) {
1194 D(bug("TRUNCATED 2 [%d]\n", ret));
1195 return_error(RPC_ERROR_MESSAGE_TRUNCATED);
1196 }
1197 }
1198
1199 error = rpc_message_recv_int32(&message, &ret);
1200 if (error != RPC_ERROR_NO_ERROR)
1201 return_error(error);
1202 if (ret != RPC_MESSAGE_ACK) {
1203 D(bug("TRUNCATED 3 [%d]\n", ret));
1204 return_error(RPC_ERROR_MESSAGE_TRUNCATED);
1205 }
1206
1207 return_error(RPC_ERROR_NO_ERROR);
1208
1209 do_return:
1210 rpc_connection_set_status(connection, RPC_STATUS_IDLE);
1211 return error;
1212 }
1213
1214 // Send a reply to the client (server side)
1215 int rpc_method_send_reply(rpc_connection_t *connection, ...)
1216 {
1217 D(bug("rpc_method_send_reply\n"));
1218
1219 rpc_message_t message;
1220 int error;
1221 va_list args;
1222
1223 if (connection == NULL)
1224 return RPC_ERROR_GENERIC;
1225 if (connection->type != RPC_CONNECTION_SERVER)
1226 return RPC_ERROR_GENERIC;
1227
1228 rpc_message_init(&message, connection);
1229 error = rpc_message_send_int32(&message, RPC_MESSAGE_REPLY);
1230 if (error != RPC_ERROR_NO_ERROR)
1231 return error;
1232 va_start(args, connection);
1233 error = rpc_message_send_args(&message, args);
1234 va_end(args);
1235 if (error != RPC_ERROR_NO_ERROR)
1236 return error;
1237 error = rpc_message_send_int32(&message, RPC_MESSAGE_END);
1238 if (error != RPC_ERROR_NO_ERROR)
1239 return error;
1240 error = rpc_message_flush(&message);
1241 if (error != RPC_ERROR_NO_ERROR)
1242 return error;
1243 return RPC_ERROR_NO_ERROR;
1244 }