1 require 'socket'
  2 require 'thread'
  3 
  4 =begin
  5 DRuby (not to be confused with dRuby, written by Masatoshi SEKI)
  6 (C) Copyright 2001 Paul Brannan (cout at rm-f.net)
  7 
  8 DRuby is a set of classes for providing distributed object support to a
  9 Ruby program.  You may distribute and/or modify it under the same terms as
 10 Ruby (see http://www.ruby-lang.org/en/LICENSE.txt).  Example:
 11 
 12 Client 
 13 ------
 14 client = DRuby::Client.new('localhost', 4242)
 15 obj = client.resolve("foo")
 16 puts obj.foo("foo")
 17 obj.oneway(:foo, "foo")
 18 
 19 Server
 20 ------
 21 class Foo
 22     def foo(x); return x; end
 23 end
 24 obj = Foo.new
 25 server = DRuby::Server.new('localhost', 4242)
 26 server.bind(obj, "foo")
 27 server.thread.join
 28 
 29 You can do all sorts of cool things with DRuby, including passing blocks to
 30 the functions, throwing exceptions and propogating them from server to
 31 client, and more.  Unlike CORBA, where you must create an interface
 32 definition and strictly adhere to it, DRuby uses marshalling, so you can
 33 use almost any object with it.  But, alas, it is not as powerful as CORBA.
 34 
 35 On a fast machine, you should expect around 4000 messages per second with
 36 normal method calls, and up to 10000 messages per second with oneway calls.
 37 My dual-processor machine gets 3500/6500 messages per second, though, so
 38 YMMV.
 39 
 40 The DRuby message format is broken into 3 components:
 41     [ msg_type, obj_id, message ]
 42 For each msg_type, there is a specific format to the message.  Additionally,
 43 certain msg_types are only valid when being sent to the server, and others
 44 are valid only when being sent back to the client.  Here is a summary:
 45 
 46 msg_type        send to     meaning of obj_id       msg format
 47 ----------------------------------------------------------------------------
 48 REQUEST         server      obj to talk to          [:method, *args]
 49 REQUEST_BLOCK   server      obj to talk to          [:method, *args]
 50 ONEWAY          server      obj to talk to          [:method, *args]
 51 RETVAL          client      always 0                retval
 52 EXCEPTION       client      always 0                $!
 53 YIELD           client      always 0                [value, value, ...]
 54 RESOLVE         server      always 0                object_name
 55 SYNC            either      0=request, 1=response   nil
 56 
 57 Known bugs:
 58 1) Oneway calls are really slow if mixed with regular calls.  I'm not sure
 59 why.  A workaround is to use a separate connection for each type of call,
 60 but sometimes that doesn't work either.
 61 2) The server does not refuse connections for unauthorized addresses; it will
 62 instead accept a connection and then immediately close it.
 63 3) If the server is flooded with oneway calls and drops packets, it will
 64 listen for data until it encounters a valid message.  If this happens, the
 65 client will not be notified.
 66 =end
 67 
 68 module DRuby
 69 
 70 public
 71 
 72     # The DRuby server class.  Like its drb equivalent, this class spawns off
 73     # a new thread which processes requests, allowing the server to do other
 74     # things while it is doing processing for a distributed object.  This
 75     # means, though, that all objects used with DRuby must be thread-safe.
 76     class Server
 77 
 78     public
 79         attr_reader :host, :port, :obj, :thread
 80 
 81         # Start a server on the given host and port.  An acceptor may be
 82         # specified to accept/reject connections; it should be a proc that
 83         # takes a Socket as an argument and returns true or false.
 84         def initialize(host, port, acceptor=nil)
 85             @host = host
 86             @port = port
 87             @name_to_id = Hash.new
 88             @id_to_object = Array.new
 89             @id_to_acceptor = Array.new
 90             @mutex = Mutex.new
 91             @next_id = 1
 92             @unused_ids = Array.new
 93             @thread = Thread.new do
 94                 server = TCPServer.new(@host, @port)
 95                 while(socket = server.accept)
 96                     if acceptor then
 97                         if !acceptor.call(socket) then
 98                             socket.close
 99                             next
