1 require 'socket'
  2 require 'thread'
  3 
  4 =begin
  5 DRuby (not to be confused with dRuby, written by Masatoshi SEKI)
  6 (C) Copyright 2001 Paul Brannan (cout at rm-f.net)
  7 
  8 DRuby is a set of classes for providing distributed object support to a
  9 Ruby program.  You may distribute and/or modify it under the same terms as
 10 Ruby (see http://www.ruby-lang.org/en/LICENSE.txt).  Example:
 11 
 12 Client 
 13 ------
 14 client = DRuby::Client.new('localhost', 4242)
 15 obj = client.resolve("foo")
 16 puts obj.foo("foo")
 17 obj.oneway(:foo, "foo")
 18 
 19 Server
 20 ------
 21 class Foo
 22     def foo(x); return x; end
 23 end
 24 obj = Foo.new
 25 server = DRuby::Server.new('localhost', 4242)
 26 server.register(obj, "foo")
 27 server.thread.join
 28 
 29 You can do all sorts of cool things with DRuby, including passing blocks to
 30 the functions, throwing exceptions and propogating them from server to
 31 client, and more.  Unlike CORBA, where you must create an interface
 32 definition and strictly adhere to it, DRuby uses marshalling, so you can
 33 use almost any object with it.  But, alas, it is not as powerful as CORBA.
 34 
 35 On a fast machine, you should expect around 4000 messages per second with
 36 normal method calls, and up to 10000 messages per second with oneway calls.
 37 My dual-processor machine gets 3500/6500 messages per second, though, so
 38 YMMV.
 39 
 40 The DRuby message format is broken into 3 components:
 41     [ msg_type, obj_id, message ]
 42 For each msg_type, there is a specific format to the message.  Additionally,
 43 certain msg_types are only valid when being sent to the server, and others
 44 are valid only when being sent back to the client.  Here is a summary:
 45 
 46 msg_type        send to     meaning of obj_id       msg format
 47 ----------------------------------------------------------------------------
 48 REQUEST         server      obj to talk to          [:method, *args]
 49 REQUEST_BLOCK   server      obj to talk to          [:method, *args]
 50 ONEWAY          server      obj to talk to          [:method, *args]
 51 RETVAL          client      always 0                retval
 52 EXCEPTION       client      always 0                $!
 53 YIELD           client      always 0                [value, value, ...]
 54 RESOLVE         server      always 0                object_name
 55 SYNC            either      0=request, 1=response   nil
 56 
 57 Known bugs:
 58 1) Oneway calls are really slow if mixed with regular calls.  I'm not sure
 59 why.  A workaround is to use a separate connection for each type of call,
 60 but sometimes that doesn't work either.
 61 2) The server does not refuse connections for unauthorized addresses; it will
 62 instead accept a connection and then immediately close it.
 63 3) If the server is flooded with oneway calls and drops packets, it will
 64 listen for data until it encounters a valid message.  If this happens, the
 65 client will not be notified.
 66 =end
 67 
 68 module DRuby
 69 
 70 public
 71 
 72     # The DRuby server class.  Like its drb equivalent, this class spawns off
 73     # a new thread which processes requests, allowing the server to do other
 74     # things while it is doing processing for a distributed object.  This
 75     # means, though, that all objects used with DRuby must be thread-safe.
 76     class Server
 77         attr_reader :host, :port, :obj, :thread
 78 
 79         # Start a server on the given host and port.  An acceptor may be
 80         # specified to accept/reject connections; it should be a proc that
 81         # takes a Socket as an argument and returns true or false.
 82         def initialize(host, port, acceptor=nil)
 83             @host = host
 84             @port = port
 85             @name_to_id = Hash.new
 86             @id_to_object = Hash.new
 87             @id_to_acceptor = Hash.new
 88             @mutex = Mutex.new
 89             @next_id = 1
 90             @thread = Thread.new do
 91                 server = TCPServer.new(@host, @port)
 92                 while(socket = server.accept)
 93                     if acceptor then
 94                         if !acceptor.call(socket) then
 95                             socket.close
 96                             next
 97                         end
 98                     end
 99                     socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1)
