1 // ROMP - The Ruby Object Message Proxy
  2 // (C) Copyright 2001 Paul Brannan (cout at rm-f.net)
  3 // 
  4 // ROMP is a set of classes for providing distributed object support to a
  5 // Ruby program.  You may distribute and/or modify it under the same terms as
  6 // Ruby (see http://www.ruby-lang.org/en/LICENSE.txt).
  7 
  8 #include <ruby.h>
  9 #include <rubyio.h>
 10 #include <stdint.h>
 11 #include <stdlib.h>
 12 #include <unistd.h>
 13 #include <fcntl.h>
 14 #include <sys/time.h>
 15 
 16 // ---------------------------------------------------------------------------
 17 // Useful macros
 18 // ---------------------------------------------------------------------------
 19 
 20 // TODO: Are these portable?
 21 
 22 #define PUTSHORT(s, buf) \
 23     do { \
 24         *buf = s >> 8; ++buf; \
 25         *buf = s & 0xff; ++buf; \
 26     } while(0)
 27 
 28 #define GETSHORT(s, buf) \
 29     do { \
 30         s = (unsigned char)*buf; ++buf; \
 31         s = (s << 8) | (unsigned char)*buf; ++buf; \
 32     } while(0)
 33 
 34 // ---------------------------------------------------------------------------
 35 // Globals
 36 // ---------------------------------------------------------------------------
 37 
 38 // objects/functions created down below
 39 //
 40 static VALUE rb_mROMP = Qnil;
 41 static VALUE rb_cSession = Qnil;
 42 static VALUE rb_cProxy_Object = Qnil;
 43 static VALUE rb_cServer = Qnil;
 44 static VALUE rb_cObject_Reference = Qnil;
 45 static ID id_object_id;
 46 
 47 // objects/functions created elsewhere
 48 //
 49 static VALUE rb_mMarshal = Qnil;
 50 
 51 static ID id_dump;
 52 static ID id_load;
 53 static ID id_message;
 54 static ID id_backtrace;
 55 static ID id_caller;
 56 static ID id_raise;
 57 static ID id_send;
 58 static ID id_get_object;
 59 static ID id_slice_bang;
 60 static ID id_print_exception;
 61 static ID id_lock;
 62 static ID id_unlock;
 63 
 64 static struct timeval zero_timeval;
 65 
 66 static void init_globals() {
 67     rb_mMarshal = rb_const_get(rb_cObject, rb_intern("Marshal"));
 68 
 69     id_dump = rb_intern("dump");
 70     id_load = rb_intern("load");
 71     id_message = rb_intern("message");
 72     id_backtrace = rb_intern("backtrace");
 73     id_caller = rb_intern("caller");
 74     id_raise = rb_intern("raise");
 75     id_send = rb_intern("send");
 76     id_get_object = rb_intern("get_object");
 77     id_slice_bang = rb_intern("slice!");
 78     id_print_exception = rb_intern("print_exception");
 79     id_lock = rb_intern("lock");
 80     id_unlock = rb_intern("unlock");
 81 
 82     zero_timeval.tv_sec = 0;
 83     zero_timeval.tv_usec = 0;
 84 }
 85 
 86 // ---------------------------------------------------------------------------
 87 // Utility functions
 88 // ---------------------------------------------------------------------------
 89 
 90 // Forward declaration
 91 static VALUE msg_to_obj(VALUE message, VALUE session, VALUE mutex);
 92 
 93 #define WRITE_HELPER \
 94     do { \
 95         write_count = write(fd, buf, count); \
 96         if(write_count < 0) { \
 97             if(errno != EWOULDBLOCK) rb_sys_fail("write"); \
 98         } else if(write_count == 0 && count != 0) { \
 99             rb_raise(rb_eIOError, "disconnected"); \
100         } else { \
101             count -= write_count; \
102             buf += write_count; \
103             total += write_count; \
104         } \
105     } while(0)
106 
107 #define READ_HELPER \
108     do { \
109         read_count = read(fd, buf, count); \
110         if(read_count < 0) { \
111             if(errno != EWOULDBLOCK) rb_sys_fail("read"); \
112         } else if(read_count == 0 && count != 0) { \
113             rb_raise(rb_eIOError, "disconnected"); \
114         } else { \
115             count -= read_count; \
116             buf += read_count; \
117             total += read_count; \
118         } \
119     } while(0)
120 
121 // Write to an fd and raise an exception if an error occurs
122 static ssize_t ruby_write_throw(int fd, const void * buf, size_t count, int nonblock) {
123     int n;
124     size_t total = 0;
125     ssize_t write_count;
126     fd_set fds, error_fds;
127 
128     if(!nonblock) {
129         FD_ZERO(&fds);
130         FD_SET(fd, &fds);
131         FD_ZERO(&error_fds);
132         FD_SET(fd, &error_fds);
133         n = select(fd + 1, 0, &fds, &fds, 0);
134         if(n > 0) {
135             WRITE_HELPER;
136         }
137     } else {
138         WRITE_HELPER;
139     }
140 
141     while(count > 0) {
142         FD_ZERO(&fds);
143         FD_SET(fd, &fds);
144         FD_ZERO(&error_fds);
145         FD_SET(fd, &error_fds);
146         n = rb_thread_select(fd + 1, 0, &fds, &fds, 0);
147         if(n == -1) {
148             if(errno == EWOULDBLOCK) continue;
149             rb_sys_fail("select");
150         }
151         WRITE_HELPER;
152     };
153     return total;
154 }
155 
156 // Read from an fd and raise an exception if an error occurs
157 static ssize_t ruby_read_throw(int fd, void * buf, size_t count, int nonblock) {
158     int n;
159     size_t total = 0;
160     ssize_t read_count;
161     fd_set fds, error_fds;
162 
163     if(!nonblock) {
164         FD_ZERO(&fds);
165         FD_SET(fd, &fds);
166         FD_ZERO(&error_fds);
167         FD_SET(fd, &error_fds);
168         n = select(fd + 1, &fds, 0, &error_fds, &zero_timeval);
169         if(n > 0) {
170             READ_HELPER;
171         }
172     } else {
173         READ_HELPER;
174     }
175 
176     while(count > 0) {
177         FD_ZERO(&fds);
178         FD_SET(fd, &fds);
179         FD_ZERO(&error_fds);
180         FD_SET(fd, &error_fds);
181         n = rb_thread_select(fd + 1, &fds, 0, &error_fds, 0);
182         if(n == -1) {
183             if(errno == EWOULDBLOCK) continue;
184             rb_sys_fail("select");
185         }
186         READ_HELPER;
187     };
188 
189     return total;
190 }
191 
192 // Return the message of an exception
193 static VALUE ruby_exc_message(VALUE exc) {
194     return rb_funcall(exc, id_message, 0);
195 }
196 
197 // Return the backtrace of an exception
198 static VALUE ruby_exc_backtrace(VALUE exc) {
199     return rb_funcall(exc, id_backtrace, 0);
200 }
201 
202 // Return the current Ruby call stack
203 static VALUE ruby_caller() {
204     // TODO: Why does calling caller() with 0 arguments not work?
205     return rb_funcall(rb_mKernel, id_caller, 1, INT2NUM(0));
206 }
207 
208 // Raise a specific ruby exception with a specific message and backtrace
209 static void ruby_raise(VALUE exc, VALUE msg, VALUE bt) {
210     rb_funcall(rb_mKernel, id_raise, 3, exc, msg, bt);
211 }
212 
213 // Send a message to a Ruby object
214 static VALUE ruby_send(VALUE obj, VALUE msg) {
215     return rb_apply(obj, id_send, msg);
216 }
217 
218 // Call "get_object" on a Ruby object
219 static VALUE ruby_get_object(VALUE obj, VALUE object_id) {
220     return rb_funcall(obj, id_get_object, 1, INT2NUM(object_id));
221 }
222 
223 // Call slice! on a Ruby object
224 static VALUE ruby_slice_bang(VALUE obj, size_t min, size_t max) {
225     VALUE range = rb_range_new(INT2NUM(min), INT2NUM(max), 0);
226     return rb_funcall(obj, id_slice_bang, 1, range);
227 }
228 
229 // Print an exception to the screen using a function defined in the ROMP
230 // module (TODO: wouldn't it be nice if Ruby's error_print were available to
231 // the user?)
232 static void ruby_print_exception(VALUE exc) {
233     rb_funcall(rb_mROMP, id_print_exception, 1, exc);
234 }
235 
236 // Call lock on an object (a mutex, probably)
237 static VALUE ruby_lock(VALUE obj) {
238     return rb_funcall(obj, id_lock, 0);
239 }
240 
241 // Call unlock on an object (a mutex, probably)
242 static VALUE ruby_unlock(VALUE obj) {
243     return rb_funcall(obj, id_unlock, 0);
244 }
245 
246 // ---------------------------------------------------------------------------
247 // Marshalling functions
248 // ---------------------------------------------------------------------------
249 
250 // Take an object as input and return it as a marshalled string.
251 static VALUE marshal_dump(VALUE obj) {
252     return rb_funcall(rb_mMarshal, id_dump, 1, obj);
253 }
254 
255 // Take a marshalled string as input and return it as an object.
256 static VALUE marshal_load(VALUE str) {
257     return rb_funcall(rb_mMarshal, id_load, 1, str);
258 }
259 
260 // ---------------------------------------------------------------------------
261 // Session functions
262 // ---------------------------------------------------------------------------
263 
264 #define ROMP_REQUEST           0x1001
265 #define ROMP_REQUEST_BLOCK     0x1002
266 #define ROMP_ONEWAY            0x1003
267 #define ROMP_ONEWAY_SYNC       0x1004
268 #define ROMP_RETVAL            0x2001
269 #define ROMP_EXCEPTION         0x2002
270 #define ROMP_YIELD             0x2003
271 #define ROMP_SYNC              0x4001
272 #define ROMP_NULL_MSG          0x4002
273 #define ROMP_MSG_START         0x4242
274 #define ROMP_MAX_ID            (1<<16)
275 #define ROMP_MAX_MSG_TYPE      (1<<16)
276 
277 #define ROMP_BUFFER_SIZE       16
278 
279 typedef struct {
280     VALUE io_object;
281     int read_fd, write_fd;
282     char buf[ROMP_BUFFER_SIZE];
283     int nonblock;
284 } ROMP_Session;
285 
286 typedef uint16_t MESSAGE_TYPE_T;
287 typedef uint16_t OBJECT_ID_T;
288 
289 // A ROMP message is broken into 3 components (see romp.rb for more details)
290 typedef struct {
291     MESSAGE_TYPE_T message_type;
292     OBJECT_ID_T object_id;
293     VALUE message_obj;
294 } ROMP_Message;
295 
296 // Send a message to the server with data data and length len.
297 static void send_message_helper(
298         ROMP_Session * session,
299         char * data,
300         size_t len,
301         MESSAGE_TYPE_T message_type,
302         OBJECT_ID_T object_id) {
303 
304     char * buf = session->buf;
305 
306     PUTSHORT(ROMP_MSG_START,    buf);
307     PUTSHORT(len,               buf);
308     PUTSHORT(message_type,      buf);
309     PUTSHORT(object_id,         buf);
310 
311     ruby_write_throw(session->write_fd, session->buf, ROMP_BUFFER_SIZE, session->nonblock);
312     ruby_write_throw(session->write_fd, data, len, session->nonblock);
313 }
314 
315 // Send a message to the server with the data in message.
316 static void send_message(ROMP_Session * session, ROMP_Message * message) {
317     VALUE data;
318     struct RString * data_str;
319 
320     data = marshal_dump(message->message_obj);
321     data_str = RSTRING(data);
322     send_message_helper(
323         session,
324         data_str->ptr,
325         data_str->len,
326         message->message_type,
327         message->object_id);
328 }
329 
330 // Send a null message to the server (no data, data len = 0)
331 static void send_null_message(ROMP_Session * session) {
332     send_message_helper(session, "", 0, ROMP_NULL_MSG, 0);
333 }
334 
335 // Receive a message from the server
336 static void get_message(ROMP_Session * session, ROMP_Message * message) {
337     uint16_t magic          = 0;
338     uint16_t data_len       = 0;
339     char * buf              = 0;
340     // struct RString message_string;
341     VALUE ruby_str;
342 
343     do {
344         buf = session->buf;
345         
346         ruby_read_throw(session->read_fd, buf, ROMP_BUFFER_SIZE, session->nonblock);
347 
348         GETSHORT(magic,                 buf);
349         GETSHORT(data_len,              buf);
350         GETSHORT(message->message_type, buf);
351         GETSHORT(message->object_id,    buf);
352     } while(magic != ROMP_MSG_START);
353 
354     buf = ALLOCA_N(char, data_len);
355     ruby_read_throw(session->read_fd, buf, data_len, session->nonblock);
356     ruby_str = rb_str_new(buf, data_len);
357 
358     if(message->message_type != ROMP_NULL_MSG) {
359         message->message_obj = marshal_load(ruby_str);
360     } else {
361         message->message_obj = Qnil;
362     }
363 }
364 
365 // Ideally, this function should return true if the server has disconnected,
366 // but currently always returns false.  The server thread will still exit
367 // when the client has disconnected, but currently does so via an exception.
368 static int session_finished(ROMP_Session * session) {
369     // TODO: Detect a disconnection
370     return 0;
371 }
372 
373 // Send a sync message to the server.
374 static void send_sync(ROMP_Session * session) {
375     ROMP_Message message = { ROMP_SYNC, 0, Qnil };
376     send_message(session, &message);
377 }
378 
379 // Wait for a sync response from the server.  Ignore any messages that are
380 // received while waiting for the response.
381 static void wait_sync(ROMP_Session * session) {
382     ROMP_Message message;
383 
384     // sleep(1);
385     get_message(session, &message);
386     if(   message.message_type != ROMP_SYNC
387        && message.object_id != 1
388        && message.message_obj != Qnil) {
389         rb_raise(rb_eRuntimeError, "ROMP synchronization failed");
390     }
391 }
392 
393 // Send a reply to a sync request.
394 static void reply_sync(ROMP_Session * session, int value) {
395     if(value == 0) {
396         ROMP_Message message = { ROMP_SYNC, 1, Qnil };
397         send_message(session, &message);
398     }
399 }
400 
401 // ----------------------------------------------------------------------------
402 // Server functions
403 // ----------------------------------------------------------------------------
404 
405 // We use this structure to pass data to our exception handler.  This is done
406 // by casting a pointer to a Ruby VALUE... not 100% kosher, but it should work.
407 typedef struct {
408     ROMP_Session * session;
409     ROMP_Message * message;
410     VALUE obj;
411     int debug;
412 } Server_Info;
413 
414 // Make a method call into a Ruby object.
415 static VALUE server_funcall(VALUE ruby_server_info) {
416     Server_Info * server_info = (Server_Info *)(ruby_server_info);
417     return ruby_send(server_info->obj, server_info->message->message_obj);
418 }
419 
420 // Send a yield message to the client, indicating that it should call
421 // Kernel#yield with the message that is sent.
422 static VALUE server_send_yield(VALUE retval, VALUE ruby_server_info) {
423     Server_Info * server_info = (Server_Info *)(ruby_server_info);
424 
425     server_info->message->message_type = ROMP_YIELD;
426     server_info->message->object_id = 0;
427     server_info->message->message_obj = retval;
428     send_message(server_info->session, server_info->message);
429 
430     return Qnil;
431 }
432 
433 // Send a return value to the client, indicating that it should return
434 // the message to the caller.
435 static VALUE server_send_retval(VALUE retval, VALUE ruby_server_info) {
436     Server_Info * server_info = (Server_Info *)(ruby_server_info);
437 
438     server_info->message->message_type = ROMP_RETVAL;
439     server_info->message->object_id = 0;
440     server_info->message->message_obj = retval;
441     send_message(server_info->session, server_info->message);
442 
443     return Qnil;
444 }
445 
446 // Send an exception the client, indicating that it should raise an exception.
447 static VALUE server_exception(VALUE ruby_server_info, VALUE exc) {
448     Server_Info * server_info = (Server_Info *)(ruby_server_info);
449     VALUE caller = ruby_caller();
450     VALUE bt = ruby_exc_backtrace(exc);
451 
452     server_info->message->message_type = ROMP_EXCEPTION;
453     server_info->message->object_id = 0;
454     server_info->message->message_obj = exc;
455 
456     // Get rid of extraneous caller information to make debugging easier.
457     ruby_slice_bang(bt, RARRAY(bt)->len - RARRAY(caller)->len - 1, -1);
458 
459     // If debugging is enabled, then print an exception.
460     if(server_info->debug) {
461         ruby_print_exception(exc);
462     }
463 
464     send_message(server_info->session, server_info->message);
465 
466     return Qnil;
467 }
468 
469 // Proces a request from the client and send an appropriate reply.
470 static VALUE server_reply(VALUE ruby_server_info) {
471     Server_Info * server_info = (Server_Info *)(ruby_server_info);
472     VALUE retval;
473     int status;
474 
475     server_info->obj = ruby_get_object(
476         server_info->obj,
477         server_info->message->object_id);
478 
479     // TODO: The client should be able to pass a callback object to the server;
480     // msg_to_obj can create a Proxy_Object, but it needs a