100                         end
101                     end
102                     socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1)
103                     Thread.new(socket) do |socket|
104                         session_loop(Session.new(socket))
105                     end
106                 end
107             end
108         end
109 
110         # Register an object with the server.  The object will be given an
111         # id of @next_id, and @next_id will be incremented.  We could use the
112         # object's real id, but this is insecure.  The acceptor argument is
113         # a proc that takes a Socket as an arugment, and returns true or false
114         # depending on whether the request should be allowed.  The supplied
115         # object must be thread-safe.
116         def create_reference(obj, acceptor=nil)
117             @mutex.synchronize do
118                 ref = register_private(obj, acceptor)
119                 Object_Reference.new(ref) #return
120             end
121         end
122 
123         # Finds an object in linear time and unregister it.  If you need a
124         # faster function, write one and send me a patch.
125         def delete_reference(obj)
126             @mutex.synchronize do
127                 unregister_private(obj)
128             end
129             nil #return
130         end
131 
132         # Register an object with the server and bind it to name
133         def bind(obj, name, acceptor=nil)
134             id = register_private(obj, acceptor)
135             @name_to_id[name] = id
136             nil #return
137         end
138 
139         # Main server loop.  Wait for a REQUEST message, process it, and send
140         # back a YIELD, EXCEPTION, or RETVAL message.  Note that we don't do
141         # any thread synchronization here, because all registered objects are
142         # required to be thread-safe, and the @id_to_object lookup is atomic
143         # (and if it succeeds, the object is guaranteed to be fully
144         # registered).
145         def session_loop(session)
146             while not session.finished()
147                 type, object_id, message = session.get_message()
148                 begin
149                     obj = @id_to_object[object_id]
150                     if (acceptor = @id_to_acceptor[object_id]) then
151                         if !acceptor.call(session.socket) then
152                             if type != Session::ONEWAY then
153                                 raise "No authorization"
154                             end
155                         end
156                     end
157                     case type # In order of most to least common
158                         when Session::REQUEST
159                             retval = obj.__send__(*message)
160                         when Session::ONEWAY
161                             begin; obj.__send__(*message); rescue Exception; end
162                             next
163                         when Session::REQUEST_BLOCK
164                             retval = obj.__send__(*message) do |*i|
165                                 session.send_message(Session::YIELD, 0, i)
166                             end
167                         when Session::RESOLVE
168                             retval = @name_to_id[message]
169                         when Session::SYNC
170                             session.reply_sync(object_id)
171                             next
172                         else
173                             raise "Bad session request"
174                     end
175                     session.send_message(Session::RETVAL, 0, retval)
176                 rescue Exception
177                     session.send_message(Session::EXCEPTION, 0, $!)
178                 end
179             end
180         end
181 
182     private
183         def register_private(obj, acceptor)
184             if @next_id >= Session::MAX_ID then
185                 if @unused_ids.size == 0 then
186                     raise "Object limit exceeded"
187                 else
188                     id = @unused_ids.pop
189                 end
190             end
191             @id_to_acceptor[@next_id] = acceptor
192             @id_to_object[@next_id] = obj
193             old_id = @next_id
194             @next_id = @next_id.succ()
195             old_id #return
196         end
197 
198         def unregister_private(obj)
199             delete_obj_from_array_private(@id_to_acceptor, @next_id)
200             delete_obj_from_array_private(@id_to_object, @next_id)
201         end
202 
203         def delete_obj_from_array_private(array, obj)
204             index = array.index(obj)
205             array[index] = nil unless index == nil
206         end
207     end
208 
209     # The DRuby client class.  A DRuby server must be started on the given
210     # host and port before instantiating a DRuby client.
211     class Client
212         attr_reader :host, :port
213         
214         # Connect a client to the server on the given host and port.  The
215         # user can specify sync=false to turn off synchronization and get a
216         # 20% speed boost.
217         def initialize(host, port, sync=true)
218             @host = host
219             @port = port
220             @server = TCPSocket.open(@host, @port)
221             @server.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1)
222             @session = Session.new(@server)
223             @mutex = sync ? Mutex.new : Null_Mutex.new
224         end
225 
226         # Given a string, return a proxy object that will forward requests
227         # for an object on the server with that name.
228         def resolve(object_name)
229             @mutex.synchronize do
230                 @session.send_message(Session::RESOLVE, 0, object_name)
231                 type, obj, message = @session.get_message()
232                 case type
233                     when Session::RETVAL
234                         Proxy_Object.new(@session, @mutex, message) #return
235                     when Session::EXCEPTION     ; raise message
236                     else                        ; raise RuntimeError
237                 end
238             end
239         end
240     end
241 
242 private
243 
244     # In case the user does not want synchronization.
245     class Null_Mutex
246         def synchronize
247             yield
248         end
249     end
250 
251     # All the special functions we have to keep track of
252     class Functions
253         GOOD = [
254             :inspect, :class_variables, :instance_eval, :instance_variables,
255             :to_a, :to_s
256         ]
257 
258         BAD = [
259             :clone, :dup, :display
260         ]
261 
262         METHOD = [
263             :methods, :private_methods, :protected_methods, :public_methods,
264             :singleton_methods
265         ]
266 
267         RESPOND = [
268             [ :method,      "raise NameError" ],
269             [ :respond_to?, "false" ]
270         ]
271     end
272 
273     # A DRuby::Object_Reference is created on the server side to represent an
274     # object in the system.  It can be returned from a server object to a
275     # client object, at which point it is converted into a DRuby::Proxy_Object.
276     class Object_Reference
277         attr_reader :object_id
278 
279         def initialize(object_id)
280             @object_id = object_id
281         end
282     end
283 
284     # A DRuby::Object acts as a proxy; it forwards most methods to the server
285     # for execution.
286     class Proxy_Object
287 
288     public
289 
290         def initialize(session, mutex, object_id)
291             @object_id = object_id
292             @session = session
293             @mutex = mutex
294         end
295 
296         # Client request handler.  The idea here is to send out a REQUEST
297         # message, and get back a YIELD, EXCEPTION, or RETVAL message.
298         def method_missing(method, *args)
299             message = [ method, *args ]
300             @mutex.synchronize do
301                 @session.send_message(
302                     (block_given?) ? Session::REQUEST_BLOCK : Session::REQUEST,
303                     @object_id,
304                     message)
305                 loop do
306                     type, obj, message = @session.get_message()
307                     case type
308                         when Session::RETVAL     ; return msg_to_obj(message)
309                         when Session::YIELD      ; yield msg_to_obj(message)
310                         when Session::EXCEPTION  ; raise message
311                         when Session::SYNC       ; @session.reply_sync(obj)
312                         else                     ; raise "Invalid msg type"
313                     end
314                 end
315             end
316         end
317 
318         # This is a special function that lets you ignore the return value
319         # of the calling function to get some extra speed, in exchange for not
320         # knowing when the function will complete.  A future version of the
321         # server should probably check whether a given function is allowed to
322         # be called as a oneway, in order to reduce DoS attacks.
323         def oneway(*message)
324             @mutex.synchronize do
325                 @session.send_message(Session::ONEWAY, @object_id, message)
326                 nil #return
327             end
328         end
329 
330         # sync() will synchonize the client with the server (useful for
331         # determining when a oneway call completes)
332         def sync()
333             @mutex.synchronize do
334                 @session.send_sync()
335                 @session.wait_sync()
336             end
337         end
338 
339         # Make sure certain methods get passed down the wire.
340         Functions::GOOD.each do |method|
341             eval %{
342                 def #{method}(*args)
343                       method_missing(:#{method}, *args) #return
344                 end
345             }
346         end
347 
348         # And make sure others never get called.
349         Functions::BAD.each do |method|
350             eval %{
351                 def #{method}(*args)
352                     raise(NameError,
353                         "undefined method `#{method}' for " +
354                         "\#<#{self.class}:#{self.id}>")
355                 end
356             }
357         end
358 
359         # And remove these function names from any method lists that get
360         # returned; there's nothing we can do about people who decide to
361         # return them from other functions.
362         Functions::METHOD.each do |method|
363             eval %{
364                 def #{method}(*args)
365                     retval = method_missing(:#{method}, *args)
366                     retval.each do |item|
367                         Functions::BAD.each do |bad|
368                             retval.delete(bad.to_s)
369                         end
370                     end
371                     retval #return
372                 end
373             }
374         end
375 
376         # Same here, except don't let the call go through in the first place.
377         Functions::RESPOND.each do |method, action|
378             eval %{
379                 def #{method}(arg, *args)
380                     Functions::BAD.each do |bad|
381                         if arg === bad.to_s then
382                             return eval("#{action}")
383                         end
384                     end
385                     method_missing(:#{method}, arg, *args) #return
386                 end
387             }
388         end
389 
390     private
391 
392         def msg_to_obj(obj)
393             if Object_Reference === obj then
394                 Proxy_Object.new(@session, @mutex, obj.object_id) #return
395             else
396                 obj #return
397             end
398         end
399 
400     end
401 
402     # A DRuby session sends messages back and forth between server and client.
403     # The message format is as follows:
404     # +-----+-----+-----+-----+-----+-----+-----+-----+-----+---  +  ---+-----+
405     # | MSG_START | msg. size | msg. type |  obj. id  |  marshalled msg/args  |
406     # +-----+-----+-----+-----+-----+-----+-----+-----+-----+---  +  ---+-----+
407     class Session
408         REQUEST         = 0x1001
409         REQUEST_BLOCK   = 0x1002
410         ONEWAY          = 0x1003
411         RETVAL          = 0x2001
412         EXCEPTION       = 0x2002
413         YIELD           = 0x2003
414         RESOLVE         = 0x3001
415         SYNC            = 0x4001
416         MSG_START       = 0x4242
417 
418         MAX_ID = 2**16
419 
420         attr_reader :io
421         alias_method :socket, :io
422 
423         def initialize(io)
424             @io = io
425         end
426 
427         def send_message(type, obj, message)
428             data = Marshal.dump(message)
429             header = [MSG_START, data.length, type, obj]
430             @io.write(header.pack("vvvv"))
431             @io.write(data)
432         end
433 
434         def get_message()
435             header = @io.read(8)
436             magic, size, type, obj = header.unpack("vvvv")
437             while magic != MSG_START
438                 header = @io.read(8)
439                 magic, size, type, obj = header.unpack("vvvv")
440             end
441             data = @io.read(size)
442             message = Marshal.load(data)
443             [ type, obj, message ] #return
444         end
445 
446         def finished()
447             @io.eof #return
448         end
449         
450         def send_sync()
451             send_message(Session::SYNC, 0, nil)
452         end
453         
454         def wait_sync()
455             sleep 1
456             type, obj, message = get_message()
457             if type != Session::SYNC && obj != 0 && message != nil then
458                 raise "DRuby synchonization failed"
459             end
460         end
461 
462         def reply_sync(value)
463             if value == 0 then
464                 send_message(Session::SYNC, 1, nil)
465             end
466         end
467 
468     end
469 end