1 require 'socket'
  2 require 'thread'
  3 require 'fcntl'
  4 require 'romp_helper'
  5 
  6 ##
  7 # ROMP - The Ruby Object Message Proxy
  8 # @author Paul Brannan
  9 # @version 0.1
 10 # (C) Copyright 2001 Paul Brannan (cout at rm-f.net)
 11 # 
 12 # <pre>
 13 # ROMP is a set of classes for providing distributed object support to a
 14 # Ruby program.  You may distribute and/or modify it under the same terms as
 15 # Ruby (see http://www.ruby-lang.org/en/LICENSE.txt).  Example:
 16 # 
 17 # Client 
 18 # ------
 19 # client = ROMP::Client.new('localhost', 4242)
 20 # obj = client.resolve("foo")
 21 # puts obj.foo("foo")
 22 # obj.oneway(:foo, "foo")
 23 # 
 24 # Server
 25 # ------
 26 # class Foo
 27 #     def foo(x); return x; end
 28 # end
 29 # obj = Foo.new
 30 # server = ROMP::Server.new('localhost', 4242)
 31 # server.bind(obj, "foo")
 32 # server.thread.join
 33 # 
 34 # You can do all sorts of cool things with ROMP, including passing blocks to
 35 # the functions, throwing exceptions and propogating them from server to
 36 # client, and more.  Unlike CORBA, where you must create an interface
 37 # definition and strictly adhere to it, ROMP uses marshalling, so you can
 38 # use almost any object with it.  But, alas, it is not as powerful as CORBA.
 39 # 
 40 # On a fast machine, you should expect around 7000 messages per second with
 41 # normal method calls, up to 10000 messages per second with oneway calls with
 42 # sync, and up to 40000 messages per second for oneway calls without sync.
 43 # These numbers can vary depending on various factors, so YMMV.
 44 # 
 45 # The ROMP message format is broken into 3 components:
 46 #     [ msg_type, obj_id, message ]
 47 # For each msg_type, there is a specific format to the message.  Additionally,
 48 # certain msg_types are only valid when being sent to the server, and others
 49 # are valid only when being sent back to the client.  Here is a summary:
 50 # 
 51 # msg_type         send to     meaning of obj_id       msg format
 52 # ----------------------------------------------------------------------------
 53 # REQUEST          server      obj to talk to          [:method, *args]
 54 # REQUEST_BLOCK    server      obj to talk to          [:method, *args]
 55 # ONEWAY           server      obj to talk to          [:method, *args]
 56 # ONEWAY_SYNC      server      obj to talk to          [:method, *args] 
 57 # RETVAL           client      always 0                retval
 58 # EXCEPTION        client      always 0                $!
 59 # YIELD            client      always 0                [value, value, ...]
 60 # SYNC             either      0=request, 1=response   nil
 61 # NULL_MSG         either      always 0                n/a
 62 # 
 63 # BUGS:
 64 # - On a 2.2 kernel, oneway calls without sync is very slow.
 65 # - UDP support does not currently work.
 66 # </pre>
 67 
 68 module ROMP
 69 
 70 public
 71 
 72     ##
 73     # The ROMP server class.  Like its drb equivalent, this class spawns off
 74     # a new thread which processes requests, allowing the server to do other
 75     # things while it is doing processing for a distributed object.  This
 76     # means, though, that all objects used with ROMP must be thread-safe.
 77     # 
 78     class Server
 79 
 80     public
 81         attr_reader :obj, :thread
 82 
 83         ##
 84         # Start a ROMP server.
 85         #
 86         # @param endpoint An endpoint for the server to listen on; should be specified in URI notation.
 87         # @param acceptor A proc object that can accept or reject connections; it should take a Socket as an argument and returns true or false.
 88         # @param debug Turns on debugging messages if enabled.
 89         # 
 90         def initialize(endpoint, acceptor=nil, debug=false)
 91             @mutex = Mutex.new
 92             @debug = debug
 93             @resolve_server = Resolve_Server.new
 94             @resolve_obj = Resolve_Obj.new(@resolve_server)
 95             @resolve_server.register(@resolve_obj)
 96 
 97             @thread = Thread.new do
 98                 server = Generic_Server.new(endpoint)
 99                 while(socket = server.accept)
