ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/cebix/BasiliskII/src/Unix/rpc_unix.cpp
Revision: 1.5
Committed: 2008-01-01T09:40:33Z (16 years, 10 months ago) by gbeauche
Branch: MAIN
CVS Tags: HEAD
Changes since 1.4: +1 -1 lines
Log Message:
Happy New Year!

File Contents

# User Rev Content
1 gbeauche 1.1 /*
2     * rpc_unix.cpp - Remote Procedure Calls, Unix specific backend
3     *
4 gbeauche 1.5 * Basilisk II (C) 1997-2008 Christian Bauer
5 gbeauche 1.1 * Contributed by Gwenole Beauchesne
6     *
7     * This program is free software; you can redistribute it and/or modify
8     * it under the terms of the GNU General Public License as published by
9     * the Free Software Foundation; either version 2 of the License, or
10     * (at your option) any later version.
11     *
12     * This program is distributed in the hope that it will be useful,
13     * but WITHOUT ANY WARRANTY; without even the implied warranty of
14     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15     * GNU General Public License for more details.
16     *
17     * You should have received a copy of the GNU General Public License
18     * along with this program; if not, write to the Free Software
19     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20     */
21    
22     /*
23     * NOTES:
24     * - this is subject to rewrite but the API is to be kept intact
25     * - this RPC system is very minimal and only suited for 1:1 communication
26     *
27     * TODO:
28     * - better failure conditions
29     * - windows rpc
30     */
31    
32     #include "sysdeps.h"
33    
34     #include <errno.h>
35     #include <fcntl.h>
36     #include <stdarg.h>
37     #include <unistd.h>
38     #include <sys/types.h>
39     #include <sys/socket.h>
40     #include <sys/un.h>
41     #include <sys/wait.h>
42     #include <netinet/in.h>
43    
44     #include "rpc.h"
45    
46     #define DEBUG 0
47     #include "debug.h"
48    
49     #define NON_BLOCKING_IO 0
50    
51 gbeauche 1.2 #if defined __linux__
52     #define USE_ABSTRACT_NAMESPACES 1
53     #endif
54    
55 gbeauche 1.1
56     /* ====================================================================== */
57 gbeauche 1.4 /* === PThreads Glue === */
58     /* ====================================================================== */
59    
60     //#define USE_THREADS
61    
62     #ifndef USE_THREADS
63     #define pthread_t void *
64     #define pthread_cancel(th)
65     #define pthread_join(th, ret)
66     #define pthread_testcancel()
67     #define pthread_create(th, attr, start, arg) dummy_thread_create()
68     static inline int dummy_thread_create(void) { errno = ENOSYS; return -1; }
69    
70     #undef pthread_mutex_t
71     #define pthread_mutex_t volatile int
72     #undef pthread_mutex_lock
73     #define pthread_mutex_lock(m) -1
74     #undef pthread_mutex_unlock
75     #define pthread_mutex_unlock(m) -1
76     #undef PTHREAD_MUTEX_INITIALIZER
77     #define PTHREAD_MUTEX_INITIALIZER 0
78     #endif
79    
80    
81     /* ====================================================================== */
82 gbeauche 1.1 /* === RPC Connection Handling === */
83     /* ====================================================================== */
84    
85     // Connection type
86     enum {
87     RPC_CONNECTION_SERVER,
88     RPC_CONNECTION_CLIENT,
89     };
90    
91     // Connection status
92     enum {
93     RPC_STATUS_IDLE,
94     RPC_STATUS_BUSY,
95     };
96    
97     // Client / Server connection
98     struct rpc_connection_t {
99     int type;
100     int status;
101     int socket;
102 gbeauche 1.2 char *socket_path;
103 gbeauche 1.1 int server_socket;
104     int server_thread_active;
105     pthread_t server_thread;
106     rpc_method_descriptor_t *callbacks;
107     int n_callbacks;
108     int send_offset;
109     char send_buffer[BUFSIZ];
110     };
111    
112     #define return_error(ERROR) do { error = (ERROR); goto do_return; } while (0)
113    
114     // Set connection status (XXX protect connection with a lock?)
115     static inline void rpc_connection_set_status(rpc_connection_t *connection, int status)
116     {
117     connection->status = status;
118     }
119    
120     // Returns TRUE if the connection is busy (e.g. waiting for a reply)
121     int rpc_connection_busy(rpc_connection_t *connection)
122     {
123     return connection && connection->status == RPC_STATUS_BUSY;
124     }
125    
126 gbeauche 1.2 // Prepare socket path for addr.sun_path[]
127     static int _rpc_socket_path(char **pathp, const char *ident)
128     {
129     int i, len;
130     len = strlen(ident);
131    
132     if (pathp == NULL)
133     return 0;
134    
135     char *path;
136     #if USE_ABSTRACT_NAMESPACES
137     const int len_bias = 1;
138     if ((path = (char *)malloc(len + len_bias + 1)) == NULL)
139     return 0;
140     path[0] = 0;
141     strcpy(&path[len_bias], ident);
142     #else
143     const int len_bias = 5;
144     if ((path = (char *)malloc(len + len_bias + 1)) == NULL)
145     return 0;
146     strcpy(path, "/tmp/");
147     for (i = 0; i < len; i++) {
148     char ch = ident[i];
149     if (ch == '/')
150     ch = '_';
151     path[len_bias + i] = ch;
152     }
153     #endif
154     len += len_bias;
155     path[len] = '\0';
156     if (*pathp)
157     free(*pathp);
158     *pathp = path;
159     return len;
160     }
161    
162 gbeauche 1.1 // Initialize server-side RPC system
163     rpc_connection_t *rpc_init_server(const char *ident)
164     {
165     D(bug("rpc_init_server ident='%s'\n", ident));
166    
167     rpc_connection_t *connection;
168     struct sockaddr_un addr;
169 gbeauche 1.2 socklen_t addr_len;
170 gbeauche 1.1
171     if (ident == NULL)
172     return NULL;
173    
174     connection = (rpc_connection_t *)malloc(sizeof(*connection));
175     if (connection == NULL)
176     return NULL;
177     connection->type = RPC_CONNECTION_SERVER;
178     connection->status = RPC_STATUS_IDLE;
179     connection->socket = -1;
180     connection->server_thread_active = 0;
181     connection->callbacks = NULL;
182     connection->n_callbacks = 0;
183    
184     if ((connection->server_socket = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
185     perror("server socket");
186     free(connection);
187     return NULL;
188     }
189    
190     memset(&addr, 0, sizeof(addr));
191     addr.sun_family = AF_UNIX;
192 gbeauche 1.2 connection->socket_path = NULL;
193     addr_len = _rpc_socket_path(&connection->socket_path, ident);
194     memcpy(&addr.sun_path[0], connection->socket_path, addr_len);
195     addr_len += sizeof(struct sockaddr_un) - sizeof(addr.sun_path);
196    
197     if (bind(connection->server_socket, (struct sockaddr *)&addr, addr_len) < 0) {
198 gbeauche 1.1 perror("server bind");
199     free(connection);
200     return NULL;
201     }
202    
203     if (listen(connection->server_socket, 1) < 0) {
204     perror("server listen");
205     free(connection);
206     return NULL;
207     }
208    
209     return connection;
210     }
211    
212     // Initialize client-side RPC system
213     rpc_connection_t *rpc_init_client(const char *ident)
214     {
215     D(bug("rpc_init_client ident='%s'\n", ident));
216    
217     rpc_connection_t *connection;
218     struct sockaddr_un addr;
219 gbeauche 1.2 socklen_t addr_len;
220 gbeauche 1.1
221     if (ident == NULL)
222     return NULL;
223    
224     connection = (rpc_connection_t *)malloc(sizeof(*connection));
225     if (connection == NULL)
226     return NULL;
227     connection->type = RPC_CONNECTION_CLIENT;
228     connection->status = RPC_STATUS_IDLE;
229     connection->server_socket = -1;
230     connection->callbacks = NULL;
231     connection->n_callbacks = 0;
232    
233     if ((connection->socket = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
234     perror("client socket");
235     free(connection);
236     return NULL;
237     }
238    
239     memset(&addr, 0, sizeof(addr));
240     addr.sun_family = AF_UNIX;
241 gbeauche 1.2 connection->socket_path = NULL;
242     addr_len = _rpc_socket_path(&connection->socket_path, ident);
243     memcpy(&addr.sun_path[0], connection->socket_path, addr_len);
244     addr_len += sizeof(struct sockaddr_un) - sizeof(addr.sun_path);
245    
246 gbeauche 1.1 // Wait at most 5 seconds for server to initialize
247     const int N_CONNECT_WAIT_DELAY = 10;
248     int n_connect_attempts = 5000 / N_CONNECT_WAIT_DELAY;
249     if (n_connect_attempts == 0)
250     n_connect_attempts = 1;
251     while (n_connect_attempts > 0) {
252 gbeauche 1.2 if (connect(connection->socket, (struct sockaddr *)&addr, addr_len) == 0)
253 gbeauche 1.1 break;
254 gbeauche 1.2 if (n_connect_attempts > 1 && errno != ECONNREFUSED && errno != ENOENT) {
255 gbeauche 1.1 perror("client_connect");
256     free(connection);
257     return NULL;
258     }
259     n_connect_attempts--;
260     usleep(N_CONNECT_WAIT_DELAY);
261     }
262     if (n_connect_attempts == 0) {
263     free(connection);
264     return NULL;
265     }
266    
267     return connection;
268     }
269    
270     // Close RPC connection
271     int rpc_exit(rpc_connection_t *connection)
272     {
273     D(bug("rpc_exit\n"));
274    
275     if (connection == NULL)
276     return RPC_ERROR_CONNECTION_NULL;
277    
278 gbeauche 1.2 if (connection->socket_path) {
279     if (connection->socket_path[0])
280     unlink(connection->socket_path);
281     free(connection->socket_path);
282     }
283    
284 gbeauche 1.1 if (connection->type == RPC_CONNECTION_SERVER) {
285     if (connection->server_thread_active) {
286     pthread_cancel(connection->server_thread);
287     pthread_join(connection->server_thread, NULL);
288     }
289     if (connection->socket != -1)
290     close(connection->socket);
291     if (connection->server_socket != -1)
292     close(connection->server_socket);
293     }
294     else {
295     if (connection->socket != -1)
296     close(connection->socket);
297     }
298    
299     if (connection->callbacks)
300     free(connection->callbacks);
301     free(connection);
302    
303     return RPC_ERROR_NO_ERROR;
304     }
305    
306 gbeauche 1.3 // Wait for a message to arrive on the connection port
307     static inline int _rpc_wait_dispatch(rpc_connection_t *connection, int timeout)
308     {
309     struct timeval tv;
310     tv.tv_sec = timeout / 1000000;
311     tv.tv_usec = timeout % 1000000;
312    
313     fd_set rfds;
314     FD_ZERO(&rfds);
315     FD_SET(connection->socket, &rfds);
316     return select(connection->socket + 1, &rfds, NULL, NULL, &tv);
317     }
318    
319     int rpc_wait_dispatch(rpc_connection_t *connection, int timeout)
320     {
321     if (connection == NULL)
322     return RPC_ERROR_CONNECTION_NULL;
323     if (connection->type != RPC_CONNECTION_SERVER)
324     return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
325    
326     return _rpc_wait_dispatch(connection, timeout);
327     }
328    
329     // Process incoming messages in the background
330 gbeauche 1.1 static void *rpc_server_func(void *arg)
331     {
332     rpc_connection_t *connection = (rpc_connection_t *)arg;
333    
334     int ret = rpc_listen_socket(connection);
335     if (ret < 0)
336     return NULL;
337    
338     connection->server_thread_active = 1;
339     for (;;) {
340 gbeauche 1.2 // XXX broken MacOS X doesn't implement cancellation points correctly
341     pthread_testcancel();
342    
343     // wait for data to arrive
344 gbeauche 1.3 int ret = _rpc_wait_dispatch(connection, 50000);
345 gbeauche 1.2 if (ret == 0)
346     continue;
347     if (ret < 0)
348 gbeauche 1.1 break;
349 gbeauche 1.2
350 gbeauche 1.1 rpc_dispatch(connection);
351     }
352     connection->server_thread_active = 0;
353     return NULL;
354     }
355    
356     // Return listen socket of RPC connection
357     int rpc_listen_socket(rpc_connection_t *connection)
358     {
359     D(bug("rpc_listen_socket\n"));
360    
361     if (connection == NULL)
362     return RPC_ERROR_CONNECTION_NULL;
363     if (connection->type != RPC_CONNECTION_SERVER)
364     return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
365    
366     struct sockaddr_un addr;
367     socklen_t addr_len = sizeof(addr);
368     if ((connection->socket = accept(connection->server_socket, (struct sockaddr *)&addr, &addr_len)) < 0) {
369     perror("server accept");
370     return RPC_ERROR_ERRNO_SET;
371     }
372    
373     #if NON_BLOCKING_IO
374     int val = fcntl(connection->socket, F_GETFL, 0);
375     if (val < 0) {
376     perror("server fcntl F_GETFL");
377     return RPC_ERROR_ERRNO_SET;
378     }
379     if (fcntl(connection->socket, F_SETFL, val | O_NONBLOCK) < 0) {
380     perror("server fcntl F_SETFL");
381     return RPC_ERROR_ERRNO_SET;
382     }
383     #endif
384    
385     return connection->socket;
386     }
387    
388     // Listen for incoming messages on RPC connection
389 gbeauche 1.4 #ifdef USE_THREADS
390 gbeauche 1.1 int rpc_listen(rpc_connection_t *connection)
391     {
392     D(bug("rpc_listen\n"));
393    
394     if (pthread_create(&connection->server_thread, NULL, rpc_server_func, connection) != 0) {
395     perror("server thread");
396     return RPC_ERROR_ERRNO_SET;
397     }
398    
399     return RPC_ERROR_NO_ERROR;
400     }
401 gbeauche 1.4 #endif
402 gbeauche 1.1
403    
404     /* ====================================================================== */
405     /* === Message Passing === */
406     /* ====================================================================== */
407    
408     // Message markers
409     enum {
410     RPC_MESSAGE_START = -3000,
411     RPC_MESSAGE_END = -3001,
412     RPC_MESSAGE_ACK = -3002,
413     RPC_MESSAGE_REPLY = -3003,
414     RPC_MESSAGE_FAILURE = -3004,
415     };
416    
417     // Message type
418     struct rpc_message_t {
419     int socket;
420     int offset;
421     unsigned char buffer[BUFSIZ];
422     };
423    
424     // User-defined marshalers
425     static struct {
426     rpc_message_descriptor_t *descs;
427     int last;
428     int count;
429     } g_message_descriptors = { NULL, 0, 0 };
430     static pthread_mutex_t g_message_descriptors_lock = PTHREAD_MUTEX_INITIALIZER;
431    
432     // Add a user-defined marshaler
433     static int rpc_message_add_callback(const rpc_message_descriptor_t *desc)
434     {
435     D(bug("rpc_message_add_callback\n"));
436    
437     const int N_ENTRIES_ALLOC = 8;
438     int error = RPC_ERROR_NO_ERROR;
439    
440     pthread_mutex_lock(&g_message_descriptors_lock);
441     if (g_message_descriptors.descs == NULL) {
442     g_message_descriptors.count = N_ENTRIES_ALLOC;
443     if ((g_message_descriptors.descs = (rpc_message_descriptor_t *)malloc(g_message_descriptors.count * sizeof(g_message_descriptors.descs[0]))) == NULL) {
444     pthread_mutex_unlock(&g_message_descriptors_lock);
445     return RPC_ERROR_NO_MEMORY;
446     }
447     g_message_descriptors.last = 0;
448     }
449     else if (g_message_descriptors.last >= g_message_descriptors.count) {
450     g_message_descriptors.count += N_ENTRIES_ALLOC;
451     if ((g_message_descriptors.descs = (rpc_message_descriptor_t *)realloc(g_message_descriptors.descs, g_message_descriptors.count * sizeof(g_message_descriptors.descs[0]))) == NULL) {
452     pthread_mutex_unlock(&g_message_descriptors_lock);
453     return RPC_ERROR_NO_MEMORY;
454     }
455     }
456    
457     // XXX only one callback per ID
458     int i;
459     for (i = 0; i < g_message_descriptors.last; i++) {
460     if (g_message_descriptors.descs[i].id == desc->id) {
461     pthread_mutex_unlock(&g_message_descriptors_lock);
462     return RPC_ERROR_NO_ERROR;
463     }
464     }
465    
466     g_message_descriptors.descs[g_message_descriptors.last++] = *desc;
467     pthread_mutex_unlock(&g_message_descriptors_lock);
468     return error;
469     }
470    
471     // Add user-defined marshalers
472     int rpc_message_add_callbacks(const rpc_message_descriptor_t *descs, int n_descs)
473     {
474     D(bug("rpc_message_add_callbacks\n"));
475    
476     int i, error;
477     for (i = 0; i < n_descs; i++) {
478     if ((error = rpc_message_add_callback(&descs[i])) < 0)
479     return error;
480     }
481    
482     return RPC_ERROR_NO_ERROR;
483     }
484    
485     // Find user-defined marshaler
486     static rpc_message_descriptor_t *rpc_message_find_descriptor(int id)
487     {
488     D(bug("rpc_message_find_descriptor\n"));
489    
490     if (g_message_descriptors.descs) {
491     int i;
492     for (i = 0; i < g_message_descriptors.count; i++) {
493     if (g_message_descriptors.descs[i].id == id)
494     return &g_message_descriptors.descs[i];
495     }
496     }
497    
498     return NULL;
499     }
500    
501     // Initialize message
502     static inline void rpc_message_init(rpc_message_t *message, rpc_connection_t *connection)
503     {
504     message->socket = connection->socket;
505     message->offset = 0;
506     }
507    
508     // Send BYTES
509     static inline int _rpc_message_send_bytes(rpc_message_t *message, unsigned char *bytes, int count)
510     {
511     if (send(message->socket, bytes, count, 0) != count)
512     return RPC_ERROR_ERRNO_SET;
513     return RPC_ERROR_NO_ERROR;
514     }
515    
516     // Send message on wire
517     static inline int rpc_message_flush(rpc_message_t *message)
518     {
519     int error = _rpc_message_send_bytes(message, message->buffer, message->offset);
520     message->offset = 0;
521     return error;
522     }
523    
524     // Send BYTES (public interface, may need to flush internal buffer)
525     int rpc_message_send_bytes(rpc_message_t *message, unsigned char *bytes, int count)
526     {
527     if (message->offset > 0) {
528     int error = rpc_message_flush(message);
529     if (error != RPC_ERROR_NO_ERROR)
530     return error;
531     }
532     return _rpc_message_send_bytes(message, bytes, count);
533     }
534    
535     // Send BYTES (buffered)
536     static inline void _rpc_message_send_bytes_buffered(rpc_message_t *message, unsigned char *bytes, int count)
537     {
538     memcpy(&message->buffer[message->offset], bytes, count);
539     message->offset += count;
540     }
541    
542     // Send CHAR
543     int rpc_message_send_char(rpc_message_t *message, char c)
544     {
545     D(bug(" send CHAR '%c'\n", c));
546    
547     unsigned char e_value = c;
548     if (message->offset + sizeof(e_value) >= sizeof(message->buffer)) {
549     int error = rpc_message_flush(message);
550     if (error != RPC_ERROR_NO_ERROR)
551     return error;
552     }
553     _rpc_message_send_bytes_buffered(message, (unsigned char *)&e_value, sizeof(e_value));
554     return RPC_ERROR_NO_ERROR;
555     }
556    
557     // Send INT32
558     int rpc_message_send_int32(rpc_message_t *message, int32_t value)
559     {
560     D(bug(" send INT32 %d\n", value));
561    
562     int32_t e_value = htonl(value);
563     if (message->offset + sizeof(e_value) >= sizeof(message->buffer)) {
564     int error = rpc_message_flush(message);
565     if (error != RPC_ERROR_NO_ERROR)
566     return error;
567     }
568     _rpc_message_send_bytes_buffered(message, (unsigned char *)&e_value, sizeof(e_value));
569     return RPC_ERROR_NO_ERROR;
570     }
571    
572     // Send UINT32
573     int rpc_message_send_uint32(rpc_message_t *message, uint32_t value)
574     {
575     D(bug(" send UINT32 %u\n", value));
576    
577     uint32_t e_value = htonl(value);
578     if (message->offset + sizeof(e_value) >= sizeof(message->buffer)) {
579     int error = rpc_message_flush(message);
580     if (error != RPC_ERROR_NO_ERROR)
581     return error;
582     }
583     _rpc_message_send_bytes_buffered(message, (unsigned char *)&e_value, sizeof(e_value));
584     return RPC_ERROR_NO_ERROR;
585     }
586    
587     // Send STRING
588     int rpc_message_send_string(rpc_message_t *message, const char *str)
589     {
590     D(bug(" send STRING \"%s\"\n", str));
591    
592     int error, length = str ? strlen(str) : 0;
593     uint32_t e_value = htonl(length);
594     if (message->offset + sizeof(e_value) >= sizeof(message->buffer)) {
595     error = rpc_message_flush(message);
596     if (error != RPC_ERROR_NO_ERROR)
597     return error;
598     }
599     _rpc_message_send_bytes_buffered(message, (unsigned char *)&e_value, sizeof(e_value));
600     error = rpc_message_flush(message);
601     if (error != RPC_ERROR_NO_ERROR)
602     return error;
603     D(bug("str=%p\n", str));
604     return _rpc_message_send_bytes(message, (unsigned char *)str, length);
605     }
606    
607     // Send message arguments
608     static int rpc_message_send_args(rpc_message_t *message, va_list args)
609     {
610     int type;
611     rpc_message_descriptor_t *desc;
612     while ((type = va_arg(args, int)) != RPC_TYPE_INVALID) {
613     int error = rpc_message_send_int32(message, type);
614     if (error != RPC_ERROR_NO_ERROR)
615     return error;
616     switch (type) {
617     case RPC_TYPE_CHAR:
618     error = rpc_message_send_char(message, (char )va_arg(args, int));
619     break;
620     case RPC_TYPE_BOOLEAN:
621     case RPC_TYPE_INT32:
622     error = rpc_message_send_int32(message, va_arg(args, int));
623     break;
624     case RPC_TYPE_UINT32:
625     error = rpc_message_send_uint32(message, va_arg(args, unsigned int));
626     break;
627     case RPC_TYPE_STRING:
628     error = rpc_message_send_string(message, va_arg(args, char *));
629     break;
630     case RPC_TYPE_ARRAY: {
631     int i;
632     int array_type = va_arg(args, int32_t);
633     int array_size = va_arg(args, uint32_t);
634     if ((error = rpc_message_send_int32(message, array_type)) < 0)
635     return error;
636     if ((error = rpc_message_send_uint32(message, array_size)) < 0)
637     return error;
638     switch (array_type) {
639     case RPC_TYPE_CHAR: {
640     unsigned char *array = va_arg(args, unsigned char *);
641     error = rpc_message_flush(message);
642     if (error != RPC_ERROR_NO_ERROR)
643     return error;
644     error = _rpc_message_send_bytes(message, array, array_size);
645     break;
646     }
647     case RPC_TYPE_BOOLEAN:
648     case RPC_TYPE_INT32: {
649     int32_t *array = va_arg(args, int32_t *);
650     for (i = 0; i < array_size; i++) {
651     if ((error = rpc_message_send_int32(message, array[i])) < 0)
652     break;
653     }
654     break;
655     }
656     case RPC_TYPE_UINT32: {
657     uint32_t *array = va_arg(args, uint32_t *);
658     for (i = 0; i < array_size; i++) {
659     if ((error = rpc_message_send_uint32(message, array[i])) < 0)
660     break;
661     }
662     break;
663     }
664     case RPC_TYPE_STRING: {
665     char **array = va_arg(args, char **);
666     for (i = 0; i < array_size; i++) {
667     if ((error = rpc_message_send_string(message, array[i])) < 0)
668     break;
669     }
670     break;
671     }
672     default:
673     if ((desc = rpc_message_find_descriptor(array_type)) != NULL) {
674     uint8_t *array = va_arg(args, uint8_t *);
675     for (i = 0; i < array_size; i++) {
676     if ((error = desc->send_callback(message, &array[i * desc->size])) < 0)
677     break;
678     }
679     }
680     else {
681     fprintf(stderr, "unknown array arg type %d to send\n", type);
682     error = RPC_ERROR_MESSAGE_ARGUMENT_UNKNOWN;
683     }
684     break;
685     }
686     break;
687     }
688     default:
689     if ((desc = rpc_message_find_descriptor(type)) != NULL)
690     error = desc->send_callback(message, va_arg(args, uint8_t *));
691     else {
692     fprintf(stderr, "unknown arg type %d to send\n", type);
693     error = RPC_ERROR_MESSAGE_ARGUMENT_UNKNOWN;
694     }
695     break;
696     }
697     if (error != RPC_ERROR_NO_ERROR)
698     return error;
699     }
700     return RPC_ERROR_NO_ERROR;
701     }
702    
703     // Receive raw BYTES
704     static inline int _rpc_message_recv_bytes(rpc_message_t *message, unsigned char *bytes, int count)
705     {
706     do {
707     int n = recv(message->socket, bytes, count, 0);
708     if (n > 0) {
709     count -= n;
710     bytes += n;
711     }
712     else if (n == -1 && errno == EINTR)
713     continue;
714     else {
715     #if NON_BLOCKING_IO
716     if (errno == EAGAIN || errno == EWOULDBLOCK) {
717     // wait for data to arrive
718 gbeauche 1.2 fd_set rfds;
719     FD_ZERO(&rfds);
720     FD_SET(message->socket, &rfds);
721     int ret = select(message->socket + 1, &rfds, NULL, NULL, NULL);
722     if (ret > 0)
723 gbeauche 1.1 continue;
724     }
725     #endif
726     return RPC_ERROR_ERRNO_SET;
727     }
728     } while (count > 0);
729     return RPC_ERROR_NO_ERROR;
730     }
731    
732     int rpc_message_recv_bytes(rpc_message_t *message, unsigned char *bytes, int count)
733     {
734     return _rpc_message_recv_bytes(message, bytes, count);
735     }
736    
737     // Receive CHAR
738     int rpc_message_recv_char(rpc_message_t *message, char *ret)
739     {
740     char r_value;
741     int error;
742     if ((error = _rpc_message_recv_bytes(message, (unsigned char *)&r_value, sizeof(r_value))) < 0)
743     return error;
744     *ret = r_value;
745     D(bug(" recv CHAR '%c'\n", *ret));
746     return RPC_ERROR_NO_ERROR;
747     }
748    
749     // Receive INT32
750     int rpc_message_recv_int32(rpc_message_t *message, int32_t *ret)
751     {
752     int32_t r_value;
753     int error;
754     if ((error = _rpc_message_recv_bytes(message, (unsigned char *)&r_value, sizeof(r_value))) < 0)
755     return error;
756     *ret = ntohl(r_value);
757     D(bug(" recv INT32 %d\n", *ret));
758     return RPC_ERROR_NO_ERROR;
759     }
760    
761     // Receive UINT32
762     int rpc_message_recv_uint32(rpc_message_t *message, uint32_t *ret)
763     {
764     uint32_t r_value;
765     int error;
766     if ((error = _rpc_message_recv_bytes(message, (unsigned char *)&r_value, sizeof(r_value))) < 0)
767     return error;
768     *ret = ntohl(r_value);
769     D(bug(" recv UINT32 %u\n", *ret));
770     return RPC_ERROR_NO_ERROR;
771     }
772    
773     // Receive STRING
774     int rpc_message_recv_string(rpc_message_t *message, char **ret)
775     {
776     char *str;
777     int length;
778     uint32_t r_value;
779     int error;
780     if ((error = _rpc_message_recv_bytes(message, (unsigned char *)&r_value, sizeof(r_value))) < 0)
781     return error;
782     length = ntohl(r_value);
783     if (length == 0)
784     str = NULL;
785     else {
786     if ((str = (char *)malloc(length + 1)) == NULL)
787     return RPC_ERROR_NO_MEMORY;
788     if ((error = _rpc_message_recv_bytes(message, (unsigned char *)str, length)) < 0)
789     return error;
790     str[length] = '\0';
791     }
792     *ret = str;
793     D(bug(" recv STRING \"%s\"\n", *ret));
794     return RPC_ERROR_NO_ERROR;
795     }
796    
797     // Receive message arguments
798     static int rpc_message_recv_args(rpc_message_t *message, va_list args)
799     {
800     int expected_type, error;
801     rpc_message_descriptor_t *desc;
802    
803     while ((expected_type = va_arg(args, int)) != RPC_TYPE_INVALID) {
804     void *p_value = va_arg(args, void *);
805     int32_t type;
806     if ((error = rpc_message_recv_int32(message, &type)) < 0)
807     return error;
808     if (type != expected_type)
809     return RPC_ERROR_MESSAGE_ARGUMENT_MISMATCH;
810     switch (type) {
811     case RPC_TYPE_CHAR:
812     error = rpc_message_recv_char(message, (char *)p_value);
813     break;
814     case RPC_TYPE_BOOLEAN:
815     case RPC_TYPE_INT32:
816     error = rpc_message_recv_int32(message, (int32_t *)p_value);
817     break;
818     case RPC_TYPE_UINT32:
819     error = rpc_message_recv_uint32(message, (uint32_t *)p_value);
820     break;
821     case RPC_TYPE_STRING:
822     error = rpc_message_recv_string(message, (char **)p_value);
823     break;
824     case RPC_TYPE_ARRAY: {
825     int i;
826     int32_t array_type;
827     uint32_t array_size;
828     if ((error = rpc_message_recv_int32(message, &array_type)) < 0)
829     return error;
830     if ((error = rpc_message_recv_uint32(message, &array_size)) < 0)
831     return error;
832     p_value = va_arg(args, void *);
833     *((uint32_t *)p_value) = array_size;
834     p_value = va_arg(args, void *);
835     switch (array_type) {
836     case RPC_TYPE_CHAR: {
837     unsigned char *array;
838     if ((array = (unsigned char *)malloc(array_size * sizeof(*array))) == NULL)
839     return RPC_ERROR_NO_MEMORY;
840     error = _rpc_message_recv_bytes(message, array, array_size);
841     if (error != RPC_ERROR_NO_ERROR)
842     return error;
843     *((void **)p_value) = (void *)array;
844     break;
845     }
846     case RPC_TYPE_BOOLEAN:
847     case RPC_TYPE_INT32: {
848     int *array;
849     if ((array = (int *)malloc(array_size * sizeof(*array))) == NULL)
850     return RPC_ERROR_NO_MEMORY;
851     for (i = 0; i < array_size; i++) {
852     int32_t value;
853     if ((error = rpc_message_recv_int32(message, &value)) < 0)
854     return error;
855     array[i] = value;
856     }
857     *((void **)p_value) = (void *)array;
858     break;
859     }
860     case RPC_TYPE_UINT32: {
861     unsigned int *array;
862     if ((array = (unsigned int *)malloc(array_size * sizeof(*array))) == NULL)
863     return RPC_ERROR_NO_MEMORY;
864     for (i = 0; i < array_size; i++) {
865     uint32_t value;
866     if ((error = rpc_message_recv_uint32(message, &value)) < 0)
867     return error;
868     array[i] = value;
869     }
870     *((void **)p_value) = (void *)array;
871     break;
872     }
873     case RPC_TYPE_STRING: {
874     char **array;
875     if ((array = (char **)malloc(array_size * sizeof(*array))) == NULL)
876     return RPC_ERROR_NO_MEMORY;
877     for (i = 0; i < array_size; i++) {
878     char *str;
879     if ((error = rpc_message_recv_string(message, &str)) < 0)
880     return error;
881     array[i] = str;
882     }
883     *((void **)p_value) = (void *)array;
884     break;
885     }
886     default:
887     if ((desc = rpc_message_find_descriptor(array_type)) != NULL) {
888     char *array;
889     if ((array = (char *)malloc(array_size * desc->size)) == NULL)
890     return RPC_ERROR_NO_MEMORY;
891     for (i = 0; i < array_size; i++) {
892     if ((error = desc->recv_callback(message, &array[i * desc->size])) < 0)
893     return error;
894     }
895     *((void **)p_value) = array;
896     }
897     else {
898     fprintf(stderr, "unknown array arg type %d to receive\n", type);
899     error = RPC_ERROR_MESSAGE_ARGUMENT_UNKNOWN;
900     }
901     break;
902     }
903     break;
904     }
905     default:
906     if ((desc = rpc_message_find_descriptor(type)) != NULL)
907     error = desc->recv_callback(message, p_value);
908     else {
909     fprintf(stderr, "unknown arg type %d to send\n", type);
910     error = RPC_ERROR_MESSAGE_ARGUMENT_UNKNOWN;
911     }
912     break;
913     }
914     if (error != RPC_ERROR_NO_ERROR)
915     return error;
916     }
917     return RPC_ERROR_NO_ERROR;
918     }
919    
920     // Skip message argument
921     static int rpc_message_skip_arg(rpc_message_t *message, int type)
922     {
923     unsigned char dummy[BUFSIZ];
924     int error = RPC_ERROR_GENERIC;
925     switch (type) {
926     case RPC_TYPE_CHAR:
927     error = _rpc_message_recv_bytes(message, dummy, 1);
928     break;
929     case RPC_TYPE_BOOLEAN:
930     case RPC_TYPE_INT32:
931     case RPC_TYPE_UINT32:
932     error = _rpc_message_recv_bytes(message, dummy, 4);
933     break;
934     case RPC_TYPE_STRING: {
935     int32_t length;
936     if ((error = rpc_message_recv_int32(message, &length)) < 0)
937     return error;
938     while (length >= sizeof(dummy)) {
939     if ((error = _rpc_message_recv_bytes(message, dummy, sizeof(dummy))) < 0)
940     return error;
941     length -= sizeof(dummy);
942     }
943     if (length > 0) {
944     if ((error = _rpc_message_recv_bytes(message, dummy, length)) < 0)
945     return error;
946     }
947     break;
948     }
949     default:
950     fprintf(stderr, "unknown arg type %d to receive\n", type);
951     break;
952     }
953     return error;
954     }
955    
956     // Dispatch message received in the server loop
957     int rpc_dispatch(rpc_connection_t *connection)
958     {
959     rpc_message_t message;
960     rpc_message_init(&message, connection);
961    
962     int32_t method, value, ret = RPC_MESSAGE_FAILURE;
963     if (rpc_message_recv_int32(&message, &value) != RPC_ERROR_NO_ERROR &&
964     value != RPC_MESSAGE_START)
965     return ret;
966    
967     D(bug("receiving message\n"));
968     if (rpc_message_recv_int32(&message, &method) == RPC_ERROR_NO_ERROR &&
969     connection->callbacks != NULL) {
970     int i;
971     for (i = 0; i < connection->n_callbacks; i++) {
972     if (connection->callbacks[i].id == method) {
973     if (connection->callbacks[i].callback &&
974     connection->callbacks[i].callback(connection) == RPC_ERROR_NO_ERROR) {
975     if (rpc_message_recv_int32(&message, &value) == RPC_ERROR_NO_ERROR && value == RPC_MESSAGE_END)
976     ret = RPC_MESSAGE_ACK;
977     else {
978     fprintf(stderr, "corrupted message handler %d\n", method);
979     for (;;) {
980     if (rpc_message_skip_arg(&message, value) != RPC_ERROR_NO_ERROR)
981     break;
982     if (rpc_message_recv_int32(&message, &value) != RPC_ERROR_NO_ERROR)
983     break;
984     if (value == RPC_MESSAGE_END)
985     break;
986     }
987     }
988     break;
989     }
990     }
991     }
992     }
993     rpc_message_send_int32(&message, ret);
994     rpc_message_flush(&message);
995     D(bug(" -- message received\n"));
996     return ret == RPC_MESSAGE_ACK ? method : ret;
997     }
998    
999    
1000     /* ====================================================================== */
1001     /* === Method Callbacks Handling === */
1002     /* ====================================================================== */
1003    
1004     // Add a user-defined method callback (server side)
1005     static int rpc_method_add_callback(rpc_connection_t *connection, const rpc_method_descriptor_t *desc)
1006     {
1007     const int N_ENTRIES_ALLOC = 8;
1008     int i;
1009    
1010     // pre-allocate up to N_ENTRIES_ALLOC entries
1011     if (connection->callbacks == NULL) {
1012     if ((connection->callbacks = (rpc_method_descriptor_t *)calloc(N_ENTRIES_ALLOC, sizeof(connection->callbacks[0]))) == NULL)
1013     return RPC_ERROR_NO_MEMORY;
1014     connection->n_callbacks = N_ENTRIES_ALLOC;
1015     }
1016    
1017     // look for a free slot
1018     for (i = connection->n_callbacks - 1; i >= 0; i--) {
1019     if (connection->callbacks[i].callback == NULL)
1020     break;
1021     }
1022    
1023     // none found, reallocate
1024     if (i < 0) {
1025     if ((connection->callbacks = (rpc_method_descriptor_t *)realloc(connection->callbacks, (connection->n_callbacks + N_ENTRIES_ALLOC) * sizeof(connection->callbacks[0]))) == NULL)
1026     return RPC_ERROR_NO_MEMORY;
1027     i = connection->n_callbacks;
1028     memset(&connection->callbacks[i], 0, N_ENTRIES_ALLOC * sizeof(connection->callbacks[0]));
1029     connection->n_callbacks += N_ENTRIES_ALLOC;
1030     }
1031    
1032     D(bug("rpc_method_add_callback for method %d in slot %d\n", desc->id, i));
1033     connection->callbacks[i] = *desc;
1034     return RPC_ERROR_NO_ERROR;
1035     }
1036    
1037     // Add user-defined method callbacks (server side)
1038     int rpc_method_add_callbacks(rpc_connection_t *connection, const rpc_method_descriptor_t *descs, int n_descs)
1039     {
1040     D(bug("rpc_method_add_callbacks\n"));
1041    
1042     if (connection == NULL)
1043     return RPC_ERROR_CONNECTION_NULL;
1044     if (connection->type != RPC_CONNECTION_SERVER)
1045     return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
1046    
1047     while (--n_descs >= 0) {
1048     int error = rpc_method_add_callback(connection, &descs[n_descs]);
1049     if (error != RPC_ERROR_NO_ERROR)
1050     return error;
1051     }
1052    
1053     return RPC_ERROR_NO_ERROR;
1054     }
1055    
1056     // Remove a user-defined method callback (common code)
1057     int rpc_method_remove_callback_id(rpc_connection_t *connection, int id)
1058     {
1059     D(bug("rpc_method_remove_callback_id\n"));
1060    
1061     if (connection->callbacks) {
1062     int i;
1063     for (i = 0; i < connection->n_callbacks; i++) {
1064     if (connection->callbacks[i].id == id) {
1065     connection->callbacks[i].callback = NULL;
1066     return RPC_ERROR_NO_ERROR;
1067     }
1068     }
1069     }
1070    
1071     return RPC_ERROR_GENERIC;
1072     }
1073    
1074     // Remove user-defined method callbacks (server side)
1075     int rpc_method_remove_callbacks(rpc_connection_t *connection, const rpc_method_descriptor_t *callbacks, int n_callbacks)
1076     {
1077     D(bug("rpc_method_remove_callbacks\n"));
1078    
1079     if (connection == NULL)
1080     return RPC_ERROR_CONNECTION_NULL;
1081     if (connection->type != RPC_CONNECTION_SERVER)
1082     return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
1083    
1084     while (--n_callbacks >= 0) {
1085     int error = rpc_method_remove_callback_id(connection, callbacks[n_callbacks].id);
1086     if (error != RPC_ERROR_NO_ERROR)
1087     return error;
1088     }
1089    
1090     return RPC_ERROR_NO_ERROR;
1091     }
1092    
1093    
1094     /* ====================================================================== */
1095     /* === Remote Procedure Call (method invocation) === */
1096     /* ====================================================================== */
1097    
1098     // Invoke remote procedure (client side)
1099     int rpc_method_invoke(rpc_connection_t *connection, int method, ...)
1100     {
1101     D(bug("rpc_method_invoke method=%d\n", method));
1102    
1103     rpc_message_t message;
1104     int error;
1105     va_list args;
1106    
1107     if (connection == NULL)
1108     return RPC_ERROR_CONNECTION_NULL;
1109     if (connection->type != RPC_CONNECTION_CLIENT)
1110     return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
1111    
1112     rpc_message_init(&message, connection);
1113     error = rpc_message_send_int32(&message, RPC_MESSAGE_START);
1114     if (error != RPC_ERROR_NO_ERROR)
1115     return error;
1116     error = rpc_message_send_int32(&message, method);
1117     if (error != RPC_ERROR_NO_ERROR)
1118     return error;
1119     va_start(args, method);
1120     error = rpc_message_send_args(&message, args);
1121     va_end(args);
1122     if (error != RPC_ERROR_NO_ERROR)
1123     return error;
1124     error = rpc_message_send_int32(&message, RPC_MESSAGE_END);
1125     if (error != RPC_ERROR_NO_ERROR)
1126     return error;
1127     error = rpc_message_flush(&message);
1128     if (error != RPC_ERROR_NO_ERROR)
1129     return error;
1130     return RPC_ERROR_NO_ERROR;
1131     }
1132    
1133     // Retrieve procedure arguments (server side)
1134     int rpc_method_get_args(rpc_connection_t *connection, ...)
1135     {
1136     D(bug("rpc_method_get_args\n"));
1137    
1138     int error;
1139     va_list args;
1140     rpc_message_t message;
1141    
1142     if (connection == NULL)
1143     return RPC_ERROR_CONNECTION_NULL;
1144     if (connection->type != RPC_CONNECTION_SERVER)
1145     return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
1146    
1147     rpc_message_init(&message, connection);
1148     va_start(args, connection);
1149     error = rpc_message_recv_args(&message, args);
1150     va_end(args);
1151    
1152     return error;
1153     }
1154    
1155     // Wait for a reply from the remote procedure (client side)
1156     int rpc_method_wait_for_reply(rpc_connection_t *connection, ...)
1157     {
1158     D(bug("rpc_method_wait_for_reply\n"));
1159    
1160     int error, type;
1161     int32_t ret;
1162     va_list args;
1163     rpc_message_t message;
1164    
1165     if (connection == NULL)
1166     return RPC_ERROR_CONNECTION_NULL;
1167     if (connection->type != RPC_CONNECTION_CLIENT)
1168     return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
1169    
1170     rpc_connection_set_status(connection, RPC_STATUS_BUSY);
1171    
1172     rpc_message_init(&message, connection);
1173     va_start(args, connection);
1174     type = va_arg(args, int);
1175     va_end(args);
1176    
1177     if (type != RPC_TYPE_INVALID) {
1178     error = rpc_message_recv_int32(&message, &ret);
1179     if (error != RPC_ERROR_NO_ERROR)
1180     return_error(error);
1181     if (ret != RPC_MESSAGE_REPLY) {
1182     D(bug("TRUNCATED 1 [%d]\n", ret));
1183     return_error(RPC_ERROR_MESSAGE_TRUNCATED);
1184     }
1185     va_start(args, connection);
1186     error = rpc_message_recv_args(&message, args);
1187     va_end(args);
1188     if (error != RPC_ERROR_NO_ERROR)
1189     return_error(error);
1190     error = rpc_message_recv_int32(&message, &ret);
1191     if (error != RPC_ERROR_NO_ERROR)
1192     return_error(error);
1193     if (ret != RPC_MESSAGE_END) {
1194     D(bug("TRUNCATED 2 [%d]\n", ret));
1195     return_error(RPC_ERROR_MESSAGE_TRUNCATED);
1196     }
1197     }
1198    
1199     error = rpc_message_recv_int32(&message, &ret);
1200     if (error != RPC_ERROR_NO_ERROR)
1201     return_error(error);
1202     if (ret != RPC_MESSAGE_ACK) {
1203     D(bug("TRUNCATED 3 [%d]\n", ret));
1204     return_error(RPC_ERROR_MESSAGE_TRUNCATED);
1205     }
1206    
1207     return_error(RPC_ERROR_NO_ERROR);
1208    
1209     do_return:
1210     rpc_connection_set_status(connection, RPC_STATUS_IDLE);
1211     return error;
1212     }
1213    
1214     // Send a reply to the client (server side)
1215     int rpc_method_send_reply(rpc_connection_t *connection, ...)
1216     {
1217     D(bug("rpc_method_send_reply\n"));
1218    
1219     rpc_message_t message;
1220     int error;
1221     va_list args;
1222    
1223     if (connection == NULL)
1224     return RPC_ERROR_GENERIC;
1225     if (connection->type != RPC_CONNECTION_SERVER)
1226     return RPC_ERROR_GENERIC;
1227    
1228     rpc_message_init(&message, connection);
1229     error = rpc_message_send_int32(&message, RPC_MESSAGE_REPLY);
1230     if (error != RPC_ERROR_NO_ERROR)
1231     return error;
1232     va_start(args, connection);
1233     error = rpc_message_send_args(&message, args);
1234     va_end(args);
1235     if (error != RPC_ERROR_NO_ERROR)
1236     return error;
1237     error = rpc_message_send_int32(&message, RPC_MESSAGE_END);
1238     if (error != RPC_ERROR_NO_ERROR)
1239     return error;
1240     error = rpc_message_flush(&message);
1241     if (error != RPC_ERROR_NO_ERROR)
1242     return error;
1243     return RPC_ERROR_NO_ERROR;
1244     }