1 // ROMP - The Ruby Object Message Proxy
  2 // (C) Copyright 2001 Paul Brannan (cout at rm-f.net)
  3 // 
  4 // ROMP is a set of classes for providing distributed object support to a
  5 // Ruby program.  You may distribute and/or modify it under the same terms as
  6 // Ruby (see http://www.ruby-lang.org/en/LICENSE.txt).
  7 
  8 #include <ruby.h>
  9 #include <rubyio.h>
 10 #include <stdint.h>
 11 #include <stdlib.h>
 12 #include <unistd.h>
 13 #include <fcntl.h>
 14 #include <sys/time.h>
 15 
 16 // ---------------------------------------------------------------------------
 17 // Useful macros
 18 // ---------------------------------------------------------------------------
 19 
 20 // TODO: Are these portable?
 21 
 22 #define PUTSHORT(s, buf) \
 23     do { \
 24         *buf = s >> 8; ++buf; \
 25         *buf = s & 0xff; ++buf; \
 26     } while(0)
 27 
 28 #define GETSHORT(s, buf) \
 29     do { \
 30         s = (unsigned char)*buf; ++buf; \
 31         s = (s << 8) | (unsigned char)*buf; ++buf; \
 32     } while(0)
 33 
 34 // ---------------------------------------------------------------------------
 35 // Globals
 36 // ---------------------------------------------------------------------------
 37 
 38 // objects/functions created down below
 39 //
 40 static VALUE rb_mROMP = Qnil;
 41 static VALUE rb_cSession = Qnil;
 42 static VALUE rb_cProxy_Object = Qnil;
 43 static VALUE rb_cServer = Qnil;
 44 static VALUE rb_cObject_Reference = Qnil;
 45 static ID id_object_id;
 46 
 47 // objects/functions created elsewhere
 48 //
 49 static VALUE rb_mMarshal = Qnil;
 50 
 51 static ID id_dump;
 52 static ID id_load;
 53 static ID id_message;
 54 static ID id_backtrace;
 55 static ID id_caller;
 56 static ID id_raise;
 57 static ID id_send;
 58 static ID id_get_object;
 59 static ID id_slice_bang;
 60 static ID id_print_exception;
 61 static ID id_lock;
 62 static ID id_unlock;
 63 
 64 static struct timeval zero_timeval;
 65 
 66 static void init_globals() {
 67     rb_mMarshal = rb_const_get(rb_cObject, rb_intern("Marshal"));
 68 
 69     id_dump = rb_intern("dump");
 70     id_load = rb_intern("load");
 71     id_message = rb_intern("message");
 72     id_backtrace = rb_intern("backtrace");
 73     id_caller = rb_intern("caller");
 74     id_raise = rb_intern("raise");
 75     id_send = rb_intern("send");
 76     id_get_object = rb_intern("get_object");
 77     id_slice_bang = rb_intern("slice!");
 78     id_print_exception = rb_intern("print_exception");
 79     id_lock = rb_intern("lock");
 80     id_unlock = rb_intern("unlock");
 81 
 82     zero_timeval.tv_sec = 0;
 83     zero_timeval.tv_usec = 0;
 84 }
 85 
 86 // ---------------------------------------------------------------------------
 87 // Utility functions
 88 // ---------------------------------------------------------------------------
 89 
 90 // Forward declaration
 91 static VALUE msg_to_obj(VALUE message, VALUE session, VALUE mutex);
 92 
 93 #define WRITE_HELPER \
 94     do { \
 95         write_count = write(fd, buf, count); \
 96         if(write_count < 0) { \
 97             if(errno != EWOULDBLOCK) rb_sys_fail("write"); \
 98         } else if(write_count == 0 && count != 0) { \
 99             rb_raise(rb_eIOError, "disconnected"); \
100         } else { \
101             count -= write_count; \
102             buf += write_count; \
103             total += write_count; \
104         } \
105     } while(0)
106 
107 #define READ_HELPER \
108     do { \
109         read_count = read(fd, buf, count); \
110         if(read_count < 0) { \
111             if(errno != EWOULDBLOCK) rb_sys_fail("read"); \
112         } else if(read_count == 0 && count != 0) { \
113             rb_raise(rb_eIOError, "disconnected"); \
114         } else { \
115             count -= read_count; \
116             buf += read_count; \
117             total += read_count; \
118         } \
119     } while(0)
120 
121 // Write to an fd and raise an exception if an error occurs
122 static ssize_t ruby_write_throw(int fd, const void * buf, size_t count, int nonblock) {
123     int n;
124     size_t total = 0;
125     ssize_t write_count;
126     fd_set fds, error_fds;
127 
128     if(!nonblock) {
129         FD_ZERO(&fds);
130         FD_SET(fd, &fds);
131         FD_ZERO(&error_fds);
132         FD_SET(fd, &error_fds);
133         n = select(fd + 1, 0, &fds, &fds, 0);
134         if(n > 0) {
135             WRITE_HELPER;
136         }
137     } else {
138         WRITE_HELPER;
139     }
140 
141     while(count > 0) {
142         FD_ZERO(&fds);
143         FD_SET(fd, &fds);
144         FD_ZERO(&error_fds);
145         FD_SET(fd, &error_fds);
146         n = rb_thread_select(fd + 1, 0, &fds, &fds, 0);
147         if(n == -1) {
148             if(errno == EWOULDBLOCK) continue;
149             rb_sys_fail("select");
150         }
151         WRITE_HELPER;
152     };
153     return total;
154 }
155 
156 // Read from an fd and raise an exception if an error occurs
157 static ssize_t ruby_read_throw(int fd, void * buf, size_t count, int nonblock) {
158     int n;
159     size_t total = 0;
160     ssize_t read_count;
161     fd_set fds, error_fds;
162 
163     if(!nonblock) {
164         FD_ZERO(&fds);
165         FD_SET(fd, &fds);
166         FD_ZERO(&error_fds);
167         FD_SET(fd, &error_fds);
168         n = select(fd + 1, &fds, 0, &error_fds, &zero_timeval);
169         if(n > 0) {
170             READ_HELPER;
171         }
172     } else {
173         READ_HELPER;
174     }
175 
176     while(count > 0) {
177         FD_ZERO(&fds);
178         FD_SET(fd, &fds);
179         FD_ZERO(&error_fds);
180         FD_SET(fd, &error_fds);
181         n = rb_thread_select(fd + 1, &fds, 0, &error_fds, 0);
182         if(n == -1) {
183             if(errno == EWOULDBLOCK) continue;
184             rb_sys_fail("select");
185         }
186         READ_HELPER;
187     };
188 
189     return total;
190 }
191 
192 // Return the message of an exception
193 static VALUE ruby_exc_message(VALUE exc) {
194     return rb_funcall(exc, id_message, 0);
195 }
196 
197 // Return the backtrace of an exception
198 static VALUE ruby_exc_backtrace(VALUE exc) {
199     return rb_funcall(exc, id_backtrace, 0);
200 }
201 
202 // Return the current Ruby call stack
203 static VALUE ruby_caller() {
204     // TODO: Why does calling caller() with 0 arguments not work?
205     return rb_funcall(rb_mKernel, id_caller, 1, INT2NUM(0));
206 }
207 
208 // Raise a specific ruby exception with a specific message and backtrace
209 static void ruby_raise(VALUE exc, VALUE msg, VALUE bt) {
210     rb_funcall(rb_mKernel, id_raise, 3, exc, msg, bt);
211 }
212 
213 // Send a message to a Ruby object
214 static VALUE ruby_send(VALUE obj, VALUE msg) {
215     return rb_apply(obj, id_send, msg);
216 }
217 
218 // Call "get_object" on a Ruby object
219 static VALUE ruby_get_object(VALUE obj, VALUE object_id) {
220     return rb_funcall(obj, id_get_object, 1, INT2NUM(object_id));
221 }
222 
223 // Call slice! on a Ruby object
224 static VALUE ruby_slice_bang(VALUE obj, size_t min, size_t max) {
225     VALUE range = rb_range_new(INT2NUM(min), INT2NUM(max), 0);
226     return rb_funcall(obj, id_slice_bang, 1, range);
227 }
228 
229 // Print an exception to the screen using a function defined in the ROMP
230 // module (TODO: wouldn't it be nice if Ruby's error_print were available to
231 // the user?)
232 static void ruby_print_exception(VALUE exc) {
233     rb_funcall(rb_mROMP, id_print_exception, 1, exc);
234 }
235 
236 // Call lock on an object (a mutex, probably)
237 static VALUE ruby_lock(VALUE obj) {
238     return rb_funcall(obj, id_lock, 0);
239 }
240 
241 // Call unlock on an object (a mutex, probably)
242 static VALUE ruby_unlock(VALUE obj) {
243     return rb_funcall(obj, id_unlock, 0);
244 }
245 
246 // ---------------------------------------------------------------------------
247 // Marshalling functions
248 // ---------------------------------------------------------------------------
249 
250 // Take an object as input and return it as a marshalled string.
251 static VALUE marshal_dump(VALUE obj) {
252     return rb_funcall(rb_mMarshal, id_dump, 1, obj);
253 }
254 
255 // Take a marshalled string as input and return it as an object.
256 static VALUE marshal_load(VALUE str) {
257     return rb_funcall(rb_mMarshal, id_load, 1, str);
258 }
259 
260 // ---------------------------------------------------------------------------
261 // Session functions
262 // ---------------------------------------------------------------------------
263 
264 #define ROMP_REQUEST           0x1001
265 #define ROMP_REQUEST_BLOCK     0x1002
266 #define ROMP_ONEWAY            0x1003
267 #define ROMP_ONEWAY_SYNC       0x1004
268 #define ROMP_RETVAL            0x2001
269 #define ROMP_EXCEPTION         0x2002
270 #define ROMP_YIELD             0x2003
271 #define ROMP_SYNC              0x4001
272 #define ROMP_NULL_MSG          0x4002
273 #define ROMP_MSG_START         0x4242
274 #define ROMP_MAX_ID            (1<<16)
275 #define ROMP_MAX_MSG_TYPE      (1<<16)
276 
277 #define ROMP_BUFFER_SIZE       16
278 
279 typedef struct {
280     VALUE io_object;
281     int read_fd, write_fd;
282     char buf[ROMP_BUFFER_SIZE];
283     int nonblock;
284 } ROMP_Session;
285 
286 typedef uint16_t MESSAGE_TYPE_T;
287 typedef uint16_t OBJECT_ID_T;
288 
289 // A ROMP message is broken into 3 components (see romp.rb for more details)
290 typedef struct {
291     MESSAGE_TYPE_T message_type;
292     OBJECT_ID_T object_id;
293     VALUE message_obj;
294 } ROMP_Message;
295 
296 // Send a message to the server with data data and length len.
297 static void send_message_helper(
298         ROMP_Session * session,
299         char * data,
300         size_t len,
301         MESSAGE_TYPE_T message_type,
302         OBJECT_ID_T object_id) {
303 
304     char * buf = session->buf;
305 
306     PUTSHORT(ROMP_MSG_START,    buf);
307     PUTSHORT(len,               buf);
308     PUTSHORT(message_type,      buf);
309     PUTSHORT(object_id,         buf);
310 
311     ruby_write_throw(session->write_fd, session->buf, ROMP_BUFFER_SIZE, session->nonblock);
312     ruby_write_throw(session->write_fd, data, len, session->nonblock);
313 }
314 
315 // Send a message to the server with the data in message.
316 static void send_message(ROMP_Session * session, ROMP_Message * message) {
317     VALUE data;
318     struct RString * data_str;
319 
320     data = marshal_dump(message->message_obj);
321     data_str = RSTRING(data);
322     send_message_helper(
323         session,
324         data_str->ptr,
325         data_str->len,
326         message->message_type,
327         message->object_id);
328 }
329 
330 // Send a null message to the server (no data, data len = 0)
331 static void send_null_message(ROMP_Session * session) {
332     send_message_helper(session, "", 0, ROMP_NULL_MSG, 0);
333 }
334 
335 // Receive a message from the server
336 static void get_message(ROMP_Session * session, ROMP_Message * message) {
337     uint16_t magic          = 0;
338     uint16_t data_len       = 0;
339     char * buf              = 0;
340     // struct RString message_string;
341     VALUE ruby_str;
342 
343     do {
344         buf = session->buf;
345         
346         ruby_read_throw(session->read_fd, buf, ROMP_BUFFER_SIZE, session->nonblock);
347 
348         GETSHORT(magic,                 buf);
349         GETSHORT(data_len,              buf);
350         GETSHORT(message->message_type, buf);
351         GETSHORT(message->object_id,    buf);
352     } while(magic != ROMP_MSG_START);
353 
354     buf = ALLOCA_N(char, data_len);
355     ruby_read_throw(session->read_fd, buf, data_len, session->nonblock);
356     ruby_str = rb_str_new(buf, data_len);
357 
358     if(message->message_type != ROMP_NULL_MSG) {
359         message->message_obj = marshal_load(ruby_str);
360     } else {
361         message->message_obj = Qnil;
362     }
363 }
364 
365 // Ideally, this function should return true if the server has disconnected,
366 // but currently always returns false.  The server thread will still exit
367 // when the client has disconnected, but currently does so via an exception.
368 static int session_finished(ROMP_Session * session) {
369     // TODO: Detect a disconnection
370     return 0;
371 }
372 
373 // Send a sync message to the server.
374 static void send_sync(ROMP_Session * session) {
375     ROMP_Message message = { ROMP_SYNC, 0, Qnil };
376     send_message(session, &message);
377 }
378 
379 // Wait for a sync response from the server.  Ignore any messages that are
380 // received while waiting for the response.
381 static void wait_sync(ROMP_Session * session) {
382     ROMP_Message message;
383 
384     // sleep(1);
385     get_message(session, &message);
386     if(   message.message_type != ROMP_SYNC
387        && message.object_id != 1
388        && message.message_obj != Qnil) {
389         rb_raise(rb_eRuntimeError, "ROMP synchronization failed");
390     }
391 }
392 
393 // Send a reply to a sync request.
394 static void reply_sync(ROMP_Session * session, int value) {
395     if(value == 0) {
396         ROMP_Message message = { ROMP_SYNC, 1, Qnil };
397         send_message(session, &message);
398     }
399 }
400 
401 // ----------------------------------------------------------------------------
402 // Server functions
403 // ----------------------------------------------------------------------------
404 
405 // We use this structure to pass data to our exception handler.  This is done
406 // by casting a pointer to a Ruby VALUE... not 100% kosher, but it should work.
407 typedef struct {
408     ROMP_Session * session;
409     ROMP_Message * message;
410     VALUE obj;
411     int debug;
412 } Server_Info;
413 
414 // Make a method call into a Ruby object.
415 static VALUE server_funcall(VALUE ruby_server_info) {
416     Server_Info * server_info = (Server_Info *)(ruby_server_info);
417     return ruby_send(server_info->obj, server_info->message->message_obj);
418 }
419 
420 // Send a yield message to the client, indicating that it should call
421 // Kernel#yield with the message that is sent.
422 static VALUE server_send_yield(VALUE retval, VALUE ruby_server_info) {
423     Server_Info * server_info = (Server_Info *)(ruby_server_info);
424 
425     server_info->message->message_type = ROMP_YIELD;
426     server_info->message->object_id = 0;
427     server_info->message->message_obj = retval;
428     send_message(server_info->session, server_info->message);
429 
430     return Qnil;
431 }
432 
433 // Send a return value to the client, indicating that it should return
434 // the message to the caller.
435 static VALUE server_send_retval(VALUE retval, VALUE ruby_server_info) {
436     Server_Info * server_info = (Server_Info *)(ruby_server_info);
437 
438     server_info->message->message_type = ROMP_RETVAL;
439     server_info->message->object_id = 0;
440     server_info->message->message_obj = retval;
441     send_message(server_info->session, server_info->message);
442 
443     return Qnil;
444 }
445 
446 // Send an exception the client, indicating that it should raise an exception.
447 static VALUE server_exception(VALUE ruby_server_info, VALUE exc) {
448     Server_Info * server_info = (Server_Info *)(ruby_server_info);
449     VALUE caller = ruby_caller();
450     VALUE bt = ruby_exc_backtrace(exc);
451 
452     server_info->message->message_type = ROMP_EXCEPTION;
453     server_info->message->object_id = 0;
454     server_info->message->message_obj = exc;
455 
456     // Get rid of extraneous caller information to make debugging easier.
457     ruby_slice_bang(bt, RARRAY(bt)->len - RARRAY(caller)->len - 1, -1);
458 
459     // If debugging is enabled, then print an exception.
460     if(server_info->debug) {
461         ruby_print_exception(exc);
462     }
463 
464     send_message(server_info->session, server_info->message);
465 
466     return Qnil;
467 }
468 
469 // Proces a request from the client and send an appropriate reply.
470 static VALUE server_reply(VALUE ruby_server_info) {
471     Server_Info * server_info = (Server_Info *)(ruby_server_info);
472     VALUE retval;
473     int status;
474 
475     server_info->obj = ruby_get_object(
476         server_info->obj,
477         server_info->message->object_id);
478 
479     // TODO: The client should be able to pass a callback object to the server;
480     // msg_to_obj can create a Proxy_Object, but it needs a session to make
481     // calls over.
482 
483     // Perform the appropriate action based on message type.
484     switch(server_info->message->message_type) {
485         case ROMP_ONEWAY_SYNC:
486             send_null_message(server_info->session);
487             // fallthrough
488  
489         case ROMP_ONEWAY:
490             rb_protect(server_funcall, ruby_server_info, &status);
491             return Qnil;
492 
493         case ROMP_REQUEST:
494             retval = ruby_send(
495                 server_info->obj,
496                 server_info->message->message_obj);
497             break;
498 
499         case ROMP_REQUEST_BLOCK:
500             retval = rb_iterate(
501                 server_funcall, ruby_server_info,
502                 server_send_yield, ruby_server_info);
503             break;
504  
505         case ROMP_SYNC:
506             reply_sync(
507                 server_info->session,
508                 server_info->message->object_id);
509             return Qnil;
510 
511         default:
512             rb_raise(rb_eRuntimeError, "Bad session request");
513     }
514 
515     server_send_retval(retval, ruby_server_info);
516 
517     return Qnil;
518 }
519 
520 // The main server loop.  Wait for a message from the client, route the
521 // message to the appropriate object, send a response and repeat.
522 static void server_loop(ROMP_Session * session, VALUE resolve_server, int dbg) {
523     ROMP_Message message;
524     Server_Info server_info = { session, &message, resolve_server, dbg };
525     VALUE ruby_server_info = (VALUE)(&server_info);
526 
527     while(!session_finished(session)) {
528         get_message(session, &message);
529         rb_rescue2(
530             server_reply, ruby_server_info,
531             server_exception, ruby_server_info, rb_eException, 0);
532         server_info.obj = resolve_server;
533     }
534 }
535 
536 // ----------------------------------------------------------------------------
537 // Client functions
538 // ----------------------------------------------------------------------------
539 
540 // We use this structure to pass data to our client functions by casting it
541 // to a Ruby VALUE (see above note with Server_Info).
542 typedef struct {
543     ROMP_Session * session;
544     VALUE ruby_session;
545     OBJECT_ID_T object_id;
546     VALUE message;
547     VALUE mutex;
548 } Proxy_Object;
549 
550 // Send a request to the server, wait for a response, and perform an action
551 // based on what that response was.  This is not thread-safe, so the caller
552 // should perform any necessary locking
553 static VALUE client_request(VALUE ruby_proxy_object) {
554     Proxy_Object * obj = (Proxy_Object *)(ruby_proxy_object);
555     ROMP_Message msg = {
556         rb_block_given_p() ? ROMP_REQUEST_BLOCK : ROMP_REQUEST,
557         obj->object_id,
558         obj->message
559     };
560     send_message(obj->session, &msg);
561 
562     for(;;) {
563         get_message(obj->session, &msg);
564         switch(msg.message_type) {
565             case ROMP_RETVAL:
566                 return msg_to_obj(msg.message_obj, obj->ruby_session, obj->mutex);
567                 break;
568             case ROMP_YIELD:
569                 rb_yield(msg_to_obj(msg.message_obj, obj->ruby_session, obj->mutex));
570                 break;
571             case ROMP_EXCEPTION: {
572                 ruby_raise(
573                     msg.message_obj,
574                     ruby_exc_message(msg.message_obj),
575                     rb_ary_concat(ruby_exc_backtrace(msg.message_obj), ruby_caller())
576                 );
577                 break;
578             }
579             case ROMP_SYNC:
580                 reply_sync(obj->session, NUM2INT(msg.message_obj));
581                 break;
582             default:
583                 rb_raise(rb_eRuntimeError, "Invalid msg type received");
584         }
585     }
586 }
587 
588 // Send a oneway message to the server.  This is not thread-safe, so the
589 // caller should perform any necessary locking.
590 static VALUE client_oneway(VALUE ruby_proxy_object) {
591     Proxy_Object * obj = (Proxy_Object *)(ruby_proxy_object);
592     ROMP_Message msg = {
593         ROMP_ONEWAY,
594         obj->object_id,
595         obj->message
596     };
597     send_message(obj->session, &msg);
598     return Qnil;
599 }
600 
601 // Send a oneway message to the server and request a message in response.
602 // This is not thread-safe, so the caller should perform any necessary
603 // locking.
604 static VALUE client_oneway_sync(VALUE ruby_proxy_object) {
605     Proxy_Object * obj = (Proxy_Object *)(ruby_proxy_object);
606     ROMP_Message msg = {
607         ROMP_ONEWAY_SYNC,
608         obj->object_id,
609         obj->message
610     };
611     send_message(obj->session, &msg);
612     get_message(obj->session, &msg);
613     return Qnil;
614 }
615 
616 // Synchronize with the server.  This is not thread-safe, so the caller should
617 // perform any necessary locking.
618 static VALUE client_sync(VALUE ruby_proxy_object) {
619     Proxy_Object * obj = (Proxy_Object *)(ruby_proxy_object);
620     send_sync(obj->session);
621     wait_sync(obj->session);
622     return Qnil;
623 }
624 
625 // ----------------------------------------------------------------------------
626 // Ruby interface functions
627 // ----------------------------------------------------------------------------
628 
629 static void ruby_session_mark(ROMP_Session * session) {
630     rb_gc_mark(session->io_object);
631 }
632 
633 static VALUE ruby_session_new(VALUE self, VALUE io_object) {
634     ROMP_Session * session;
635     VALUE ruby_session;
636     OpenFile * openfile;
637     FILE * read_fp;
638     FILE * write_fp;
639 
640     if(!rb_obj_is_kind_of(io_object, rb_cIO)) {
641         rb_raise(rb_eTypeError, "Expecting an IO object");
642     } 
643     
644     ruby_session = Data_Make_Struct(
645         rb_cSession,
646         ROMP_Session,
647         (RUBY_DATA_FUNC)(ruby_session_mark),
648         (RUBY_DATA_FUNC)(free),
649         session);
650 
651     GetOpenFile(io_object, openfile);
652     read_fp = GetReadFile(openfile);
653     write_fp = GetWriteFile(openfile);
654     session->read_fd = fileno(read_fp);
655     session->write_fd = fileno(write_fp);
656     session->io_object = io_object;
657     session->nonblock = 0;
658 
659     return ruby_session;
660 }
661 
662 static VALUE ruby_set_nonblock(VALUE self, VALUE nonblock) {
663     ROMP_Session * session;
664     Data_Get_Struct(self, ROMP_Session, session);
665     if(nonblock == Qtrue) {
666         session->nonblock = 1;
667     } else if(nonblock == Qfalse) {
668         session->nonblock = 0;
669     } else {
670         rb_raise(rb_eTypeError, "Expecting a boolean");
671     }
672     return Qnil;
673 }
674 
675 static void ruby_proxy_object_mark(Proxy_Object * proxy_object) {
676     rb_gc_mark(proxy_object->ruby_session);
677     rb_gc_mark(proxy_object->mutex);
678 }
679 
680 static VALUE ruby_proxy_object_new(
681         VALUE self, VALUE ruby_session, VALUE ruby_mutex, VALUE ruby_object_id) {
682     ROMP_Session * session;
683     OBJECT_ID_T object_id = NUM2INT(ruby_object_id);
684     Proxy_Object * proxy_object;
685     VALUE ruby_proxy_object;
686 
687     if(!rb_obj_is_kind_of(ruby_session, rb_cSession)) {
688         rb_raise(rb_eTypeError, "Expecting a session");
689     }
690     Data_Get_Struct(ruby_session, ROMP_Session, session);
691 
692     ruby_proxy_object = Data_Make_Struct(
693         rb_cProxy_Object,
694         Proxy_Object,
695         (RUBY_DATA_FUNC)(ruby_proxy_object_mark),
696         (RUBY_DATA_FUNC)(free),
697         proxy_object);
698     proxy_object->session = session;
699     proxy_object->ruby_session = ruby_session;
700     proxy_object->mutex = ruby_mutex;
701     proxy_object->object_id = object_id;
702 
703     return ruby_proxy_object;
704 }
705 
706 static VALUE ruby_proxy_object_method_missing(VALUE self, VALUE message) {
707     Proxy_Object * proxy_object;
708     Data_Get_Struct(self, Proxy_Object, proxy_object);
709 
710     proxy_object->message = message;
711     ruby_lock(proxy_object->mutex);
712     return rb_ensure(
713         client_request, (VALUE)(proxy_object),
714         ruby_unlock, proxy_object->mutex);
715 }
716 
717 static VALUE ruby_proxy_object_oneway(VALUE self, VALUE message) {
718     Proxy_Object * proxy_object;
719     Data_Get_Struct(self, Proxy_Object, proxy_object);
720 
721     proxy_object->message = message;
722     ruby_lock(proxy_object->mutex);
723     rb_ensure(
724         client_oneway, (VALUE)(proxy_object),
725         ruby_unlock, proxy_object->mutex);
726     return Qnil;
727 }
728 
729 static VALUE ruby_proxy_object_oneway_sync(VALUE self, VALUE message) {
730     Proxy_Object * proxy_object;
731     Data_Get_Struct(self, Proxy_Object, proxy_object);
732 
733     proxy_object->message = message;
734     ruby_lock(proxy_object->mutex);
735     rb_ensure(
736         client_oneway_sync, (VALUE)(proxy_object),
737         ruby_unlock, proxy_object->mutex);
738     return Qnil;
739 }
740 
741 static VALUE ruby_proxy_object_sync(VALUE self) {
742     Proxy_Object * proxy_object;
743     Data_Get_Struct(self, Proxy_Object, proxy_object);
744 
745     ruby_lock(proxy_object->mutex);
746     rb_ensure(
747         client_sync, (VALUE)(proxy_object),
748         ruby_unlock, proxy_object->mutex);
749     return Qnil;
750 }
751 
752 static VALUE ruby_server_loop(VALUE self, VALUE ruby_session) {
753     ROMP_Session * session;
754     VALUE resolve_server;
755     VALUE ruby_debug;
756     int debug;
757 
758     if(!rb_obj_is_kind_of(ruby_session, rb_cSession)) {
759         rb_raise(rb_eTypeError, "Excpecting a session");
760     }
761     Data_Get_Struct(ruby_session, ROMP_Session, session);
762 
763     resolve_server = rb_iv_get(self, "@resolve_server");
764 
765     ruby_debug = rb_iv_get(self, "@debug");
766     debug = (ruby_debug != Qfalse) && !NIL_P(ruby_debug);
767     server_loop(session, resolve_server, debug);
768     return Qnil;
769 }
770 
771 // Given a message, convert it into an object that can be returned.  This
772 // function really only checks to see if an Object_Reference has been returned
773 // from the server, and creates a new Proxy_Object if this is the case.
774 // Otherwise, the original object is returned to the client.
775 static VALUE msg_to_obj(VALUE message, VALUE session, VALUE mutex) {
776     if(CLASS_OF(message) == rb_cObject_Reference) {
777         return ruby_proxy_object_new(
778             rb_cProxy_Object,
779             session,
780             mutex,
781             rb_funcall(message, id_object_id, 0));
782     } else {
783         return message;
784     }
785 }
786 
787 void Init_romp_helper() {
788     init_globals();
789 
790     rb_mROMP = rb_define_module("ROMP");
791     rb_cSession = rb_define_class_under(rb_mROMP, "Session", rb_cObject);
792 
793     rb_define_const(rb_cSession, "REQUEST", INT2NUM(ROMP_REQUEST));
794     rb_define_const(rb_cSession, "REQUEST_BLOCK", INT2NUM(ROMP_REQUEST_BLOCK));
795     rb_define_const(rb_cSession, "ONEWAY", INT2NUM(ROMP_ONEWAY));
796     rb_define_const(rb_cSession, "ONEWAY_SYNC", INT2NUM(ROMP_ONEWAY_SYNC));
797     rb_define_const(rb_cSession, "RETVAL", INT2NUM(ROMP_RETVAL));
798     rb_define_const(rb_cSession, "EXCEPTION", INT2NUM(ROMP_EXCEPTION));
799     rb_define_const(rb_cSession, "YIELD", INT2NUM(ROMP_YIELD));
800     rb_define_const(rb_cSession, "SYNC", INT2NUM(ROMP_SYNC));
801     rb_define_const(rb_cSession, "NULL_MSG", INT2NUM(ROMP_NULL_MSG));
802     rb_define_const(rb_cSession, "MSG_START", INT2NUM(ROMP_MSG_START));
803     rb_define_const(rb_cSession, "MAX_ID", INT2NUM(ROMP_MAX_ID));
804     rb_define_const(rb_cSession, "MAX_MSG_TYPE", INT2NUM(ROMP_MAX_MSG_TYPE));
805 
806     rb_define_singleton_method(rb_cSession, "new", ruby_session_new, 1);
807     rb_define_method(rb_cSession, "set_nonblock", ruby_set_nonblock, 1);
808 
809     rb_cProxy_Object = rb_define_class_under(rb_mROMP, "Proxy_Object", rb_cObject);
810     rb_define_singleton_method(rb_cProxy_Object, "new", ruby_proxy_object_new, 3);
811     rb_define_method(rb_cProxy_Object, "method_missing", ruby_proxy_object_method_missing, -2);
812     rb_define_method(rb_cProxy_Object, "oneway", ruby_proxy_object_oneway, -2);
813     rb_define_method(rb_cProxy_Object, "oneway_sync", ruby_proxy_object_oneway_sync, -2);
814     rb_define_method(rb_cProxy_Object, "sync", ruby_proxy_object_sync, 0);
815 
816     rb_cServer = rb_define_class_under(rb_mROMP, "Server", rb_cObject);
817     rb_define_method(rb_cServer, "server_loop", ruby_server_loop, 1);
818 
819     rb_cObject_Reference = rb_define_class_under(rb_mROMP, "Object_Reference", rb_cObject);
820 
821     id_object_id = rb_intern("object_id");
822 }