require 'socket' require 'thread' =begin DRuby (not to be confused with dRuby, written by Masatoshi SEKI) (C) Copyright 2001 Paul Brannan (cout at rm-f.net) DRuby is a set of classes for providing distributed object support to a Ruby program. You may distribute and/or modify it under the same terms as Ruby (see http://www.ruby-lang.org/en/LICENSE.txt). Example: Client ------ client = DRuby::Client.new('localhost', 4242) obj = client.resolve("foo") puts obj.foo("foo") obj.oneway(:foo, "foo") Server ------ class Foo def foo(x); return x; end end obj = Foo.new server = DRuby::Server.new('localhost', 4242) server.bind(obj, "foo") server.thread.join You can do all sorts of cool things with DRuby, including passing blocks to the functions, throwing exceptions and propogating them from server to client, and more. Unlike CORBA, where you must create an interface definition and strictly adhere to it, DRuby uses marshalling, so you can use almost any object with it. But, alas, it is not as powerful as CORBA. On a fast machine, you should expect around 4000 messages per second with normal method calls, and up to 10000 messages per second with oneway calls. My dual-processor machine gets 3500/6500 messages per second, though, so YMMV. The DRuby message format is broken into 3 components: [ msg_type, obj_id, message ] For each msg_type, there is a specific format to the message. Additionally, certain msg_types are only valid when being sent to the server, and others are valid only when being sent back to the client. Here is a summary: msg_type send to meaning of obj_id msg format ---------------------------------------------------------------------------- REQUEST server obj to talk to [:method, *args] REQUEST_BLOCK server obj to talk to [:method, *args] ONEWAY server obj to talk to [:method, *args] RETVAL client always 0 retval EXCEPTION client always 0 $! YIELD client always 0 [value, value, ...] RESOLVE server always 0 object_name SYNC either 0=request, 1=response nil Known bugs: 1) Oneway calls are really slow if mixed with regular calls. I'm not sure why. A workaround is to use a separate connection for each type of call, but sometimes that doesn't work either. 2) The server does not refuse connections for unauthorized addresses; it will instead accept a connection and then immediately close it. 3) If the server is flooded with oneway calls and drops packets, it will listen for data until it encounters a valid message. If this happens, the client will not be notified. =end module DRuby public # The DRuby server class. Like its drb equivalent, this class spawns off # a new thread which processes requests, allowing the server to do other # things while it is doing processing for a distributed object. This # means, though, that all objects used with DRuby must be thread-safe. class Server public attr_reader :host, :port, :obj, :thread # Start a server on the given host and port. An acceptor may be # specified to accept/reject connections; it should be a proc that # takes a Socket as an argument and returns true or false. def initialize(host, port, acceptor=nil) @host = host @port = port @name_to_id = Hash.new @id_to_object = Array.new @id_to_acceptor = Array.new @mutex = Mutex.new @next_id = 1 @unused_ids = Array.new @thread = Thread.new do server = TCPServer.new(@host, @port) while(socket = server.accept) if acceptor then if !acceptor.call(socket) then socket.close next end end socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1) Thread.new(socket) do |socket| session_loop(Session.new(socket)) end end end end # Register an object with the server. The object will be given an # id of @next_id, and @next_id will be incremented. We could use the # object's real id, but this is insecure. The acceptor argument is # a proc that takes a Socket as an arugment, and returns true or false # depending on whether the request should be allowed. The supplied # object must be thread-safe. def create_reference(obj, acceptor=nil) @mutex.synchronize do ref = register_private(obj, acceptor) Object_Reference.new(ref) #return end end # Finds an object in linear time and unregister it. If you need a # faster function, write one and send me a patch. def delete_reference(obj) @mutex.synchronize do unregister_private(obj) end nil #return end # Register an object with the server and bind it to name def bind(obj, name, acceptor=nil) id = register_private(obj, acceptor) @name_to_id[name] = id nil #return end # Main server loop. Wait for a REQUEST message, process it, and send # back a YIELD, EXCEPTION, or RETVAL message. Note that we don't do # any thread synchronization here, because all registered objects are # required to be thread-safe, and the @id_to_object lookup is atomic # (and if it succeeds, the object is guaranteed to be fully # registered). def session_loop(session) while not session.finished() type, object_id, message = session.get_message() begin obj = @id_to_object[object_id] if (acceptor = @id_to_acceptor[object_id]) then if !acceptor.call(session.socket) then if type != Session::ONEWAY then raise "No authorization" end end end case type # In order of most to least common when Session::REQUEST retval = obj.__send__(*message) when Session::ONEWAY begin; obj.__send__(*message); rescue Exception; end next when Session::REQUEST_BLOCK retval = obj.__send__(*message) do |*i| session.send_message(Session::YIELD, 0, i) end when Session::RESOLVE retval = @name_to_id[message] when Session::SYNC session.reply_sync(object_id) next else raise "Bad session request" end session.send_message(Session::RETVAL, 0, retval) rescue Exception session.send_message(Session::EXCEPTION, 0, $!) end end end private def register_private(obj, acceptor) if @next_id >= Session::MAX_ID then if @unused_ids.size == 0 then raise "Object limit exceeded" else id = @unused_ids.pop end end @id_to_acceptor[@next_id] = acceptor @id_to_object[@next_id] = obj old_id = @next_id @next_id = @next_id.succ() old_id #return end def unregister_private(obj) delete_obj_from_array_private(@id_to_acceptor, @next_id) delete_obj_from_array_private(@id_to_object, @next_id) end def delete_obj_from_array_private(array, obj) index = array.index(obj) array[index] = nil unless index == nil end end # The DRuby client class. A DRuby server must be started on the given # host and port before instantiating a DRuby client. class Client attr_reader :host, :port # Connect a client to the server on the given host and port. The # user can specify sync=false to turn off synchronization and get a # 20% speed boost. def initialize(host, port, sync=true) @host = host @port = port @server = TCPSocket.open(@host, @port) @server.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1) @session = Session.new(@server) @mutex = sync ? Mutex.new : Null_Mutex.new end # Given a string, return a proxy object that will forward requests # for an object on the server with that name. def resolve(object_name) @mutex.synchronize do @session.send_message(Session::RESOLVE, 0, object_name) type, obj, message = @session.get_message() case type when Session::RETVAL Proxy_Object.new(@session, @mutex, message) #return when Session::EXCEPTION ; raise message else ; raise RuntimeError end end end end private # In case the user does not want synchronization. class Null_Mutex def synchronize yield end end # All the special functions we have to keep track of class Functions GOOD = [ :inspect, :class_variables, :instance_eval, :instance_variables, :to_a, :to_s ] BAD = [ :clone, :dup, :display ] METHOD = [ :methods, :private_methods, :protected_methods, :public_methods, :singleton_methods ] RESPOND = [ [ :method, "raise NameError" ], [ :respond_to?, "false" ] ] end # A DRuby::Object_Reference is created on the server side to represent an # object in the system. It can be returned from a server object to a # client object, at which point it is converted into a DRuby::Proxy_Object. class Object_Reference attr_reader :object_id def initialize(object_id) @object_id = object_id end end # A DRuby::Object acts as a proxy; it forwards most methods to the server # for execution. class Proxy_Object public def initialize(session, mutex, object_id) @object_id = object_id @session = session @mutex = mutex end # Client request handler. The idea here is to send out a REQUEST # message, and get back a YIELD, EXCEPTION, or RETVAL message. def method_missing(method, *args) message = [ method, *args ] @mutex.synchronize do @session.send_message( (block_given?) ? Session::REQUEST_BLOCK : Session::REQUEST, @object_id, message) loop do type, obj, message = @session.get_message() case type when Session::RETVAL ; return msg_to_obj(message) when Session::YIELD ; yield msg_to_obj(message) when Session::EXCEPTION ; raise message when Session::SYNC ; @session.reply_sync(obj) else ; raise "Invalid msg type" end end end end # This is a special function that lets you ignore the return value # of the calling function to get some extra speed, in exchange for not # knowing when the function will complete. A future version of the # server should probably check whether a given function is allowed to # be called as a oneway, in order to reduce DoS attacks. def oneway(*message) @mutex.synchronize do @session.send_message(Session::ONEWAY, @object_id, message) nil #return end end # sync() will synchonize the client with the server (useful for # determining when a oneway call completes) def sync() @mutex.synchronize do @session.send_sync() @session.wait_sync() end end # Make sure certain methods get passed down the wire. Functions::GOOD.each do |method| eval %{ def #{method}(*args) method_missing(:#{method}, *args) #return end } end # And make sure others never get called. Functions::BAD.each do |method| eval %{ def #{method}(*args) raise(NameError, "undefined method `#{method}' for " + "\#<#{self.class}:#{self.id}>") end } end # And remove these function names from any method lists that get # returned; there's nothing we can do about people who decide to # return them from other functions. Functions::METHOD.each do |method| eval %{ def #{method}(*args) retval = method_missing(:#{method}, *args) retval.each do |item| Functions::BAD.each do |bad| retval.delete(bad.to_s) end end retval #return end } end # Same here, except don't let the call go through in the first place. Functions::RESPOND.each do |method, action| eval %{ def #{method}(arg, *args) Functions::BAD.each do |bad| if arg === bad.to_s then return eval("#{action}") end end method_missing(:#{method}, arg, *args) #return end } end private def msg_to_obj(obj) if Object_Reference === obj then Proxy_Object.new(@session, @mutex, obj.object_id) #return else obj #return end end end # A DRuby session sends messages back and forth between server and client. # The message format is as follows: # +-----+-----+-----+-----+-----+-----+-----+-----+-----+--- + ---+-----+ # | MSG_START | msg. size | msg. type | obj. id | marshalled msg/args | # +-----+-----+-----+-----+-----+-----+-----+-----+-----+--- + ---+-----+ class Session REQUEST = 0x1001 REQUEST_BLOCK = 0x1002 ONEWAY = 0x1003 RETVAL = 0x2001 EXCEPTION = 0x2002 YIELD = 0x2003 RESOLVE = 0x3001 SYNC = 0x4001 MSG_START = 0x4242 MAX_ID = 2**16 attr_reader :io alias_method :socket, :io def initialize(io) @io = io end def send_message(type, obj, message) data = Marshal.dump(message) header = [MSG_START, data.length, type, obj] @io.write(header.pack("vvvv")) @io.write(data) end def get_message() header = @io.read(8) magic, size, type, obj = header.unpack("vvvv") while magic != MSG_START header = @io.read(8) magic, size, type, obj = header.unpack("vvvv") end data = @io.read(size) message = Marshal.load(data) [ type, obj, message ] #return end def finished() @io.eof #return end def send_sync() send_message(Session::SYNC, 0, nil) end def wait_sync() sleep 1 type, obj, message = get_message() if type != Session::SYNC && obj != 0 && message != nil then raise "DRuby synchonization failed" end end def reply_sync(value) if value == 0 then send_message(Session::SYNC, 1, nil) end end end end