require 'socket' require 'thread' require 'fcntl' require 'romp_helper' ## # ROMP - The Ruby Object Message Proxy # @author Paul Brannan # @version 0.1 # (C) Copyright 2001 Paul Brannan (cout at rm-f.net) # #
# ROMP is a set of classes for providing distributed object support to a
# Ruby program.  You may distribute and/or modify it under the same terms as
# Ruby (see http://www.ruby-lang.org/en/LICENSE.txt).  Example:
# 
# Client 
# ------
# client = ROMP::Client.new('localhost', 4242)
# obj = client.resolve("foo")
# puts obj.foo("foo")
# obj.oneway(:foo, "foo")
# 
# Server
# ------
# class Foo
#     def foo(x); return x; end
# end
# obj = Foo.new
# server = ROMP::Server.new('localhost', 4242)
# server.bind(obj, "foo")
# server.thread.join
# 
# You can do all sorts of cool things with ROMP, including passing blocks to
# the functions, throwing exceptions and propogating them from server to
# client, and more.  Unlike CORBA, where you must create an interface
# definition and strictly adhere to it, ROMP uses marshalling, so you can
# use almost any object with it.  But, alas, it is not as powerful as CORBA.
# 
# On a fast machine, you should expect around 7000 messages per second with
# normal method calls, up to 10000 messages per second with oneway calls with
# sync, and up to 40000 messages per second for oneway calls without sync.
# These numbers can vary depending on various factors, so YMMV.
# 
# The ROMP message format is broken into 3 components:
#     [ msg_type, obj_id, message ]
# For each msg_type, there is a specific format to the message.  Additionally,
# certain msg_types are only valid when being sent to the server, and others
# are valid only when being sent back to the client.  Here is a summary:
# 
# msg_type         send to     meaning of obj_id       msg format
# ----------------------------------------------------------------------------
# REQUEST          server      obj to talk to          [:method, *args]
# REQUEST_BLOCK    server      obj to talk to          [:method, *args]
# ONEWAY           server      obj to talk to          [:method, *args]
# ONEWAY_SYNC      server      obj to talk to          [:method, *args] 
# RETVAL           client      always 0                retval
# EXCEPTION        client      always 0                $!
# YIELD            client      always 0                [value, value, ...]
# SYNC             either      0=request, 1=response   nil
# NULL_MSG         either      always 0                n/a
# 
# BUGS:
# - On a 2.2 kernel, oneway calls without sync is very slow.
# - UDP support does not currently work.
# 
module ROMP public ## # The ROMP server class. Like its drb equivalent, this class spawns off # a new thread which processes requests, allowing the server to do other # things while it is doing processing for a distributed object. This # means, though, that all objects used with ROMP must be thread-safe. # class Server public attr_reader :obj, :thread ## # Start a ROMP server. # # @param endpoint An endpoint for the server to listen on; should be specified in URI notation. # @param acceptor A proc object that can accept or reject connections; it should take a Socket as an argument and returns true or false. # @param debug Turns on debugging messages if enabled. # def initialize(endpoint, acceptor=nil, debug=false) @mutex = Mutex.new @debug = debug @resolve_server = Resolve_Server.new @resolve_obj = Resolve_Obj.new(@resolve_server) @resolve_server.register(@resolve_obj) @thread = Thread.new do server = Generic_Server.new(endpoint) while(socket = server.accept) puts "Got a connection" if @debug if acceptor then if !acceptor.call(socket) then socket.close next end end puts "Accepted the connection" if @debug session = Session.new(socket) session.set_nonblock(true) Thread.new(socket) do |socket| begin # TODO: Send a sync message to the client so it # knows we are ready to receive data. server_loop(session) rescue Exception ROMP::print_exception($!) if @debug end puts "Connection closed" if @debug end end end end ## # Register an object with the server. The object will be given an # id of @next_id, and @next_id will be incremented. We could use the # object's real id, but this is insecure. The supplied object must # be thread-safe. # # @param obj The object to register. # # @return A new Object_Reference that should be returned to the client. # def create_reference(obj) @mutex.synchronize do id = @resolve_server.register(obj) Object_Reference.new(id) #return end end ## # Find an object in linear time and unregister it. Be careful with # this function, because the client may not know the object has # gone away. # # @param obj The object to unregister. # def delete_reference(obj) @mutex.synchronize do @resolve_server.unregister(obj) end nil #return end ## # Register an object with the server and bind it to name. # # @param obj The object to bind. # @param name The name of to bind the object to. # def bind(obj, name) id = @resolve_server.register(obj) @resolve_server.bind(name, id) nil #return end ## # This keeps the client from seeing our objects when they call inspect # alias_method :__inspect__, :inspect def inspect() return "" end private if false then # the following functions are implemented in C: ## # The server_loop function is the guts of the server. It takes in # requests from the client and forwards them to already-registered # objects. # # @param session The session to run the loop with. # def server_loop(session) end end # if false end ## # The ROMP client class. A ROMP server must be started on the given # host and port before instantiating a ROMP client. # class Client ## # Connect to a ROMP server # # @param endpoint The endpoint the server is listening on. # @param sync Specifies whether to synchronize between threads; turn this off to get a 20% performance boost. # def initialize(endpoint, sync=true) @server = Generic_Client.new(endpoint) @session = Session.new(@server) @session.set_nonblock(true) @mutex = sync ? Mutex.new : Null_Mutex.new @resolve_obj = Proxy_Object.new(@session, @mutex, 0) end ## # Given a string, return a proxy object that will forward requests # for an object on the server with that name. # # @param object_name The name of the object to resolve. # # @return A Proxy_Object that can be used to make method calls on the object in the server. # def resolve(object_name) @mutex.synchronize do object_id = @resolve_obj.resolve(object_name) return Proxy_Object.new(@session, @mutex, object_id) end end end private ## # In case the user does not want synchronization. # class Null_Mutex def synchronize yield end def lock end def unlock end end ## # All the special functions we have to keep track of # class Functions GOOD = [ :inspect, :class_variables, :instance_eval, :instance_variables, :to_a, :to_s ] BAD = [ :clone, :dup, :display ] METHOD = [ :methods, :private_methods, :protected_methods, :public_methods, :singleton_methods ] RESPOND = [ [ :method, "raise NameError" ], [ :respond_to?, "false" ] ] end ## # A ROMP::Object_Reference is created on the server side to represent an # object in the system. It can be returned from a server object to a # client object, at which point it is converted into a ROMP::Proxy_Object. # class Object_Reference attr_reader :object_id def initialize(object_id) @object_id = object_id end end ## # A ROMP::Object acts as a proxy; it forwards most methods to the server # for execution. When you make calls to a ROMP server, you will be # making the calls through a Proxy_Object. # class Proxy_Object if false then # the following functions are implemented in C: ## # The method_missing function is called for any method that is not # defined on the client side. It will forward requests to the server # for processing, and can iterate through a block, raise an exception, # or return a value. # def method_missing(function, *args) end ## # The oneway function is called to make a oneway call to the server # without synchronization. # def onweway(function, *args) end ## # The oneway_sync function is called to make a oneway call to the # server with synchronization (the server will return a null message # to the client before it begins processing). This is slightly safer # than a normal oneway call, but it is slower (except on a linux 2.2 # kernel; see the bug list above). # def oneway_sync(function, *args) end ## # The sync function will synchronize with the server. It sends a sync # request and waits for a response. # def sync() end end # if false # Make sure certain methods get passed down the wire. Functions::GOOD.each do |method| eval %{ def #{method}(*args) method_missing(:#{method}, *args) #return end } end # And make sure others never get called. Functions::BAD.each do |method| eval %{ def #{method}(*args) raise(NameError, "undefined method `#{method}' for " + "\#<#{self.class}:#{self.id}>") end } end # And remove these function names from any method lists that get # returned; there's nothing we can do about people who decide to # return them from other functions. Functions::METHOD.each do |method| eval %{ def #{method}(*args) retval = method_missing(:#{method}, *args) retval.each do |item| Functions::BAD.each do |bad| retval.delete(bad.to_s) end end retval #return end } end # Same here, except don't let the call go through in the first place. Functions::RESPOND.each do |method, action| eval %{ def #{method}(arg, *args) Functions::BAD.each do |bad| if arg === bad.to_s then return eval("#{action}") end end method_missing(:#{method}, arg, *args) #return end } end end ## # The Resolve_Server class registers objects for the server. You will # never have to use this class directly. # class Resolve_Server def initialize @next_id = 0 @unused_ids = Array.new @id_to_object = Hash.new @name_to_id = Hash.new end def register(obj) if @next_id >= Session::MAX_ID then if @unused_ids.size == 0 then raise "Object limit exceeded" else id = @unused_ids.pop end end @id_to_object[@next_id] = obj old_id = @next_id @next_id = @next_id.succ() old_id #return end def get_object(object_id) @id_to_object[object_id] #return end def unregister(obj) delete_obj_from_array_private(@id_to_object, obj) end def bind(name, id) @name_to_id[name] = id end def resolve(name) @name_to_id[name] #return end def delete_obj_from_array_private(array, obj) index = array.index(obj) array[index] = nil unless index == nil end end ## # The Resolve_Obj class handles resolve requests for the client. It is # a special ROMP object with an object id of 0. You will never have to # make calls on it directly, but will instead make calls on it through # the Client object. # class Resolve_Obj def initialize(resolve_server) @resolve_server = resolve_server end def resolve(name) @resolve_server.resolve(name) #return end end ## # A Generic_Server creates an endpoint to listen on, waits for connections, # and accepts them when requested. It can operate on different kinds of # connections. You will never have to use this object directly. # class Generic_Server def initialize(endpoint) case endpoint when %r{^(tcp)?romp://(.*?):(.*)} @type = "tcp" @host = $2 == "" ? nil : $2 @port = $3 @server = TCPServer.new(@host, @port) when %r{^(udp)romp://(.*?):(.*)} @type = "udp" @host = $2 == "" ? nil : $2 @port = $3 @server = UDPSocket.open() @server.bind(@host, @port) @mutex = Mutex.new when %r{^(unix)romp://(.*)} @type = "unix" @path = $2 @server = UNIXServer.open(@path) else raise ArgumentError, "Invalid endpoint" end end def accept case @type when "tcp" socket = @server.accept socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1) socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) socket.sync = true socket #return when "udp" @mutex.lock socket = @server socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) socket #return when "unix" socket = @server.accept socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) socket.sync = true socket #return end end end ## # A Generic_Client connects to a Generic_Server on a given endpoint. # You will never have to use this object directly. # class Generic_Client def self.new(endpoint) case endpoint when %r{^(tcp)?romp://(.*?):(.*)} socket = TCPSocket.open($2, $3) socket.sync = true socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1) socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) socket #return when %r{^(udp)romp://(.*?):(.*)} socket = UDPSocket.open socket.connect($2, $3) when %r{^(unix)romp://(.*)} socket = UNIXSocket.open($2) else raise ArgumentError, "Invalid endpoint" end end end ## # Print an exception to the screen. This is necessary, because Ruby does # not give us access to its error_print function from within Ruby. # # @param exc The exception object to print. # def self.print_exception(exc) first = true $!.backtrace.each do |bt| if first then puts "#{bt}: #{$!} (#{$!.message})" else puts "\tfrom #{bt}" end first = false end end if false then # the following classes are implemented in C: ## # The Sesssion class is defined in romp_helper.so. You should never have # to use it directly. # class Session end end # if false end