1 require 'socket'
2 require 'thread'
3 require 'fcntl'
4 require 'romp_helper'
5
6 ##
7 # ROMP - The Ruby Object Message Proxy
8 # @author Paul Brannan
9 # @version 0.1
10 # (C) Copyright 2001 Paul Brannan (cout at rm-f.net)
11 #
12 # <pre>
13 # ROMP is a set of classes for providing distributed object support to a
14 # Ruby program. You may distribute and/or modify it under the same terms as
15 # Ruby (see http://www.ruby-lang.org/en/LICENSE.txt). Example:
16 #
17 # Client
18 # ------
19 # client = ROMP::Client.new('localhost', 4242)
20 # obj = client.resolve("foo")
21 # puts obj.foo("foo")
22 # obj.oneway(:foo, "foo")
23 #
24 # Server
25 # ------
26 # class Foo
27 # def foo(x); return x; end
28 # end
29 # obj = Foo.new
30 # server = ROMP::Server.new('localhost', 4242)
31 # server.bind(obj, "foo")
32 # server.thread.join
33 #
34 # You can do all sorts of cool things with ROMP, including passing blocks to
35 # the functions, throwing exceptions and propogating them from server to
36 # client, and more. Unlike CORBA, where you must create an interface
37 # definition and strictly adhere to it, ROMP uses marshalling, so you can
38 # use almost any object with it. But, alas, it is not as powerful as CORBA.
39 #
40 # On a fast machine, you should expect around 7000 messages per second with
41 # normal method calls, up to 10000 messages per second with oneway calls with
42 # sync, and up to 40000 messages per second for oneway calls without sync.
43 # These numbers can vary depending on various factors, so YMMV.
44 #
45 # The ROMP message format is broken into 3 components:
46 # [ msg_type, obj_id, message ]
47 # For each msg_type, there is a specific format to the message. Additionally,
48 # certain msg_types are only valid when being sent to the server, and others
49 # are valid only when being sent back to the client. Here is a summary:
50 #
51 # msg_type send to meaning of obj_id msg format
52 # ----------------------------------------------------------------------------
53 # REQUEST server obj to talk to [:method, *args]
54 # REQUEST_BLOCK server obj to talk to [:method, *args]
55 # ONEWAY server obj to talk to [:method, *args]
56 # ONEWAY_SYNC server obj to talk to [:method, *args]
57 # RETVAL client always 0 retval
58 # EXCEPTION client always 0 $!
59 # YIELD client always 0 [value, value, ...]
60 # SYNC either 0=request, 1=response nil
61 # NULL_MSG either always 0 n/a
62 #
63 # BUGS:
64 # - On a 2.2 kernel, oneway calls without sync is very slow.
65 # - UDP support does not currently work.
66 # </pre>
67
68 module ROMP
69
70 public
71
72 ##
73 # The ROMP server class. Like its drb equivalent, this class spawns off
74 # a new thread which processes requests, allowing the server to do other
75 # things while it is doing processing for a distributed object. This
76 # means, though, that all objects used with ROMP must be thread-safe.
77 #
78 class Server
79
80 public
81 attr_reader :obj, :thread
82
83 ##
84 # Start a ROMP server.
85 #
86 # @param endpoint An endpoint for the server to listen on; should be specified in URI notation.
87 # @param acceptor A proc object that can accept or reject connections; it should take a Socket as an argument and returns true or false.
88 # @param debug Turns on debugging messages if enabled.
89 #
90 def initialize(endpoint, acceptor=nil, debug=false)
91 @mutex = Mutex.new
92 @debug = debug
93 @resolve_server = Resolve_Server.new
94 @resolve_obj = Resolve_Obj.new(@resolve_server)
95 @resolve_server.register(@resolve_obj)
96
97 @thread = Thread.new do
98 server = Generic_Server.new(endpoint)
99 while(socket = server.accept)
100 puts "Got a connection" if @debug
101 if acceptor then
102 if !acceptor.call(socket) then
103 socket.close
104 next
105 end
106 end
107 puts "Accepted the connection" if @debug
108 session = Session.new(socket)
109 session.set_nonblock(true)
110 Thread.new(socket) do |socket|
111 Thread.current.abort_on_exception = true
112 begin
113 # TODO: Send a sync message to the client so it
114 # knows we are ready to receive data.
115 server_loop(session)
116 rescue Exception
117 ROMP::print_exception($!) if @debug
118 end
119 puts "Connection closed" if @debug
120 end
121 end
122 end
123 end
124
125 ##
126 # Register an object with the server. The object will be given an
127 # id of @next_id, and @next_id will be incremented. We could use the
128 # object's real id, but this is insecure. The supplied object must
129 # be thread-safe.
130 #
131 # @param obj The object to register.
132 #
133 # @return A new Object_Reference that should be returned to the client.
134 #
135 def create_reference(obj)
136 @mutex.synchronize do
137 id = @resolve_server.register(obj)
138 Object_Reference.new(id) #return
139 end
140 end
141
142 ##
143 # Find an object in linear time and unregister it. Be careful with
144 # this function, because the client may not know the object has
145 # gone away.
146 #
147 # @param obj The object to unregister.
148 #
149 def delete_reference(obj)
150 @mutex.synchronize do
151 @resolve_server.unregister(obj)
152 end
153 nil #return
154 end
155
156 ##
157 # Register an object with the server and bind it to name.
158 #
159 # @param obj The object to bind.
160 # @param name The name of to bind the object to.
161 #
162 def bind(obj, name)
163 id = @resolve_server.register(obj)
164 @resolve_server.bind(name, id)
165 nil #return
166 end
167
168 ##
169 # This keeps the client from seeing our objects when they call inspect
170 #
171 alias_method :__inspect__, :inspect
172 def inspect()
173 return ""
174 end
175
176 private
177 if false then # the following functions are implemented in C:
178
179 ##
180 # The server_loop function is the guts of the server. It takes in
181 # requests from the client and forwards them to already-registered
182 # objects.
183 #
184 # @param session The session to run the loop with.
185 #
186 def server_loop(session)
187 end
188
189 end # if false
190 end
191
192 ##
193 # The ROMP client class. A ROMP server must be started on the given
194 # host and port before instantiating a ROMP client.
195 #
196 class Client
197
198 ##
199 # Connect to a ROMP server
200 #
201 # @param endpoint The endpoint the server is listening on.
202 # @param sync Specifies whether to synchronize between threads; turn this off to get a 20% performance boost.
203 #
204 def initialize(endpoint, sync=true)
205 @server = Generic_Client.new(endpoint)
206 @session = Session.new(@server)
207 @session.set_nonblock(true)
208 @mutex = sync ? Mutex.new : Null_Mutex.new
209 @resolve_obj = Proxy_Object.new(@session, @mutex, 0)
210 end
211
212 ##
213 # Given a string, return a proxy object that will forward requests
214 # for an object on the server with that name.
215 #
216 # @param object_name The name of the object to resolve.
217 #
218 # @return A Proxy_Object that can be used to make method calls on the object in the server.
219 #
220 def resolve(object_name)
221 @mutex.synchronize do
222 object_id = @resolve_obj.resolve(object_name)
223 return Proxy_Object.new(@session, @mutex, object_id)
224 end
225 end
226 end
227
228 private
229
230 ##
231 # In case the user does not want synchronization.
232 #
233 class Null_Mutex
234 def synchronize
235 yield
236 end
237
238 def lock
239 end
240
241 def unlock
242 end
243 end
244
245 ##
246 # All the special functions we have to keep track of
247 #
248 class Functions
249 GOOD = [
250 :inspect, :class_variables, :instance_eval, :instance_variables,
251 :to_a, :to_s
252 ]
253
254 BAD = [
255 :clone, :dup, :display
256 ]
257
258 METHOD = [
259 :methods, :private_methods, :protected_methods, :public_methods,
260 :singleton_methods
261 ]
262
263 RESPOND = [
264 [ :method, "raise NameError" ],
265 [ :respond_to?, "false" ]
266 ]
267 end
268
269 ##
270 # A ROMP::Object_Reference is created on the server side to represent an
271 # object in the system. It can be returned from a server object to a
272 # client object, at which point it is converted into a ROMP::Proxy_Object.
273 #
274 class Object_Reference
275 attr_reader :object_id
276
277 def initialize(object_id)
278 @object_id = object_id
279 end
280 end
281
282 ##
283 # A ROMP::Object acts as a proxy; it forwards most methods to the server
284 # for execution. When you make calls to a ROMP server, you will be
285 # making the calls through a Proxy_Object.
286 #
287 class Proxy_Object
288
289 if false then # the following functions are implemented in C:
290
291 ##
292 # The method_missing function is called for any method that is not
293 # defined on the client side. It will forward requests to the server
294 # for processing, and can iterate through a block, raise an exception,
295 # or return a value.
296 #
297 def method_missing(function, *args)
298 end
299
300 ##
301 # The oneway function is called to make a oneway call to the server
302 # without synchronization.
303 #
304 def onweway(function, *args)
305 end
306
307 ##
308 # The oneway_sync function is called to make a oneway call to the
309 # server with synchronization (the server will return a null message
310 # to the client before it begins processing). This is slightly safer
311 # than a normal oneway call, but it is slower (except on a linux 2.2
312 # kernel; see the bug list above).
313 #
314 def oneway_sync(function, *args)
315 end
316
317 ##
318 # The sync function will synchronize with the server. It sends a sync
319 # request and waits for a response.
320 #
321 def sync()
322 end
323
324 end # if false
325
326 # Make sure certain methods get passed down the wire.
327 Functions::GOOD.each do |method|
328 eval %{
329 def #{method}(*args)
330 method_missing(:#{method}, *args) #return
331 end
332 }
333 end
334
335 # And make sure others never get called.
336 Functions::BAD.each do |method|
337 eval %{
338 def #{method}(*args)
339 raise(NameError,
340 "undefined method `#{method}' for " +
341 "\#<#{self.class}:#{self.id}>")
342 end
343 }
344 end
345
346 # And remove these function names from any method lists that get
347 # returned; there's nothing we can do about people who decide to
348 # return them from other functions.
349 Functions::METHOD.each do |method|
350 eval %{
351 def #{method}(*args)
352 retval = method_missing(:#{method}, *args)
353 retval.each do |item|
354 Functions::BAD.each do |bad|
355 retval.delete(bad.to_s)
356 end
357 end
358 retval #return
359 end
360 }
361 end
362
363 # Same here, except don't let the call go through in the first place.
364 Functions::RESPOND.each do |method, action|
365 eval %{
366 def #{method}(arg, *args)
367 Functions::BAD.each do |bad|
368 if arg === bad.to_s then
369 return eval("#{action}")
370 end
371 end
372 method_missing(:#{method}, arg, *args) #return
373 end
374 }
375 end
376
377 end
378
379 ##
380 # The Resolve_Server class registers objects for the server. You will
381 # never have to use this class directly.
382 #
383 class Resolve_Server
384 def initialize
385 @next_id = 0
386 @unused_ids = Array.new
387 @id_to_object = Hash.new
388 @name_to_id = Hash.new
389 end
390
391 def register(obj)
392 if @next_id >= Session::MAX_ID then
393 if @unused_ids.size == 0 then
394 raise "Object limit exceeded"
395 else
396 id = @unused_ids.pop
397 end
398 end
399 @id_to_object[@next_id] = obj
400 old_id = @next_id
401 @next_id = @next_id.succ()
402 old_id #return
403 end
404
405 def get_object(object_id)
406 @id_to_object[object_id] #return
407 end
408
409 def unregister(obj)
410 delete_obj_from_array_private(@id_to_object, obj)
411 end
412
413 def bind(name, id)
414 @name_to_id[name] = id
415 end
416
417 def resolve(name)
418 @name_to_id[name] #return
419 end
420
421 def delete_obj_from_array_private(array, obj)
422 index = array.index(obj)
423 array[index] = nil unless index == nil
424 end
425 end
426
427 ##
428 # The Resolve_Obj class handles resolve requests for the client. It is
429 # a special ROMP object with an object id of 0. You will never have to
430 # make calls on it directly, but will instead make calls on it through
431 # the Client object.
432 #
433 class Resolve_Obj
434 def initialize(resolve_server)
435 @resolve_server = resolve_server
436 end
437
438 def resolve(name)
439 @resolve_server.resolve(name) #return
440 end
441 end
442
443 ##
444 # A Generic_Server creates an endpoint to listen on, waits for connections,
445 # and accepts them when requested. It can operate on different kinds of
446 # connections. You will never have to use this object directly.
447 #
448 class Generic_Server
449 def initialize(endpoint)
450 case endpoint
451 when %r{^(tcp)?romp://(.*?):(.*)}
452 @type = "tcp"
453 @host = $2 == "" ? nil : $2
454 @port = $3
455 @server = TCPServer.new(@host, @port)
456 when %r{^(udp)romp://(.*?):(.*)}
457 @type = "udp"
458 @host = $2 == "" ? nil : $2
459 @port = $3
460 @server = UDPSocket.open()
461 @server.bind(@host, @port)
462 @mutex = Mutex.new
463 when %r{^(unix)romp://(.*)}
464 @type = "unix"
465 @path = $2
466 @server = UNIXServer.open(@path)
467 else
468 raise ArgumentError, "Invalid endpoint"
469 end
470 end
471 def accept
472 case @type
473 when "tcp"
474 socket = @server.accept
475 socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1)
476 socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
477 socket.sync = true
478 socket #return
479 when "udp"
480 @mutex.lock
481 socket = @server
482 socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
483 socket #return
484 when "unix"
485 socket = @server.accept
486 socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
487 socket.sync = true
488 socket #return
489 end
490 end
491 end
492
493 ##
494 # A Generic_Client connects to a Generic_Server on a given endpoint.
495 # You will never have to use this object directly.
496 #
497 class Generic_Client
498 def self.new(endpoint)
499 case endpoint
500 when %r{^(tcp)?romp://(.*?):(.*)}
501 socket = TCPSocket.open($2, $3)
502 socket.sync = true
503 socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1)
504 socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
505 socket #return
506 when %r{^(udp)romp://(.*?):(.*)}
507 socket = UDPSocket.open
508 socket.connect($2, $3)
509 socket #return
510 when %r{^(unix)romp://(.*)}
511 socket = UNIXSocket.open($2)
512 else
513 raise ArgumentError, "Invalid endpoint"
514 end
515 end
516 end
517
518
519 ##
520 # Print an exception to the screen. This is necessary, because Ruby does
521 # not give us access to its error_print function from within Ruby.
522 #
523 # @param exc The exception object to print.
524 #
525 def self.print_exception(exc)
526 first = true
527 $!.backtrace.each do |bt|
528 if first then
529 puts "#{bt}: #{$!} (#{$!.message})"
530 else
531 puts "\tfrom #{bt}"
532 end
533 first = false
534 end
535 end
536
537 if false then # the following classes are implemented in C:
538
539 ##
540 # The Sesssion class is defined in romp_helper.so. You should never have
541 # to use it directly.
542 #
543 class Session
544 end
545
546 end # if false
547
548 end