1 // ROMP - The Ruby Object Message Proxy 2 // (C) Copyright 2001 Paul Brannan (cout at rm-f.net) 3 // 4 // ROMP is a set of classes for providing distributed object support to a 5 // Ruby program. You may distribute and/or modify it under the same terms as 6 // Ruby (see http://www.ruby-lang.org/en/LICENSE.txt). 7 8 #include <ruby.h> 9 #include <rubyio.h> 10 #include <stdint.h> 11 #include <stdlib.h> 12 #include <unistd.h> 13 #include <fcntl.h> 14 #include <sys/time.h> 15 16 // --------------------------------------------------------------------------- 17 // Useful macros 18 // --------------------------------------------------------------------------- 19 20 // TODO: Are these portable? 21 22 #define PUTSHORT(s, buf) \ 23 do { \ 24 *buf = s >> 8; ++buf; \ 25 *buf = s & 0xff; ++buf; \ 26 } while(0) 27 28 #define GETSHORT(s, buf) \ 29 do { \ 30 s = (unsigned char)*buf; ++buf; \ 31 s = (s << 8) | (unsigned char)*buf; ++buf; \ 32 } while(0) 33 34 // --------------------------------------------------------------------------- 35 // Globals 36 // --------------------------------------------------------------------------- 37 38 // objects/functions created down below 39 // 40 static VALUE rb_mROMP = Qnil; 41 static VALUE rb_cSession = Qnil; 42 static VALUE rb_cProxy_Object = Qnil; 43 static VALUE rb_cServer = Qnil; 44 static VALUE rb_cObject_Reference = Qnil; 45 static ID id_object_id; 46 47 // objects/functions created elsewhere 48 // 49 static VALUE rb_mMarshal = Qnil; 50 51 static ID id_dump; 52 static ID id_load; 53 static ID id_message; 54 static ID id_backtrace; 55 static ID id_caller; 56 static ID id_raise; 57 static ID id_send; 58 static ID id_get_object; 59 static ID id_slice_bang; 60 static ID id_print_exception; 61 static ID id_lock; 62 static ID id_unlock; 63 64 static struct timeval zero_timeval; 65 66 static void init_globals() { 67 rb_mMarshal = rb_const_get(rb_cObject, rb_intern("Marshal")); 68 69 id_dump = rb_intern("dump"); 70 id_load = rb_intern("load"); 71 id_message = rb_intern("message"); 72 id_backtrace = rb_intern("backtrace"); 73 id_caller = rb_intern("caller"); 74 id_raise = rb_intern("raise"); 75 id_send = rb_intern("send"); 76 id_get_object = rb_intern("get_object"); 77 id_slice_bang = rb_intern("slice!"); 78 id_print_exception = rb_intern("print_exception"); 79 id_lock = rb_intern("lock"); 80 id_unlock = rb_intern("unlock"); 81 82 zero_timeval.tv_sec = 0; 83 zero_timeval.tv_usec = 0; 84 } 85 86 // --------------------------------------------------------------------------- 87 // Utility functions 88 // --------------------------------------------------------------------------- 89 90 // Forward declaration 91 static VALUE msg_to_obj(VALUE message, VALUE session, VALUE mutex); 92 93 // Create a Ruby string that won't be collected by the GC. Be very careful 94 // with this function! 95 static void create_tmp_ruby_string(struct RString * str, char * buf, size_t len) { 96 str->basic.flags = T_STRING; 97 str->basic.klass = rb_cString; 98 str->ptr = buf; 99 str->len = len; 100 str->orig = 0; // ? 101 } 102 103 #define WRITE_HELPER \ 104 do { \ 105 write_count = write(fd, buf, count); \ 106 if(write_count < 0) { \ 107 if(errno != EWOULDBLOCK) rb_sys_fail("write"); \ 108 } else if(write_count == 0 && count != 0) { \ 109 rb_raise(rb_eIOError, "disconnected"); \ 110 } else { \ 111 count -= write_count; \ 112 buf += write_count; \ 113 total += write_count; \ 114 } \ 115 } while(0) 116 117 #define READ_HELPER \ 118 do { \ 119 read_count = read(fd, buf, count); \ 120 if(read_count < 0) { \ 121 if(errno != EWOULDBLOCK) rb_sys_fail("read"); \ 122 } else if(read_count == 0 && count != 0) { \ 123 rb_raise(rb_eIOError, "disconnected"); \ 124 } else { \ 125 count -= read_count; \ 126 buf += read_count; \ 127 total += read_count; \ 128 } \ 129 } while(0) 130 131 // Write to an fd and raise an exception if an error occurs 132 static ssize_t ruby_write_throw(int fd, const void * buf, size_t count, int nonblock) { 133 int n; 134 size_t total = 0; 135 ssize_t write_count; 136 fd_set fds, error_fds; 137 138 if(!nonblock) { 139 FD_ZERO(&fds); 140 FD_SET(fd, &fds); 141 FD_ZERO(&error_fds); 142 FD_SET(fd, &error_fds); 143 n = select(fd + 1, 0, &fds, &fds, 0); 144 if(n > 0) { 145 WRITE_HELPER; 146 } 147 } else { 148 WRITE_HELPER; 149 } 150 151 while(count > 0) { 152 FD_ZERO(&fds); 153 FD_SET(fd, &fds); 154 FD_ZERO(&error_fds); 155 FD_SET(fd, &error_fds); 156 n = rb_thread_select(fd + 1, 0, &fds, &fds, 0); 157 if(n == -1) { 158 if(errno == EWOULDBLOCK) continue; 159 rb_sys_fail("select"); 160 } 161 WRITE_HELPER; 162 }; 163 return total; 164 } 165 166 // Read from an fd and raise an exception if an error occurs 167 static ssize_t ruby_read_throw(int fd, void * buf, size_t count, int nonblock) { 168 int n; 169 size_t total = 0; 170 ssize_t read_count; 171 fd_set fds, error_fds; 172 173 if(!nonblock) { 174 FD_ZERO(&fds); 175 FD_SET(fd, &fds); 176 FD_ZERO(&error_fds); 177 FD_SET(fd, &error_fds); 178 n = select(fd + 1, &fds, 0, &error_fds, &zero_timeval); 179 if(n > 0) { 180 READ_HELPER; 181 } 182 } else { 183 READ_HELPER; 184 } 185 186 while(count > 0) { 187 FD_ZERO(&fds); 188 FD_SET(fd, &fds); 189 FD_ZERO(&error_fds); 190 FD_SET(fd, &error_fds); 191 n = rb_thread_select(fd + 1, &fds, 0, &error_fds, 0); 192 if(n == -1) { 193 if(errno == EWOULDBLOCK) continue; 194 rb_sys_fail("select"); 195 } 196 READ_HELPER; 197 }; 198 199 return total; 200 } 201 202 // Return the message of an exception 203 static VALUE ruby_exc_message(VALUE exc) { 204 return rb_funcall(exc, id_message, 0); 205 } 206 207 // Return the backtrace of an exception 208 static VALUE ruby_exc_backtrace(VALUE exc) { 209 return rb_funcall(exc, id_backtrace, 0); 210 } 211 212 // Return the current Ruby call stack 213 static VALUE ruby_caller() { 214 // TODO: Why does calling caller() with 0 arguments not work? 215 return rb_funcall(rb_mKernel, id_caller, 1, INT2NUM(0)); 216 } 217 218 // Raise a specific ruby exception with a specific message and backtrace 219 static void ruby_raise(VALUE exc, VALUE msg, VALUE bt) { 220 rb_funcall(rb_mKernel, id_raise, 3, exc, msg, bt); 221 } 222 223 // Send a message to a Ruby object 224 static VALUE ruby_send(VALUE obj, VALUE msg) { 225 return rb_apply(obj, id_send, msg); 226 } 227 228 // Call "get_object" on a Ruby object 229 static VALUE ruby_get_object(VALUE obj, VALUE object_id) { 230 return rb_funcall(obj, id_get_object, 1, INT2NUM(object_id)); 231 } 232 233 // Call slice! on a Ruby object 234 static VALUE ruby_slice_bang(VALUE obj, size_t min, size_t max) { 235 VALUE range = rb_range_new(INT2NUM(min), INT2NUM(max), 0); 236 return rb_funcall(obj, id_slice_bang, 1, range); 237 } 238 239 // Print an exception to the screen using a function defined in the ROMP 240 // module (TODO: wouldn't it be nice if Ruby's error_print were available to 241 // the user?) 242 static void ruby_print_exception(VALUE exc) { 243 rb_funcall(rb_mROMP, id_print_exception, 1, exc); 244 } 245 246 // Call lock on an object (a mutex, probably) 247 static VALUE ruby_lock(VALUE obj) { 248 return rb_funcall(obj, id_lock, 0); 249 } 250 251 // Call unlock on an object (a mutex, probably) 252 static VALUE ruby_unlock(VALUE obj) { 253 return rb_funcall(obj, id_unlock, 0); 254 } 255 256 // --------------------------------------------------------------------------- 257 // Marshalling functions 258 // --------------------------------------------------------------------------- 259 260 // Take an object as input and return it as a marshalled string. 261 static VALUE marshal_dump(VALUE obj) { 262 return rb_funcall(rb_mMarshal, id_dump, 1, obj); 263 } 264 265 // Take a marshalled string as input and return it as an object. 266 static VALUE marshal_load(VALUE str) { 267 return rb_funcall(rb_mMarshal, id_load, 1, str); 268 } 269 270 // --------------------------------------------------------------------------- 271 // Session functions 272 // --------------------------------------------------------------------------- 273 274 #define ROMP_REQUEST 0x1001 275 #define ROMP_REQUEST_BLOCK 0x1002 276 #define ROMP_ONEWAY 0x1003 277 #define ROMP_ONEWAY_SYNC 0x1004 278 #define ROMP_RETVAL 0x2001 279 #define ROMP_EXCEPTION 0x2002 280 #define ROMP_YIELD 0x2003 281 #define ROMP_SYNC 0x4001 282 #define ROMP_NULL_MSG 0x4002 283 #define ROMP_MSG_START 0x4242 284 #define ROMP_MAX_ID (1<<16) 285 #define ROMP_MAX_MSG_TYPE (1<<16) 286 287 #define ROMP_BUFFER_SIZE 16 288 289 typedef struct { 290 VALUE io_object; 291 int read_fd, write_fd; 292 char buf[ROMP_BUFFER_SIZE]; 293 int nonblock; 294 } ROMP_Session; 295 296 typedef uint16_t MESSAGE_TYPE_T; 297 typedef uint16_t OBJECT_ID_T; 298 299 // A ROMP message is broken into 3 components (see romp.rb for more details) 300 typedef struct { 301 MESSAGE_TYPE_T message_type; 302 OBJECT_ID_T object_id; 303 VALUE message_obj; 304 } ROMP_Message; 305 306 // Send a message to the server with data data and length len. 307 static void send_message_helper( 308 ROMP_Session * session, 309 char * data, 310 size_t len, 311 MESSAGE_TYPE_T message_type, 312 OBJECT_ID_T object_id) { 313 314 char * buf = session->buf; 315 316 PUTSHORT(ROMP_MSG_START, buf); 317 PUTSHORT(len, buf); 318 PUTSHORT(message_type, buf); 319 PUTSHORT(object_id, buf); 320 321 ruby_write_throw(session->write_fd, session->buf, ROMP_BUFFER_SIZE, session->nonblock); 322 ruby_write_throw(session->write_fd, data, len, session->nonblock); 323 } 324 325 // Send a message to the server with the data in message. 326 static void send_message(ROMP_Session * session, ROMP_Message * message) { 327 VALUE data; 328 struct RString * data_str; 329 330 data = marshal_dump(message->message_obj); 331 data_str = RSTRING(data); 332 send_message_helper( 333 session, 334 data_str->ptr, 335 data_str->len, 336 message->message_type, 337 message->object_id); 338 } 339 340 // Send a null message to the server (no data, data len = 0) 341 static void send_null_message(ROMP_Session * session) { 342 send_message_helper(session, "", 0, ROMP_NULL_MSG, 0); 343 } 344 345 // Receive a message from the server 346 static void get_message(ROMP_Session * session, ROMP_Message * message) { 347 uint16_t magic = 0; 348 uint16_t data_len = 0; 349 char * buf = 0; 350 // struct RString message_string; 351 VALUE ruby_str; 352 353 do { 354 buf = session->buf; 355 356 ruby_read_throw(session->read_fd, buf, ROMP_BUFFER_SIZE, session->nonblock); 357 358 GETSHORT(magic, buf); 359 GETSHORT(data_len, buf); 360 GETSHORT(message->message_type, buf); 361 GETSHORT(message->object_id, buf); 362 } while(magic != ROMP_MSG_START); 363 364 buf = ALLOCA_N(char, data_len); 365 ruby_read_throw(session->read_fd, buf, data_len, session->nonblock); 366 // create_tmp_ruby_string(&message_string, buf, data_len); 367 ruby_str = rb_str_new(buf, data_len); 368 369 if(message->message_type != ROMP_NULL_MSG) { 370 message->message_obj = marshal_load(ruby_str); 371 } else { 372 message->message_obj = Qnil; 373 } 374 } 375 376 // Ideally, this function should return true if the server has disconnected, 377 // but currently always returns false. The server thread will still exit 378 // when the client has disconnected, but currently does so via an exception. 379 static int session_finished(ROMP_Session * session) { 380 // TODO: Detect a disconnection 381 return 0; 382 } 383 384 // Send a sync message to the server. 385 static void send_sync(ROMP_Session * session) { 386 ROMP_Message message = { ROMP_SYNC, 0, Qnil }; 387 send_message(session, &message); 388 } 389 390 // Wait for a sync response from the server. Ignore any messages that are 391 // received while waiting for the response. 392 static void wait_sync(ROMP_Session * session) { 393 ROMP_Message message; 394 395 // sleep(1); 396 get_message(session, &message); 397 if( message.message_type != ROMP_SYNC 398 && message.object_id != 1 399 && message.message_obj != Qnil) { 400 rb_raise(rb_eRuntimeError, "ROMP synchronization failed"); 401 } 402 } 403 404 // Send a reply to a sync request. 405 static void reply_sync(ROMP_Session * session, int value) { 406 if(value == 0) { 407 ROMP_Message message = { ROMP_SYNC, 1, Qnil }; 408 send_message(session, &message); 409 } 410 } 411 412 // ---------------------------------------------------------------------------- 413 // Server functions 414 // ---------------------------------------------------------------------------- 415 416 // We use this structure to pass data to our exception handler. This is done 417 // by casting a pointer to a Ruby VALUE... not 100% kosher, but it should work. 418 typedef struct { 419 ROMP_Session * session; 420 ROMP_Message * message; 421 VALUE obj; 422 int debug; 423 } Server_Info; 424 425 // Make a method call into a Ruby object. 426 static VALUE server_funcall(VALUE ruby_server_info) { 427 Server_Info * server_info = (Server_Info *)(ruby_server_info); 428 return ruby_send(server_info->obj, server_info->message->message_obj); 429 } 430 431 // Send a yield message to the client, indicating that it should call 432 // Kernel#yield with the message that is sent. 433 static VALUE server_send_yield(VALUE retval, VALUE ruby_server_info) { 434 Server_Info * server_info = (Server_Info *)(ruby_server_info); 435 436 server_info->message->message_type = ROMP_YIELD; 437 server_info->message->object_id = 0; 438 server_info->message->message_obj = retval; 439 send_message(server_info->session, server_info->message); 440 441 return Qnil; 442 } 443 444 // Send a return value to the client, indicating that it should return 445 // the message to the caller. 446 static VALUE server_send_retval(VALUE retval, VALUE ruby_server_info) { 447 Server_Info * server_info = (Server_Info *)(ruby_server_info); 448 449 server_info->message->message_type = ROMP_RETVAL; 450 server_info->message->object_id = 0; 451 server_info->message->message_obj = retval; 452 send_message(server_info->session, server_info->message); 453 454 return Qnil; 455 } 456 457 // Send an exception the client, indicating that it should raise an exception. 458 static VALUE server_exception(VALUE ruby_server_info, VALUE exc) { 459 Server_Info * server_info = (Server_Info *)(ruby_server_info); 460 VALUE caller = ruby_caller(); 461 VALUE bt = ruby_exc_backtrace(exc); 462 463 server_info->message->message_type = ROMP_EXCEPTION; 464 server_info->message->object_id = 0; 465 server_info->message->message_obj = exc; 466 467 // Get rid of extraneous caller information to make debugging easier. 468 ruby_slice_bang(bt, RARRAY(bt)->len - RARRAY(caller)->len - 1, -1); 469 470 // If debugging is enabled, then print an exception. 471 if(server_info->debug) { 472 ruby_print_exception(exc); 473 } 474 475 send_message(server_info->session, server_info->message); 476 477 return Qnil; 478 } 479 480 // Proces a request from the client and send an appropriate reply. 481 static VALUE server_reply(VALUE ruby_server_info) { 482 Server_Info * server_info = (Server_Info *)(ruby_server_info); 483 VALUE retval; 484 int status; 485 486 server_info->obj = ruby_get_object( 487 server_info->obj, 488 server_info->message->object_id); 489 490 // TODO: The client should be able to pass a callback object to the server; 491 // msg_to_obj can create a Proxy_Object, but it needs a session to make 492 // calls over. 493 494 // Perform the appropriate action based on message type. 495 switch(server_info->message->message_type) { 496 case ROMP_ONEWAY_SYNC: 497 send_null_message(server_info->session); 498 // fallthrough 499 500 case ROMP_ONEWAY: 501 rb_protect(server_funcall, ruby_server_info, &status); 502 return Qnil; 503 504 case ROMP_REQUEST: 505 retval = ruby_send( 506 server_info->obj, 507 server_info->message->message_obj); 508 break; 509 510 case ROMP_REQUEST_BLOCK: 511 retval = rb_iterate( 512 server_funcall, ruby_server_info, 513 server_send_yield, ruby_server_info); 514 break; 515 516 case ROMP_SYNC: 517 reply_sync( 518 server_info->session, 519 server_info->message->object_id); 520 return Qnil; 521 522 default: 523 rb_raise(rb_eRuntimeError, "Bad session request"); 524 } 525 526 server_send_retval(retval, ruby_server_info); 527 528 return Qnil; 529 } 530 531 // The main server loop. Wait for a message from the client, route the 532 // message to the appropriate object, send a response and repeat. 533 static void server_loop(ROMP_Session * session, VALUE resolve_server, int dbg) { 534 ROMP_Message message; 535 Server_Info server_info = { session, &message, resolve_server, dbg }; 536 VALUE ruby_server_info = (VALUE)(&server_info); 537 538 while(!session_finished(session)) { 539 get_message(session, &message); 540 rb_rescue2( 541 server_reply, ruby_server_info, 542 server_exception, ruby_server_info, rb_eException, 0); 543 server_info.obj = resolve_server; 544 } 545 } 546 547 // ---------------------------------------------------------------------------- 548 // Client functions 549 // ---------------------------------------------------------------------------- 550 551 // We use this structure to pass data to our client functions by casting it 552 // to a Ruby VALUE (see above note with Server_Info). 553 typedef struct { 554 ROMP_Session * session; 555 VALUE ruby_session; 556 OBJECT_ID_T object_id; 557 VALUE message; 558 VALUE mutex; 559 } Proxy_Object; 560 561 // Send a request to the server, wait for a response, and perform an action 562 // based on what that response was. This is not thread-safe, so the caller 563 // should perform any necessary locking 564 static VALUE client_request(VALUE ruby_proxy_object) { 565 Proxy_Object * obj = (Proxy_Object *)(ruby_proxy_object); 566 ROMP_Message msg = { 567 rb_block_given_p() ? ROMP_REQUEST_BLOCK : ROMP_REQUEST, 568 obj->object_id, 569 obj->message 570 }; 571 send_message(obj->session, &msg); 572 573 for(;;) { 574 get_message(obj->session, &msg); 575 switch(msg.message_type) { 576 case ROMP_RETVAL: 577 return msg_to_obj(msg.message_obj, obj->ruby_session, obj->mutex); 578 break; 579 case ROMP_YIELD: 580 rb_yield(msg_to_obj(msg.message_obj, obj->ruby_session, obj->mutex)); 581 break; 582 case ROMP_EXCEPTION: { 583 ruby_raise( 584 msg.message_obj, 585 ruby_exc_message(msg.message_obj), 586 rb_ary_concat(ruby_exc_backtrace(msg.message_obj), ruby_caller()) 587 ); 588 break; 589 } 590 case ROMP_SYNC: 591 reply_sync(obj->session, NUM2INT(msg.message_obj)); 592 break; 593 default: 594 rb_raise(rb_eRuntimeError, "Invalid msg type received"); 595 } 596 } 597 } 598 599 // Send a oneway message to the server. This is not thread-safe, so the 600 // caller should perform any necessary locking. 601 static VALUE client_oneway(VALUE ruby_proxy_object) { 602 Proxy_Object * obj = (Proxy_Object *)(ruby_proxy_object); 603 ROMP_Message msg = { 604 ROMP_ONEWAY, 605 obj->object_id, 606 obj->message 607 }; 608 send_message(obj->session, &msg); 609 return Qnil; 610 } 611 612 // Send a oneway message to the server and request a message in response. 613 // This is not thread-safe, so the caller should perform any necessary 614 // locking. 615 static VALUE client_oneway_sync(VALUE ruby_proxy_object) { 616 Proxy_Object * obj = (Proxy_Object *)(ruby_proxy_object); 617 ROMP_Message msg = { 618 ROMP_ONEWAY_SYNC, 619 obj->object_id, 620 obj->message 621 }; 622 send_message(obj->session, &msg); 623 get_message(obj->session, &msg); 624 return Qnil; 625 } 626 627 // Synchronize with the server. This is not thread-safe, so the caller should 628 // perform any necessary locking. 629 static VALUE client_sync(VALUE ruby_proxy_object) { 630 Proxy_Object * obj = (Proxy_Object *)(ruby_proxy_object); 631 send_sync(obj->session); 632 wait_sync(obj->session); 633 return Qnil; 634 } 635 636 // ---------------------------------------------------------------------------- 637 // Ruby interface functions 638 // ---------------------------------------------------------------------------- 639 640 static void ruby_session_mark(ROMP_Session * session) { 641 rb_gc_mark(session->io_object); 642 } 643 644 static VALUE ruby_session_new(VALUE self, VALUE io_object) { 645 ROMP_Session * session; 646 VALUE ruby_session; 647 OpenFile * openfile; 648 FILE * read_fp; 649 FILE * write_fp; 650 651 if(!rb_obj_is_kind_of(io_object, rb_cIO)) { 652 rb_raise(rb_eTypeError, "Expecting an IO object"); 653 } 654 655 ruby_session = Data_Make_Struct( 656 rb_cSession, 657 ROMP_Session, 658 (RUBY_DATA_FUNC)(ruby_session_mark), 659 (RUBY_DATA_FUNC)(free), 660 session); 661 662 GetOpenFile(io_object, openfile); 663 read_fp = GetReadFile(openfile); 664 write_fp = GetWriteFile(openfile); 665 session->read_fd = fileno(read_fp); 666 session->write_fd = fileno(write_fp); 667 session->io_object = io_object; 668 session->nonblock = 0; 669 670 return ruby_session; 671 } 672 673 static VALUE ruby_set_nonblock(VALUE self, VALUE nonblock) { 674 ROMP_Session * session; 675 Data_Get_Struct(self, ROMP_Session, session); 676 if(nonblock == Qtrue) { 677 session->nonblock = 1; 678 } else if(nonblock == Qfalse) { 679 session->nonblock = 0; 680 } else { 681 rb_raise(rb_eTypeError, "Expecting a boolean"); 682 } 683 return Qnil; 684 } 685 686 static void ruby_proxy_object_mark(Proxy_Object * proxy_object) { 687 rb_gc_mark(proxy_object->ruby_session); 688 rb_gc_mark(proxy_object->mutex); 689 } 690 691 static VALUE ruby_proxy_object_new( 692 VALUE self, VALUE ruby_session, VALUE ruby_mutex, VALUE ruby_object_id) { 693 ROMP_Session * session; 694 OBJECT_ID_T object_id = NUM2INT(ruby_object_id); 695 Proxy_Object * proxy_object; 696 VALUE ruby_proxy_object; 697 698 if(!rb_obj_is_kind_of(ruby_session, rb_cSession)) { 699 rb_raise(rb_eTypeError, "Expecting a session"); 700 } 701 Data_Get_Struct(ruby_session, ROMP_Session, session); 702 703 ruby_proxy_object = Data_Make_Struct( 704 rb_cProxy_Object, 705 Proxy_Object, 706 (RUBY_DATA_FUNC)(ruby_proxy_object_mark), 707 (RUBY_DATA_FUNC)(free), 708 proxy_object); 709 proxy_object->session = session; 710 proxy_object->ruby_session = ruby_session; 711 proxy_object->mutex = ruby_mutex; 712 proxy_object->object_id = object_id; 713 714 return ruby_proxy_object; 715 } 716 717 static VALUE ruby_proxy_object_method_missing(VALUE self, VALUE message) { 718 Proxy_Object * proxy_object; 719 Data_Get_Struct(self, Proxy_Object, proxy_object); 720 721 proxy_object->message = message; 722 ruby_lock(proxy_object->mutex); 723 return rb_ensure( 724 client_request, (VALUE)(proxy_object), 725 ruby_unlock, proxy_object->mutex); 726 } 727 728 static VALUE ruby_proxy_object_oneway(VALUE self, VALUE message) { 729 Proxy_Object * proxy_object; 730 Data_Get_Struct(self, Proxy_Object, proxy_object); 731 732 proxy_object->message = message; 733 ruby_lock(proxy_object->mutex); 734 rb_ensure( 735 client_oneway, (VALUE)(proxy_object), 736 ruby_unlock, proxy_object->mutex); 737 return Qnil; 738 } 739 740 static VALUE ruby_proxy_object_oneway_sync(VALUE self, VALUE message) { 741 Proxy_Object * proxy_object; 742 Data_Get_Struct(self, Proxy_Object, proxy_object); 743 744 proxy_object->message = message; 745 ruby_lock(proxy_object->mutex); 746 rb_ensure( 747 client_oneway_sync, (VALUE)(proxy_object), 748 ruby_unlock, proxy_object->mutex); 749 return Qnil; 750 } 751 752 static VALUE ruby_proxy_object_sync(VALUE self) { 753 Proxy_Object * proxy_object; 754 Data_Get_Struct(self, Proxy_Object, proxy_object); 755 756 ruby_lock(proxy_object->mutex); 757 rb_ensure( 758 client_sync, (VALUE)(proxy_object), 759 ruby_unlock, proxy_object->mutex); 760 return Qnil; 761 } 762 763 static VALUE ruby_server_loop(VALUE self, VALUE ruby_session) { 764 ROMP_Session * session; 765 VALUE resolve_server; 766 VALUE ruby_debug; 767 int debug; 768 769 if(!rb_obj_is_kind_of(ruby_session, rb_cSession)) { 770 rb_raise(rb_eTypeError, "Excpecting a session"); 771 } 772 Data_Get_Struct(ruby_session, ROMP_Session, session); 773 774 resolve_server = rb_iv_get(self, "@resolve_server"); 775 776 ruby_debug = rb_iv_get(self, "@debug"); 777 debug = (ruby_debug != Qfalse) && !NIL_P(ruby_debug); 778 server_loop(session, resolve_server, debug); 779 return Qnil; 780 } 781 782 // Given a message, convert it into an object that can be returned. This 783 // function really only checks to see if an Object_Reference has been returned 784 // from the server, and creates a new Proxy_Object if this is the case. 785 // Otherwise, the original object is returned to the client. 786 static VALUE msg_to_obj(VALUE message, VALUE session, VALUE mutex) { 787 if(CLASS_OF(message) == rb_cObject_Reference) { 788 return ruby_proxy_object_new( 789 rb_cProxy_Object, 790 session, 791 mutex, 792 rb_funcall(message, id_object_id, 0)); 793 } else { 794 return message; 795 } 796 } 797 798 void Init_romp_helper() { 799 init_globals(); 800 801 rb_mROMP = rb_define_module("ROMP"); 802 rb_cSession = rb_define_class_under(rb_mROMP, "Session", rb_cObject); 803 804 rb_define_const(rb_cSession, "REQUEST", INT2NUM(ROMP_REQUEST)); 805 rb_define_const(rb_cSession, "REQUEST_BLOCK", INT2NUM(ROMP_REQUEST_BLOCK)); 806 rb_define_const(rb_cSession, "ONEWAY", INT2NUM(ROMP_ONEWAY)); 807 rb_define_const(rb_cSession, "ONEWAY_SYNC", INT2NUM(ROMP_ONEWAY_SYNC)); 808 rb_define_const(rb_cSession, "RETVAL", INT2NUM(ROMP_RETVAL)); 809 rb_define_const(rb_cSession, "EXCEPTION", INT2NUM(ROMP_EXCEPTION)); 810 rb_define_const(rb_cSession, "YIELD", INT2NUM(ROMP_YIELD)); 811 rb_define_const(rb_cSession, "SYNC", INT2NUM(ROMP_SYNC)); 812 rb_define_const(rb_cSession, "NULL_MSG", INT2NUM(ROMP_NULL_MSG)); 813 rb_define_const(rb_cSession, "MSG_START", INT2NUM(ROMP_MSG_START)); 814 rb_define_const(rb_cSession, "MAX_ID", INT2NUM(ROMP_MAX_ID)); 815 rb_define_const(rb_cSession, "MAX_MSG_TYPE", INT2NUM(ROMP_MAX_MSG_TYPE)); 816 817 rb_define_singleton_method(rb_cSession, "new", ruby_session_new, 1); 818 rb_define_method(rb_cSession, "set_nonblock", ruby_set_nonblock, 1); 819 820 rb_cProxy_Object = rb_define_class_under(rb_mROMP, "Proxy_Object", rb_cObject); 821 rb_define_singleton_method(rb_cProxy_Object, "new", ruby_proxy_object_new, 3); 822 rb_define_method(rb_cProxy_Object, "method_missing", ruby_proxy_object_method_missing, -2); 823 rb_define_method(rb_cProxy_Object, "oneway", ruby_proxy_object_oneway, -2); 824 rb_define_method(rb_cProxy_Object, "oneway_sync", ruby_proxy_object_oneway_sync, -2); 825 rb_define_method(rb_cProxy_Object, "sync", ruby_proxy_object_sync, 0); 826 827 rb_cServer = rb_define_class_under(rb_mROMP, "Server", rb_cObject); 828 rb_define_method(rb_cServer, "server_loop", ruby_server_loop, 1); 829 830 rb_cObject_Reference = rb_define_class_under(rb_mROMP, "Object_Reference", rb_cObject); 831 832 id_object_id = rb_intern("object_id"); 833 }