require 'socket' require 'thread' =begin DRuby (not to be confused with dRuby, written by Masatoshi SEKI) (C) Copyright 2001 Paul Brannan (cout at rm-f.net) DRuby is a set of classes for providing distributed object support to a Ruby program. You may distribute and/or modify it under the same terms as Ruby (see http://www.ruby-lang.org/en/LICENSE.txt). Example: Client ------ client = DRuby::Client.new('localhost', 4242) obj = client.resolve("foo") puts obj.foo("foo") obj.oneway(:foo, "foo") Server ------ class Foo def foo(x); return x; end end obj = Foo.new server = DRuby::Server.new('localhost', 4242) server.register(obj, "foo") server.thread.join You can do all sorts of cool things with DRuby, including passing blocks to the functions, throwing exceptions and propogating them from server to client, and more. Unlike CORBA, where you must create an interface definition and strictly adhere to it, DRuby uses marshalling, so you can use almost any object with it. But, alas, it is not as powerful as CORBA. On a fast machine, you should expect around 4000 messages per second with normal method calls, and up to 10000 messages per second with oneway calls. My dual-processor machine gets 3500/6500 messages per second, though, so YMMV. The DRuby message format is broken into 3 components: [ msg_type, obj_id, message ] For each msg_type, there is a specific format to the message. Additionally, certain msg_types are only valid when being sent to the server, and others are valid only when being sent back to the client. Here is a summary: msg_type send to meaning of obj_id msg format ---------------------------------------------------------------------------- REQUEST server obj to talk to [:method, *args] REQUEST_BLOCK server obj to talk to [:method, *args] ONEWAY server obj to talk to [:method, *args] RETVAL client always 0 retval EXCEPTION client always 0 $! YIELD client always 0 [value, value, ...] RESOLVE server always 0 object_name SYNC either 0=request, 1=response nil Known bugs: 1) Oneway calls are really slow if mixed with regular calls. I'm not sure why. A workaround is to use a separate connection for each type of call, but sometimes that doesn't work either. 2) The server does not refuse connections for unauthorized addresses; it will instead accept a connection and then immediately close it. 3) If the server is flooded with oneway calls and drops packets, it will listen for data until it encounters a valid message. If this happens, the client will not be notified. =end module DRuby public # The DRuby server class. Like its drb equivalent, this class spawns off # a new thread which processes requests, allowing the server to do other # things while it is doing processing for a distributed object. This # means, though, that all objects used with DRuby must be thread-safe. class Server attr_reader :host, :port, :obj, :thread # Start a server on the given host and port. An acceptor may be # specified to accept/reject connections; it should be a proc that # takes a Socket as an argument and returns true or false. def initialize(host, port, acceptor=nil) @host = host @port = port @name_to_id = Hash.new @id_to_object = Hash.new @id_to_acceptor = Hash.new @mutex = Mutex.new @next_id = 1 @thread = Thread.new do server = TCPServer.new(@host, @port) while(socket = server.accept) if acceptor then if !acceptor.call(socket) then socket.close next end end socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1) Thread.new(socket) do |socket| session_loop(Session.new(socket)) end end end end # Register an object with the server. Once an object is registered, # it cannot be unregistered. The object will be given an id of # @next_id, and @next_id will be incremented. We could use the # object's real id, but we want to be sure that the id will fit into # a 16-bit short. The acceptor argument is a proc that takes a Socket # as an arugment, and returns true or false depending on whether the # request should be allowed. The supplied object must be thread-safe. def register(obj, name, acceptor=nil) @mutex.synchronize do if @next_id >= Session::MAX_ID then raise ArgumentError, "Object limit exceeded" end @name_to_id[name] = @next_id @id_to_acceptor[@next_id] = acceptor @id_to_object[@next_id] = obj @next_id = @next_id.succ() end end # Main server loop. Wait for a REQUEST message, process it, and send # back a YIELD, EXCEPTION, or RETVAL message. Note that we don't do # any thread synchronization here, because all registered objects are # required to be thread-safe, and the @id_to_object lookup is atomic # (and if it succeeds, the object is guaranteed to be fully # registered). def session_loop(session) while not session.finished() type, object_id, message = session.get_message() obj = @id_to_object[object_id] acceptor = @id_to_acceptor[object_id] begin if acceptor then if !acceptor.call(session.socket) then if type != Session::ONEWAY then raise ArgumentError, "No authorization" end end end case type when Session::REQUEST retval = obj.__send__(*message) when Session::REQUEST_BLOCK retval = obj.__send__(*message) do |*i| session.send_message(Session::YIELD, 0, i) end when Session::ONEWAY begin; obj.__send__(*message); rescue Exception; end next when Session::RESOLVE retval = @name_to_id[message] when Session::SYNC session.reply_sync(object_id) next else raise ArgumentError, "Bad session request" end session.send_message(Session::RETVAL, 0, retval) rescue Exception session.send_message(Session::EXCEPTION, 0, $!) end end end end # The DRuby client class. A DRuby server must be started on the given # host and port before instantiating a DRuby client. class Client attr_reader :host, :port # Connect a client to the server on the given host and port. The # user can specify sync=false to turn off synchronization and get a # 20% speed boost. def initialize(host, port, sync=true) @host = host @port = port @server = TCPSocket.open(@host, @port) @server.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1) @session = Session.new(@server) @mutex = sync ? Mutex.new : Null_Mutex.new end # Given a string, return a proxy object that will forward requests # for an object on the server with that name. def resolve(object_name) @mutex.synchronize do @session.send_message(Session::RESOLVE, 0, object_name) type, obj, message = @session.get_message() case type when Session::RETVAL return Object.new(@session, @mutex, message) when Session::EXCEPTION ; raise message else ; raise RuntimeError end end end end private # In case the user does not want synchronization. class Null_Mutex def synchronize yield end end # A DRuby::Object acts as a proxy; it forwards most methods to the server # for execution. class Object def initialize(session, mutex, object_id) @object_id = object_id @session = session @mutex = mutex end # Client request handler. The idea here is to send out a REQUEST # message, and get back a YIELD, EXCEPTION, or RETVAL message. def method_missing(method, *args) message = [ method, *args ] @mutex.synchronize do @session.send_message( (block_given?) ? Session::REQUEST_BLOCK : Session::REQUEST, @object_id, message) loop do type, obj, message = @session.get_message() case type when Session::RETVAL ; return message when Session::YIELD ; yield message when Session::EXCEPTION ; raise message when Session::SYNC ; @session.reply_sync(obj) else ; raise RuntimeError end end end end # This is a special function that lets you ignore the return value # of the calling function to get some extra speed, in exchange for not # knowing when the function will complete. A future version of the # server should probably check whether a given function is allowed to # be called as a oneway, in order to reduce DoS attacks. def oneway(*message) @mutex.synchronize do @session.send_message(Session::ONEWAY, @object_id, message) return nil end end # sync() will synchonize the client with the server (useful for # determining when a oneway call completes) def sync() @mutex.synchronize do @session.send_sync() @session.wait_sync() end end GOOD_FUNCTIONS = [ :inspect, :class_variables, :instance_eval, :instance_variables, :to_a, :to_s ] BAD_FUNCTIONS = [ :clone, :dup, :display ] METHOD_FUNCTIONS = [ :methods, :private_methods, :protected_methods, :public_methods, :singleton_methods ] RESPOND_FUNCTIONS = [ [ :method, "raise NameError" ], [ :respond_to?, "return false" ] ] # Make sure certain methods get passed down the wire. GOOD_FUNCTIONS.each do |method| eval %{ def #{method}(*args) return method_missing(:#{method}, *args) end } end # And make sure others never get called. BAD_FUNCTIONS.each do |method| undef_method method end # And remove these function names from any method lists that get # returned; there's nothing we can do about people who decide to # return them from other functions. METHOD_FUNCTIONS.each do |method| eval %{ def #{method}(*args) retval = method_missing(:#{method}, *args) retval.each do |item| BAD_FUNCTIONS.each do |bad| retval.delete(bad.to_s) end end return retval end } end # Same here, except don't let the call go through in the first place. RESPOND_FUNCTIONS.each do |method, action| eval %{ def #{method}(arg, *args) BAD_FUNCTIONS.each do |bad| if arg === bad.to_s then eval("#{action}") end end return method_missing(:#{method}, arg, *args) end } end end # A DRuby session sends messages back and forth between server and client. # The message format is as follows: # +-----+-----+-----+-----+-----+-----+-----+-----+-----+--- + ---+-----+ # | MSG_START | msg. size | msg. type | obj. id | marshalled msg/args | # +-----+-----+-----+-----+-----+-----+-----+-----+-----+--- + ---+-----+ class Session REQUEST = 0x1001 REQUEST_BLOCK = 0x1002 ONEWAY = 0x1003 RETVAL = 0x2001 EXCEPTION = 0x2002 YIELD = 0x2003 RESOLVE = 0x3001 SYNC = 0x4001 MSG_START = 0x4242 MAX_ID = 2**16 attr_reader :io alias_method :socket, :io def initialize(io) @io = io end def send_message(type, obj, message) data = Marshal.dump(message) header = [MSG_START, data.length, type, obj] @io.write(header.pack("vvvv")) @io.write(data) end def get_message() header = @io.read(8) magic, size, type, obj = header.unpack("vvvv") while magic != MSG_START header = @io.read(8) magic, size, type, obj = header.unpack("vvvv") end data = @io.read(size) message = Marshal.load(data) return [ type, obj, message ] end def finished() return @io.eof end def send_sync() send_message(Session::SYNC, 0, nil) end def wait_sync() sleep 1 type, obj, message = get_message() if type != Session::SYNC && obj != 0 && message != nil then raise RuntimeError, "DRuby synchonization failed" end end def reply_sync(value) if value == 0 then send_message(Session::SYNC, 1, nil) end end end end