1 require 'socket'
2 require 'thread'
3
4 =begin
5 DRuby (not to be confused with dRuby, written by Masatoshi SEKI)
6 (C) Copyright 2001 Paul Brannan (cout at rm-f.net)
7
8 DRuby is a set of classes for providing distributed object support to a
9 Ruby program. You may distribute and/or modify it under the same terms as
10 Ruby (see http://www.ruby-lang.org/en/LICENSE.txt). Example:
11
12 Client
13 ------
14 client = DRuby::Client.new('localhost', 4242)
15 obj = client.resolve("foo")
16 puts obj.foo("foo")
17 obj.oneway(:foo, "foo")
18
19 Server
20 ------
21 class Foo
22 def foo(x); return x; end
23 end
24 obj = Foo.new
25 server = DRuby::Server.new('localhost', 4242)
26 server.register(obj, "foo")
27 server.thread.join
28
29 You can do all sorts of cool things with DRuby, including passing blocks to
30 the functions, throwing exceptions and propogating them from server to
31 client, and more. Unlike CORBA, where you must create an interface
32 definition and strictly adhere to it, DRuby uses marshalling, so you can
33 use almost any object with it. But, alas, it is not as powerful as CORBA.
34
35 On a fast machine, you should expect around 4000 messages per second with
36 normal method calls, and up to 10000 messages per second with oneway calls.
37 My dual-processor machine gets 3500/6500 messages per second, though, so
38 YMMV.
39
40 The DRuby message format is broken into 3 components:
41 [ msg_type, obj_id, message ]
42 For each msg_type, there is a specific format to the message. Additionally,
43 certain msg_types are only valid when being sent to the server, and others
44 are valid only when being sent back to the client. Here is a summary:
45
46 msg_type send to meaning of obj_id msg format
47 ----------------------------------------------------------------------------
48 REQUEST server obj to talk to [:method, *args]
49 REQUEST_BLOCK server obj to talk to [:method, *args]
50 ONEWAY server obj to talk to [:method, *args]
51 RETVAL client always 0 retval
52 EXCEPTION client always 0 $!
53 YIELD client always 0 [value, value, ...]
54 RESOLVE server always 0 object_name
55 SYNC either 0=request, 1=response nil
56
57 Known bugs:
58 1) Oneway calls are really slow if mixed with regular calls. I'm not sure
59 why. A workaround is to use a separate connection for each type of call,
60 but sometimes that doesn't work either.
61 2) The server does not refuse connections for unauthorized addresses; it will
62 instead accept a connection and then immediately close it.
63 3) If the server is flooded with oneway calls and drops packets, it will
64 listen for data until it encounters a valid message. If this happens, the
65 client will not be notified.
66 =end
67
68 module DRuby
69
70 public
71
72 # The DRuby server class. Like its drb equivalent, this class spawns off
73 # a new thread which processes requests, allowing the server to do other
74 # things while it is doing processing for a distributed object. This
75 # means, though, that all objects used with DRuby must be thread-safe.
76 class Server
77 attr_reader :host, :port, :obj, :thread
78
79 # Start a server on the given host and port. An acceptor may be
80 # specified to accept/reject connections; it should be a proc that
81 # takes a Socket as an argument and returns true or false.
82 def initialize(host, port, acceptor=nil)
83 @host = host
84 @port = port
85 @name_to_id = Hash.new
86 @id_to_object = Hash.new
87 @id_to_acceptor = Hash.new
88 @mutex = Mutex.new
89 @next_id = 1
90 @thread = Thread.new do
91 server = TCPServer.new(@host, @port)
92 while(socket = server.accept)
93 if acceptor then
94 if !acceptor.call(socket) then
95 socket.close
96 next
97 end
98 end
99 socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1)
100 Thread.new(socket) do |socket|
101 session_loop(Session.new(socket))
102 end
103 end
104 end
105 end
106
107 # Register an object with the server. Once an object is registered,
108 # it cannot be unregistered. The object will be given an id of
109 # @next_id, and @next_id will be incremented. We could use the
110 # object's real id, but we want to be sure that the id will fit into
111 # a 16-bit short. The acceptor argument is a proc that takes a Socket
112 # as an arugment, and returns true or false depending on whether the
113 # request should be allowed. The supplied object must be thread-safe.
114 def register(obj, name, acceptor=nil)
115 @mutex.synchronize do
116 if @next_id >= Session::MAX_ID then
117 raise ArgumentError, "Object limit exceeded"
118 end
119 @name_to_id[name] = @next_id
120 @id_to_acceptor[@next_id] = acceptor
121 @id_to_object[@next_id] = obj
122 @next_id = @next_id.succ()
123 end
124 end
125
126 # Main server loop. Wait for a REQUEST message, process it, and send
127 # back a YIELD, EXCEPTION, or RETVAL message. Note that we don't do
128 # any thread synchronization here, because all registered objects are
129 # required to be thread-safe, and the @id_to_object lookup is atomic
130 # (and if it succeeds, the object is guaranteed to be fully
131 # registered).
132 def session_loop(session)
133 while not session.finished()
134 type, object_id, message = session.get_message()
135 obj = @id_to_object[object_id]
136 acceptor = @id_to_acceptor[object_id]
137 begin
138 if acceptor then
139 if !acceptor.call(session.socket) then
140 if type != Session::ONEWAY then
141 raise ArgumentError, "No authorization"
142 end
143 end
144 end
145 case type
146 when Session::REQUEST
147 retval = obj.__send__(*message)
148 when Session::REQUEST_BLOCK
149 retval = obj.__send__(*message) do |*i|
150 session.send_message(Session::YIELD, 0, i)
151 end
152 when Session::ONEWAY
153 begin; obj.__send__(*message); rescue Exception; end
154 next
155 when Session::RESOLVE
156 retval = @name_to_id[message]
157 when Session::SYNC
158 session.reply_sync(object_id)
159 next
160 else
161 raise ArgumentError, "Bad session request"
162 end
163 session.send_message(Session::RETVAL, 0, retval)
164 rescue Exception
165 session.send_message(Session::EXCEPTION, 0, $!)
166 end
167 end
168 end
169 end
170
171 # The DRuby client class. A DRuby server must be started on the given
172 # host and port before instantiating a DRuby client.
173 class Client
174 attr_reader :host, :port
175
176 # Connect a client to the server on the given host and port. The
177 # user can specify sync=false to turn off synchronization and get a
178 # 20% speed boost.
179 def initialize(host, port, sync=true)
180 @host = host
181 @port = port
182 @server = TCPSocket.open(@host, @port)
183 @server.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1)
184 @session = Session.new(@server)
185 @mutex = sync ? Mutex.new : Null_Mutex.new
186 end
187
188 # Given a string, return a proxy object that will forward requests
189 # for an object on the server with that name.
190 def resolve(object_name)
191 @mutex.synchronize do
192 @session.send_message(Session::RESOLVE, 0, object_name)
193 type, obj, message = @session.get_message()
194 case type
195 when Session::RETVAL
196 return Object.new(@session, @mutex, message)
197 when Session::EXCEPTION ; raise message
198 else ; raise RuntimeError
199 end
200 end
201 end
202 end
203
204 private
205
206 # In case the user does not want synchronization.
207 class Null_Mutex
208 def synchronize
209 yield
210 end
211 end
212
213 # A DRuby::Object acts as a proxy; it forwards most methods to the server
214 # for execution.
215 class Object
216
217 def initialize(session, mutex, object_id)
218 @object_id = object_id
219 @session = session
220 @mutex = mutex
221 end
222
223 # Client request handler. The idea here is to send out a REQUEST
224 # message, and get back a YIELD, EXCEPTION, or RETVAL message.
225 def method_missing(method, *args)
226 message = [ method, *args ]
227 @mutex.synchronize do
228 @session.send_message(
229 (block_given?) ? Session::REQUEST_BLOCK : Session::REQUEST,
230 @object_id,
231 message)
232 loop do
233 type, obj, message = @session.get_message()
234 case type
235 when Session::RETVAL ; return message
236 when Session::YIELD ; yield message
237 when Session::EXCEPTION ; raise message
238 when Session::SYNC ; @session.reply_sync(obj)
239 else ; raise RuntimeError
240 end
241 end
242 end
243 end
244
245 # This is a special function that lets you ignore the return value
246 # of the calling function to get some extra speed, in exchange for not
247 # knowing when the function will complete. A future version of the
248 # server should probably check whether a given function is allowed to
249 # be called as a oneway, in order to reduce DoS attacks.
250 def oneway(*message)
251 @mutex.synchronize do
252 @session.send_message(Session::ONEWAY, @object_id, message)
253 return nil
254 end
255 end
256
257 # sync() will synchonize the client with the server (useful for
258 # determining when a oneway call completes)
259 def sync()
260 @mutex.synchronize do
261 @session.send_sync()
262 @session.wait_sync()
263 end
264 end
265
266 GOOD_FUNCTIONS = [
267 :inspect, :class_variables, :instance_eval, :instance_variables,
268 :to_a, :to_s
269 ]
270
271 BAD_FUNCTIONS = [
272 :clone, :dup, :display
273 ]
274
275 METHOD_FUNCTIONS = [
276 :methods, :private_methods, :protected_methods, :public_methods,
277 :singleton_methods
278 ]
279
280 RESPOND_FUNCTIONS = [
281 [ :method, "raise NameError" ],
282 [ :respond_to?, "return false" ]
283 ]
284
285 # Make sure certain methods get passed down the wire.
286 GOOD_FUNCTIONS.each do |method|
287 eval %{
288 def #{method}(*args)
289 return method_missing(:#{method}, *args)
290 end
291 }
292 end
293
294 # And make sure others never get called.
295 BAD_FUNCTIONS.each do |method|
296 undef_method method
297 end
298
299 # And remove these function names from any method lists that get
300 # returned; there's nothing we can do about people who decide to
301 # return them from other functions.
302 METHOD_FUNCTIONS.each do |method|
303 eval %{
304 def #{method}(*args)
305 retval = method_missing(:#{method}, *args)
306 retval.each do |item|
307 BAD_FUNCTIONS.each do |bad|
308 retval.delete(bad.to_s)
309 end
310 end
311 return retval
312 end
313 }
314 end
315
316 # Same here, except don't let the call go through in the first place.
317 RESPOND_FUNCTIONS.each do |method, action|
318 eval %{
319 def #{method}(arg, *args)
320 BAD_FUNCTIONS.each do |bad|
321 if arg === bad.to_s then
322 eval("#{action}")
323 end
324 end
325 return method_missing(:#{method}, arg, *args)
326 end
327 }
328 end
329 end
330
331 # A DRuby session sends messages back and forth between server and client.
332 # The message format is as follows:
333 # +-----+-----+-----+-----+-----+-----+-----+-----+-----+--- + ---+-----+
334 # | MSG_START | msg. size | msg. type | obj. id | marshalled msg/args |
335 # +-----+-----+-----+-----+-----+-----+-----+-----+-----+--- + ---+-----+
336 class Session
337 REQUEST = 0x1001
338 REQUEST_BLOCK = 0x1002
339 ONEWAY = 0x1003
340 RETVAL = 0x2001
341 EXCEPTION = 0x2002
342 YIELD = 0x2003
343 RESOLVE = 0x3001
344 SYNC = 0x4001
345 MSG_START = 0x4242
346
347 MAX_ID = 2**16
348
349 attr_reader :io
350 alias_method :socket, :io
351
352 def initialize(io)
353 @io = io
354 end
355
356 def send_message(type, obj, message)
357 data = Marshal.dump(message)
358 header = [MSG_START, data.length, type, obj]
359 @io.write(header.pack("vvvv"))
360 @io.write(data)
361 end
362
363 def get_message()
364 header = @io.read(8)
365 magic, size, type, obj = header.unpack("vvvv")
366 while magic != MSG_START
367 header = @io.read(8)
368 magic, size, type, obj = header.unpack("vvvv")
369 end
370 data = @io.read(size)
371 message = Marshal.load(data)
372 return [ type, obj, message ]
373 end
374
375 def finished()
376 return @io.eof
377 end
378
379 def send_sync()
380 send_message(Session::SYNC, 0, nil)
381 end
382
383 def wait_sync()
384 sleep 1
385 type, obj, message = get_message()
386 if type != Session::SYNC && obj != 0 && message != nil then
387 raise RuntimeError, "DRuby synchonization failed"
388 end
389 end
390
391 def reply_sync(value)
392 if value == 0 then
393 send_message(Session::SYNC, 1, nil)
394 end
395 end
396
397 end
398
399 end