100                     puts "Got a connection" if @debug
101                     if acceptor then
102                         if !acceptor.call(socket) then
103                             socket.close
104                             next
105                         end
106                     end
107                     puts "Accepted the connection" if @debug
108                     session = Session.new(socket)
109                     session.set_nonblock(true)
110                     Thread.new(socket) do |socket|
111                         begin
112                             # TODO: Send a sync message to the client so it
113                             # knows we are ready to receive data.
114                             server_loop(session)
115                         rescue Exception
116                             ROMP::print_exception($!) if @debug
117                         end
118                         puts "Connection closed" if @debug
119                     end
120                 end
121             end
122         end
123 
124         ##
125         # Register an object with the server.  The object will be given an
126         # id of @next_id, and @next_id will be incremented.  We could use the
127         # object's real id, but this is insecure.  The supplied object must
128         # be thread-safe.
129         #
130         # @param obj The object to register.
131         #
132         # @return A new Object_Reference that should be returned to the client.
133         #
134         def create_reference(obj)
135             @mutex.synchronize do
136                 id = @resolve_server.register(obj)
137                 Object_Reference.new(id) #return
138             end
139         end
140 
141         ##
142         # Find an object in linear time and unregister it. Be careful with
143         # this function, because the client may not know the object has
144         # gone away.
145         #
146         # @param obj The object to unregister.
147         #
148         def delete_reference(obj)
149             @mutex.synchronize do
150                 @resolve_server.unregister(obj)
151             end
152             nil #return
153         end
154 
155         ##
156         # Register an object with the server and bind it to name.
157         #
158         # @param obj The object to bind.
159         # @param name The name of to bind the object to.
160         #
161         def bind(obj, name)
162             id = @resolve_server.register(obj)
163             @resolve_server.bind(name, id)
164             nil #return
165         end
166 
167         ##
168         # This keeps the client from seeing our objects when they call inspect
169         #
170         alias_method :__inspect__, :inspect
171         def inspect()
172             return ""
173         end
174 
175     private
176         if false then # the following functions are implemented in C:
177 
178         ##
179         # The server_loop function is the guts of the server.  It takes in
180         # requests from the client and forwards them to already-registered
181         # objects.
182         #
183         # @param session The session to run the loop with.
184         #
185         def server_loop(session)
186         end
187 
188         end # if false
189     end
190 
191     ##
192     # The ROMP client class.  A ROMP server must be started on the given
193     # host and port before instantiating a ROMP client.
194     #
195     class Client
196 
197         ##
198         # Connect to a ROMP server
199         #
200         # @param endpoint The endpoint the server is listening on.
201         # @param sync Specifies whether to synchronize between threads; turn this off to get a 20% performance boost.
202         #
203         def initialize(endpoint, sync=true)
204             @server = Generic_Client.new(endpoint)
205             @session = Session.new(@server)
206             @session.set_nonblock(true)
207             @mutex = sync ? Mutex.new : Null_Mutex.new
208             @resolve_obj = Proxy_Object.new(@session, @mutex, 0)
209         end
210 
211         ##
212         # Given a string, return a proxy object that will forward requests
213         # for an object on the server with that name.
214         #
215         # @param object_name The name of the object to resolve.
216         #
217         # @return A Proxy_Object that can be used to make method calls on the object in the server.
218         #
219         def resolve(object_name)
220             @mutex.synchronize do
221                 object_id = @resolve_obj.resolve(object_name)
222                 return Proxy_Object.new(@session, @mutex, object_id)
223             end
224         end
225     end
226 
227 private
228 
229     ##
230     # In case the user does not want synchronization.
231     #
232     class Null_Mutex
233         def synchronize
234             yield
235         end
236 
237         def lock
238         end
239 
240         def unlock
241         end
242     end
243 
244     ##
245     # All the special functions we have to keep track of
246     #
247     class Functions
248         GOOD = [
249             :inspect, :class_variables, :instance_eval, :instance_variables,
250             :to_a, :to_s
251         ]
252 
253         BAD = [
254             :clone, :dup, :display
255         ]
256 
257         METHOD = [
258             :methods, :private_methods, :protected_methods, :public_methods,
259             :singleton_methods
260         ]
261 
262         RESPOND = [
263             [ :method,      "raise NameError" ],
264             [ :respond_to?, "false" ]
265         ]
266     end
267 
268     ##
269     # A ROMP::Object_Reference is created on the server side to represent an
270     # object in the system.  It can be returned from a server object to a
271     # client object, at which point it is converted into a ROMP::Proxy_Object.
272     #
273     class Object_Reference
274         attr_reader :object_id
275 
276         def initialize(object_id)
277             @object_id = object_id
278         end
279     end
280 
281     ##
282     # A ROMP::Object acts as a proxy; it forwards most methods to the server
283     # for execution.  When you make calls to a ROMP server, you will be
284     # making the calls through a Proxy_Object.
285     #
286     class Proxy_Object
287 
288         if false then # the following functions are implemented in C:
289 
290         ##
291         # The method_missing function is called for any method that is not
292         # defined on the client side.  It will forward requests to the server
293         # for processing, and can iterate through a block, raise an exception,
294         # or return a value.
295         #
296         def method_missing(function, *args)
297         end
298 
299         ##
300         # The oneway function is called to make a oneway call to the server
301         # without synchronization.
302         #
303         def onweway(function, *args)
304         end
305 
306         ##
307         # The oneway_sync function is called to make a oneway call to the
308         # server with synchronization (the server will return a null message
309         # to the client before it begins processing).  This is slightly safer
310         # than a normal oneway call, but it is slower (except on a linux 2.2
311         # kernel; see the bug list above).
312         #
313         def oneway_sync(function, *args)
314         end
315         
316         ##
317         # The sync function will synchronize with the server.  It sends a sync
318         # request and waits for a response.
319         #
320         def sync()
321         end
322 
323         end # if false
324 
325         # Make sure certain methods get passed down the wire.
326         Functions::GOOD.each do |method|
327             eval %{
328                 def #{method}(*args)
329                       method_missing(:#{method}, *args) #return
330                 end
331             }
332         end
333 
334         # And make sure others never get called.
335         Functions::BAD.each do |method|
336             eval %{
337                 def #{method}(*args)
338                     raise(NameError,
339                         "undefined method `#{method}' for " +
340                         "\#<#{self.class}:#{self.id}>")
341                 end
342             }
343         end
344 
345         # And remove these function names from any method lists that get
346         # returned; there's nothing we can do about people who decide to
347         # return them from other functions.
348         Functions::METHOD.each do |method|
349             eval %{
350                 def #{method}(*args)
351                     retval = method_missing(:#{method}, *args)
352                     retval.each do |item|
353                         Functions::BAD.each do |bad|
354                             retval.delete(bad.to_s)
355                         end
356                     end
357                     retval #return
358                 end
359             }
360         end
361 
362         # Same here, except don't let the call go through in the first place.
363         Functions::RESPOND.each do |method, action|
364             eval %{
365                 def #{method}(arg, *args)
366                     Functions::BAD.each do |bad|
367                         if arg === bad.to_s then
368                             return eval("#{action}")
369                         end
370                     end
371                     method_missing(:#{method}, arg, *args) #return
372                 end
373             }
374         end
375 
376     end
377 
378     ##
379     # The Resolve_Server class registers objects for the server.  You will
380     # never have to use this class directly.
381     #
382     class Resolve_Server
383         def initialize
384             @next_id = 0
385             @unused_ids = Array.new
386             @id_to_object = Hash.new
387             @name_to_id = Hash.new
388         end
389 
390         def register(obj)
391             if @next_id >= Session::MAX_ID then
392                 if @unused_ids.size == 0 then
393                     raise "Object limit exceeded"
394                 else
395                     id = @unused_ids.pop
396                 end
397             end
398             @id_to_object[@next_id] = obj
399             old_id = @next_id
400             @next_id = @next_id.succ()
401             old_id #return
402         end
403 
404         def get_object(object_id)
405             @id_to_object[object_id] #return
406         end
407 
408         def unregister(obj)
409             delete_obj_from_array_private(@id_to_object, obj)
410         end
411 
412         def bind(name, id)
413             @name_to_id[name] = id
414         end
415 
416         def resolve(name)
417             @name_to_id[name] #return
418         end
419 
420         def delete_obj_from_array_private(array, obj)
421             index = array.index(obj)
422             array[index] = nil unless index == nil
423         end
424     end
425 
426     ##
427     # The Resolve_Obj class handles resolve requests for the client.  It is
428     # a special ROMP object with an object id of 0.  You will never have to
429     # make calls on it directly, but will instead make calls on it through
430     # the Client object.
431     #
432     class Resolve_Obj
433         def initialize(resolve_server)
434             @resolve_server = resolve_server
435         end
436 
437         def resolve(name)
438             @resolve_server.resolve(name) #return
439         end
440     end
441 
442     ##
443     # A Generic_Server creates an endpoint to listen on, waits for connections,
444     # and accepts them when requested.  It can operate on different kinds of
445     # connections.  You will never have to use this object directly.
446     #
447     class Generic_Server
448         def initialize(endpoint)
449             case endpoint
450                 when %r{^(tcp)?romp://(.*?):(.*)}
451                     @type = "tcp"
452                     @host = $2 == "" ? nil : $2
453                     @port = $3
454                     @server = TCPServer.new(@host, @port)
455                 when %r{^(udp)romp://(.*?):(.*)}
456                     @type = "udp"
457                     @host = $2 == "" ? nil : $2
458                     @port = $3
459                     @server = UDPSocket.open()
460                     @server.bind(@host, @port)
461                     @mutex = Mutex.new
462                 when %r{^(unix)romp://(.*)}
463                     @type = "unix"
464                     @path = $2
465                     @server = UNIXServer.open(@path)
466                 else
467                     raise ArgumentError, "Invalid endpoint"
468             end
469         end
470         def accept
471             case @type
472                 when "tcp"
473                     socket = @server.accept
474                     socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1)
475                     socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
476                     socket.sync = true
477                     socket #return
478                 when "udp"
479                     @mutex.lock
480                     socket = @server
481                     socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
482                     socket #return
483                 when "unix"
484                     socket = @server.accept
485                     socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
486                     socket.sync = true
487                     socket #return
488             end
489         end
490     end
491 
492     ##
493     # A Generic_Client connects to a Generic_Server on a given endpoint.
494     # You will never have to use this object directly.
495     #
496     class Generic_Client
497         def self.new(endpoint)
498             case endpoint
499                 when %r{^(tcp)?romp://(.*?):(.*)}
500                     socket = TCPSocket.open($2, $3)
501                     socket.sync = true
502                     socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1)
503                     socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
504                     socket #return
505                 when %r{^(udp)romp://(.*?):(.*)}
506                     socket = UDPSocket.open
507                     socket.connect($2, $3)
508                 when %r{^(unix)romp://(.*)}
509                     socket = UNIXSocket.open($2)
510                 else
511                     raise ArgumentError, "Invalid endpoint"
512             end
513         end
514     end
515 
516 
517     ##
518     # Print an exception to the screen.  This is necessary, because Ruby does
519     # not give us access to its error_print function from within Ruby.
520     #
521     # @param exc The exception object to print.
522     #
523     def self.print_exception(exc)
524         first = true
525         $!.backtrace.each do |bt|
526             if first then
527                 puts "#{bt}: #{$!} (#{$!.message})"
528             else
529                 puts "\tfrom #{bt}"
530             end
531             first = false
532         end
533     end
534 
535     if false then # the following classes are implemented in C:
536 
537     ##
538     # The Sesssion class is defined in romp_helper.so.  You should never have
539     # to use it directly.
540     #
541     class Session
542     end
543 
544     end # if false
545 
546 end