1 // ROMP - The Ruby Object Message Proxy
2 // (C) Copyright 2001 Paul Brannan (cout at rm-f.net)
3 //
4 // ROMP is a set of classes for providing distributed object support to a
5 // Ruby program. You may distribute and/or modify it under the same terms as
6 // Ruby (see http://www.ruby-lang.org/en/LICENSE.txt).
7
8 #include <ruby.h>
9 #include <rubyio.h>
10 #include <stdint.h>
11 #include <stdlib.h>
12 #include <unistd.h>
13 #include <fcntl.h>
14 #include <sys/time.h>
15
16 // ---------------------------------------------------------------------------
17 // Useful macros
18 // ---------------------------------------------------------------------------
19
20 // TODO: Are these portable?
21
22 #define PUTSHORT(s, buf) \
23 do { \
24 *buf = s >> 8; ++buf; \
25 *buf = s & 0xff; ++buf; \
26 } while(0)
27
28 #define GETSHORT(s, buf) \
29 do { \
30 s = (unsigned char)*buf; ++buf; \
31 s = (s << 8) | (unsigned char)*buf; ++buf; \
32 } while(0)
33
34 // ---------------------------------------------------------------------------
35 // Globals
36 // ---------------------------------------------------------------------------
37
38 // objects/functions created down below
39 //
40 static VALUE rb_mROMP = Qnil;
41 static VALUE rb_cSession = Qnil;
42 static VALUE rb_cProxy_Object = Qnil;
43 static VALUE rb_cServer = Qnil;
44 static VALUE rb_cObject_Reference = Qnil;
45 static ID id_object_id;
46
47 // objects/functions created elsewhere
48 //
49 static VALUE rb_mMarshal = Qnil;
50
51 static ID id_dump;
52 static ID id_load;
53 static ID id_message;
54 static ID id_backtrace;
55 static ID id_caller;
56 static ID id_raise;
57 static ID id_send;
58 static ID id_get_object;
59 static ID id_slice_bang;
60 static ID id_print_exception;
61 static ID id_lock;
62 static ID id_unlock;
63
64 static struct timeval zero_timeval;
65
66 static void init_globals() {
67 rb_mMarshal = rb_const_get(rb_cObject, rb_intern("Marshal"));
68
69 id_dump = rb_intern("dump");
70 id_load = rb_intern("load");
71 id_message = rb_intern("message");
72 id_backtrace = rb_intern("backtrace");
73 id_caller = rb_intern("caller");
74 id_raise = rb_intern("raise");
75 id_send = rb_intern("send");
76 id_get_object = rb_intern("get_object");
77 id_slice_bang = rb_intern("slice!");
78 id_print_exception = rb_intern("print_exception");
79 id_lock = rb_intern("lock");
80 id_unlock = rb_intern("unlock");
81
82 zero_timeval.tv_sec = 0;
83 zero_timeval.tv_usec = 0;
84 }
85
86 // ---------------------------------------------------------------------------
87 // Utility functions
88 // ---------------------------------------------------------------------------
89
90 // Forward declaration
91 static VALUE msg_to_obj(VALUE message, VALUE session, VALUE mutex);
92
93 // Create a Ruby string that won't be collected by the GC. Be very careful
94 // with this function!
95 static void create_tmp_ruby_string(struct RString * str, char * buf, size_t len) {
96 str->basic.flags = T_STRING;
97 str->basic.klass = rb_cString;
98 str->ptr = buf;
99 str->len = len;
100 str->orig = 0; // ?
101 }
102
103 #define WRITE_HELPER \
104 do { \
105 write_count = write(fd, buf, count); \
106 if(write_count < 0) { \
107 if(errno != EWOULDBLOCK) rb_sys_fail("write"); \
108 } else if(write_count == 0 && count != 0) { \
109 rb_raise(rb_eIOError, "disconnected"); \
110 } else { \
111 count -= write_count; \
112 buf += write_count; \
113 total += write_count; \
114 } \
115 } while(0)
116
117 #define READ_HELPER \
118 do { \
119 read_count = read(fd, buf, count); \
120 if(read_count < 0) { \
121 if(errno != EWOULDBLOCK) rb_sys_fail("read"); \
122 } else if(read_count == 0 && count != 0) { \
123 rb_raise(rb_eIOError, "disconnected"); \
124 } else { \
125 count -= read_count; \
126 buf += read_count; \
127 total += read_count; \
128 } \
129 } while(0)
130
131 // Write to an fd and raise an exception if an error occurs
132 static ssize_t ruby_write_throw(int fd, const void * buf, size_t count, int nonblock) {
133 int n;
134 size_t total = 0;
135 ssize_t write_count;
136 fd_set fds, error_fds;
137
138 if(!nonblock) {
139 FD_ZERO(&fds);
140 FD_SET(fd, &fds);
141 FD_ZERO(&error_fds);
142 FD_SET(fd, &error_fds);
143 n = select(fd + 1, 0, &fds, &fds, 0);
144 if(n > 0) {
145 WRITE_HELPER;
146 }
147 } else {
148 WRITE_HELPER;
149 }
150
151 while(count > 0) {
152 FD_ZERO(&fds);
153 FD_SET(fd, &fds);
154 FD_ZERO(&error_fds);
155 FD_SET(fd, &error_fds);
156 n = rb_thread_select(fd + 1, 0, &fds, &fds, 0);
157 if(n == -1) {
158 if(errno == EWOULDBLOCK) continue;
159 rb_sys_fail("select");
160 }
161 WRITE_HELPER;
162 };
163 return total;
164 }
165
166 // Read from an fd and raise an exception if an error occurs
167 static ssize_t ruby_read_throw(int fd, void * buf, size_t count, int nonblock) {
168 int n;
169 size_t total = 0;
170 ssize_t read_count;
171 fd_set fds, error_fds;
172
173 if(!nonblock) {
174 FD_ZERO(&fds);
175 FD_SET(fd, &fds);
176 FD_ZERO(&error_fds);
177 FD_SET(fd, &error_fds);
178 n = select(fd + 1, &fds, 0, &error_fds, &zero_timeval);
179 if(n > 0) {
180 READ_HELPER;
181 }
182 } else {
183 READ_HELPER;
184 }
185
186 while(count > 0) {
187 FD_ZERO(&fds);
188 FD_SET(fd, &fds);
189 FD_ZERO(&error_fds);
190 FD_SET(fd, &error_fds);
191 n = rb_thread_select(fd + 1, &fds, 0, &error_fds, 0);
192 if(n == -1) {
193 if(errno == EWOULDBLOCK) continue;
194 rb_sys_fail("select");
195 }
196 READ_HELPER;
197 };
198
199 return total;
200 }
201
202 // Return the message of an exception
203 static VALUE ruby_exc_message(VALUE exc) {
204 return rb_funcall(exc, id_message, 0);
205 }
206
207 // Return the backtrace of an exception
208 static VALUE ruby_exc_backtrace(VALUE exc) {
209 return rb_funcall(exc, id_backtrace, 0);
210 }
211
212 // Return the current Ruby call stack
213 static VALUE ruby_caller() {
214 // TODO: Why does calling caller() with 0 arguments not work?
215 return rb_funcall(rb_mKernel, id_caller, 1, INT2NUM(0));
216 }
217
218 // Raise a specific ruby exception with a specific message and backtrace
219 static void ruby_raise(VALUE exc, VALUE msg, VALUE bt) {
220 rb_funcall(rb_mKernel, id_raise, 3, exc, msg, bt);
221 }
222
223 // Send a message to a Ruby object
224 static VALUE ruby_send(VALUE obj, VALUE msg) {
225 return rb_apply(obj, id_send, msg);
226 }
227
228 // Call "get_object" on a Ruby object
229 static VALUE ruby_get_object(VALUE obj, VALUE object_id) {
230 return rb_funcall(obj, id_get_object, 1, INT2NUM(object_id));
231 }
232
233 // Call slice! on a Ruby object
234 static VALUE ruby_slice_bang(VALUE obj, size_t min, size_t max) {
235 VALUE range = rb_range_new(INT2NUM(min), INT2NUM(max), 0);
236 return rb_funcall(obj, id_slice_bang, 1, range);
237 }
238
239 // Print an exception to the screen using a function defined in the ROMP
240 // module (TODO: wouldn't it be nice if Ruby's error_print were available to
241 // the user?)
242 static void ruby_print_exception(VALUE exc) {
243 rb_funcall(rb_mROMP, id_print_exception, 1, exc);
244 }
245
246 // Call lock on an object (a mutex, probably)
247 static VALUE ruby_lock(VALUE obj) {
248 return rb_funcall(obj, id_lock, 0);
249 }
250
251 // Call unlock on an object (a mutex, probably)
252 static VALUE ruby_unlock(VALUE obj) {
253 return rb_funcall(obj, id_unlock, 0);
254 }
255
256 // ---------------------------------------------------------------------------
257 // Marshalling functions
258 // ---------------------------------------------------------------------------
259
260 // Take an object as input and return it as a marshalled string.
261 static VALUE marshal_dump(VALUE obj) {
262 return rb_funcall(rb_mMarshal, id_dump, 1, obj);
263 }
264
265 // Take a marshalled string as input and return it as an object.
266 static VALUE marshal_load(VALUE str) {
267 return rb_funcall(rb_mMarshal, id_load, 1, str);
268 }
269
270 // ---------------------------------------------------------------------------
271 // Session functions
272 // ---------------------------------------------------------------------------
273
274 #define ROMP_REQUEST 0x1001
275 #define ROMP_REQUEST_BLOCK 0x1002
276 #define ROMP_ONEWAY 0x1003
277 #define ROMP_ONEWAY_SYNC 0x1004
278 #define ROMP_RETVAL 0x2001
279 #define ROMP_EXCEPTION 0x2002
280 #define ROMP_YIELD 0x2003
281 #define ROMP_SYNC 0x4001
282 #define ROMP_NULL_MSG 0x4002
283 #define ROMP_MSG_START 0x4242
284 #define ROMP_MAX_ID (1<<16)
285 #define ROMP_MAX_MSG_TYPE (1<<16)
286
287 #define ROMP_BUFFER_SIZE 16
288
289 typedef struct {
290 VALUE io_object;
291 int read_fd, write_fd;
292 char buf[ROMP_BUFFER_SIZE];
293 int nonblock;
294 } ROMP_Session;
295
296 typedef uint16_t MESSAGE_TYPE_T;
297 typedef uint16_t OBJECT_ID_T;
298
299 // A ROMP message is broken into 3 components (see romp.rb for more details)
300 typedef struct {
301 MESSAGE_TYPE_T message_type;
302 OBJECT_ID_T object_id;
303 VALUE message_obj;
304 } ROMP_Message;
305
306 // Send a message to the server with data data and length len.
307 static void send_message_helper(
308 ROMP_Session * session,
309 char * data,
310 size_t len,
311 MESSAGE_TYPE_T message_type,
312 OBJECT_ID_T object_id) {
313
314 char * buf = session->buf;
315
316 PUTSHORT(ROMP_MSG_START, buf);
317 PUTSHORT(len, buf);
318 PUTSHORT(message_type, buf);
319 PUTSHORT(object_id, buf);
320
321 ruby_write_throw(session->write_fd, session->buf, ROMP_BUFFER_SIZE, session->nonblock);
322 ruby_write_throw(session->write_fd, data, len, session->nonblock);
323 }
324
325 // Send a message to the server with the data in message.
326 static void send_message(ROMP_Session * session, ROMP_Message * message) {
327 VALUE data;
328 struct RString * data_str;
329
330 data = marshal_dump(message->message_obj);
331 data_str = RSTRING(data);
332 send_message_helper(
333 session,
334 data_str->ptr,
335 data_str->len,
336 message->message_type,
337 message->object_id);
338 }
339
340 // Send a null message to the server (no data, data len = 0)
341 static void send_null_message(ROMP_Session * session) {
342 send_message_helper(session, "", 0, ROMP_NULL_MSG, 0);
343 }
344
345 // Receive a message from the server
346 static void get_message(ROMP_Session * session, ROMP_Message * message) {
347 uint16_t magic = 0;
348 uint16_t data_len = 0;
349 char * buf = 0;
350 // struct RString message_string;
351 VALUE ruby_str;
352
353 do {
354 buf = session->buf;
355
356 ruby_read_throw(session->read_fd, buf, ROMP_BUFFER_SIZE, session->nonblock);
357
358 GETSHORT(magic, buf);
359 GETSHORT(data_len, buf);
360 GETSHORT(message->message_type, buf);
361 GETSHORT(message->object_id, buf);
362 } while(magic != ROMP_MSG_START);
363
364 buf = ALLOCA_N(char, data_len);
365 ruby_read_throw(session->read_fd, buf, data_len, session->nonblock);
366 // create_tmp_ruby_string(&message_string, buf, data_len);
367 ruby_str = rb_str_new(buf, data_len);
368
369 if(message->message_type != ROMP_NULL_MSG) {
370 message->message_obj = marshal_load(ruby_str);
371 } else {
372 message->message_obj = Qnil;
373 }
374 }
375
376 // Ideally, this function should return true if the server has disconnected,
377 // but currently always returns false. The server thread will still exit
378 // when the client has disconnected, but currently does so via an exception.
379 static int session_finished(ROMP_Session * session) {
380 // TODO: Detect a disconnection
381 return 0;
382 }
383
384 // Send a sync message to the server.
385 static void send_sync(ROMP_Session * session) {
386 ROMP_Message message = { ROMP_SYNC, 0, Qnil };
387 send_message(session, &message);
388 }
389
390 // Wait for a sync response from the server. Ignore any messages that are
391 // received while waiting for the response.
392 static void wait_sync(ROMP_Session * session) {
393 ROMP_Message message;
394
395 // sleep(1);
396 get_message(session, &message);
397 if( message.message_type != ROMP_SYNC
398 && message.object_id != 1
399 && message.message_obj != Qnil) {
400 rb_raise(rb_eRuntimeError, "ROMP synchronization failed");
401 }
402 }
403
404 // Send a reply to a sync request.
405 static void reply_sync(ROMP_Session * session, int value) {
406 if(value == 0) {
407 ROMP_Message message = { ROMP_SYNC, 1, Qnil };
408 send_message(session, &message);
409 }
410 }
411
412 // ----------------------------------------------------------------------------
413 // Server functions
414 // ----------------------------------------------------------------------------
415
416 // We use this structure to pass data to our exception handler. This is done
417 // by casting a pointer to a Ruby VALUE... not 100% kosher, but it should work.
418 typedef struct {
419 ROMP_Session * session;
420 ROMP_Message * message;
421 VALUE obj;
422 int debug;
423 } Server_Info;
424
425 // Make a method call into a Ruby object.
426 static VALUE server_funcall(VALUE ruby_server_info) {
427 Server_Info * server_info = (Server_Info *)(ruby_server_info);
428 return ruby_send(server_info->obj, server_info->message->message_obj);
429 }
430
431 // Send a yield message to the client, indicating that it should call
432 // Kernel#yield with the message that is sent.
433 static VALUE server_send_yield(VALUE retval, VALUE ruby_server_info) {
434 Server_Info * server_info = (Server_Info *)(ruby_server_info);
435
436 server_info->message->message_type = ROMP_YIELD;
437 server_info->message->object_id = 0;
438 server_info->message->message_obj = retval;
439 send_message(server_info->session, server_info->message);
440
441 return Qnil;
442 }
443
444 // Send a return value to the client, indicating that it should return
445 // the message to the caller.
446 static VALUE server_send_retval(VALUE retval, VALUE ruby_server_info) {
447 Server_Info * server_info = (Server_Info *)(ruby_server_info);
448
449 server_info->message->message_type = ROMP_RETVAL;
450 server_info->message->object_id = 0;
451 server_info->message->message_obj = retval;
452 send_message(server_info->session, server_info->message);
453
454 return Qnil;
455 }
456
457 // Send an exception the client, indicating that it should raise an exception.
458 static VALUE server_exception(VALUE ruby_server_info, VALUE exc) {
459 Server_Info * server_info = (Server_Info *)(ruby_server_info);
460 VALUE caller = ruby_caller();
461 VALUE bt = ruby_exc_backtrace(exc);
462
463 server_info->message->message_type = ROMP_EXCEPTION;
464 server_info->message->object_id = 0;
465 server_info->message->message_obj = exc;
466
467 // Get rid of extraneous caller information to make debugging easier.
468 ruby_slice_bang(bt, RARRAY(bt)->len - RARRAY(caller)->len - 1, -1);
469
470 // If debugging is enabled, then print an exception.
471 if(server_info->debug) {
472 ruby_print_exception(exc);
473 }
474
475 send_message(server_info->session, server_info->message);
476
477 return Qnil;
478 }
479
480 // Proces a request from the client and send an appropriate reply.
481 static VALUE server_reply(VALUE ruby_server_info) {
482 Server_Info * server_info = (Server_Info *)(ruby_server_info);
483 VALUE retval;
484 int status;
485
486 server_info->obj = ruby_get_object(
487 server_info->obj,
488 server_info->message->object_id);
489
490 // TODO: The client should be able to pass a callback object to the server;
491 // msg_to_obj can create a Proxy_Object, but it needs a session to make
492 // calls over.
493
494 // Perform the appropriate action based on message type.
495 switch(server_info->message->message_type) {
496 case ROMP_ONEWAY_SYNC:
497 send_null_message(server_info->session);
498 // fallthrough
499
500 case ROMP_ONEWAY:
501 rb_protect(server_funcall, ruby_server_info, &status);
502 return Qnil;
503
504 case ROMP_REQUEST:
505 retval = ruby_send(
506 server_info->obj,
507 server_info->message->message_obj);
508 break;
509
510 case ROMP_REQUEST_BLOCK:
511 retval = rb_iterate(
512 server_funcall, ruby_server_info,
513 server_send_yield, ruby_server_info);
514 break;
515
516 case ROMP_SYNC:
517 reply_sync(
518 server_info->session,
519 server_info->message->object_id);
520 return Qnil;
521
522 default:
523 rb_raise(rb_eRuntimeError, "Bad session request");
524 }
525
526 server_send_retval(retval, ruby_server_info);
527
528 return Qnil;
529 }
530
531 // The main server loop. Wait for a message from the client, route the
532 // message to the appropriate object, send a response and repeat.
533 static void server_loop(ROMP_Session * session, VALUE resolve_server, int dbg) {
534 ROMP_Message message;
535 Server_Info server_info = { session, &message, resolve_server, dbg };
536 VALUE ruby_server_info = (VALUE)(&server_info);
537
538 while(!session_finished(session)) {
539 get_message(session, &message);
540 rb_rescue2(
541 server_reply, ruby_server_info,
542 server_exception, ruby_server_info, rb_eException, 0);
543 server_info.obj = resolve_server;
544 }
545 }
546
547 // ----------------------------------------------------------------------------
548 // Client functions
549 // ----------------------------------------------------------------------------
550
551 // We use this structure to pass data to our client functions by casting it
552 // to a Ruby VALUE (see above note with Server_Info).
553 typedef struct {
554 ROMP_Session * session;
555 VALUE ruby_session;
556 OBJECT_ID_T object_id;
557 VALUE message;
558 VALUE mutex;
559 } Proxy_Object;
560
561 // Send a request to the server, wait for a response, and perform an action
562 // based on what that response was. This is not thread-safe, so the caller
563 // should perform any necessary locking
564 static VALUE client_request(VALUE ruby_proxy_object) {
565 Proxy_Object * obj = (Proxy_Object *)(ruby_proxy_object);
566 ROMP_Message msg = {
567 rb_block_given_p() ? ROMP_REQUEST_BLOCK : ROMP_REQUEST,
568 obj->object_id,
569 obj->message
570 };
571 send_message(obj->session, &msg);
572
573 for(;;) {
574 get_message(obj->session, &msg);
575 switch(msg.message_type) {
576 case ROMP_RETVAL:
577 return msg_to_obj(msg.message_obj, obj->ruby_session, obj->mutex);
578 break;
579 case ROMP_YIELD:
580 rb_yield(msg_to_obj(msg.message_obj, obj->ruby_session, obj->mutex));
581 break;
582 case ROMP_EXCEPTION: {
583 ruby_raise(
584 msg.message_obj,
585 ruby_exc_message(msg.message_obj),
586 rb_ary_concat(ruby_exc_backtrace(msg.message_obj), ruby_caller())
587 );
588 break;
589 }
590 case ROMP_SYNC:
591 reply_sync(obj->session, NUM2INT(msg.message_obj));
592 break;
593 default:
594 rb_raise(rb_eRuntimeError, "Invalid msg type received");
595 }
596 }
597 }
598
599 // Send a oneway message to the server. This is not thread-safe, so the
600 // caller should perform any necessary locking.
601 static VALUE client_oneway(VALUE ruby_proxy_object) {
602 Proxy_Object * obj = (Proxy_Object *)(ruby_proxy_object);
603 ROMP_Message msg = {
604 ROMP_ONEWAY,
605 obj->object_id,
606 obj->message
607 };
608 send_message(obj->session, &msg);
609 return Qnil;
610 }
611
612 // Send a oneway message to the server and request a message in response.
613 // This is not thread-safe, so the caller should perform any necessary
614 // locking.
615 static VALUE client_oneway_sync(VALUE ruby_proxy_object) {
616 Proxy_Object * obj = (Proxy_Object *)(ruby_proxy_object);
617 ROMP_Message msg = {
618 ROMP_ONEWAY_SYNC,
619 obj->object_id,
620 obj->message
621 };
622 send_message(obj->session, &msg);
623 get_message(obj->session, &msg);
624 return Qnil;
625 }
626
627 // Synchronize with the server. This is not thread-safe, so the caller should
628 // perform any necessary locking.
629 static VALUE client_sync(VALUE ruby_proxy_object) {
630 Proxy_Object * obj = (Proxy_Object *)(ruby_proxy_object);
631 send_sync(obj->session);
632 wait_sync(obj->session);
633 return Qnil;
634 }
635
636 // ----------------------------------------------------------------------------
637 // Ruby interface functions
638 // ----------------------------------------------------------------------------
639
640 static void ruby_session_mark(ROMP_Session * session) {
641 rb_gc_mark(session->io_object);
642 }
643
644 static VALUE ruby_session_new(VALUE self, VALUE io_object) {
645 ROMP_Session * session;
646 VALUE ruby_session;
647 OpenFile * openfile;
648 FILE * read_fp;
649 FILE * write_fp;
650
651 if(!rb_obj_is_kind_of(io_object, rb_cIO)) {
652 rb_raise(rb_eTypeError, "Expecting an IO object");
653 }
654
655 ruby_session = Data_Make_Struct(
656 rb_cSession,
657 ROMP_Session,
658 (RUBY_DATA_FUNC)(ruby_session_mark),
659 (RUBY_DATA_FUNC)(free),
660 session);
661
662 GetOpenFile(io_object, openfile);
663 read_fp = GetReadFile(openfile);
664 write_fp = GetWriteFile(openfile);
665 session->read_fd = fileno(read_fp);
666 session->write_fd = fileno(write_fp);
667 session->io_object = io_object;
668 session->nonblock = 0;
669
670 return ruby_session;
671 }
672
673 static VALUE ruby_set_nonblock(VALUE self, VALUE nonblock) {
674 ROMP_Session * session;
675 Data_Get_Struct(self, ROMP_Session, session);
676 if(nonblock == Qtrue) {
677 session->nonblock = 1;
678 } else if(nonblock == Qfalse) {
679 session->nonblock = 0;
680 } else {
681 rb_raise(rb_eTypeError, "Expecting a boolean");
682 }
683 return Qnil;
684 }
685
686 static void ruby_proxy_object_mark(Proxy_Object * proxy_object) {
687 rb_gc_mark(proxy_object->ruby_session);
688 rb_gc_mark(proxy_object->mutex);
689 }
690
691 static VALUE ruby_proxy_object_new(
692 VALUE self, VALUE ruby_session, VALUE ruby_mutex, VALUE ruby_object_id) {
693 ROMP_Session * session;
694 OBJECT_ID_T object_id = NUM2INT(ruby_object_id);
695 Proxy_Object * proxy_object;
696 VALUE ruby_proxy_object;
697
698 if(!rb_obj_is_kind_of(ruby_session, rb_cSession)) {
699 rb_raise(rb_eTypeError, "Expecting a session");
700 }
701 Data_Get_Struct(ruby_session, ROMP_Session, session);
702
703 ruby_proxy_object = Data_Make_Struct(
704 rb_cProxy_Object,
705 Proxy_Object,
706 (RUBY_DATA_FUNC)(ruby_proxy_object_mark),
707 (RUBY_DATA_FUNC)(free),
708 proxy_object);
709 proxy_object->session = session;
710 proxy_object->ruby_session = ruby_session;
711 proxy_object->mutex = ruby_mutex;
712 proxy_object->object_id = object_id;
713
714 return ruby_proxy_object;
715 }
716
717 static VALUE ruby_proxy_object_method_missing(VALUE self, VALUE message) {
718 Proxy_Object * proxy_object;
719 Data_Get_Struct(self, Proxy_Object, proxy_object);
720
721 proxy_object->message = message;
722 ruby_lock(proxy_object->mutex);
723 return rb_ensure(
724 client_request, (VALUE)(proxy_object),
725 ruby_unlock, proxy_object->mutex);
726 }
727
728 static VALUE ruby_proxy_object_oneway(VALUE self, VALUE message) {
729 Proxy_Object * proxy_object;
730 Data_Get_Struct(self, Proxy_Object, proxy_object);
731
732 proxy_object->message = message;
733 ruby_lock(proxy_object->mutex);
734 rb_ensure(
735 client_oneway, (VALUE)(proxy_object),
736 ruby_unlock, proxy_object->mutex);
737 return Qnil;
738 }
739
740 static VALUE ruby_proxy_object_oneway_sync(VALUE self, VALUE message) {
741 Proxy_Object * proxy_object;
742 Data_Get_Struct(self, Proxy_Object, proxy_object);
743
744 proxy_object->message = message;
745 ruby_lock(proxy_object->mutex);
746 rb_ensure(
747 client_oneway_sync, (VALUE)(proxy_object),
748 ruby_unlock, proxy_object->mutex);
749 return Qnil;
750 }
751
752 static VALUE ruby_proxy_object_sync(VALUE self) {
753 Proxy_Object * proxy_object;
754 Data_Get_Struct(self, Proxy_Object, proxy_object);
755
756 ruby_lock(proxy_object->mutex);
757 rb_ensure(
758 client_sync, (VALUE)(proxy_object),
759 ruby_unlock, proxy_object->mutex);
760 return Qnil;
761 }
762
763 static VALUE ruby_server_loop(VALUE self, VALUE ruby_session) {
764 ROMP_Session * session;
765 VALUE resolve_server;
766 VALUE ruby_debug;
767 int debug;
768
769 if(!rb_obj_is_kind_of(ruby_session, rb_cSession)) {
770 rb_raise(rb_eTypeError, "Excpecting a session");
771 }
772 Data_Get_Struct(ruby_session, ROMP_Session, session);
773
774 resolve_server = rb_iv_get(self, "@resolve_server");
775
776 ruby_debug = rb_iv_get(self, "@debug");
777 debug = (ruby_debug != Qfalse) && !NIL_P(ruby_debug);
778 server_loop(session, resolve_server, debug);
779 return Qnil;
780 }
781
782 // Given a message, convert it into an object that can be returned. This
783 // function really only checks to see if an Object_Reference has been returned
784 // from the server, and creates a new Proxy_Object if this is the case.
785 // Otherwise, the original object is returned to the client.
786 static VALUE msg_to_obj(VALUE message, VALUE session, VALUE mutex) {
787 if(CLASS_OF(message) == rb_cObject_Reference) {
788 return ruby_proxy_object_new(
789 rb_cProxy_Object,
790 session,
791 mutex,
792 rb_funcall(message, id_object_id, 0));
793 } else {
794 return message;
795 }
796 }
797
798 void Init_romp_helper() {
799 init_globals();
800
801 rb_mROMP = rb_define_module("ROMP");
802 rb_cSession = rb_define_class_under(rb_mROMP, "Session", rb_cObject);
803
804 rb_define_const(rb_cSession, "REQUEST", INT2NUM(ROMP_REQUEST));
805 rb_define_const(rb_cSession, "REQUEST_BLOCK", INT2NUM(ROMP_REQUEST_BLOCK));
806 rb_define_const(rb_cSession, "ONEWAY", INT2NUM(ROMP_ONEWAY));
807 rb_define_const(rb_cSession, "ONEWAY_SYNC", INT2NUM(ROMP_ONEWAY_SYNC));
808 rb_define_const(rb_cSession, "RETVAL", INT2NUM(ROMP_RETVAL));
809 rb_define_const(rb_cSession, "EXCEPTION", INT2NUM(ROMP_EXCEPTION));
810 rb_define_const(rb_cSession, "YIELD", INT2NUM(ROMP_YIELD));
811 rb_define_const(rb_cSession, "SYNC", INT2NUM(ROMP_SYNC));
812 rb_define_const(rb_cSession, "NULL_MSG", INT2NUM(ROMP_NULL_MSG));
813 rb_define_const(rb_cSession, "MSG_START", INT2NUM(ROMP_MSG_START));
814 rb_define_const(rb_cSession, "MAX_ID", INT2NUM(ROMP_MAX_ID));
815 rb_define_const(rb_cSession, "MAX_MSG_TYPE", INT2NUM(ROMP_MAX_MSG_TYPE));
816
817 rb_define_singleton_method(rb_cSession, "new", ruby_session_new, 1);
818 rb_define_method(rb_cSession, "set_nonblock", ruby_set_nonblock, 1);
819
820 rb_cProxy_Object = rb_define_class_under(rb_mROMP, "Proxy_Object", rb_cObject);
821 rb_define_singleton_method(rb_cProxy_Object, "new", ruby_proxy_object_new, 3);
822 rb_define_method(rb_cProxy_Object, "method_missing", ruby_proxy_object_method_missing, -2);
823 rb_define_method(rb_cProxy_Object, "oneway", ruby_proxy_object_oneway, -2);
824 rb_define_method(rb_cProxy_Object, "oneway_sync", ruby_proxy_object_oneway_sync, -2);
825 rb_define_method(rb_cProxy_Object, "sync", ruby_proxy_object_sync, 0);
826
827 rb_cServer = rb_define_class_under(rb_mROMP, "Server", rb_cObject);
828 rb_define_method(rb_cServer, "server_loop", ruby_server_loop, 1);
829
830 rb_cObject_Reference = rb_define_class_under(rb_mROMP, "Object_Reference", rb_cObject);
831
832 id_object_id = rb_intern("object_id");
833 }