ViewVC Help
View File | Revision Log | Show Annotations | Revision Graph | Root Listing
root/cebix/BasiliskII/src/Unix/rpc_unix.cpp
Revision: 1.3
Committed: 2006-04-18T21:24:12Z (18 years, 7 months ago) by gbeauche
Branch: MAIN
Changes since 1.2: +25 -8 lines
Log Message:
Implement rpc_wait_dispatch()

File Contents

# User Rev Content
1 gbeauche 1.1 /*
2     * rpc_unix.cpp - Remote Procedure Calls, Unix specific backend
3     *
4     * Basilisk II (C) 1997-2006 Christian Bauer
5     * Contributed by Gwenole Beauchesne
6     *
7     * This program is free software; you can redistribute it and/or modify
8     * it under the terms of the GNU General Public License as published by
9     * the Free Software Foundation; either version 2 of the License, or
10     * (at your option) any later version.
11     *
12     * This program is distributed in the hope that it will be useful,
13     * but WITHOUT ANY WARRANTY; without even the implied warranty of
14     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15     * GNU General Public License for more details.
16     *
17     * You should have received a copy of the GNU General Public License
18     * along with this program; if not, write to the Free Software
19     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20     */
21    
22     /*
23     * NOTES:
24     * - this is subject to rewrite but the API is to be kept intact
25     * - this RPC system is very minimal and only suited for 1:1 communication
26     *
27     * TODO:
28     * - better failure conditions
29     * - windows rpc
30     */
31    
32     #include "sysdeps.h"
33    
34     #include <errno.h>
35     #include <fcntl.h>
36     #include <stdarg.h>
37     #include <unistd.h>
38     #include <sys/types.h>
39     #include <sys/socket.h>
40     #include <sys/un.h>
41     #include <sys/wait.h>
42     #include <netinet/in.h>
43     #include <pthread.h>
44    
45     #include "rpc.h"
46    
47     #define DEBUG 0
48     #include "debug.h"
49    
50     #define NON_BLOCKING_IO 0
51    
52 gbeauche 1.2 #if defined __linux__
53     #define USE_ABSTRACT_NAMESPACES 1
54     #endif
55    
56 gbeauche 1.1
57     /* ====================================================================== */
58     /* === RPC Connection Handling === */
59     /* ====================================================================== */
60    
61     // Connection type
62     enum {
63     RPC_CONNECTION_SERVER,
64     RPC_CONNECTION_CLIENT,
65     };
66    
67     // Connection status
68     enum {
69     RPC_STATUS_IDLE,
70     RPC_STATUS_BUSY,
71     };
72    
73     // Client / Server connection
74     struct rpc_connection_t {
75     int type;
76     int status;
77     int socket;
78 gbeauche 1.2 char *socket_path;
79 gbeauche 1.1 int server_socket;
80     int server_thread_active;
81     pthread_t server_thread;
82     rpc_method_descriptor_t *callbacks;
83     int n_callbacks;
84     int send_offset;
85     char send_buffer[BUFSIZ];
86     };
87    
88     #define return_error(ERROR) do { error = (ERROR); goto do_return; } while (0)
89    
90     // Set connection status (XXX protect connection with a lock?)
91     static inline void rpc_connection_set_status(rpc_connection_t *connection, int status)
92     {
93     connection->status = status;
94     }
95    
96     // Returns TRUE if the connection is busy (e.g. waiting for a reply)
97     int rpc_connection_busy(rpc_connection_t *connection)
98     {
99     return connection && connection->status == RPC_STATUS_BUSY;
100     }
101    
102 gbeauche 1.2 // Prepare socket path for addr.sun_path[]
103     static int _rpc_socket_path(char **pathp, const char *ident)
104     {
105     int i, len;
106     len = strlen(ident);
107    
108     if (pathp == NULL)
109     return 0;
110    
111     char *path;
112     #if USE_ABSTRACT_NAMESPACES
113     const int len_bias = 1;
114     if ((path = (char *)malloc(len + len_bias + 1)) == NULL)
115     return 0;
116     path[0] = 0;
117     strcpy(&path[len_bias], ident);
118     #else
119     const int len_bias = 5;
120     if ((path = (char *)malloc(len + len_bias + 1)) == NULL)
121     return 0;
122     strcpy(path, "/tmp/");
123     for (i = 0; i < len; i++) {
124     char ch = ident[i];
125     if (ch == '/')
126     ch = '_';
127     path[len_bias + i] = ch;
128     }
129     #endif
130     len += len_bias;
131     path[len] = '\0';
132     if (*pathp)
133     free(*pathp);
134     *pathp = path;
135     return len;
136     }
137    
138 gbeauche 1.1 // Initialize server-side RPC system
139     rpc_connection_t *rpc_init_server(const char *ident)
140     {
141     D(bug("rpc_init_server ident='%s'\n", ident));
142    
143     rpc_connection_t *connection;
144     struct sockaddr_un addr;
145 gbeauche 1.2 socklen_t addr_len;
146 gbeauche 1.1
147     if (ident == NULL)
148     return NULL;
149    
150     connection = (rpc_connection_t *)malloc(sizeof(*connection));
151     if (connection == NULL)
152     return NULL;
153     connection->type = RPC_CONNECTION_SERVER;
154     connection->status = RPC_STATUS_IDLE;
155     connection->socket = -1;
156     connection->server_thread_active = 0;
157     connection->callbacks = NULL;
158     connection->n_callbacks = 0;
159    
160     if ((connection->server_socket = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
161     perror("server socket");
162     free(connection);
163     return NULL;
164     }
165    
166     memset(&addr, 0, sizeof(addr));
167     addr.sun_family = AF_UNIX;
168 gbeauche 1.2 connection->socket_path = NULL;
169     addr_len = _rpc_socket_path(&connection->socket_path, ident);
170     memcpy(&addr.sun_path[0], connection->socket_path, addr_len);
171     addr_len += sizeof(struct sockaddr_un) - sizeof(addr.sun_path);
172    
173     if (bind(connection->server_socket, (struct sockaddr *)&addr, addr_len) < 0) {
174 gbeauche 1.1 perror("server bind");
175     free(connection);
176     return NULL;
177     }
178    
179     if (listen(connection->server_socket, 1) < 0) {
180     perror("server listen");
181     free(connection);
182     return NULL;
183     }
184    
185     return connection;
186     }
187    
188     // Initialize client-side RPC system
189     rpc_connection_t *rpc_init_client(const char *ident)
190     {
191     D(bug("rpc_init_client ident='%s'\n", ident));
192    
193     rpc_connection_t *connection;
194     struct sockaddr_un addr;
195 gbeauche 1.2 socklen_t addr_len;
196 gbeauche 1.1
197     if (ident == NULL)
198     return NULL;
199    
200     connection = (rpc_connection_t *)malloc(sizeof(*connection));
201     if (connection == NULL)
202     return NULL;
203     connection->type = RPC_CONNECTION_CLIENT;
204     connection->status = RPC_STATUS_IDLE;
205     connection->server_socket = -1;
206     connection->callbacks = NULL;
207     connection->n_callbacks = 0;
208    
209     if ((connection->socket = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) {
210     perror("client socket");
211     free(connection);
212     return NULL;
213     }
214    
215     memset(&addr, 0, sizeof(addr));
216     addr.sun_family = AF_UNIX;
217 gbeauche 1.2 connection->socket_path = NULL;
218     addr_len = _rpc_socket_path(&connection->socket_path, ident);
219     memcpy(&addr.sun_path[0], connection->socket_path, addr_len);
220     addr_len += sizeof(struct sockaddr_un) - sizeof(addr.sun_path);
221    
222 gbeauche 1.1 // Wait at most 5 seconds for server to initialize
223     const int N_CONNECT_WAIT_DELAY = 10;
224     int n_connect_attempts = 5000 / N_CONNECT_WAIT_DELAY;
225     if (n_connect_attempts == 0)
226     n_connect_attempts = 1;
227     while (n_connect_attempts > 0) {
228 gbeauche 1.2 if (connect(connection->socket, (struct sockaddr *)&addr, addr_len) == 0)
229 gbeauche 1.1 break;
230 gbeauche 1.2 if (n_connect_attempts > 1 && errno != ECONNREFUSED && errno != ENOENT) {
231 gbeauche 1.1 perror("client_connect");
232     free(connection);
233     return NULL;
234     }
235     n_connect_attempts--;
236     usleep(N_CONNECT_WAIT_DELAY);
237     }
238     if (n_connect_attempts == 0) {
239     free(connection);
240     return NULL;
241     }
242    
243     return connection;
244     }
245    
246     // Close RPC connection
247     int rpc_exit(rpc_connection_t *connection)
248     {
249     D(bug("rpc_exit\n"));
250    
251     if (connection == NULL)
252     return RPC_ERROR_CONNECTION_NULL;
253    
254 gbeauche 1.2 if (connection->socket_path) {
255     if (connection->socket_path[0])
256     unlink(connection->socket_path);
257     free(connection->socket_path);
258     }
259    
260 gbeauche 1.1 if (connection->type == RPC_CONNECTION_SERVER) {
261     if (connection->server_thread_active) {
262     pthread_cancel(connection->server_thread);
263     pthread_join(connection->server_thread, NULL);
264     }
265     if (connection->socket != -1)
266     close(connection->socket);
267     if (connection->server_socket != -1)
268     close(connection->server_socket);
269     }
270     else {
271     if (connection->socket != -1)
272     close(connection->socket);
273     }
274    
275     if (connection->callbacks)
276     free(connection->callbacks);
277     free(connection);
278    
279     return RPC_ERROR_NO_ERROR;
280     }
281    
282 gbeauche 1.3 // Wait for a message to arrive on the connection port
283     static inline int _rpc_wait_dispatch(rpc_connection_t *connection, int timeout)
284     {
285     struct timeval tv;
286     tv.tv_sec = timeout / 1000000;
287     tv.tv_usec = timeout % 1000000;
288    
289     fd_set rfds;
290     FD_ZERO(&rfds);
291     FD_SET(connection->socket, &rfds);
292     return select(connection->socket + 1, &rfds, NULL, NULL, &tv);
293     }
294    
295     int rpc_wait_dispatch(rpc_connection_t *connection, int timeout)
296     {
297     if (connection == NULL)
298     return RPC_ERROR_CONNECTION_NULL;
299     if (connection->type != RPC_CONNECTION_SERVER)
300     return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
301    
302     return _rpc_wait_dispatch(connection, timeout);
303     }
304    
305     // Process incoming messages in the background
306 gbeauche 1.1 static void *rpc_server_func(void *arg)
307     {
308     rpc_connection_t *connection = (rpc_connection_t *)arg;
309    
310     int ret = rpc_listen_socket(connection);
311     if (ret < 0)
312     return NULL;
313    
314     connection->server_thread_active = 1;
315     for (;;) {
316 gbeauche 1.2 // XXX broken MacOS X doesn't implement cancellation points correctly
317     pthread_testcancel();
318    
319     // wait for data to arrive
320 gbeauche 1.3 int ret = _rpc_wait_dispatch(connection, 50000);
321 gbeauche 1.2 if (ret == 0)
322     continue;
323     if (ret < 0)
324 gbeauche 1.1 break;
325 gbeauche 1.2
326 gbeauche 1.1 rpc_dispatch(connection);
327     }
328     connection->server_thread_active = 0;
329     return NULL;
330     }
331    
332     // Return listen socket of RPC connection
333     int rpc_listen_socket(rpc_connection_t *connection)
334     {
335     D(bug("rpc_listen_socket\n"));
336    
337     if (connection == NULL)
338     return RPC_ERROR_CONNECTION_NULL;
339     if (connection->type != RPC_CONNECTION_SERVER)
340     return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
341    
342     struct sockaddr_un addr;
343     socklen_t addr_len = sizeof(addr);
344     if ((connection->socket = accept(connection->server_socket, (struct sockaddr *)&addr, &addr_len)) < 0) {
345     perror("server accept");
346     return RPC_ERROR_ERRNO_SET;
347     }
348    
349     #if NON_BLOCKING_IO
350     int val = fcntl(connection->socket, F_GETFL, 0);
351     if (val < 0) {
352     perror("server fcntl F_GETFL");
353     return RPC_ERROR_ERRNO_SET;
354     }
355     if (fcntl(connection->socket, F_SETFL, val | O_NONBLOCK) < 0) {
356     perror("server fcntl F_SETFL");
357     return RPC_ERROR_ERRNO_SET;
358     }
359     #endif
360    
361     return connection->socket;
362     }
363    
364     // Listen for incoming messages on RPC connection
365     int rpc_listen(rpc_connection_t *connection)
366     {
367     D(bug("rpc_listen\n"));
368    
369     if (pthread_create(&connection->server_thread, NULL, rpc_server_func, connection) != 0) {
370     perror("server thread");
371     return RPC_ERROR_ERRNO_SET;
372     }
373    
374     return RPC_ERROR_NO_ERROR;
375     }
376    
377    
378     /* ====================================================================== */
379     /* === Message Passing === */
380     /* ====================================================================== */
381    
382     // Message markers
383     enum {
384     RPC_MESSAGE_START = -3000,
385     RPC_MESSAGE_END = -3001,
386     RPC_MESSAGE_ACK = -3002,
387     RPC_MESSAGE_REPLY = -3003,
388     RPC_MESSAGE_FAILURE = -3004,
389     };
390    
391     // Message type
392     struct rpc_message_t {
393     int socket;
394     int offset;
395     unsigned char buffer[BUFSIZ];
396     };
397    
398     // User-defined marshalers
399     static struct {
400     rpc_message_descriptor_t *descs;
401     int last;
402     int count;
403     } g_message_descriptors = { NULL, 0, 0 };
404     static pthread_mutex_t g_message_descriptors_lock = PTHREAD_MUTEX_INITIALIZER;
405    
406     // Add a user-defined marshaler
407     static int rpc_message_add_callback(const rpc_message_descriptor_t *desc)
408     {
409     D(bug("rpc_message_add_callback\n"));
410    
411     const int N_ENTRIES_ALLOC = 8;
412     int error = RPC_ERROR_NO_ERROR;
413    
414     pthread_mutex_lock(&g_message_descriptors_lock);
415     if (g_message_descriptors.descs == NULL) {
416     g_message_descriptors.count = N_ENTRIES_ALLOC;
417     if ((g_message_descriptors.descs = (rpc_message_descriptor_t *)malloc(g_message_descriptors.count * sizeof(g_message_descriptors.descs[0]))) == NULL) {
418     pthread_mutex_unlock(&g_message_descriptors_lock);
419     return RPC_ERROR_NO_MEMORY;
420     }
421     g_message_descriptors.last = 0;
422     }
423     else if (g_message_descriptors.last >= g_message_descriptors.count) {
424     g_message_descriptors.count += N_ENTRIES_ALLOC;
425     if ((g_message_descriptors.descs = (rpc_message_descriptor_t *)realloc(g_message_descriptors.descs, g_message_descriptors.count * sizeof(g_message_descriptors.descs[0]))) == NULL) {
426     pthread_mutex_unlock(&g_message_descriptors_lock);
427     return RPC_ERROR_NO_MEMORY;
428     }
429     }
430    
431     // XXX only one callback per ID
432     int i;
433     for (i = 0; i < g_message_descriptors.last; i++) {
434     if (g_message_descriptors.descs[i].id == desc->id) {
435     pthread_mutex_unlock(&g_message_descriptors_lock);
436     return RPC_ERROR_NO_ERROR;
437     }
438     }
439    
440     g_message_descriptors.descs[g_message_descriptors.last++] = *desc;
441     pthread_mutex_unlock(&g_message_descriptors_lock);
442     return error;
443     }
444    
445     // Add user-defined marshalers
446     int rpc_message_add_callbacks(const rpc_message_descriptor_t *descs, int n_descs)
447     {
448     D(bug("rpc_message_add_callbacks\n"));
449    
450     int i, error;
451     for (i = 0; i < n_descs; i++) {
452     if ((error = rpc_message_add_callback(&descs[i])) < 0)
453     return error;
454     }
455    
456     return RPC_ERROR_NO_ERROR;
457     }
458    
459     // Find user-defined marshaler
460     static rpc_message_descriptor_t *rpc_message_find_descriptor(int id)
461     {
462     D(bug("rpc_message_find_descriptor\n"));
463    
464     if (g_message_descriptors.descs) {
465     int i;
466     for (i = 0; i < g_message_descriptors.count; i++) {
467     if (g_message_descriptors.descs[i].id == id)
468     return &g_message_descriptors.descs[i];
469     }
470     }
471    
472     return NULL;
473     }
474    
475     // Initialize message
476     static inline void rpc_message_init(rpc_message_t *message, rpc_connection_t *connection)
477     {
478     message->socket = connection->socket;
479     message->offset = 0;
480     }
481    
482     // Send BYTES
483     static inline int _rpc_message_send_bytes(rpc_message_t *message, unsigned char *bytes, int count)
484     {
485     if (send(message->socket, bytes, count, 0) != count)
486     return RPC_ERROR_ERRNO_SET;
487     return RPC_ERROR_NO_ERROR;
488     }
489    
490     // Send message on wire
491     static inline int rpc_message_flush(rpc_message_t *message)
492     {
493     int error = _rpc_message_send_bytes(message, message->buffer, message->offset);
494     message->offset = 0;
495     return error;
496     }
497    
498     // Send BYTES (public interface, may need to flush internal buffer)
499     int rpc_message_send_bytes(rpc_message_t *message, unsigned char *bytes, int count)
500     {
501     if (message->offset > 0) {
502     int error = rpc_message_flush(message);
503     if (error != RPC_ERROR_NO_ERROR)
504     return error;
505     }
506     return _rpc_message_send_bytes(message, bytes, count);
507     }
508    
509     // Send BYTES (buffered)
510     static inline void _rpc_message_send_bytes_buffered(rpc_message_t *message, unsigned char *bytes, int count)
511     {
512     memcpy(&message->buffer[message->offset], bytes, count);
513     message->offset += count;
514     }
515    
516     // Send CHAR
517     int rpc_message_send_char(rpc_message_t *message, char c)
518     {
519     D(bug(" send CHAR '%c'\n", c));
520    
521     unsigned char e_value = c;
522     if (message->offset + sizeof(e_value) >= sizeof(message->buffer)) {
523     int error = rpc_message_flush(message);
524     if (error != RPC_ERROR_NO_ERROR)
525     return error;
526     }
527     _rpc_message_send_bytes_buffered(message, (unsigned char *)&e_value, sizeof(e_value));
528     return RPC_ERROR_NO_ERROR;
529     }
530    
531     // Send INT32
532     int rpc_message_send_int32(rpc_message_t *message, int32_t value)
533     {
534     D(bug(" send INT32 %d\n", value));
535    
536     int32_t e_value = htonl(value);
537     if (message->offset + sizeof(e_value) >= sizeof(message->buffer)) {
538     int error = rpc_message_flush(message);
539     if (error != RPC_ERROR_NO_ERROR)
540     return error;
541     }
542     _rpc_message_send_bytes_buffered(message, (unsigned char *)&e_value, sizeof(e_value));
543     return RPC_ERROR_NO_ERROR;
544     }
545    
546     // Send UINT32
547     int rpc_message_send_uint32(rpc_message_t *message, uint32_t value)
548     {
549     D(bug(" send UINT32 %u\n", value));
550    
551     uint32_t e_value = htonl(value);
552     if (message->offset + sizeof(e_value) >= sizeof(message->buffer)) {
553     int error = rpc_message_flush(message);
554     if (error != RPC_ERROR_NO_ERROR)
555     return error;
556     }
557     _rpc_message_send_bytes_buffered(message, (unsigned char *)&e_value, sizeof(e_value));
558     return RPC_ERROR_NO_ERROR;
559     }
560    
561     // Send STRING
562     int rpc_message_send_string(rpc_message_t *message, const char *str)
563     {
564     D(bug(" send STRING \"%s\"\n", str));
565    
566     int error, length = str ? strlen(str) : 0;
567     uint32_t e_value = htonl(length);
568     if (message->offset + sizeof(e_value) >= sizeof(message->buffer)) {
569     error = rpc_message_flush(message);
570     if (error != RPC_ERROR_NO_ERROR)
571     return error;
572     }
573     _rpc_message_send_bytes_buffered(message, (unsigned char *)&e_value, sizeof(e_value));
574     error = rpc_message_flush(message);
575     if (error != RPC_ERROR_NO_ERROR)
576     return error;
577     D(bug("str=%p\n", str));
578     return _rpc_message_send_bytes(message, (unsigned char *)str, length);
579     }
580    
581     // Send message arguments
582     static int rpc_message_send_args(rpc_message_t *message, va_list args)
583     {
584     int type;
585     rpc_message_descriptor_t *desc;
586     while ((type = va_arg(args, int)) != RPC_TYPE_INVALID) {
587     int error = rpc_message_send_int32(message, type);
588     if (error != RPC_ERROR_NO_ERROR)
589     return error;
590     switch (type) {
591     case RPC_TYPE_CHAR:
592     error = rpc_message_send_char(message, (char )va_arg(args, int));
593     break;
594     case RPC_TYPE_BOOLEAN:
595     case RPC_TYPE_INT32:
596     error = rpc_message_send_int32(message, va_arg(args, int));
597     break;
598     case RPC_TYPE_UINT32:
599     error = rpc_message_send_uint32(message, va_arg(args, unsigned int));
600     break;
601     case RPC_TYPE_STRING:
602     error = rpc_message_send_string(message, va_arg(args, char *));
603     break;
604     case RPC_TYPE_ARRAY: {
605     int i;
606     int array_type = va_arg(args, int32_t);
607     int array_size = va_arg(args, uint32_t);
608     if ((error = rpc_message_send_int32(message, array_type)) < 0)
609     return error;
610     if ((error = rpc_message_send_uint32(message, array_size)) < 0)
611     return error;
612     switch (array_type) {
613     case RPC_TYPE_CHAR: {
614     unsigned char *array = va_arg(args, unsigned char *);
615     error = rpc_message_flush(message);
616     if (error != RPC_ERROR_NO_ERROR)
617     return error;
618     error = _rpc_message_send_bytes(message, array, array_size);
619     break;
620     }
621     case RPC_TYPE_BOOLEAN:
622     case RPC_TYPE_INT32: {
623     int32_t *array = va_arg(args, int32_t *);
624     for (i = 0; i < array_size; i++) {
625     if ((error = rpc_message_send_int32(message, array[i])) < 0)
626     break;
627     }
628     break;
629     }
630     case RPC_TYPE_UINT32: {
631     uint32_t *array = va_arg(args, uint32_t *);
632     for (i = 0; i < array_size; i++) {
633     if ((error = rpc_message_send_uint32(message, array[i])) < 0)
634     break;
635     }
636     break;
637     }
638     case RPC_TYPE_STRING: {
639     char **array = va_arg(args, char **);
640     for (i = 0; i < array_size; i++) {
641     if ((error = rpc_message_send_string(message, array[i])) < 0)
642     break;
643     }
644     break;
645     }
646     default:
647     if ((desc = rpc_message_find_descriptor(array_type)) != NULL) {
648     uint8_t *array = va_arg(args, uint8_t *);
649     for (i = 0; i < array_size; i++) {
650     if ((error = desc->send_callback(message, &array[i * desc->size])) < 0)
651     break;
652     }
653     }
654     else {
655     fprintf(stderr, "unknown array arg type %d to send\n", type);
656     error = RPC_ERROR_MESSAGE_ARGUMENT_UNKNOWN;
657     }
658     break;
659     }
660     break;
661     }
662     default:
663     if ((desc = rpc_message_find_descriptor(type)) != NULL)
664     error = desc->send_callback(message, va_arg(args, uint8_t *));
665     else {
666     fprintf(stderr, "unknown arg type %d to send\n", type);
667     error = RPC_ERROR_MESSAGE_ARGUMENT_UNKNOWN;
668     }
669     break;
670     }
671     if (error != RPC_ERROR_NO_ERROR)
672     return error;
673     }
674     return RPC_ERROR_NO_ERROR;
675     }
676    
677     // Receive raw BYTES
678     static inline int _rpc_message_recv_bytes(rpc_message_t *message, unsigned char *bytes, int count)
679     {
680     do {
681     int n = recv(message->socket, bytes, count, 0);
682     if (n > 0) {
683     count -= n;
684     bytes += n;
685     }
686     else if (n == -1 && errno == EINTR)
687     continue;
688     else {
689     #if NON_BLOCKING_IO
690     if (errno == EAGAIN || errno == EWOULDBLOCK) {
691     // wait for data to arrive
692 gbeauche 1.2 fd_set rfds;
693     FD_ZERO(&rfds);
694     FD_SET(message->socket, &rfds);
695     int ret = select(message->socket + 1, &rfds, NULL, NULL, NULL);
696     if (ret > 0)
697 gbeauche 1.1 continue;
698     }
699     #endif
700     return RPC_ERROR_ERRNO_SET;
701     }
702     } while (count > 0);
703     return RPC_ERROR_NO_ERROR;
704     }
705    
706     int rpc_message_recv_bytes(rpc_message_t *message, unsigned char *bytes, int count)
707     {
708     return _rpc_message_recv_bytes(message, bytes, count);
709     }
710    
711     // Receive CHAR
712     int rpc_message_recv_char(rpc_message_t *message, char *ret)
713     {
714     char r_value;
715     int error;
716     if ((error = _rpc_message_recv_bytes(message, (unsigned char *)&r_value, sizeof(r_value))) < 0)
717     return error;
718     *ret = r_value;
719     D(bug(" recv CHAR '%c'\n", *ret));
720     return RPC_ERROR_NO_ERROR;
721     }
722    
723     // Receive INT32
724     int rpc_message_recv_int32(rpc_message_t *message, int32_t *ret)
725     {
726     int32_t r_value;
727     int error;
728     if ((error = _rpc_message_recv_bytes(message, (unsigned char *)&r_value, sizeof(r_value))) < 0)
729     return error;
730     *ret = ntohl(r_value);
731     D(bug(" recv INT32 %d\n", *ret));
732     return RPC_ERROR_NO_ERROR;
733     }
734    
735     // Receive UINT32
736     int rpc_message_recv_uint32(rpc_message_t *message, uint32_t *ret)
737     {
738     uint32_t r_value;
739     int error;
740     if ((error = _rpc_message_recv_bytes(message, (unsigned char *)&r_value, sizeof(r_value))) < 0)
741     return error;
742     *ret = ntohl(r_value);
743     D(bug(" recv UINT32 %u\n", *ret));
744     return RPC_ERROR_NO_ERROR;
745     }
746    
747     // Receive STRING
748     int rpc_message_recv_string(rpc_message_t *message, char **ret)
749     {
750     char *str;
751     int length;
752     uint32_t r_value;
753     int error;
754     if ((error = _rpc_message_recv_bytes(message, (unsigned char *)&r_value, sizeof(r_value))) < 0)
755     return error;
756     length = ntohl(r_value);
757     if (length == 0)
758     str = NULL;
759     else {
760     if ((str = (char *)malloc(length + 1)) == NULL)
761     return RPC_ERROR_NO_MEMORY;
762     if ((error = _rpc_message_recv_bytes(message, (unsigned char *)str, length)) < 0)
763     return error;
764     str[length] = '\0';
765     }
766     *ret = str;
767     D(bug(" recv STRING \"%s\"\n", *ret));
768     return RPC_ERROR_NO_ERROR;
769     }
770    
771     // Receive message arguments
772     static int rpc_message_recv_args(rpc_message_t *message, va_list args)
773     {
774     int expected_type, error;
775     rpc_message_descriptor_t *desc;
776    
777     while ((expected_type = va_arg(args, int)) != RPC_TYPE_INVALID) {
778     void *p_value = va_arg(args, void *);
779     int32_t type;
780     if ((error = rpc_message_recv_int32(message, &type)) < 0)
781     return error;
782     if (type != expected_type)
783     return RPC_ERROR_MESSAGE_ARGUMENT_MISMATCH;
784     switch (type) {
785     case RPC_TYPE_CHAR:
786     error = rpc_message_recv_char(message, (char *)p_value);
787     break;
788     case RPC_TYPE_BOOLEAN:
789     case RPC_TYPE_INT32:
790     error = rpc_message_recv_int32(message, (int32_t *)p_value);
791     break;
792     case RPC_TYPE_UINT32:
793     error = rpc_message_recv_uint32(message, (uint32_t *)p_value);
794     break;
795     case RPC_TYPE_STRING:
796     error = rpc_message_recv_string(message, (char **)p_value);
797     break;
798     case RPC_TYPE_ARRAY: {
799     int i;
800     int32_t array_type;
801     uint32_t array_size;
802     if ((error = rpc_message_recv_int32(message, &array_type)) < 0)
803     return error;
804     if ((error = rpc_message_recv_uint32(message, &array_size)) < 0)
805     return error;
806     p_value = va_arg(args, void *);
807     *((uint32_t *)p_value) = array_size;
808     p_value = va_arg(args, void *);
809     switch (array_type) {
810     case RPC_TYPE_CHAR: {
811     unsigned char *array;
812     if ((array = (unsigned char *)malloc(array_size * sizeof(*array))) == NULL)
813     return RPC_ERROR_NO_MEMORY;
814     error = _rpc_message_recv_bytes(message, array, array_size);
815     if (error != RPC_ERROR_NO_ERROR)
816     return error;
817     *((void **)p_value) = (void *)array;
818     break;
819     }
820     case RPC_TYPE_BOOLEAN:
821     case RPC_TYPE_INT32: {
822     int *array;
823     if ((array = (int *)malloc(array_size * sizeof(*array))) == NULL)
824     return RPC_ERROR_NO_MEMORY;
825     for (i = 0; i < array_size; i++) {
826     int32_t value;
827     if ((error = rpc_message_recv_int32(message, &value)) < 0)
828     return error;
829     array[i] = value;
830     }
831     *((void **)p_value) = (void *)array;
832     break;
833     }
834     case RPC_TYPE_UINT32: {
835     unsigned int *array;
836     if ((array = (unsigned int *)malloc(array_size * sizeof(*array))) == NULL)
837     return RPC_ERROR_NO_MEMORY;
838     for (i = 0; i < array_size; i++) {
839     uint32_t value;
840     if ((error = rpc_message_recv_uint32(message, &value)) < 0)
841     return error;
842     array[i] = value;
843     }
844     *((void **)p_value) = (void *)array;
845     break;
846     }
847     case RPC_TYPE_STRING: {
848     char **array;
849     if ((array = (char **)malloc(array_size * sizeof(*array))) == NULL)
850     return RPC_ERROR_NO_MEMORY;
851     for (i = 0; i < array_size; i++) {
852     char *str;
853     if ((error = rpc_message_recv_string(message, &str)) < 0)
854     return error;
855     array[i] = str;
856     }
857     *((void **)p_value) = (void *)array;
858     break;
859     }
860     default:
861     if ((desc = rpc_message_find_descriptor(array_type)) != NULL) {
862     char *array;
863     if ((array = (char *)malloc(array_size * desc->size)) == NULL)
864     return RPC_ERROR_NO_MEMORY;
865     for (i = 0; i < array_size; i++) {
866     if ((error = desc->recv_callback(message, &array[i * desc->size])) < 0)
867     return error;
868     }
869     *((void **)p_value) = array;
870     }
871     else {
872     fprintf(stderr, "unknown array arg type %d to receive\n", type);
873     error = RPC_ERROR_MESSAGE_ARGUMENT_UNKNOWN;
874     }
875     break;
876     }
877     break;
878     }
879     default:
880     if ((desc = rpc_message_find_descriptor(type)) != NULL)
881     error = desc->recv_callback(message, p_value);
882     else {
883     fprintf(stderr, "unknown arg type %d to send\n", type);
884     error = RPC_ERROR_MESSAGE_ARGUMENT_UNKNOWN;
885     }
886     break;
887     }
888     if (error != RPC_ERROR_NO_ERROR)
889     return error;
890     }
891     return RPC_ERROR_NO_ERROR;
892     }
893    
894     // Skip message argument
895     static int rpc_message_skip_arg(rpc_message_t *message, int type)
896     {
897     unsigned char dummy[BUFSIZ];
898     int error = RPC_ERROR_GENERIC;
899     switch (type) {
900     case RPC_TYPE_CHAR:
901     error = _rpc_message_recv_bytes(message, dummy, 1);
902     break;
903     case RPC_TYPE_BOOLEAN:
904     case RPC_TYPE_INT32:
905     case RPC_TYPE_UINT32:
906     error = _rpc_message_recv_bytes(message, dummy, 4);
907     break;
908     case RPC_TYPE_STRING: {
909     int32_t length;
910     if ((error = rpc_message_recv_int32(message, &length)) < 0)
911     return error;
912     while (length >= sizeof(dummy)) {
913     if ((error = _rpc_message_recv_bytes(message, dummy, sizeof(dummy))) < 0)
914     return error;
915     length -= sizeof(dummy);
916     }
917     if (length > 0) {
918     if ((error = _rpc_message_recv_bytes(message, dummy, length)) < 0)
919     return error;
920     }
921     break;
922     }
923     default:
924     fprintf(stderr, "unknown arg type %d to receive\n", type);
925     break;
926     }
927     return error;
928     }
929    
930     // Dispatch message received in the server loop
931     int rpc_dispatch(rpc_connection_t *connection)
932     {
933     rpc_message_t message;
934     rpc_message_init(&message, connection);
935    
936     int32_t method, value, ret = RPC_MESSAGE_FAILURE;
937     if (rpc_message_recv_int32(&message, &value) != RPC_ERROR_NO_ERROR &&
938     value != RPC_MESSAGE_START)
939     return ret;
940    
941     D(bug("receiving message\n"));
942     if (rpc_message_recv_int32(&message, &method) == RPC_ERROR_NO_ERROR &&
943     connection->callbacks != NULL) {
944     int i;
945     for (i = 0; i < connection->n_callbacks; i++) {
946     if (connection->callbacks[i].id == method) {
947     if (connection->callbacks[i].callback &&
948     connection->callbacks[i].callback(connection) == RPC_ERROR_NO_ERROR) {
949     if (rpc_message_recv_int32(&message, &value) == RPC_ERROR_NO_ERROR && value == RPC_MESSAGE_END)
950     ret = RPC_MESSAGE_ACK;
951     else {
952     fprintf(stderr, "corrupted message handler %d\n", method);
953     for (;;) {
954     if (rpc_message_skip_arg(&message, value) != RPC_ERROR_NO_ERROR)
955     break;
956     if (rpc_message_recv_int32(&message, &value) != RPC_ERROR_NO_ERROR)
957     break;
958     if (value == RPC_MESSAGE_END)
959     break;
960     }
961     }
962     break;
963     }
964     }
965     }
966     }
967     rpc_message_send_int32(&message, ret);
968     rpc_message_flush(&message);
969     D(bug(" -- message received\n"));
970     return ret == RPC_MESSAGE_ACK ? method : ret;
971     }
972    
973    
974     /* ====================================================================== */
975     /* === Method Callbacks Handling === */
976     /* ====================================================================== */
977    
978     // Add a user-defined method callback (server side)
979     static int rpc_method_add_callback(rpc_connection_t *connection, const rpc_method_descriptor_t *desc)
980     {
981     const int N_ENTRIES_ALLOC = 8;
982     int i;
983    
984     // pre-allocate up to N_ENTRIES_ALLOC entries
985     if (connection->callbacks == NULL) {
986     if ((connection->callbacks = (rpc_method_descriptor_t *)calloc(N_ENTRIES_ALLOC, sizeof(connection->callbacks[0]))) == NULL)
987     return RPC_ERROR_NO_MEMORY;
988     connection->n_callbacks = N_ENTRIES_ALLOC;
989     }
990    
991     // look for a free slot
992     for (i = connection->n_callbacks - 1; i >= 0; i--) {
993     if (connection->callbacks[i].callback == NULL)
994     break;
995     }
996    
997     // none found, reallocate
998     if (i < 0) {
999     if ((connection->callbacks = (rpc_method_descriptor_t *)realloc(connection->callbacks, (connection->n_callbacks + N_ENTRIES_ALLOC) * sizeof(connection->callbacks[0]))) == NULL)
1000     return RPC_ERROR_NO_MEMORY;
1001     i = connection->n_callbacks;
1002     memset(&connection->callbacks[i], 0, N_ENTRIES_ALLOC * sizeof(connection->callbacks[0]));
1003     connection->n_callbacks += N_ENTRIES_ALLOC;
1004     }
1005    
1006     D(bug("rpc_method_add_callback for method %d in slot %d\n", desc->id, i));
1007     connection->callbacks[i] = *desc;
1008     return RPC_ERROR_NO_ERROR;
1009     }
1010    
1011     // Add user-defined method callbacks (server side)
1012     int rpc_method_add_callbacks(rpc_connection_t *connection, const rpc_method_descriptor_t *descs, int n_descs)
1013     {
1014     D(bug("rpc_method_add_callbacks\n"));
1015    
1016     if (connection == NULL)
1017     return RPC_ERROR_CONNECTION_NULL;
1018     if (connection->type != RPC_CONNECTION_SERVER)
1019     return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
1020    
1021     while (--n_descs >= 0) {
1022     int error = rpc_method_add_callback(connection, &descs[n_descs]);
1023     if (error != RPC_ERROR_NO_ERROR)
1024     return error;
1025     }
1026    
1027     return RPC_ERROR_NO_ERROR;
1028     }
1029    
1030     // Remove a user-defined method callback (common code)
1031     int rpc_method_remove_callback_id(rpc_connection_t *connection, int id)
1032     {
1033     D(bug("rpc_method_remove_callback_id\n"));
1034    
1035     if (connection->callbacks) {
1036     int i;
1037     for (i = 0; i < connection->n_callbacks; i++) {
1038     if (connection->callbacks[i].id == id) {
1039     connection->callbacks[i].callback = NULL;
1040     return RPC_ERROR_NO_ERROR;
1041     }
1042     }
1043     }
1044    
1045     return RPC_ERROR_GENERIC;
1046     }
1047    
1048     // Remove user-defined method callbacks (server side)
1049     int rpc_method_remove_callbacks(rpc_connection_t *connection, const rpc_method_descriptor_t *callbacks, int n_callbacks)
1050     {
1051     D(bug("rpc_method_remove_callbacks\n"));
1052    
1053     if (connection == NULL)
1054     return RPC_ERROR_CONNECTION_NULL;
1055     if (connection->type != RPC_CONNECTION_SERVER)
1056     return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
1057    
1058     while (--n_callbacks >= 0) {
1059     int error = rpc_method_remove_callback_id(connection, callbacks[n_callbacks].id);
1060     if (error != RPC_ERROR_NO_ERROR)
1061     return error;
1062     }
1063    
1064     return RPC_ERROR_NO_ERROR;
1065     }
1066    
1067    
1068     /* ====================================================================== */
1069     /* === Remote Procedure Call (method invocation) === */
1070     /* ====================================================================== */
1071    
1072     // Invoke remote procedure (client side)
1073     int rpc_method_invoke(rpc_connection_t *connection, int method, ...)
1074     {
1075     D(bug("rpc_method_invoke method=%d\n", method));
1076    
1077     rpc_message_t message;
1078     int error;
1079     va_list args;
1080    
1081     if (connection == NULL)
1082     return RPC_ERROR_CONNECTION_NULL;
1083     if (connection->type != RPC_CONNECTION_CLIENT)
1084     return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
1085    
1086     rpc_message_init(&message, connection);
1087     error = rpc_message_send_int32(&message, RPC_MESSAGE_START);
1088     if (error != RPC_ERROR_NO_ERROR)
1089     return error;
1090     error = rpc_message_send_int32(&message, method);
1091     if (error != RPC_ERROR_NO_ERROR)
1092     return error;
1093     va_start(args, method);
1094     error = rpc_message_send_args(&message, args);
1095     va_end(args);
1096     if (error != RPC_ERROR_NO_ERROR)
1097     return error;
1098     error = rpc_message_send_int32(&message, RPC_MESSAGE_END);
1099     if (error != RPC_ERROR_NO_ERROR)
1100     return error;
1101     error = rpc_message_flush(&message);
1102     if (error != RPC_ERROR_NO_ERROR)
1103     return error;
1104     return RPC_ERROR_NO_ERROR;
1105     }
1106    
1107     // Retrieve procedure arguments (server side)
1108     int rpc_method_get_args(rpc_connection_t *connection, ...)
1109     {
1110     D(bug("rpc_method_get_args\n"));
1111    
1112     int error;
1113     va_list args;
1114     rpc_message_t message;
1115    
1116     if (connection == NULL)
1117     return RPC_ERROR_CONNECTION_NULL;
1118     if (connection->type != RPC_CONNECTION_SERVER)
1119     return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
1120    
1121     rpc_message_init(&message, connection);
1122     va_start(args, connection);
1123     error = rpc_message_recv_args(&message, args);
1124     va_end(args);
1125    
1126     return error;
1127     }
1128    
1129     // Wait for a reply from the remote procedure (client side)
1130     int rpc_method_wait_for_reply(rpc_connection_t *connection, ...)
1131     {
1132     D(bug("rpc_method_wait_for_reply\n"));
1133    
1134     int error, type;
1135     int32_t ret;
1136     va_list args;
1137     rpc_message_t message;
1138    
1139     if (connection == NULL)
1140     return RPC_ERROR_CONNECTION_NULL;
1141     if (connection->type != RPC_CONNECTION_CLIENT)
1142     return RPC_ERROR_CONNECTION_TYPE_MISMATCH;
1143    
1144     rpc_connection_set_status(connection, RPC_STATUS_BUSY);
1145    
1146     rpc_message_init(&message, connection);
1147     va_start(args, connection);
1148     type = va_arg(args, int);
1149     va_end(args);
1150    
1151     if (type != RPC_TYPE_INVALID) {
1152     error = rpc_message_recv_int32(&message, &ret);
1153     if (error != RPC_ERROR_NO_ERROR)
1154     return_error(error);
1155     if (ret != RPC_MESSAGE_REPLY) {
1156     D(bug("TRUNCATED 1 [%d]\n", ret));
1157     return_error(RPC_ERROR_MESSAGE_TRUNCATED);
1158     }
1159     va_start(args, connection);
1160     error = rpc_message_recv_args(&message, args);
1161     va_end(args);
1162     if (error != RPC_ERROR_NO_ERROR)
1163     return_error(error);
1164     error = rpc_message_recv_int32(&message, &ret);
1165     if (error != RPC_ERROR_NO_ERROR)
1166     return_error(error);
1167     if (ret != RPC_MESSAGE_END) {
1168     D(bug("TRUNCATED 2 [%d]\n", ret));
1169     return_error(RPC_ERROR_MESSAGE_TRUNCATED);
1170     }
1171     }
1172    
1173     error = rpc_message_recv_int32(&message, &ret);
1174     if (error != RPC_ERROR_NO_ERROR)
1175     return_error(error);
1176     if (ret != RPC_MESSAGE_ACK) {
1177     D(bug("TRUNCATED 3 [%d]\n", ret));
1178     return_error(RPC_ERROR_MESSAGE_TRUNCATED);
1179     }
1180    
1181     return_error(RPC_ERROR_NO_ERROR);
1182    
1183     do_return:
1184     rpc_connection_set_status(connection, RPC_STATUS_IDLE);
1185     return error;
1186     }
1187    
1188     // Send a reply to the client (server side)
1189     int rpc_method_send_reply(rpc_connection_t *connection, ...)
1190     {
1191     D(bug("rpc_method_send_reply\n"));
1192    
1193     rpc_message_t message;
1194     int error;
1195     va_list args;
1196    
1197     if (connection == NULL)
1198     return RPC_ERROR_GENERIC;
1199     if (connection->type != RPC_CONNECTION_SERVER)
1200     return RPC_ERROR_GENERIC;
1201    
1202     rpc_message_init(&message, connection);
1203     error = rpc_message_send_int32(&message, RPC_MESSAGE_REPLY);
1204     if (error != RPC_ERROR_NO_ERROR)
1205     return error;
1206     va_start(args, connection);
1207     error = rpc_message_send_args(&message, args);
1208     va_end(args);
1209     if (error != RPC_ERROR_NO_ERROR)
1210     return error;
1211     error = rpc_message_send_int32(&message, RPC_MESSAGE_END);
1212     if (error != RPC_ERROR_NO_ERROR)
1213     return error;
1214     error = rpc_message_flush(&message);
1215     if (error != RPC_ERROR_NO_ERROR)
1216     return error;
1217     return RPC_ERROR_NO_ERROR;
1218     }