1 // ROMP - The Ruby Object Message Proxy
  2 // (C) Copyright 2001 Paul Brannan (cout at rm-f.net)
  3 // 
  4 // ROMP is a set of classes for providing distributed object support to a
  5 // Ruby program.  You may distribute and/or modify it under the same terms as
  6 // Ruby (see http://www.ruby-lang.org/en/LICENSE.txt).
  7 
  8 #include <ruby.h>
  9 #include <rubyio.h>
 10 #include <stdint.h>
 11 #include <stdlib.h>
 12 #include <unistd.h>
 13 #include <fcntl.h>
 14 #include <sys/time.h>
 15 
 16 // ---------------------------------------------------------------------------
 17 // Useful macros
 18 // ---------------------------------------------------------------------------
 19 
 20 // TODO: Are these portable?
 21 
 22 #define PUTSHORT(s, buf) \
 23     do { \
 24         *buf = s >> 8; ++buf; \
 25         *buf = s & 0xff; ++buf; \
 26     } while(0)
 27 
 28 #define GETSHORT(s, buf) \
 29     do { \
 30         s = (unsigned char)*buf; ++buf; \
 31         s = (s << 8) | (unsigned char)*buf; ++buf; \
 32     } while(0)
 33 
 34 // ---------------------------------------------------------------------------
 35 // Globals
 36 // ---------------------------------------------------------------------------
 37 
 38 // objects/functions created down below
 39 //
 40 static VALUE rb_mROMP = Qnil;
 41 static VALUE rb_cSession = Qnil;
 42 static VALUE rb_cProxy_Object = Qnil;
 43 static VALUE rb_cServer = Qnil;
 44 static VALUE rb_cObject_Reference = Qnil;
 45 static ID id_object_id;
 46 
 47 // objects/functions created elsewhere
 48 //
 49 static VALUE rb_mMarshal = Qnil;
 50 
 51 static ID id_dump;
 52 static ID id_load;
 53 static ID id_message;
 54 static ID id_backtrace;
 55 static ID id_caller;
 56 static ID id_raise;
 57 static ID id_send;
 58 static ID id_get_object;
 59 static ID id_slice_bang;
 60 static ID id_print_exception;
 61 static ID id_lock;
 62 static ID id_unlock;
 63 
 64 static struct timeval zero_timeval;
 65 
 66 static void init_globals() {
 67     rb_mMarshal = rb_const_get(rb_cObject, rb_intern("Marshal"));
 68 
 69     id_dump = rb_intern("dump");
 70     id_load = rb_intern("load");
 71     id_message = rb_intern("message");
 72     id_backtrace = rb_intern("backtrace");
 73     id_caller = rb_intern("caller");
 74     id_raise = rb_intern("raise");
 75     id_send = rb_intern("send");
 76     id_get_object = rb_intern("get_object");
 77     id_slice_bang = rb_intern("slice!");
 78     id_print_exception = rb_intern("print_exception");
 79     id_lock = rb_intern("lock");
 80     id_unlock = rb_intern("unlock");
 81 
 82     zero_timeval.tv_sec = 0;
 83     zero_timeval.tv_usec = 0;
 84 }
 85 
 86 // ---------------------------------------------------------------------------
 87 // Utility functions
 88 // ---------------------------------------------------------------------------
 89 
 90 // Forward declaration
 91 static VALUE msg_to_obj(VALUE message, VALUE session, VALUE mutex);
 92 
 93 // Create a Ruby string that won't be collected by the GC.  Be very careful
 94 // with this function!
 95 static void create_tmp_ruby_string(struct RString * str, char * buf, size_t len) {
 96     str->basic.flags = T_STRING;
 97     str->basic.klass = rb_cString;
 98     str->ptr = buf;
 99     str->len = len;
100     str->orig = 0; // ?
101 }
102 
103 #define WRITE_HELPER \
104     do { \
105         write_count = write(fd, buf, count); \
106         if(write_count < 0) { \
107             if(errno != EWOULDBLOCK) rb_sys_fail("write"); \
108         } else if(write_count == 0 && count != 0) { \
109             rb_raise(rb_eIOError, "disconnected"); \
110         } else { \
111             count -= write_count; \
112             buf += write_count; \
113             total += write_count; \
114         } \
115     } while(0)
116 
117 #define READ_HELPER \
118     do { \
119         read_count = read(fd, buf, count); \
120         if(read_count < 0) { \
121             if(errno != EWOULDBLOCK) rb_sys_fail("read"); \
122         } else if(read_count == 0 && count != 0) { \
123             rb_raise(rb_eIOError, "disconnected"); \
124         } else { \
125             count -= read_count; \
126             buf += read_count; \
127             total += read_count; \
128         } \
129     } while(0)
130 
131 // Write to an fd and raise an exception if an error occurs
132 static ssize_t ruby_write_throw(int fd, const void * buf, size_t count, int nonblock) {
133     int n;
134     size_t total = 0;
135     ssize_t write_count;
136     fd_set fds, error_fds;
137 
138     if(!nonblock) {
139         FD_ZERO(&fds);
140         FD_SET(fd, &fds);
141         FD_ZERO(&error_fds);
142         FD_SET(fd, &error_fds);
143         n = select(fd + 1, 0, &fds, &fds, 0);
144         if(n > 0) {
145             WRITE_HELPER;
146         }
147     } else {
148         WRITE_HELPER;
149     }
150 
151     while(count > 0) {
152         FD_ZERO(&fds);
153         FD_SET(fd, &fds);
154         FD_ZERO(&error_fds);
155         FD_SET(fd, &error_fds);
156         n = rb_thread_select(fd + 1, 0, &fds, &fds, 0);
157         if(n == -1) {
158             if(errno == EWOULDBLOCK) continue;
159             rb_sys_fail("select");
160         }
161         WRITE_HELPER;
162     };
163     return total;
164 }
165 
166 // Read from an fd and raise an exception if an error occurs
167 static ssize_t ruby_read_throw(int fd, void * buf, size_t count, int nonblock) {
168     int n;
169     size_t total = 0;
170     ssize_t read_count;
171     fd_set fds, error_fds;
172 
173     if(!nonblock) {
174         FD_ZERO(&fds);
175         FD_SET(fd, &fds);
176         FD_ZERO(&error_fds);
177         FD_SET(fd, &error_fds);
178         n = select(fd + 1, &fds, 0, &error_fds, &zero_timeval);
179         if(n > 0) {
180             READ_HELPER;
181         }
182     } else {
183         READ_HELPER;
184     }
185 
186     while(count > 0) {
187         FD_ZERO(&fds);
188         FD_SET(fd, &fds);
189         FD_ZERO(&error_fds);
190         FD_SET(fd, &error_fds);
191         n = rb_thread_select(fd + 1, &fds, 0, &error_fds, 0);
192         if(n == -1) {
193             if(errno == EWOULDBLOCK) continue;
194             rb_sys_fail("select");
195         }
196         READ_HELPER;
197     };
198 
199     return total;
200 }
201 
202 // Return the message of an exception
203 static VALUE ruby_exc_message(VALUE exc) {
204     return rb_funcall(exc, id_message, 0);
205 }
206 
207 // Return the backtrace of an exception
208 static VALUE ruby_exc_backtrace(VALUE exc) {
209     return rb_funcall(exc, id_backtrace, 0);
210 }
211 
212 // Return the current Ruby call stack
213 static VALUE ruby_caller() {
214     // TODO: Why does calling caller() with 0 arguments not work?
215     return rb_funcall(rb_mKernel, id_caller, 1, INT2NUM(0));
216 }
217 
218 // Raise a specific ruby exception with a specific message and backtrace
219 static void ruby_raise(VALUE exc, VALUE msg, VALUE bt) {
220     rb_funcall(rb_mKernel, id_raise, 3, exc, msg, bt);
221 }
222 
223 // Send a message to a Ruby object
224 static VALUE ruby_send(VALUE obj, VALUE msg) {
225     return rb_apply(obj, id_send, msg);
226 }
227 
228 // Call "get_object" on a Ruby object
229 static VALUE ruby_get_object(VALUE obj, VALUE object_id) {
230     return rb_funcall(obj, id_get_object, 1, INT2NUM(object_id));
231 }
232 
233 // Call slice! on a Ruby object
234 static VALUE ruby_slice_bang(VALUE obj, size_t min, size_t max) {
235     VALUE range = rb_range_new(INT2NUM(min), INT2NUM(max), 0);
236     return rb_funcall(obj, id_slice_bang, 1, range);
237 }
238 
239 // Print an exception to the screen using a function defined in the ROMP
240 // module (TODO: wouldn't it be nice if Ruby's error_print were available to
241 // the user?)
242 static void ruby_print_exception(VALUE exc) {
243     rb_funcall(rb_mROMP, id_print_exception, 1, exc);
244 }
245 
246 // Call lock on an object (a mutex, probably)
247 static VALUE ruby_lock(VALUE obj) {
248     return rb_funcall(obj, id_lock, 0);
249 }
250 
251 // Call unlock on an object (a mutex, probably)
252 static VALUE ruby_unlock(VALUE obj) {
253     return rb_funcall(obj, id_unlock, 0);
254 }
255 
256 // ---------------------------------------------------------------------------
257 // Marshalling functions
258 // ---------------------------------------------------------------------------
259 
260 // Take an object as input and return it as a marshalled string.
261 static VALUE marshal_dump(VALUE obj) {
262     return rb_funcall(rb_mMarshal, id_dump, 1, obj);
263 }
264 
265 // Take a marshalled string as input and return it as an object.
266 static VALUE marshal_load(VALUE str) {
267     return rb_funcall(rb_mMarshal, id_load, 1, str);
268 }
269 
270 // ---------------------------------------------------------------------------
271 // Session functions
272 // ---------------------------------------------------------------------------
273 
274 #define ROMP_REQUEST           0x1001
275 #define ROMP_REQUEST_BLOCK     0x1002
276 #define ROMP_ONEWAY            0x1003
277 #define ROMP_ONEWAY_SYNC       0x1004
278 #define ROMP_RETVAL            0x2001
279 #define ROMP_EXCEPTION         0x2002
280 #define ROMP_YIELD             0x2003
281 #define ROMP_SYNC              0x4001
282 #define ROMP_NULL_MSG          0x4002
283 #define ROMP_MSG_START         0x4242
284 #define ROMP_MAX_ID            (1<<16)
285 #define ROMP_MAX_MSG_TYPE      (1<<16)
286 
287 #define ROMP_BUFFER_SIZE       16
288 
289 typedef struct {
290     VALUE io_object;
291     int read_fd, write_fd;
292     char buf[ROMP_BUFFER_SIZE];
293     int nonblock;
294 } ROMP_Session;
295 
296 typedef uint16_t MESSAGE_TYPE_T;
297 typedef uint16_t OBJECT_ID_T;
298 
299 // A ROMP message is broken into 3 components (see romp.rb for more details)
300 typedef struct {
301     MESSAGE_TYPE_T message_type;
302     OBJECT_ID_T object_id;
303     VALUE message_obj;
304 } ROMP_Message;
305 
306 // Send a message to the server with data data and length len.
307 static void send_message_helper(
308         ROMP_Session * session,
309         char * data,
310         size_t len,
311         MESSAGE_TYPE_T message_type,
312         OBJECT_ID_T object_id) {
313 
314     char * buf = session->buf;
315 
316     PUTSHORT(ROMP_MSG_START,    buf);
317     PUTSHORT(len,               buf);
318     PUTSHORT(message_type,      buf);
319     PUTSHORT(object_id,         buf);
320 
321     ruby_write_throw(session->write_fd, session->buf, ROMP_BUFFER_SIZE, session->nonblock);
322     ruby_write_throw(session->write_fd, data, len, session->nonblock);
323 }
324 
325 // Send a message to the server with the data in message.
326 static void send_message(ROMP_Session * session, ROMP_Message * message) {
327     VALUE data;
328     struct RString * data_str;
329 
330     data = marshal_dump(message->message_obj);
331     data_str = RSTRING(data);
332     send_message_helper(
333         session,
334         data_str->ptr,
335         data_str->len,
336         message->message_type,
337         message->object_id);
338 }
339 
340 // Send a null message to the server (no data, data len = 0)
341 static void send_null_message(ROMP_Session * session) {
342     send_message_helper(session, "", 0, ROMP_NULL_MSG, 0);
343 }
344 
345 // Receive a message from the server
346 static void get_message(ROMP_Session * session, ROMP_Message * message) {
347     uint16_t magic          = 0;
348     uint16_t data_len       = 0;
349     char * buf              = 0;
350     // struct RString message_string;
351     VALUE ruby_str;
352 
353     do {
354         buf = session->buf;
355         
356         ruby_read_throw(session->read_fd, buf, ROMP_BUFFER_SIZE, session->nonblock);
357 
358         GETSHORT(magic,                 buf);
359         GETSHORT(data_len,              buf);
360         GETSHORT(message->message_type, buf);
361         GETSHORT(message->object_id,    buf);
362     } while(magic != ROMP_MSG_START);
363 
364     buf = ALLOCA_N(char, data_len);
365     ruby_read_throw(session->read_fd, buf, data_len, session->nonblock);
366     // create_tmp_ruby_string(&message_string, buf, data_len);
367     ruby_str = rb_str_new(buf, data_len);
368 
369     if(message->message_type != ROMP_NULL_MSG) {
370         message->message_obj = marshal_load(ruby_str);
371     } else {
372         message->message_obj = Qnil;
373     }
374 }
375 
376 // Ideally, this function should return true if the server has disconnected,
377 // but currently always returns false.  The server thread will still exit
378 // when the client has disconnected, but currently does so via an exception.
379 static int session_finished(ROMP_Session * session) {
380     // TODO: Detect a disconnection
381     return 0;
382 }
383 
384 // Send a sync message to the server.
385 static void send_sync(ROMP_Session * session) {
386     ROMP_Message message = { ROMP_SYNC, 0, Qnil };
387     send_message(session, &message);
388 }
389 
390 // Wait for a sync response from the server.  Ignore any messages that are
391 // received while waiting for the response.
392 static void wait_sync(ROMP_Session * session) {
393     ROMP_Message message;
394 
395     // sleep(1);
396     get_message(session, &message);
397     if(   message.message_type != ROMP_SYNC
398        && message.object_id != 1
399        && message.message_obj != Qnil) {
400         rb_raise(rb_eRuntimeError, "ROMP synchronization failed");
401     }
402 }
403 
404 // Send a reply to a sync request.
405 static void reply_sync(ROMP_Session * session, int value) {
406     if(value == 0) {
407         ROMP_Message message = { ROMP_SYNC, 1, Qnil };
408         send_message(session, &message);
409     }
410 }
411 
412 // ----------------------------------------------------------------------------
413 // Server functions
414 // ----------------------------------------------------------------------------
415 
416 // We use this structure to pass data to our exception handler.  This is done
417 // by casting a pointer to a Ruby VALUE... not 100% kosher, but it should work.
418 typedef struct {
419     ROMP_Session * session;
420     ROMP_Message * message;
421     VALUE obj;
422     int debug;
423 } Server_Info;
424 
425 // Make a method call into a Ruby object.
426 static VALUE server_funcall(VALUE ruby_server_info) {
427     Server_Info * server_info = (Server_Info *)(ruby_server_info);
428     return ruby_send(server_info->obj, server_info->message->message_obj);
429 }
430 
431 // Send a yield message to the client, indicating that it should call
432 // Kernel#yield with the message that is sent.
433 static VALUE server_send_yield(VALUE retval, VALUE ruby_server_info) {
434     Server_Info * server_info = (Server_Info *)(ruby_server_info);
435 
436     server_info->message->message_type = ROMP_YIELD;
437     server_info->message->object_id = 0;
438     server_info->message->message_obj = retval;
439     send_message(server_info->session, server_info->message);
440 
441     return Qnil;
442 }
443 
444 // Send a return value to the client, indicating that it should return
445 // the message to the caller.
446 static VALUE server_send_retval(VALUE retval, VALUE ruby_server_info) {
447     Server_Info * server_info = (Server_Info *)(ruby_server_info);
448 
449     server_info->message->message_type = ROMP_RETVAL;
450     server_info->message->object_id = 0;
451     server_info->message->message_obj = retval;
452     send_message(server_info->session, server_info->message);
453 
454     return Qnil;
455 }
456 
457 // Send an exception the client, indicating that it should raise an exception.
458 static VALUE server_exception(VALUE ruby_server_info, VALUE exc) {
459     Server_Info * server_info = (Server_Info *)(ruby_server_info);
460     VALUE caller = ruby_caller();
461     VALUE bt = ruby_exc_backtrace(exc);
462 
463     server_info->message->message_type = ROMP_EXCEPTION;
464     server_info->message->object_id = 0;
465     server_info->message->message_obj = exc;
466 
467     // Get rid of extraneous caller information to make debugging easier.
468     ruby_slice_bang(bt, RARRAY(bt)->len - RARRAY(caller)->len - 1, -1);
469 
470     // If debugging is enabled, then print an exception.
471     if(server_info->debug) {
472         ruby_print_exception(exc);
473     }
474 
475     send_message(server_info->session, server_info->message);
476 
477     return Qnil;
478 }
479 
480 // Proces a request from the client and send an appropriate reply.
481 static VALUE server_reply(VALUE ruby_server_info) {
482     Server_Info * server_info = (Server_Info *)(ruby_server_info);
483     VALUE retval;
484     int status;
485 
486     server_info->obj = ruby_get_object(
487         server_info->obj,
488         server_info->message->object_id);
489 
490     // TODO: The client should be able to pass a callback object to the server;
491     // msg_to_obj can create a Proxy_Object, but it needs a session to make
492     // calls over.
493 
494     // Perform the appropriate action based on message type.
495     switch(server_info->message->message_type) {
496         case ROMP_ONEWAY_SYNC:
497             send_null_message(server_info->session);
498             // fallthrough
499  
500         case ROMP_ONEWAY:
501             rb_protect(server_funcall, ruby_server_info, &status);
502             return Qnil;
503 
504         case ROMP_REQUEST:
505             retval = ruby_send(
506                 server_info->obj,
507                 server_info->message->message_obj);
508             break;
509 
510         case ROMP_REQUEST_BLOCK:
511             retval = rb_iterate(
512                 server_funcall, ruby_server_info,
513                 server_send_yield, ruby_server_info);
514             break;
515  
516         case ROMP_SYNC:
517             reply_sync(
518                 server_info->session,
519                 server_info->message->object_id);
520             return Qnil;
521 
522         default:
523             rb_raise(rb_eRuntimeError, "Bad session request");
524     }
525 
526     server_send_retval(retval, ruby_server_info);
527 
528     return Qnil;
529 }
530 
531 // The main server loop.  Wait for a message from the client, route the
532 // message to the appropriate object, send a response and repeat.
533 static void server_loop(ROMP_Session * session, VALUE resolve_server, int dbg) {
534     ROMP_Message message;
535     Server_Info server_info = { session, &message, resolve_server, dbg };
536     VALUE ruby_server_info = (VALUE)(&server_info);
537 
538     while(!session_finished(session)) {
539         get_message(session, &message);
540         rb_rescue2(
541             server_reply, ruby_server_info,
542             server_exception, ruby_server_info, rb_eException, 0);
543         server_info.obj = resolve_server;
544     }
545 }
546 
547 // ----------------------------------------------------------------------------
548 // Client functions
549 // ----------------------------------------------------------------------------
550 
551 // We use this structure to pass data to our client functions by casting it
552 // to a Ruby VALUE (see above note with Server_Info).
553 typedef struct {
554     ROMP_Session * session;
555     VALUE ruby_session;
556     OBJECT_ID_T object_id;
557     VALUE message;
558     VALUE mutex;
559 } Proxy_Object;
560 
561 // Send a request to the server, wait for a response, and perform an action
562 // based on what that response was.  This is not thread-safe, so the caller
563 // should perform any necessary locking
564 static VALUE client_request(VALUE ruby_proxy_object) {
565     Proxy_Object * obj = (Proxy_Object *)(ruby_proxy_object);
566     ROMP_Message msg = {
567         rb_block_given_p() ? ROMP_REQUEST_BLOCK : ROMP_REQUEST,
568         obj->object_id,
569         obj->message
570     };
571     send_message(obj->session, &msg);
572 
573     for(;;) {
574         get_message(obj->session, &msg);
575         switch(msg.message_type) {
576             case ROMP_RETVAL:
577                 return msg_to_obj(msg.message_obj, obj->ruby_session, obj->mutex);
578                 break;
579             case ROMP_YIELD:
580                 rb_yield(msg_to_obj(msg.message_obj, obj->ruby_session, obj->mutex));
581                 break;
582             case ROMP_EXCEPTION: {
583                 ruby_raise(
584                     msg.message_obj,
585                     ruby_exc_message(msg.message_obj),
586                     rb_ary_concat(ruby_exc_backtrace(msg.message_obj), ruby_caller())
587                 );
588                 break;
589             }
590             case ROMP_SYNC:
591                 reply_sync(obj->session, NUM2INT(msg.message_obj));
592                 break;
593             default:
594                 rb_raise(rb_eRuntimeError, "Invalid msg type received");
595         }
596     }
597 }
598 
599 // Send a oneway message to the server.  This is not thread-safe, so the
600 // caller should perform any necessary locking.
601 static VALUE client_oneway(VALUE ruby_proxy_object) {
602     Proxy_Object * obj = (Proxy_Object *)(ruby_proxy_object);
603     ROMP_Message msg = {
604         ROMP_ONEWAY,
605         obj->object_id,
606         obj->message
607     };
608     send_message(obj->session, &msg);
609     return Qnil;
610 }
611 
612 // Send a oneway message to the server and request a message in response.
613 // This is not thread-safe, so the caller should perform any necessary
614 // locking.
615 static VALUE client_oneway_sync(VALUE ruby_proxy_object) {
616     Proxy_Object * obj = (Proxy_Object *)(ruby_proxy_object);
617     ROMP_Message msg = {
618         ROMP_ONEWAY_SYNC,
619         obj->object_id,
620         obj->message
621     };
622     send_message(obj->session, &msg);
623     get_message(obj->session, &msg);
624     return Qnil;
625 }
626 
627 // Synchronize with the server.  This is not thread-safe, so the caller should
628 // perform any necessary locking.
629 static VALUE client_sync(VALUE ruby_proxy_object) {
630     Proxy_Object * obj = (Proxy_Object *)(ruby_proxy_object);
631     send_sync(obj->session);
632     wait_sync(obj->session);
633     return Qnil;
634 }
635 
636 // ----------------------------------------------------------------------------
637 // Ruby interface functions
638 // ----------------------------------------------------------------------------
639 
640 static void ruby_session_mark(ROMP_Session * session) {
641     rb_gc_mark(session->io_object);
642 }
643 
644 static VALUE ruby_session_new(VALUE self, VALUE io_object) {
645     ROMP_Session * session;
646     VALUE ruby_session;
647     OpenFile * openfile;
648     FILE * read_fp;
649     FILE * write_fp;
650 
651     if(!rb_obj_is_kind_of(io_object, rb_cIO)) {
652         rb_raise(rb_eTypeError, "Expecting an IO object");
653     } 
654     
655     ruby_session = Data_Make_Struct(
656         rb_cSession,
657         ROMP_Session,
658         (RUBY_DATA_FUNC)(ruby_session_mark),
659         (RUBY_DATA_FUNC)(free),
660         session);
661 
662     GetOpenFile(io_object, openfile);
663     read_fp = GetReadFile(openfile);
664     write_fp = GetWriteFile(openfile);
665     session->read_fd = fileno(read_fp);
666     session->write_fd = fileno(write_fp);
667     session->io_object = io_object;
668     session->nonblock = 0;
669 
670     return ruby_session;
671 }
672 
673 static VALUE ruby_set_nonblock(VALUE self, VALUE nonblock) {
674     ROMP_Session * session;
675     Data_Get_Struct(self, ROMP_Session, session);
676     if(nonblock == Qtrue) {
677         session->nonblock = 1;
678     } else if(nonblock == Qfalse) {
679         session->nonblock = 0;
680     } else {
681         rb_raise(rb_eTypeError, "Expecting a boolean");
682     }
683     return Qnil;
684 }
685 
686 static void ruby_proxy_object_mark(Proxy_Object * proxy_object) {
687     rb_gc_mark(proxy_object->ruby_session);
688     rb_gc_mark(proxy_object->mutex);
689 }
690 
691 static VALUE ruby_proxy_object_new(
692         VALUE self, VALUE ruby_session, VALUE ruby_mutex, VALUE ruby_object_id) {
693     ROMP_Session * session;
694     OBJECT_ID_T object_id = NUM2INT(ruby_object_id);
695     Proxy_Object * proxy_object;
696     VALUE ruby_proxy_object;
697 
698     if(!rb_obj_is_kind_of(ruby_session, rb_cSession)) {
699         rb_raise(rb_eTypeError, "Expecting a session");
700     }
701     Data_Get_Struct(ruby_session, ROMP_Session, session);
702 
703     ruby_proxy_object = Data_Make_Struct(
704         rb_cProxy_Object,
705         Proxy_Object,
706         (RUBY_DATA_FUNC)(ruby_proxy_object_mark),
707         (RUBY_DATA_FUNC)(free),
708         proxy_object);
709     proxy_object->session = session;
710     proxy_object->ruby_session = ruby_session;
711     proxy_object->mutex = ruby_mutex;
712     proxy_object->object_id = object_id;
713 
714     return ruby_proxy_object;
715 }
716 
717 static VALUE ruby_proxy_object_method_missing(VALUE self, VALUE message) {
718     Proxy_Object * proxy_object;
719     Data_Get_Struct(self, Proxy_Object, proxy_object);
720 
721     proxy_object->message = message;
722     ruby_lock(proxy_object->mutex);
723     return rb_ensure(
724         client_request, (VALUE)(proxy_object),
725         ruby_unlock, proxy_object->mutex);
726 }
727 
728 static VALUE ruby_proxy_object_oneway(VALUE self, VALUE message) {
729     Proxy_Object * proxy_object;
730     Data_Get_Struct(self, Proxy_Object, proxy_object);
731 
732     proxy_object->message = message;
733     ruby_lock(proxy_object->mutex);
734     rb_ensure(
735         client_oneway, (VALUE)(proxy_object),
736         ruby_unlock, proxy_object->mutex);
737     return Qnil;
738 }
739 
740 static VALUE ruby_proxy_object_oneway_sync(VALUE self, VALUE message) {
741     Proxy_Object * proxy_object;
742     Data_Get_Struct(self, Proxy_Object, proxy_object);
743 
744     proxy_object->message = message;
745     ruby_lock(proxy_object->mutex);
746     rb_ensure(
747         client_oneway_sync, (VALUE)(proxy_object),
748         ruby_unlock, proxy_object->mutex);
749     return Qnil;
750 }
751 
752 static VALUE ruby_proxy_object_sync(VALUE self) {
753     Proxy_Object * proxy_object;
754     Data_Get_Struct(self, Proxy_Object, proxy_object);
755 
756     ruby_lock(proxy_object->mutex);
757     rb_ensure(
758         client_sync, (VALUE)(proxy_object),
759         ruby_unlock, proxy_object->mutex);
760     return Qnil;
761 }
762 
763 static VALUE ruby_server_loop(VALUE self, VALUE ruby_session) {
764     ROMP_Session * session;
765     VALUE resolve_server;
766     VALUE ruby_debug;
767     int debug;
768 
769     if(!rb_obj_is_kind_of(ruby_session, rb_cSession)) {
770         rb_raise(rb_eTypeError, "Excpecting a session");
771     }
772     Data_Get_Struct(ruby_session, ROMP_Session, session);
773 
774     resolve_server = rb_iv_get(self, "@resolve_server");
775 
776     ruby_debug = rb_iv_get(self, "@debug");
777     debug = (ruby_debug != Qfalse) && !NIL_P(ruby_debug);
778     server_loop(session, resolve_server, debug);
779     return Qnil;
780 }
781 
782 // Given a message, convert it into an object that can be returned.  This
783 // function really only checks to see if an Object_Reference has been returned
784 // from the server, and creates a new Proxy_Object if this is the case.
785 // Otherwise, the original object is returned to the client.
786 static VALUE msg_to_obj(VALUE message, VALUE session, VALUE mutex) {
787     if(CLASS_OF(message) == rb_cObject_Reference) {
788         return ruby_proxy_object_new(
789             rb_cProxy_Object,
790             session,
791             mutex,
792             rb_funcall(message, id_object_id, 0));
793     } else {
794         return message;
795     }
796 }
797 
798 void Init_romp_helper() {
799     init_globals();
800 
801     rb_mROMP = rb_define_module("ROMP");
802     rb_cSession = rb_define_class_under(rb_mROMP, "Session", rb_cObject);
803 
804     rb_define_const(rb_cSession, "REQUEST", INT2NUM(ROMP_REQUEST));
805     rb_define_const(rb_cSession, "REQUEST_BLOCK", INT2NUM(ROMP_REQUEST_BLOCK));
806     rb_define_const(rb_cSession, "ONEWAY", INT2NUM(ROMP_ONEWAY));
807     rb_define_const(rb_cSession, "ONEWAY_SYNC", INT2NUM(ROMP_ONEWAY_SYNC));
808     rb_define_const(rb_cSession, "RETVAL", INT2NUM(ROMP_RETVAL));
809     rb_define_const(rb_cSession, "EXCEPTION", INT2NUM(ROMP_EXCEPTION));
810     rb_define_const(rb_cSession, "YIELD", INT2NUM(ROMP_YIELD));
811     rb_define_const(rb_cSession, "SYNC", INT2NUM(ROMP_SYNC));
812     rb_define_const(rb_cSession, "NULL_MSG", INT2NUM(ROMP_NULL_MSG));
813     rb_define_const(rb_cSession, "MSG_START", INT2NUM(ROMP_MSG_START));
814     rb_define_const(rb_cSession, "MAX_ID", INT2NUM(ROMP_MAX_ID));
815     rb_define_const(rb_cSession, "MAX_MSG_TYPE", INT2NUM(ROMP_MAX_MSG_TYPE));
816 
817     rb_define_singleton_method(rb_cSession, "new", ruby_session_new, 1);
818     rb_define_method(rb_cSession, "set_nonblock", ruby_set_nonblock, 1);
819 
820     rb_cProxy_Object = rb_define_class_under(rb_mROMP, "Proxy_Object", rb_cObject);
821     rb_define_singleton_method(rb_cProxy_Object, "new", ruby_proxy_object_new, 3);
822     rb_define_method(rb_cProxy_Object, "method_missing", ruby_proxy_object_method_missing, -2);
823     rb_define_method(rb_cProxy_Object, "oneway", ruby_proxy_object_oneway, -2);
824     rb_define_method(rb_cProxy_Object, "oneway_sync", ruby_proxy_object_oneway_sync, -2);
825     rb_define_method(rb_cProxy_Object, "sync", ruby_proxy_object_sync, 0);
826 
827     rb_cServer = rb_define_class_under(rb_mROMP, "Server", rb_cObject);
828     rb_define_method(rb_cServer, "server_loop", ruby_server_loop, 1);
829 
830     rb_cObject_Reference = rb_define_class_under(rb_mROMP, "Object_Reference", rb_cObject);
831 
832     id_object_id = rb_intern("object_id");
833 }