100                     Thread.new(socket) do |socket|
101                         session_loop(Session.new(socket))
102                     end
103                 end
104             end
105         end
106 
107         # Register an object with the server.  Once an object is registered,
108         # it cannot be unregistered.  The object will be given an id of
109         # @next_id, and @next_id will be incremented.  We could use the
110         # object's real id, but we want to be sure that the id will fit into
111         # a 16-bit short.  The acceptor argument is a proc that takes a Socket
112         # as an arugment, and returns true or false depending on whether the
113         # request should be allowed.  The supplied object must be thread-safe.
114         def register(obj, name, acceptor=nil)
115             @mutex.synchronize do
116                 if @next_id >= Session::MAX_ID then
117                     raise ArgumentError, "Object limit exceeded"
118                 end
119                 @name_to_id[name] = @next_id
120                 @id_to_acceptor[@next_id] = acceptor
121                 @id_to_object[@next_id] = obj
122                 @next_id = @next_id.succ()
123             end
124         end
125 
126         # Main server loop.  Wait for a REQUEST message, process it, and send
127         # back a YIELD, EXCEPTION, or RETVAL message.  Note that we don't do
128         # any thread synchronization here, because all registered objects are
129         # required to be thread-safe, and the @id_to_object lookup is atomic
130         # (and if it succeeds, the object is guaranteed to be fully
131         # registered).
132         def session_loop(session)
133             while not session.finished()
134                 type, object_id, message = session.get_message()
135                 obj = @id_to_object[object_id]
136                 acceptor = @id_to_acceptor[object_id]
137                 begin
138                     if acceptor then
139                         if !acceptor.call(session.socket) then
140                             if type != Session::ONEWAY then
141                                 raise ArgumentError, "No authorization"
142                             end
143                         end
144                     end
145                     case type
146                         when Session::REQUEST
147                             retval = obj.__send__(*message)
148                         when Session::REQUEST_BLOCK
149                             retval = obj.__send__(*message) do |*i|
150                                 session.send_message(Session::YIELD, 0, i)
151                             end
152                         when Session::ONEWAY
153                             begin; obj.__send__(*message); rescue Exception; end
154                             next
155                         when Session::RESOLVE
156                             retval = @name_to_id[message]
157                         when Session::SYNC
158                             session.reply_sync(object_id)
159                             next
160                         else
161                             raise ArgumentError, "Bad session request"
162                     end
163                     session.send_message(Session::RETVAL, 0, retval)
164                 rescue Exception
165                     session.send_message(Session::EXCEPTION, 0, $!)
166                 end
167             end
168         end
169     end
170 
171     # The DRuby client class.  A DRuby server must be started on the given
172     # host and port before instantiating a DRuby client.
173     class Client
174         attr_reader :host, :port
175         
176         # Connect a client to the server on the given host and port.  The
177         # user can specify sync=false to turn off synchronization and get a
178         # 20% speed boost.
179         def initialize(host, port, sync=true)
180             @host = host
181             @port = port
182             @server = TCPSocket.open(@host, @port)
183             @server.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1)
184             @session = Session.new(@server)
185             @mutex = sync ? Mutex.new : Null_Mutex.new
186         end
187 
188         # Given a string, return a proxy object that will forward requests
189         # for an object on the server with that name.
190         def resolve(object_name)
191             @mutex.synchronize do
192                 @session.send_message(Session::RESOLVE, 0, object_name)
193                 type, obj, message = @session.get_message()
194                 case type
195                     when Session::RETVAL
196                         return Object.new(@session, @mutex, message)
197                     when Session::EXCEPTION     ; raise message
198                     else                        ; raise RuntimeError
199                 end
200             end
201         end
202     end
203 
204 private
205 
206     # In case the user does not want synchronization.
207     class Null_Mutex
208         def synchronize
209             yield
210         end
211     end
212 
213     # A DRuby::Object acts as a proxy; it forwards most methods to the server
214     # for execution.
215     class Object
216 
217         def initialize(session, mutex, object_id)
218             @object_id = object_id
219             @session = session
220             @mutex = mutex
221         end
222 
223         # Client request handler.  The idea here is to send out a REQUEST
224         # message, and get back a YIELD, EXCEPTION, or RETVAL message.
225         def method_missing(method, *args)
226             message = [ method, *args ]
227             @mutex.synchronize do
228                 @session.send_message(
229                     (block_given?) ? Session::REQUEST_BLOCK : Session::REQUEST,
230                     @object_id,
231                     message)
232                 loop do
233                     type, obj, message = @session.get_message()
234                     case type
235                         when Session::RETVAL     ; return message
236                         when Session::YIELD      ; yield message
237                         when Session::EXCEPTION  ; raise message
238                         when Session::SYNC       ; @session.reply_sync(obj)
239                         else                     ; raise RuntimeError
240                     end
241                 end
242             end
243         end
244 
245         # This is a special function that lets you ignore the return value
246         # of the calling function to get some extra speed, in exchange for not
247         # knowing when the function will complete.  A future version of the
248         # server should probably check whether a given function is allowed to
249         # be called as a oneway, in order to reduce DoS attacks.
250         def oneway(*message)
251             @mutex.synchronize do
252                 @session.send_message(Session::ONEWAY, @object_id, message)
253                 return nil
254             end
255         end
256 
257         # sync() will synchonize the client with the server (useful for
258         # determining when a oneway call completes)
259         def sync()
260             @mutex.synchronize do
261                 @session.send_sync()
262                 @session.wait_sync()
263             end
264         end
265 
266         GOOD_FUNCTIONS = [
267             :inspect, :class_variables, :instance_eval, :instance_variables,
268             :to_a, :to_s
269         ]
270 
271         BAD_FUNCTIONS = [
272             :clone, :dup, :display
273         ]
274 
275         METHOD_FUNCTIONS = [
276             :methods, :private_methods, :protected_methods, :public_methods,
277             :singleton_methods
278         ]
279 
280         RESPOND_FUNCTIONS = [
281             [ :method,      "raise NameError" ],
282             [ :respond_to?, "return false" ]
283         ]
284 
285         # Make sure certain methods get passed down the wire.
286         GOOD_FUNCTIONS.each do |method|
287             eval %{
288                 def #{method}(*args)
289                       return method_missing(:#{method}, *args)
290                 end
291             }
292         end
293 
294         # And make sure others never get called.
295         BAD_FUNCTIONS.each do |method|
296             undef_method method
297         end
298 
299         # And remove these function names from any method lists that get
300         # returned; there's nothing we can do about people who decide to
301         # return them from other functions.
302         METHOD_FUNCTIONS.each do |method|
303             eval %{
304                 def #{method}(*args)
305                     retval = method_missing(:#{method}, *args)
306                     retval.each do |item|
307                         BAD_FUNCTIONS.each do |bad|
308                             retval.delete(bad.to_s)
309                         end
310                     end
311                     return retval
312                 end
313             }
314         end
315 
316         # Same here, except don't let the call go through in the first place.
317         RESPOND_FUNCTIONS.each do |method, action|
318             eval %{
319                 def #{method}(arg, *args)
320                     BAD_FUNCTIONS.each do |bad|
321                         if arg === bad.to_s then
322                             eval("#{action}")
323                         end
324                     end
325                     return method_missing(:#{method}, arg, *args)
326                 end
327             }
328         end
329     end
330 
331     # A DRuby session sends messages back and forth between server and client.
332     # The message format is as follows:
333     # +-----+-----+-----+-----+-----+-----+-----+-----+-----+---  +  ---+-----+
334     # | MSG_START | msg. size | msg. type |  obj. id  |  marshalled msg/args  |
335     # +-----+-----+-----+-----+-----+-----+-----+-----+-----+---  +  ---+-----+
336     class Session
337         REQUEST         = 0x1001
338         REQUEST_BLOCK   = 0x1002
339         ONEWAY          = 0x1003
340         RETVAL          = 0x2001
341         EXCEPTION       = 0x2002
342         YIELD           = 0x2003
343         RESOLVE         = 0x3001
344         SYNC            = 0x4001
345         MSG_START       = 0x4242
346 
347         MAX_ID = 2**16
348 
349         attr_reader :io
350         alias_method :socket, :io
351 
352         def initialize(io)
353             @io = io
354         end
355 
356         def send_message(type, obj, message)
357             data = Marshal.dump(message)
358             header = [MSG_START, data.length, type, obj]
359             @io.write(header.pack("vvvv"))
360             @io.write(data)
361         end
362 
363         def get_message()
364             header = @io.read(8)
365             magic, size, type, obj = header.unpack("vvvv")
366             while magic != MSG_START
367                 header = @io.read(8)
368                 magic, size, type, obj = header.unpack("vvvv")
369             end
370             data = @io.read(size)
371             message = Marshal.load(data)
372             return [ type, obj, message ]
373         end
374 
375         def finished()
376             return @io.eof
377         end
378         
379         def send_sync()
380             send_message(Session::SYNC, 0, nil)
381         end
382         
383         def wait_sync()
384             sleep 1
385             type, obj, message = get_message()
386             if type != Session::SYNC && obj != 0 && message != nil then
387                 raise RuntimeError, "DRuby synchonization failed"
388             end
389         end
390 
391         def reply_sync(value)
392             if value == 0 then
393                 send_message(Session::SYNC, 1, nil)
394             end
395         end
396 
397     end
398 
399 end