1 require 'socket'
2 require 'thread'
3
4 =begin
5 DRuby (not to be confused with dRuby, written by Masatoshi SEKI)
6 (C) Copyright 2001 Paul Brannan (cout at rm-f.net)
7
8 DRuby is a set of classes for providing distributed object support to a
9 Ruby program. You may distribute and/or modify it under the same terms as
10 Ruby (see http://www.ruby-lang.org/en/LICENSE.txt). Example:
11
12 Client
13 ------
14 client = DRuby::Client.new('localhost', 4242)
15 obj = client.resolve("foo")
16 puts obj.foo("foo")
17 obj.oneway(:foo, "foo")
18
19 Server
20 ------
21 class Foo
22 def foo(x); return x; end
23 end
24 obj = Foo.new
25 server = DRuby::Server.new('localhost', 4242)
26 server.bind(obj, "foo")
27 server.thread.join
28
29 You can do all sorts of cool things with DRuby, including passing blocks to
30 the functions, throwing exceptions and propogating them from server to
31 client, and more. Unlike CORBA, where you must create an interface
32 definition and strictly adhere to it, DRuby uses marshalling, so you can
33 use almost any object with it. But, alas, it is not as powerful as CORBA.
34
35 On a fast machine, you should expect around 4000 messages per second with
36 normal method calls, and up to 10000 messages per second with oneway calls.
37 My dual-processor machine gets 3500/6500 messages per second, though, so
38 YMMV.
39
40 The DRuby message format is broken into 3 components:
41 [ msg_type, obj_id, message ]
42 For each msg_type, there is a specific format to the message. Additionally,
43 certain msg_types are only valid when being sent to the server, and others
44 are valid only when being sent back to the client. Here is a summary:
45
46 msg_type send to meaning of obj_id msg format
47 ----------------------------------------------------------------------------
48 REQUEST server obj to talk to [:method, *args]
49 REQUEST_BLOCK server obj to talk to [:method, *args]
50 ONEWAY server obj to talk to [:method, *args]
51 RETVAL client always 0 retval
52 EXCEPTION client always 0 $!
53 YIELD client always 0 [value, value, ...]
54 RESOLVE server always 0 object_name
55 SYNC either 0=request, 1=response nil
56
57 Known bugs:
58 1) Oneway calls are really slow if mixed with regular calls. I'm not sure
59 why. A workaround is to use a separate connection for each type of call,
60 but sometimes that doesn't work either.
61 2) The server does not refuse connections for unauthorized addresses; it will
62 instead accept a connection and then immediately close it.
63 3) If the server is flooded with oneway calls and drops packets, it will
64 listen for data until it encounters a valid message. If this happens, the
65 client will not be notified.
66 =end
67
68 module DRuby
69
70 public
71
72 # The DRuby server class. Like its drb equivalent, this class spawns off
73 # a new thread which processes requests, allowing the server to do other
74 # things while it is doing processing for a distributed object. This
75 # means, though, that all objects used with DRuby must be thread-safe.
76 class Server
77
78 public
79 attr_reader :host, :port, :obj, :thread
80
81 # Start a server on the given host and port. An acceptor may be
82 # specified to accept/reject connections; it should be a proc that
83 # takes a Socket as an argument and returns true or false.
84 def initialize(host, port, acceptor=nil)
85 @host = host
86 @port = port
87 @name_to_id = Hash.new
88 @id_to_object = Array.new
89 @id_to_acceptor = Array.new
90 @mutex = Mutex.new
91 @next_id = 1
92 @unused_ids = Array.new
93 @thread = Thread.new do
94 server = TCPServer.new(@host, @port)
95 while(socket = server.accept)
96 if acceptor then
97 if !acceptor.call(socket) then
98 socket.close
99 next
100 end
101 end
102 socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1)
103 Thread.new(socket) do |socket|
104 session_loop(Session.new(socket))
105 end
106 end
107 end
108 end
109
110 # Register an object with the server. The object will be given an
111 # id of @next_id, and @next_id will be incremented. We could use the
112 # object's real id, but this is insecure. The acceptor argument is
113 # a proc that takes a Socket as an arugment, and returns true or false
114 # depending on whether the request should be allowed. The supplied
115 # object must be thread-safe.
116 def create_reference(obj, acceptor=nil)
117 @mutex.synchronize do
118 ref = register_private(obj, acceptor)
119 Object_Reference.new(ref) #return
120 end
121 end
122
123 # Finds an object in linear time and unregister it. If you need a
124 # faster function, write one and send me a patch.
125 def delete_reference(obj)
126 @mutex.synchronize do
127 unregister_private(obj)
128 end
129 nil #return
130 end
131
132 # Register an object with the server and bind it to name
133 def bind(obj, name, acceptor=nil)
134 id = register_private(obj, acceptor)
135 @name_to_id[name] = id
136 nil #return
137 end
138
139 # Main server loop. Wait for a REQUEST message, process it, and send
140 # back a YIELD, EXCEPTION, or RETVAL message. Note that we don't do
141 # any thread synchronization here, because all registered objects are
142 # required to be thread-safe, and the @id_to_object lookup is atomic
143 # (and if it succeeds, the object is guaranteed to be fully
144 # registered).
145 def session_loop(session)
146 while not session.finished()
147 type, object_id, message = session.get_message()
148 begin
149 obj = @id_to_object[object_id]
150 if (acceptor = @id_to_acceptor[object_id]) then
151 if !acceptor.call(session.socket) then
152 if type != Session::ONEWAY then
153 raise "No authorization"
154 end
155 end
156 end
157 case type # In order of most to least common
158 when Session::REQUEST
159 retval = obj.__send__(*message)
160 when Session::ONEWAY
161 begin; obj.__send__(*message); rescue Exception; end
162 next
163 when Session::REQUEST_BLOCK
164 retval = obj.__send__(*message) do |*i|
165 session.send_message(Session::YIELD, 0, i)
166 end
167 when Session::RESOLVE
168 retval = @name_to_id[message]
169 when Session::SYNC
170 session.reply_sync(object_id)
171 next
172 else
173 raise "Bad session request"
174 end
175 session.send_message(Session::RETVAL, 0, retval)
176 rescue Exception
177 session.send_message(Session::EXCEPTION, 0, $!)
178 end
179 end
180 end
181
182 private
183 def register_private(obj, acceptor)
184 if @next_id >= Session::MAX_ID then
185 if @unused_ids.size == 0 then
186 raise "Object limit exceeded"
187 else
188 id = @unused_ids.pop
189 end
190 end
191 @id_to_acceptor[@next_id] = acceptor
192 @id_to_object[@next_id] = obj
193 old_id = @next_id
194 @next_id = @next_id.succ()
195 old_id #return
196 end
197
198 def unregister_private(obj)
199 delete_obj_from_array_private(@id_to_acceptor, @next_id)
200 delete_obj_from_array_private(@id_to_object, @next_id)
201 end
202
203 def delete_obj_from_array_private(array, obj)
204 index = array.index(obj)
205 array[index] = nil unless index == nil
206 end
207 end
208
209 # The DRuby client class. A DRuby server must be started on the given
210 # host and port before instantiating a DRuby client.
211 class Client
212 attr_reader :host, :port
213
214 # Connect a client to the server on the given host and port. The
215 # user can specify sync=false to turn off synchronization and get a
216 # 20% speed boost.
217 def initialize(host, port, sync=true)
218 @host = host
219 @port = port
220 @server = TCPSocket.open(@host, @port)
221 @server.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1)
222 @session = Session.new(@server)
223 @mutex = sync ? Mutex.new : Null_Mutex.new
224 end
225
226 # Given a string, return a proxy object that will forward requests
227 # for an object on the server with that name.
228 def resolve(object_name)
229 @mutex.synchronize do
230 @session.send_message(Session::RESOLVE, 0, object_name)
231 type, obj, message = @session.get_message()
232 case type
233 when Session::RETVAL
234 Proxy_Object.new(@session, @mutex, message) #return
235 when Session::EXCEPTION ; raise message
236 else ; raise RuntimeError
237 end
238 end
239 end
240 end
241
242 private
243
244 # In case the user does not want synchronization.
245 class Null_Mutex
246 def synchronize
247 yield
248 end
249 end
250
251 # All the special functions we have to keep track of
252 class Functions
253 GOOD = [
254 :inspect, :class_variables, :instance_eval, :instance_variables,
255 :to_a, :to_s
256 ]
257
258 BAD = [
259 :clone, :dup, :display
260 ]
261
262 METHOD = [
263 :methods, :private_methods, :protected_methods, :public_methods,
264 :singleton_methods
265 ]
266
267 RESPOND = [
268 [ :method, "raise NameError" ],
269 [ :respond_to?, "false" ]
270 ]
271 end
272
273 # A DRuby::Object_Reference is created on the server side to represent an
274 # object in the system. It can be returned from a server object to a
275 # client object, at which point it is converted into a DRuby::Proxy_Object.
276 class Object_Reference
277 attr_reader :object_id
278
279 def initialize(object_id)
280 @object_id = object_id
281 end
282 end
283
284 # A DRuby::Object acts as a proxy; it forwards most methods to the server
285 # for execution.
286 class Proxy_Object
287
288 public
289
290 def initialize(session, mutex, object_id)
291 @object_id = object_id
292 @session = session
293 @mutex = mutex
294 end
295
296 # Client request handler. The idea here is to send out a REQUEST
297 # message, and get back a YIELD, EXCEPTION, or RETVAL message.
298 def method_missing(method, *args)
299 message = [ method, *args ]
300 @mutex.synchronize do
301 @session.send_message(
302 (block_given?) ? Session::REQUEST_BLOCK : Session::REQUEST,
303 @object_id,
304 message)
305 loop do
306 type, obj, message = @session.get_message()
307 case type
308 when Session::RETVAL ; return msg_to_obj(message)
309 when Session::YIELD ; yield msg_to_obj(message)
310 when Session::EXCEPTION ; raise message
311 when Session::SYNC ; @session.reply_sync(obj)
312 else ; raise "Invalid msg type"
313 end
314 end
315 end
316 end
317
318 # This is a special function that lets you ignore the return value
319 # of the calling function to get some extra speed, in exchange for not
320 # knowing when the function will complete. A future version of the
321 # server should probably check whether a given function is allowed to
322 # be called as a oneway, in order to reduce DoS attacks.
323 def oneway(*message)
324 @mutex.synchronize do
325 @session.send_message(Session::ONEWAY, @object_id, message)
326 nil #return
327 end
328 end
329
330 # sync() will synchonize the client with the server (useful for
331 # determining when a oneway call completes)
332 def sync()
333 @mutex.synchronize do
334 @session.send_sync()
335 @session.wait_sync()
336 end
337 end
338
339 # Make sure certain methods get passed down the wire.
340 Functions::GOOD.each do |method|
341 eval %{
342 def #{method}(*args)
343 method_missing(:#{method}, *args) #return
344 end
345 }
346 end
347
348 # And make sure others never get called.
349 Functions::BAD.each do |method|
350 eval %{
351 def #{method}(*args)
352 raise(NameError,
353 "undefined method `#{method}' for " +
354 "\#<#{self.class}:#{self.id}>")
355 end
356 }
357 end
358
359 # And remove these function names from any method lists that get
360 # returned; there's nothing we can do about people who decide to
361 # return them from other functions.
362 Functions::METHOD.each do |method|
363 eval %{
364 def #{method}(*args)
365 retval = method_missing(:#{method}, *args)
366 retval.each do |item|
367 Functions::BAD.each do |bad|
368 retval.delete(bad.to_s)
369 end
370 end
371 retval #return
372 end
373 }
374 end
375
376 # Same here, except don't let the call go through in the first place.
377 Functions::RESPOND.each do |method, action|
378 eval %{
379 def #{method}(arg, *args)
380 Functions::BAD.each do |bad|
381 if arg === bad.to_s then
382 return eval("#{action}")
383 end
384 end
385 method_missing(:#{method}, arg, *args) #return
386 end
387 }
388 end
389
390 private
391
392 def msg_to_obj(obj)
393 if Object_Reference === obj then
394 Proxy_Object.new(@session, @mutex, obj.object_id) #return
395 else
396 obj #return
397 end
398 end
399
400 end
401
402 # A DRuby session sends messages back and forth between server and client.
403 # The message format is as follows:
404 # +-----+-----+-----+-----+-----+-----+-----+-----+-----+--- + ---+-----+
405 # | MSG_START | msg. size | msg. type | obj. id | marshalled msg/args |
406 # +-----+-----+-----+-----+-----+-----+-----+-----+-----+--- + ---+-----+
407 class Session
408 REQUEST = 0x1001
409 REQUEST_BLOCK = 0x1002
410 ONEWAY = 0x1003
411 RETVAL = 0x2001
412 EXCEPTION = 0x2002
413 YIELD = 0x2003
414 RESOLVE = 0x3001
415 SYNC = 0x4001
416 MSG_START = 0x4242
417
418 MAX_ID = 2**16
419
420 attr_reader :io
421 alias_method :socket, :io
422
423 def initialize(io)
424 @io = io
425 end
426
427 def send_message(type, obj, message)
428 data = Marshal.dump(message)
429 header = [MSG_START, data.length, type, obj]
430 @io.write(header.pack("vvvv"))
431 @io.write(data)
432 end
433
434 def get_message()
435 header = @io.read(8)
436 magic, size, type, obj = header.unpack("vvvv")
437 while magic != MSG_START
438 header = @io.read(8)
439 magic, size, type, obj = header.unpack("vvvv")
440 end
441 data = @io.read(size)
442 message = Marshal.load(data)
443 [ type, obj, message ] #return
444 end
445
446 def finished()
447 @io.eof #return
448 end
449
450 def send_sync()
451 send_message(Session::SYNC, 0, nil)
452 end
453
454 def wait_sync()
455 sleep 1
456 type, obj, message = get_message()
457 if type != Session::SYNC && obj != 0 && message != nil then
458 raise "DRuby synchonization failed"
459 end
460 end
461
462 def reply_sync(value)
463 if value == 0 then
464 send_message(Session::SYNC, 1, nil)
465 end
466 end
467
468 end
469 end