1 // ROMP - The Ruby Object Message Proxy
2 // (C) Copyright 2001 Paul Brannan (cout at rm-f.net)
3 //
4 // ROMP is a set of classes for providing distributed object support to a
5 // Ruby program. You may distribute and/or modify it under the same terms as
6 // Ruby (see http://www.ruby-lang.org/en/LICENSE.txt).
7
8 #include <ruby.h>
9 #include <rubyio.h>
10 #include <stdint.h>
11 #include <stdlib.h>
12 #include <unistd.h>
13 #include <fcntl.h>
14 #include <sys/time.h>
15
16 // ---------------------------------------------------------------------------
17 // Useful macros
18 // ---------------------------------------------------------------------------
19
20 // TODO: Are these portable?
21
22 #define PUTSHORT(s, buf) \
23 do { \
24 *buf = s >> 8; ++buf; \
25 *buf = s & 0xff; ++buf; \
26 } while(0)
27
28 #define GETSHORT(s, buf) \
29 do { \
30 s = (unsigned char)*buf; ++buf; \
31 s = (s << 8) | (unsigned char)*buf; ++buf; \
32 } while(0)
33
34 // ---------------------------------------------------------------------------
35 // Globals
36 // ---------------------------------------------------------------------------
37
38 // objects/functions created down below
39 //
40 static VALUE rb_mROMP = Qnil;
41 static VALUE rb_cSession = Qnil;
42 static VALUE rb_cProxy_Object = Qnil;
43 static VALUE rb_cServer = Qnil;
44 static VALUE rb_cObject_Reference = Qnil;
45 static ID id_object_id;
46
47 // objects/functions created elsewhere
48 //
49 static VALUE rb_mMarshal = Qnil;
50
51 static ID id_dump;
52 static ID id_load;
53 static ID id_message;
54 static ID id_backtrace;
55 static ID id_caller;
56 static ID id_raise;
57 static ID id_send;
58 static ID id_get_object;
59 static ID id_slice_bang;
60 static ID id_print_exception;
61 static ID id_lock;
62 static ID id_unlock;
63
64 static struct timeval zero_timeval;
65
66 static void init_globals() {
67 rb_mMarshal = rb_const_get(rb_cObject, rb_intern("Marshal"));
68
69 id_dump = rb_intern("dump");
70 id_load = rb_intern("load");
71 id_message = rb_intern("message");
72 id_backtrace = rb_intern("backtrace");
73 id_caller = rb_intern("caller");
74 id_raise = rb_intern("raise");
75 id_send = rb_intern("send");
76 id_get_object = rb_intern("get_object");
77 id_slice_bang = rb_intern("slice!");
78 id_print_exception = rb_intern("print_exception");
79 id_lock = rb_intern("lock");
80 id_unlock = rb_intern("unlock");
81
82 zero_timeval.tv_sec = 0;
83 zero_timeval.tv_usec = 0;
84 }
85
86 // ---------------------------------------------------------------------------
87 // Utility functions
88 // ---------------------------------------------------------------------------
89
90 // Forward declaration
91 static VALUE msg_to_obj(VALUE message, VALUE session, VALUE mutex);
92
93 #define WRITE_HELPER \
94 do { \
95 write_count = write(fd, buf, count); \
96 if(write_count < 0) { \
97 if(errno != EWOULDBLOCK) rb_sys_fail("write"); \
98 } else if(write_count == 0 && count != 0) { \
99 rb_raise(rb_eIOError, "disconnected"); \
100 } else { \
101 count -= write_count; \
102 buf += write_count; \
103 total += write_count; \
104 } \
105 } while(0)
106
107 #define READ_HELPER \
108 do { \
109 read_count = read(fd, buf, count); \
110 if(read_count < 0) { \
111 if(errno != EWOULDBLOCK) rb_sys_fail("read"); \
112 } else if(read_count == 0 && count != 0) { \
113 rb_raise(rb_eIOError, "disconnected"); \
114 } else { \
115 count -= read_count; \
116 buf += read_count; \
117 total += read_count; \
118 } \
119 } while(0)
120
121 // Write to an fd and raise an exception if an error occurs
122 static ssize_t ruby_write_throw(int fd, const void * buf, size_t count, int nonblock) {
123 int n;
124 size_t total = 0;
125 ssize_t write_count;
126 fd_set fds, error_fds;
127
128 if(!nonblock) {
129 FD_ZERO(&fds);
130 FD_SET(fd, &fds);
131 FD_ZERO(&error_fds);
132 FD_SET(fd, &error_fds);
133 n = select(fd + 1, 0, &fds, &fds, 0);
134 if(n > 0) {
135 WRITE_HELPER;
136 }
137 } else {
138 WRITE_HELPER;
139 }
140
141 while(count > 0) {
142 FD_ZERO(&fds);
143 FD_SET(fd, &fds);
144 FD_ZERO(&error_fds);
145 FD_SET(fd, &error_fds);
146 n = rb_thread_select(fd + 1, 0, &fds, &fds, 0);
147 if(n == -1) {
148 if(errno == EWOULDBLOCK) continue;
149 rb_sys_fail("select");
150 }
151 WRITE_HELPER;
152 };
153 return total;
154 }
155
156 // Read from an fd and raise an exception if an error occurs
157 static ssize_t ruby_read_throw(int fd, void * buf, size_t count, int nonblock) {
158 int n;
159 size_t total = 0;
160 ssize_t read_count;
161 fd_set fds, error_fds;
162
163 if(!nonblock) {
164 FD_ZERO(&fds);
165 FD_SET(fd, &fds);
166 FD_ZERO(&error_fds);
167 FD_SET(fd, &error_fds);
168 n = select(fd + 1, &fds, 0, &error_fds, &zero_timeval);
169 if(n > 0) {
170 READ_HELPER;
171 }
172 } else {
173 READ_HELPER;
174 }
175
176 while(count > 0) {
177 FD_ZERO(&fds);
178 FD_SET(fd, &fds);
179 FD_ZERO(&error_fds);
180 FD_SET(fd, &error_fds);
181 n = rb_thread_select(fd + 1, &fds, 0, &error_fds, 0);
182 if(n == -1) {
183 if(errno == EWOULDBLOCK) continue;
184 rb_sys_fail("select");
185 }
186 READ_HELPER;
187 };
188
189 return total;
190 }
191
192 // Return the message of an exception
193 static VALUE ruby_exc_message(VALUE exc) {
194 return rb_funcall(exc, id_message, 0);
195 }
196
197 // Return the backtrace of an exception
198 static VALUE ruby_exc_backtrace(VALUE exc) {
199 return rb_funcall(exc, id_backtrace, 0);
200 }
201
202 // Return the current Ruby call stack
203 static VALUE ruby_caller() {
204 // TODO: Why does calling caller() with 0 arguments not work?
205 return rb_funcall(rb_mKernel, id_caller, 1, INT2NUM(0));
206 }
207
208 // Raise a specific ruby exception with a specific message and backtrace
209 static void ruby_raise(VALUE exc, VALUE msg, VALUE bt) {
210 rb_funcall(rb_mKernel, id_raise, 3, exc, msg, bt);
211 }
212
213 // Send a message to a Ruby object
214 static VALUE ruby_send(VALUE obj, VALUE msg) {
215 return rb_apply(obj, id_send, msg);
216 }
217
218 // Call "get_object" on a Ruby object
219 static VALUE ruby_get_object(VALUE obj, VALUE object_id) {
220 return rb_funcall(obj, id_get_object, 1, INT2NUM(object_id));
221 }
222
223 // Call slice! on a Ruby object
224 static VALUE ruby_slice_bang(VALUE obj, size_t min, size_t max) {
225 VALUE range = rb_range_new(INT2NUM(min), INT2NUM(max), 0);
226 return rb_funcall(obj, id_slice_bang, 1, range);
227 }
228
229 // Print an exception to the screen using a function defined in the ROMP
230 // module (TODO: wouldn't it be nice if Ruby's error_print were available to
231 // the user?)
232 static void ruby_print_exception(VALUE exc) {
233 rb_funcall(rb_mROMP, id_print_exception, 1, exc);
234 }
235
236 // Call lock on an object (a mutex, probably)
237 static VALUE ruby_lock(VALUE obj) {
238 return rb_funcall(obj, id_lock, 0);
239 }
240
241 // Call unlock on an object (a mutex, probably)
242 static VALUE ruby_unlock(VALUE obj) {
243 return rb_funcall(obj, id_unlock, 0);
244 }
245
246 // ---------------------------------------------------------------------------
247 // Marshalling functions
248 // ---------------------------------------------------------------------------
249
250 // Take an object as input and return it as a marshalled string.
251 static VALUE marshal_dump(VALUE obj) {
252 return rb_funcall(rb_mMarshal, id_dump, 1, obj);
253 }
254
255 // Take a marshalled string as input and return it as an object.
256 static VALUE marshal_load(VALUE str) {
257 return rb_funcall(rb_mMarshal, id_load, 1, str);
258 }
259
260 // ---------------------------------------------------------------------------
261 // Session functions
262 // ---------------------------------------------------------------------------
263
264 #define ROMP_REQUEST 0x1001
265 #define ROMP_REQUEST_BLOCK 0x1002
266 #define ROMP_ONEWAY 0x1003
267 #define ROMP_ONEWAY_SYNC 0x1004
268 #define ROMP_RETVAL 0x2001
269 #define ROMP_EXCEPTION 0x2002
270 #define ROMP_YIELD 0x2003
271 #define ROMP_SYNC 0x4001
272 #define ROMP_NULL_MSG 0x4002
273 #define ROMP_MSG_START 0x4242
274 #define ROMP_MAX_ID (1<<16)
275 #define ROMP_MAX_MSG_TYPE (1<<16)
276
277 #define ROMP_BUFFER_SIZE 16
278
279 typedef struct {
280 VALUE io_object;
281 int read_fd, write_fd;
282 char buf[ROMP_BUFFER_SIZE];
283 int nonblock;
284 } ROMP_Session;
285
286 typedef uint16_t MESSAGE_TYPE_T;
287 typedef uint16_t OBJECT_ID_T;
288
289 // A ROMP message is broken into 3 components (see romp.rb for more details)
290 typedef struct {
291 MESSAGE_TYPE_T message_type;
292 OBJECT_ID_T object_id;
293 VALUE message_obj;
294 } ROMP_Message;
295
296 // Send a message to the server with data data and length len.
297 static void send_message_helper(
298 ROMP_Session * session,
299 char * data,
300 size_t len,
301 MESSAGE_TYPE_T message_type,
302 OBJECT_ID_T object_id) {
303
304 char * buf = session->buf;
305
306 PUTSHORT(ROMP_MSG_START, buf);
307 PUTSHORT(len, buf);
308 PUTSHORT(message_type, buf);
309 PUTSHORT(object_id, buf);
310
311 ruby_write_throw(session->write_fd, session->buf, ROMP_BUFFER_SIZE, session->nonblock);
312 ruby_write_throw(session->write_fd, data, len, session->nonblock);
313 }
314
315 // Send a message to the server with the data in message.
316 static void send_message(ROMP_Session * session, ROMP_Message * message) {
317 VALUE data;
318 struct RString * data_str;
319
320 data = marshal_dump(message->message_obj);
321 data_str = RSTRING(data);
322 send_message_helper(
323 session,
324 data_str->ptr,
325 data_str->len,
326 message->message_type,
327 message->object_id);
328 }
329
330 // Send a null message to the server (no data, data len = 0)
331 static void send_null_message(ROMP_Session * session) {
332 send_message_helper(session, "", 0, ROMP_NULL_MSG, 0);
333 }
334
335 // Receive a message from the server
336 static void get_message(ROMP_Session * session, ROMP_Message * message) {
337 uint16_t magic = 0;
338 uint16_t data_len = 0;
339 char * buf = 0;
340 // struct RString message_string;
341 VALUE ruby_str;
342
343 do {
344 buf = session->buf;
345
346 ruby_read_throw(session->read_fd, buf, ROMP_BUFFER_SIZE, session->nonblock);
347
348 GETSHORT(magic, buf);
349 GETSHORT(data_len, buf);
350 GETSHORT(message->message_type, buf);
351 GETSHORT(message->object_id, buf);
352 } while(magic != ROMP_MSG_START);
353
354 buf = ALLOCA_N(char, data_len);
355 ruby_read_throw(session->read_fd, buf, data_len, session->nonblock);
356 ruby_str = rb_str_new(buf, data_len);
357
358 if(message->message_type != ROMP_NULL_MSG) {
359 message->message_obj = marshal_load(ruby_str);
360 } else {
361 message->message_obj = Qnil;
362 }
363 }
364
365 // Ideally, this function should return true if the server has disconnected,
366 // but currently always returns false. The server thread will still exit
367 // when the client has disconnected, but currently does so via an exception.
368 static int session_finished(ROMP_Session * session) {
369 // TODO: Detect a disconnection
370 return 0;
371 }
372
373 // Send a sync message to the server.
374 static void send_sync(ROMP_Session * session) {
375 ROMP_Message message = { ROMP_SYNC, 0, Qnil };
376 send_message(session, &message);
377 }
378
379 // Wait for a sync response from the server. Ignore any messages that are
380 // received while waiting for the response.
381 static void wait_sync(ROMP_Session * session) {
382 ROMP_Message message;
383
384 // sleep(1);
385 get_message(session, &message);
386 if( message.message_type != ROMP_SYNC
387 && message.object_id != 1
388 && message.message_obj != Qnil) {
389 rb_raise(rb_eRuntimeError, "ROMP synchronization failed");
390 }
391 }
392
393 // Send a reply to a sync request.
394 static void reply_sync(ROMP_Session * session, int value) {
395 if(value == 0) {
396 ROMP_Message message = { ROMP_SYNC, 1, Qnil };
397 send_message(session, &message);
398 }
399 }
400
401 // ----------------------------------------------------------------------------
402 // Server functions
403 // ----------------------------------------------------------------------------
404
405 // We use this structure to pass data to our exception handler. This is done
406 // by casting a pointer to a Ruby VALUE... not 100% kosher, but it should work.
407 typedef struct {
408 ROMP_Session * session;
409 ROMP_Message * message;
410 VALUE obj;
411 int debug;
412 } Server_Info;
413
414 // Make a method call into a Ruby object.
415 static VALUE server_funcall(VALUE ruby_server_info) {
416 Server_Info * server_info = (Server_Info *)(ruby_server_info);
417 return ruby_send(server_info->obj, server_info->message->message_obj);
418 }
419
420 // Send a yield message to the client, indicating that it should call
421 // Kernel#yield with the message that is sent.
422 static VALUE server_send_yield(VALUE retval, VALUE ruby_server_info) {
423 Server_Info * server_info = (Server_Info *)(ruby_server_info);
424
425 server_info->message->message_type = ROMP_YIELD;
426 server_info->message->object_id = 0;
427 server_info->message->message_obj = retval;
428 send_message(server_info->session, server_info->message);
429
430 return Qnil;
431 }
432
433 // Send a return value to the client, indicating that it should return
434 // the message to the caller.
435 static VALUE server_send_retval(VALUE retval, VALUE ruby_server_info) {
436 Server_Info * server_info = (Server_Info *)(ruby_server_info);
437
438 server_info->message->message_type = ROMP_RETVAL;
439 server_info->message->object_id = 0;
440 server_info->message->message_obj = retval;
441 send_message(server_info->session, server_info->message);
442
443 return Qnil;
444 }
445
446 // Send an exception the client, indicating that it should raise an exception.
447 static VALUE server_exception(VALUE ruby_server_info, VALUE exc) {
448 Server_Info * server_info = (Server_Info *)(ruby_server_info);
449 VALUE caller = ruby_caller();
450 VALUE bt = ruby_exc_backtrace(exc);
451
452 server_info->message->message_type = ROMP_EXCEPTION;
453 server_info->message->object_id = 0;
454 server_info->message->message_obj = exc;
455
456 // Get rid of extraneous caller information to make debugging easier.
457 ruby_slice_bang(bt, RARRAY(bt)->len - RARRAY(caller)->len - 1, -1);
458
459 // If debugging is enabled, then print an exception.
460 if(server_info->debug) {
461 ruby_print_exception(exc);
462 }
463
464 send_message(server_info->session, server_info->message);
465
466 return Qnil;
467 }
468
469 // Proces a request from the client and send an appropriate reply.
470 static VALUE server_reply(VALUE ruby_server_info) {
471 Server_Info * server_info = (Server_Info *)(ruby_server_info);
472 VALUE retval;
473 int status;
474
475 server_info->obj = ruby_get_object(
476 server_info->obj,
477 server_info->message->object_id);
478
479 // TODO: The client should be able to pass a callback object to the server;
480 // msg_to_obj can create a Proxy_Object, but it needs a session to make
481 // calls over.
482
483 // Perform the appropriate action based on message type.
484 switch(server_info->message->message_type) {
485 case ROMP_ONEWAY_SYNC:
486 send_null_message(server_info->session);
487 // fallthrough
488
489 case ROMP_ONEWAY:
490 rb_protect(server_funcall, ruby_server_info, &status);
491 return Qnil;
492
493 case ROMP_REQUEST:
494 retval = ruby_send(
495 server_info->obj,
496 server_info->message->message_obj);
497 break;
498
499 case ROMP_REQUEST_BLOCK:
500 retval = rb_iterate(
501 server_funcall, ruby_server_info,
502 server_send_yield, ruby_server_info);
503 break;
504
505 case ROMP_SYNC:
506 reply_sync(
507 server_info->session,
508 server_info->message->object_id);
509 return Qnil;
510
511 default:
512 rb_raise(rb_eRuntimeError, "Bad session request");
513 }
514
515 server_send_retval(retval, ruby_server_info);
516
517 return Qnil;
518 }
519
520 // The main server loop. Wait for a message from the client, route the
521 // message to the appropriate object, send a response and repeat.
522 static void server_loop(ROMP_Session * session, VALUE resolve_server, int dbg) {
523 ROMP_Message message;
524 Server_Info server_info = { session, &message, resolve_server, dbg };
525 VALUE ruby_server_info = (VALUE)(&server_info);
526
527 while(!session_finished(session)) {
528 get_message(session, &message);
529 rb_rescue2(
530 server_reply, ruby_server_info,
531 server_exception, ruby_server_info, rb_eException, 0);
532 server_info.obj = resolve_server;
533 }
534 }
535
536 // ----------------------------------------------------------------------------
537 // Client functions
538 // ----------------------------------------------------------------------------
539
540 // We use this structure to pass data to our client functions by casting it
541 // to a Ruby VALUE (see above note with Server_Info).
542 typedef struct {
543 ROMP_Session * session;
544 VALUE ruby_session;
545 OBJECT_ID_T object_id;
546 VALUE message;
547 VALUE mutex;
548 } Proxy_Object;
549
550 // Send a request to the server, wait for a response, and perform an action
551 // based on what that response was. This is not thread-safe, so the caller
552 // should perform any necessary locking
553 static VALUE client_request(VALUE ruby_proxy_object) {
554 Proxy_Object * obj = (Proxy_Object *)(ruby_proxy_object);
555 ROMP_Message msg = {
556 rb_block_given_p() ? ROMP_REQUEST_BLOCK : ROMP_REQUEST,
557 obj->object_id,
558 obj->message
559 };
560 send_message(obj->session, &msg);
561
562 for(;;) {
563 get_message(obj->session, &msg);
564 switch(msg.message_type) {
565 case ROMP_RETVAL:
566 return msg_to_obj(msg.message_obj, obj->ruby_session, obj->mutex);
567 break;
568 case ROMP_YIELD:
569 rb_yield(msg_to_obj(msg.message_obj, obj->ruby_session, obj->mutex));
570 break;
571 case ROMP_EXCEPTION: {
572 ruby_raise(
573 msg.message_obj,
574 ruby_exc_message(msg.message_obj),
575 rb_ary_concat(ruby_exc_backtrace(msg.message_obj), ruby_caller())
576 );
577 break;
578 }
579 case ROMP_SYNC:
580 reply_sync(obj->session, NUM2INT(msg.message_obj));
581 break;
582 default:
583 rb_raise(rb_eRuntimeError, "Invalid msg type received");
584 }
585 }
586 }
587
588 // Send a oneway message to the server. This is not thread-safe, so the
589 // caller should perform any necessary locking.
590 static VALUE client_oneway(VALUE ruby_proxy_object) {
591 Proxy_Object * obj = (Proxy_Object *)(ruby_proxy_object);
592 ROMP_Message msg = {
593 ROMP_ONEWAY,
594 obj->object_id,
595 obj->message
596 };
597 send_message(obj->session, &msg);
598 return Qnil;
599 }
600
601 // Send a oneway message to the server and request a message in response.
602 // This is not thread-safe, so the caller should perform any necessary
603 // locking.
604 static VALUE client_oneway_sync(VALUE ruby_proxy_object) {
605 Proxy_Object * obj = (Proxy_Object *)(ruby_proxy_object);
606 ROMP_Message msg = {
607 ROMP_ONEWAY_SYNC,
608 obj->object_id,
609 obj->message
610 };
611 send_message(obj->session, &msg);
612 get_message(obj->session, &msg);
613 return Qnil;
614 }
615
616 // Synchronize with the server. This is not thread-safe, so the caller should
617 // perform any necessary locking.
618 static VALUE client_sync(VALUE ruby_proxy_object) {
619 Proxy_Object * obj = (Proxy_Object *)(ruby_proxy_object);
620 send_sync(obj->session);
621 wait_sync(obj->session);
622 return Qnil;
623 }
624
625 // ----------------------------------------------------------------------------
626 // Ruby interface functions
627 // ----------------------------------------------------------------------------
628
629 static void ruby_session_mark(ROMP_Session * session) {
630 rb_gc_mark(session->io_object);
631 }
632
633 static VALUE ruby_session_new(VALUE self, VALUE io_object) {
634 ROMP_Session * session;
635 VALUE ruby_session;
636 OpenFile * openfile;
637 FILE * read_fp;
638 FILE * write_fp;
639
640 if(!rb_obj_is_kind_of(io_object, rb_cIO)) {
641 rb_raise(rb_eTypeError, "Expecting an IO object");
642 }
643
644 ruby_session = Data_Make_Struct(
645 rb_cSession,
646 ROMP_Session,
647 (RUBY_DATA_FUNC)(ruby_session_mark),
648 (RUBY_DATA_FUNC)(free),
649 session);
650
651 GetOpenFile(io_object, openfile);
652 read_fp = GetReadFile(openfile);
653 write_fp = GetWriteFile(openfile);
654 session->read_fd = fileno(read_fp);
655 session->write_fd = fileno(write_fp);
656 session->io_object = io_object;
657 session->nonblock = 0;
658
659 return ruby_session;
660 }
661
662 static VALUE ruby_set_nonblock(VALUE self, VALUE nonblock) {
663 ROMP_Session * session;
664 Data_Get_Struct(self, ROMP_Session, session);
665 if(nonblock == Qtrue) {
666 session->nonblock = 1;
667 } else if(nonblock == Qfalse) {
668 session->nonblock = 0;
669 } else {
670 rb_raise(rb_eTypeError, "Expecting a boolean");
671 }
672 return Qnil;
673 }
674
675 static void ruby_proxy_object_mark(Proxy_Object * proxy_object) {
676 rb_gc_mark(proxy_object->ruby_session);
677 rb_gc_mark(proxy_object->mutex);
678 }
679
680 static VALUE ruby_proxy_object_new(
681 VALUE self, VALUE ruby_session, VALUE ruby_mutex, VALUE ruby_object_id) {
682 ROMP_Session * session;
683 OBJECT_ID_T object_id = NUM2INT(ruby_object_id);
684 Proxy_Object * proxy_object;
685 VALUE ruby_proxy_object;
686
687 if(!rb_obj_is_kind_of(ruby_session, rb_cSession)) {
688 rb_raise(rb_eTypeError, "Expecting a session");
689 }
690 Data_Get_Struct(ruby_session, ROMP_Session, session);
691
692 ruby_proxy_object = Data_Make_Struct(
693 rb_cProxy_Object,
694 Proxy_Object,
695 (RUBY_DATA_FUNC)(ruby_proxy_object_mark),
696 (RUBY_DATA_FUNC)(free),
697 proxy_object);
698 proxy_object->session = session;
699 proxy_object->ruby_session = ruby_session;
700 proxy_object->mutex = ruby_mutex;
701 proxy_object->object_id = object_id;
702
703 return ruby_proxy_object;
704 }
705
706 static VALUE ruby_proxy_object_method_missing(VALUE self, VALUE message) {
707 Proxy_Object * proxy_object;
708 Data_Get_Struct(self, Proxy_Object, proxy_object);
709
710 proxy_object->message = message;
711 ruby_lock(proxy_object->mutex);
712 return rb_ensure(
713 client_request, (VALUE)(proxy_object),
714 ruby_unlock, proxy_object->mutex);
715 }
716
717 static VALUE ruby_proxy_object_oneway(VALUE self, VALUE message) {
718 Proxy_Object * proxy_object;
719 Data_Get_Struct(self, Proxy_Object, proxy_object);
720
721 proxy_object->message = message;
722 ruby_lock(proxy_object->mutex);
723 rb_ensure(
724 client_oneway, (VALUE)(proxy_object),
725 ruby_unlock, proxy_object->mutex);
726 return Qnil;
727 }
728
729 static VALUE ruby_proxy_object_oneway_sync(VALUE self, VALUE message) {
730 Proxy_Object * proxy_object;
731 Data_Get_Struct(self, Proxy_Object, proxy_object);
732
733 proxy_object->message = message;
734 ruby_lock(proxy_object->mutex);
735 rb_ensure(
736 client_oneway_sync, (VALUE)(proxy_object),
737 ruby_unlock, proxy_object->mutex);
738 return Qnil;
739 }
740
741 static VALUE ruby_proxy_object_sync(VALUE self) {
742 Proxy_Object * proxy_object;
743 Data_Get_Struct(self, Proxy_Object, proxy_object);
744
745 ruby_lock(proxy_object->mutex);
746 rb_ensure(
747 client_sync, (VALUE)(proxy_object),
748 ruby_unlock, proxy_object->mutex);
749 return Qnil;
750 }
751
752 static VALUE ruby_server_loop(VALUE self, VALUE ruby_session) {
753 ROMP_Session * session;
754 VALUE resolve_server;
755 VALUE ruby_debug;
756 int debug;
757
758 if(!rb_obj_is_kind_of(ruby_session, rb_cSession)) {
759 rb_raise(rb_eTypeError, "Excpecting a session");
760 }
761 Data_Get_Struct(ruby_session, ROMP_Session, session);
762
763 resolve_server = rb_iv_get(self, "@resolve_server");
764
765 ruby_debug = rb_iv_get(self, "@debug");
766 debug = (ruby_debug != Qfalse) && !NIL_P(ruby_debug);
767 server_loop(session, resolve_server, debug);
768 return Qnil;
769 }
770
771 // Given a message, convert it into an object that can be returned. This
772 // function really only checks to see if an Object_Reference has been returned
773 // from the server, and creates a new Proxy_Object if this is the case.
774 // Otherwise, the original object is returned to the client.
775 static VALUE msg_to_obj(VALUE message, VALUE session, VALUE mutex) {
776 if(CLASS_OF(message) == rb_cObject_Reference) {
777 return ruby_proxy_object_new(
778 rb_cProxy_Object,
779 session,
780 mutex,
781 rb_funcall(message, id_object_id, 0));
782 } else {
783 return message;
784 }
785 }
786
787 void Init_romp_helper() {
788 init_globals();
789
790 rb_mROMP = rb_define_module("ROMP");
791 rb_cSession = rb_define_class_under(rb_mROMP, "Session", rb_cObject);
792
793 rb_define_const(rb_cSession, "REQUEST", INT2NUM(ROMP_REQUEST));
794 rb_define_const(rb_cSession, "REQUEST_BLOCK", INT2NUM(ROMP_REQUEST_BLOCK));
795 rb_define_const(rb_cSession, "ONEWAY", INT2NUM(ROMP_ONEWAY));
796 rb_define_const(rb_cSession, "ONEWAY_SYNC", INT2NUM(ROMP_ONEWAY_SYNC));
797 rb_define_const(rb_cSession, "RETVAL", INT2NUM(ROMP_RETVAL));
798 rb_define_const(rb_cSession, "EXCEPTION", INT2NUM(ROMP_EXCEPTION));
799 rb_define_const(rb_cSession, "YIELD", INT2NUM(ROMP_YIELD));
800 rb_define_const(rb_cSession, "SYNC", INT2NUM(ROMP_SYNC));
801 rb_define_const(rb_cSession, "NULL_MSG", INT2NUM(ROMP_NULL_MSG));
802 rb_define_const(rb_cSession, "MSG_START", INT2NUM(ROMP_MSG_START));
803 rb_define_const(rb_cSession, "MAX_ID", INT2NUM(ROMP_MAX_ID));
804 rb_define_const(rb_cSession, "MAX_MSG_TYPE", INT2NUM(ROMP_MAX_MSG_TYPE));
805
806 rb_define_singleton_method(rb_cSession, "new", ruby_session_new, 1);
807 rb_define_method(rb_cSession, "set_nonblock", ruby_set_nonblock, 1);
808
809 rb_cProxy_Object = rb_define_class_under(rb_mROMP, "Proxy_Object", rb_cObject);
810 rb_define_singleton_method(rb_cProxy_Object, "new", ruby_proxy_object_new, 3);
811 rb_define_method(rb_cProxy_Object, "method_missing", ruby_proxy_object_method_missing, -2);
812 rb_define_method(rb_cProxy_Object, "oneway", ruby_proxy_object_oneway, -2);
813 rb_define_method(rb_cProxy_Object, "oneway_sync", ruby_proxy_object_oneway_sync, -2);
814 rb_define_method(rb_cProxy_Object, "sync", ruby_proxy_object_sync, 0);
815
816 rb_cServer = rb_define_class_under(rb_mROMP, "Server", rb_cObject);
817 rb_define_method(rb_cServer, "server_loop", ruby_server_loop, 1);
818
819 rb_cObject_Reference = rb_define_class_under(rb_mROMP, "Object_Reference", rb_cObject);
820
821 id_object_id = rb_intern("object_id");
822 }