1 require 'socket'
  2 require 'thread'
  3 require 'fcntl'
  4 require 'romp_helper'
  5 
  6 ##
  7 # ROMP - The Ruby Object Message Proxy
  8 # @author Paul Brannan
  9 # @version 0.1
 10 # (C) Copyright 2001 Paul Brannan (cout at rm-f.net)
 11 # 
 12 # <pre>
 13 # ROMP is a set of classes for providing distributed object support to a
 14 # Ruby program.  You may distribute and/or modify it under the same terms as
 15 # Ruby (see http://www.ruby-lang.org/en/LICENSE.txt).  Example:
 16 # 
 17 # Client 
 18 # ------
 19 # client = ROMP::Client.new('localhost', 4242)
 20 # obj = client.resolve("foo")
 21 # puts obj.foo("foo")
 22 # obj.oneway(:foo, "foo")
 23 # 
 24 # Server
 25 # ------
 26 # class Foo
 27 #     def foo(x); return x; end
 28 # end
 29 # obj = Foo.new
 30 # server = ROMP::Server.new('localhost', 4242)
 31 # server.bind(obj, "foo")
 32 # server.thread.join
 33 # 
 34 # You can do all sorts of cool things with ROMP, including passing blocks to
 35 # the functions, throwing exceptions and propogating them from server to
 36 # client, and more.  Unlike CORBA, where you must create an interface
 37 # definition and strictly adhere to it, ROMP uses marshalling, so you can
 38 # use almost any object with it.  But, alas, it is not as powerful as CORBA.
 39 # 
 40 # On a fast machine, you should expect around 7000 messages per second with
 41 # normal method calls, up to 10000 messages per second with oneway calls with
 42 # sync, and up to 40000 messages per second for oneway calls without sync.
 43 # These numbers can vary depending on various factors, so YMMV.
 44 # 
 45 # The ROMP message format is broken into 3 components:
 46 #     [ msg_type, obj_id, message ]
 47 # For each msg_type, there is a specific format to the message.  Additionally,
 48 # certain msg_types are only valid when being sent to the server, and others
 49 # are valid only when being sent back to the client.  Here is a summary:
 50 # 
 51 # msg_type         send to     meaning of obj_id       msg format
 52 # ----------------------------------------------------------------------------
 53 # REQUEST          server      obj to talk to          [:method, *args]
 54 # REQUEST_BLOCK    server      obj to talk to          [:method, *args]
 55 # ONEWAY           server      obj to talk to          [:method, *args]
 56 # ONEWAY_SYNC      server      obj to talk to          [:method, *args] 
 57 # RETVAL           client      always 0                retval
 58 # EXCEPTION        client      always 0                $!
 59 # YIELD            client      always 0                [value, value, ...]
 60 # SYNC             either      0=request, 1=response   nil
 61 # NULL_MSG         either      always 0                n/a
 62 # 
 63 # BUGS:
 64 # - On a 2.2 kernel, oneway calls without sync is very slow.
 65 # - UDP support does not currently work.
 66 # </pre>
 67 
 68 module ROMP
 69 
 70 public
 71 
 72     ##
 73     # The ROMP server class.  Like its drb equivalent, this class spawns off
 74     # a new thread which processes requests, allowing the server to do other
 75     # things while it is doing processing for a distributed object.  This
 76     # means, though, that all objects used with ROMP must be thread-safe.
 77     # 
 78     class Server
 79 
 80     public
 81         attr_reader :obj, :thread
 82 
 83         ##
 84         # Start a ROMP server.
 85         #
 86         # @param endpoint An endpoint for the server to listen on; should be specified in URI notation.
 87         # @param acceptor A proc object that can accept or reject connections; it should take a Socket as an argument and returns true or false.
 88         # @param debug Turns on debugging messages if enabled.
 89         # 
 90         def initialize(endpoint, acceptor=nil, debug=false)
 91             @mutex = Mutex.new
 92             @debug = debug
 93             @resolve_server = Resolve_Server.new
 94             @resolve_obj = Resolve_Obj.new(@resolve_server)
 95             @resolve_server.register(@resolve_obj)
 96 
 97             @thread = Thread.new do
 98                 server = Generic_Server.new(endpoint)
 99                 while(socket = server.accept)
100                     puts "Got a connection" if @debug
101                     if acceptor then
102                         if !acceptor.call(socket) then
103                             socket.close
104                             next
105                         end
106                     end
107                     puts "Accepted the connection" if @debug
108                     session = Session.new(socket)
109                     session.set_nonblock(true)
110                     Thread.new(socket) do |socket|
111                         Thread.current.abort_on_exception = true
112                         begin
113                             # TODO: Send a sync message to the client so it
114                             # knows we are ready to receive data.
115                             server_loop(session)
116                         rescue Exception
117                             ROMP::print_exception($!) if @debug
118                         end
119                         puts "Connection closed" if @debug
120                     end
121                 end
122             end
123         end
124 
125         ##
126         # Register an object with the server.  The object will be given an
127         # id of @next_id, and @next_id will be incremented.  We could use the
128         # object's real id, but this is insecure.  The supplied object must
129         # be thread-safe.
130         #
131         # @param obj The object to register.
132         #
133         # @return A new Object_Reference that should be returned to the client.
134         #
135         def create_reference(obj)
136             @mutex.synchronize do
137                 id = @resolve_server.register(obj)
138                 Object_Reference.new(id) #return
139             end
140         end
141 
142         ##
143         # Find an object in linear time and unregister it. Be careful with
144         # this function, because the client may not know the object has
145         # gone away.
146         #
147         # @param obj The object to unregister.
148         #
149         def delete_reference(obj)
150             @mutex.synchronize do
151                 @resolve_server.unregister(obj)
152             end
153             nil #return
154         end
155 
156         ##
157         # Register an object with the server and bind it to name.
158         #
159         # @param obj The object to bind.
160         # @param name The name of to bind the object to.
161         #
162         def bind(obj, name)
163             id = @resolve_server.register(obj)
164             @resolve_server.bind(name, id)
165             nil #return
166         end
167 
168         ##
169         # This keeps the client from seeing our objects when they call inspect
170         #
171         alias_method :__inspect__, :inspect
172         def inspect()
173             return ""
174         end
175 
176     private
177         if false then # the following functions are implemented in C:
178 
179         ##
180         # The server_loop function is the guts of the server.  It takes in
181         # requests from the client and forwards them to already-registered
182         # objects.
183         #
184         # @param session The session to run the loop with.
185         #
186         def server_loop(session)
187         end
188 
189         end # if false
190     end
191 
192     ##
193     # The ROMP client class.  A ROMP server must be started on the given
194     # host and port before instantiating a ROMP client.
195     #
196     class Client
197 
198         ##
199         # Connect to a ROMP server
200         #
201         # @param endpoint The endpoint the server is listening on.
202         # @param sync Specifies whether to synchronize between threads; turn this off to get a 20% performance boost.
203         #
204         def initialize(endpoint, sync=true)
205             @server = Generic_Client.new(endpoint)
206             @session = Session.new(@server)
207             @session.set_nonblock(true)
208             @mutex = sync ? Mutex.new : Null_Mutex.new
209             @resolve_obj = Proxy_Object.new(@session, @mutex, 0)
210         end
211 
212         ##
213         # Given a string, return a proxy object that will forward requests
214         # for an object on the server with that name.
215         #
216         # @param object_name The name of the object to resolve.
217         #
218         # @return A Proxy_Object that can be used to make method calls on the object in the server.
219         #
220         def resolve(object_name)
221             @mutex.synchronize do
222                 object_id = @resolve_obj.resolve(object_name)
223                 return Proxy_Object.new(@session, @mutex, object_id)
224             end
225         end
226     end
227 
228 private
229 
230     ##
231     # In case the user does not want synchronization.
232     #
233     class Null_Mutex
234         def synchronize
235             yield
236         end
237 
238         def lock
239         end
240 
241         def unlock
242         end
243     end
244 
245     ##
246     # All the special functions we have to keep track of
247     #
248     class Functions
249         GOOD = [
250             :inspect, :class_variables, :instance_eval, :instance_variables,
251             :to_a, :to_s
252         ]
253 
254         BAD = [
255             :clone, :dup, :display
256         ]
257 
258         METHOD = [
259             :methods, :private_methods, :protected_methods, :public_methods,
260             :singleton_methods
261         ]
262 
263         RESPOND = [
264             [ :method,      "raise NameError" ],
265             [ :respond_to?, "false" ]
266         ]
267     end
268 
269     ##
270     # A ROMP::Object_Reference is created on the server side to represent an
271     # object in the system.  It can be returned from a server object to a
272     # client object, at which point it is converted into a ROMP::Proxy_Object.
273     #
274     class Object_Reference
275         attr_reader :object_id
276 
277         def initialize(object_id)
278             @object_id = object_id
279         end
280     end
281 
282     ##
283     # A ROMP::Object acts as a proxy; it forwards most methods to the server
284     # for execution.  When you make calls to a ROMP server, you will be
285     # making the calls through a Proxy_Object.
286     #
287     class Proxy_Object
288 
289         if false then # the following functions are implemented in C:
290 
291         ##
292         # The method_missing function is called for any method that is not
293         # defined on the client side.  It will forward requests to the server
294         # for processing, and can iterate through a block, raise an exception,
295         # or return a value.
296         #
297         def method_missing(function, *args)
298         end
299 
300         ##
301         # The oneway function is called to make a oneway call to the server
302         # without synchronization.
303         #
304         def onweway(function, *args)
305         end
306 
307         ##
308         # The oneway_sync function is called to make a oneway call to the
309         # server with synchronization (the server will return a null message
310         # to the client before it begins processing).  This is slightly safer
311         # than a normal oneway call, but it is slower (except on a linux 2.2
312         # kernel; see the bug list above).
313         #
314         def oneway_sync(function, *args)
315         end
316         
317         ##
318         # The sync function will synchronize with the server.  It sends a sync
319         # request and waits for a response.
320         #
321         def sync()
322         end
323 
324         end # if false
325 
326         # Make sure certain methods get passed down the wire.
327         Functions::GOOD.each do |method|
328             eval %{
329                 def #{method}(*args)
330                       method_missing(:#{method}, *args) #return
331                 end
332             }
333         end
334 
335         # And make sure others never get called.
336         Functions::BAD.each do |method|
337             eval %{
338                 def #{method}(*args)
339                     raise(NameError,
340                         "undefined method `#{method}' for " +
341                         "\#<#{self.class}:#{self.id}>")
342                 end
343             }
344         end
345 
346         # And remove these function names from any method lists that get
347         # returned; there's nothing we can do about people who decide to
348         # return them from other functions.
349         Functions::METHOD.each do |method|
350             eval %{
351                 def #{method}(*args)
352                     retval = method_missing(:#{method}, *args)
353                     retval.each do |item|
354                         Functions::BAD.each do |bad|
355                             retval.delete(bad.to_s)
356                         end
357                     end
358                     retval #return
359                 end
360             }
361         end
362 
363         # Same here, except don't let the call go through in the first place.
364         Functions::RESPOND.each do |method, action|
365             eval %{
366                 def #{method}(arg, *args)
367                     Functions::BAD.each do |bad|
368                         if arg === bad.to_s then
369                             return eval("#{action}")
370                         end
371                     end
372                     method_missing(:#{method}, arg, *args) #return
373                 end
374             }
375         end
376 
377     end
378 
379     ##
380     # The Resolve_Server class registers objects for the server.  You will
381     # never have to use this class directly.
382     #
383     class Resolve_Server
384         def initialize
385             @next_id = 0
386             @unused_ids = Array.new
387             @id_to_object = Hash.new
388             @name_to_id = Hash.new
389         end
390 
391         def register(obj)
392             if @next_id >= Session::MAX_ID then
393                 if @unused_ids.size == 0 then
394                     raise "Object limit exceeded"
395                 else
396                     id = @unused_ids.pop
397                 end
398             end
399             @id_to_object[@next_id] = obj
400             old_id = @next_id
401             @next_id = @next_id.succ()
402             old_id #return
403         end
404 
405         def get_object(object_id)
406             @id_to_object[object_id] #return
407         end
408 
409         def unregister(obj)
410             delete_obj_from_array_private(@id_to_object, obj)
411         end
412 
413         def bind(name, id)
414             @name_to_id[name] = id
415         end
416 
417         def resolve(name)
418             @name_to_id[name] #return
419         end
420 
421         def delete_obj_from_array_private(array, obj)
422             index = array.index(obj)
423             array[index] = nil unless index == nil
424         end
425     end
426 
427     ##
428     # The Resolve_Obj class handles resolve requests for the client.  It is
429     # a special ROMP object with an object id of 0.  You will never have to
430     # make calls on it directly, but will instead make calls on it through
431     # the Client object.
432     #
433     class Resolve_Obj
434         def initialize(resolve_server)
435             @resolve_server = resolve_server
436         end
437 
438         def resolve(name)
439             @resolve_server.resolve(name) #return
440         end
441     end
442 
443     ##
444     # A Generic_Server creates an endpoint to listen on, waits for connections,
445     # and accepts them when requested.  It can operate on different kinds of
446     # connections.  You will never have to use this object directly.
447     #
448     class Generic_Server
449         def initialize(endpoint)
450             case endpoint
451                 when %r{^(tcp)?romp://(.*?):(.*)}
452                     @type = "tcp"
453                     @host = $2 == "" ? nil : $2
454                     @port = $3
455                     @server = TCPServer.new(@host, @port)
456                 when %r{^(udp)romp://(.*?):(.*)}
457                     @type = "udp"
458                     @host = $2 == "" ? nil : $2
459                     @port = $3
460                     @server = UDPSocket.open()
461                     @server.bind(@host, @port)
462                     @mutex = Mutex.new
463                 when %r{^(unix)romp://(.*)}
464                     @type = "unix"
465                     @path = $2
466                     @server = UNIXServer.open(@path)
467                 else
468                     raise ArgumentError, "Invalid endpoint"
469             end
470         end
471         def accept
472             case @type
473                 when "tcp"
474                     socket = @server.accept
475                     socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1)
476                     socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
477                     socket.sync = true
478                     socket #return
479                 when "udp"
480                     @mutex.lock
481                     socket = @server
482                     socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
483                     socket #return
484                 when "unix"
485                     socket = @server.accept
486                     socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
487                     socket.sync = true
488                     socket #return
489             end
490         end
491     end
492 
493     ##
494     # A Generic_Client connects to a Generic_Server on a given endpoint.
495     # You will never have to use this object directly.
496     #
497     class Generic_Client
498         def self.new(endpoint)
499             case endpoint
500                 when %r{^(tcp)?romp://(.*?):(.*)}
501                     socket = TCPSocket.open($2, $3)
502                     socket.sync = true
503                     socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1)
504                     socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
505                     socket #return
506                 when %r{^(udp)romp://(.*?):(.*)}
507                     socket = UDPSocket.open
508                     socket.connect($2, $3)
509                     socket #return
510                 when %r{^(unix)romp://(.*)}
511                     socket = UNIXSocket.open($2)
512                 else
513                     raise ArgumentError, "Invalid endpoint"
514             end
515         end
516     end
517 
518 
519     ##
520     # Print an exception to the screen.  This is necessary, because Ruby does
521     # not give us access to its error_print function from within Ruby.
522     #
523     # @param exc The exception object to print.
524     #
525     def self.print_exception(exc)
526         first = true
527         $!.backtrace.each do |bt|
528             if first then
529                 puts "#{bt}: #{$!} (#{$!.message})"
530             else
531                 puts "\tfrom #{bt}"
532             end
533             first = false
534         end
535     end
536 
537     if false then # the following classes are implemented in C:
538 
539     ##
540     # The Sesssion class is defined in romp_helper.so.  You should never have
541     # to use it directly.
542     #
543     class Session
544     end
545 
546     end # if false
547 
548 end