1 require 'socket'
2 require 'thread'
3 require 'fcntl'
4 require 'romp_helper'
5
6 ##
7 # ROMP - The Ruby Object Message Proxy
8 # @author Paul Brannan
9 # @version 0.1
10 # (C) Copyright 2001 Paul Brannan (cout at rm-f.net)
11 #
12 # <pre>
13 # ROMP is a set of classes for providing distributed object support to a
14 # Ruby program. You may distribute and/or modify it under the same terms as
15 # Ruby (see http://www.ruby-lang.org/en/LICENSE.txt). Example:
16 #
17 # Client
18 # ------
19 # client = ROMP::Client.new('localhost', 4242)
20 # obj = client.resolve("foo")
21 # puts obj.foo("foo")
22 # obj.oneway(:foo, "foo")
23 #
24 # Server
25 # ------
26 # class Foo
27 # def foo(x); return x; end
28 # end
29 # obj = Foo.new
30 # server = ROMP::Server.new('localhost', 4242)
31 # server.bind(obj, "foo")
32 # server.thread.join
33 #
34 # You can do all sorts of cool things with ROMP, including passing blocks to
35 # the functions, throwing exceptions and propogating them from server to
36 # client, and more. Unlike CORBA, where you must create an interface
37 # definition and strictly adhere to it, ROMP uses marshalling, so you can
38 # use almost any object with it. But, alas, it is not as powerful as CORBA.
39 #
40 # On a fast machine, you should expect around 7000 messages per second with
41 # normal method calls, up to 10000 messages per second with oneway calls with
42 # sync, and up to 40000 messages per second for oneway calls without sync.
43 # These numbers can vary depending on various factors, so YMMV.
44 #
45 # The ROMP message format is broken into 3 components:
46 # [ msg_type, obj_id, message ]
47 # For each msg_type, there is a specific format to the message. Additionally,
48 # certain msg_types are only valid when being sent to the server, and others
49 # are valid only when being sent back to the client. Here is a summary:
50 #
51 # msg_type send to meaning of obj_id msg format
52 # ----------------------------------------------------------------------------
53 # REQUEST server obj to talk to [:method, *args]
54 # REQUEST_BLOCK server obj to talk to [:method, *args]
55 # ONEWAY server obj to talk to [:method, *args]
56 # ONEWAY_SYNC server obj to talk to [:method, *args]
57 # RETVAL client always 0 retval
58 # EXCEPTION client always 0 $!
59 # YIELD client always 0 [value, value, ...]
60 # SYNC either 0=request, 1=response nil
61 # NULL_MSG either always 0 n/a
62 #
63 # BUGS:
64 # - On a 2.2 kernel, oneway calls without sync is very slow.
65 # - UDP support does not currently work.
66 # </pre>
67
68 module ROMP
69
70 public
71
72 ##
73 # The ROMP server class. Like its drb equivalent, this class spawns off
74 # a new thread which processes requests, allowing the server to do other
75 # things while it is doing processing for a distributed object. This
76 # means, though, that all objects used with ROMP must be thread-safe.
77 #
78 class Server
79
80 public
81 attr_reader :obj, :thread
82
83 ##
84 # Start a ROMP server.
85 #
86 # @param endpoint An endpoint for the server to listen on; should be specified in URI notation.
87 # @param acceptor A proc object that can accept or reject connections; it should take a Socket as an argument and returns true or false.
88 # @param debug Turns on debugging messages if enabled.
89 #
90 def initialize(endpoint, acceptor=nil, debug=false)
91 @mutex = Mutex.new
92 @debug = debug
93 @resolve_server = Resolve_Server.new
94 @resolve_obj = Resolve_Obj.new(@resolve_server)
95 @resolve_server.register(@resolve_obj)
96
97 @thread = Thread.new do
98 server = Generic_Server.new(endpoint)
99 while(socket = server.accept)
100 puts "Got a connection" if @debug
101 if acceptor then
102 if !acceptor.call(socket) then
103 socket.close
104 next
105 end
106 end
107 puts "Accepted the connection" if @debug
108 session = Session.new(socket)
109 session.set_nonblock(true)
110 Thread.new(socket) do |socket|
111 begin
112 # TODO: Send a sync message to the client so it
113 # knows we are ready to receive data.
114 server_loop(session)
115 rescue Exception
116 ROMP::print_exception($!) if @debug
117 end
118 puts "Connection closed" if @debug
119 end
120 end
121 end
122 end
123
124 ##
125 # Register an object with the server. The object will be given an
126 # id of @next_id, and @next_id will be incremented. We could use the
127 # object's real id, but this is insecure. The supplied object must
128 # be thread-safe.
129 #
130 # @param obj The object to register.
131 #
132 # @return A new Object_Reference that should be returned to the client.
133 #
134 def create_reference(obj)
135 @mutex.synchronize do
136 id = @resolve_server.register(obj)
137 Object_Reference.new(id) #return
138 end
139 end
140
141 ##
142 # Find an object in linear time and unregister it. Be careful with
143 # this function, because the client may not know the object has
144 # gone away.
145 #
146 # @param obj The object to unregister.
147 #
148 def delete_reference(obj)
149 @mutex.synchronize do
150 @resolve_server.unregister(obj)
151 end
152 nil #return
153 end
154
155 ##
156 # Register an object with the server and bind it to name.
157 #
158 # @param obj The object to bind.
159 # @param name The name of to bind the object to.
160 #
161 def bind(obj, name)
162 id = @resolve_server.register(obj)
163 @resolve_server.bind(name, id)
164 nil #return
165 end
166
167 ##
168 # This keeps the client from seeing our objects when they call inspect
169 #
170 alias_method :__inspect__, :inspect
171 def inspect()
172 return ""
173 end
174
175 private
176 if false then # the following functions are implemented in C:
177
178 ##
179 # The server_loop function is the guts of the server. It takes in
180 # requests from the client and forwards them to already-registered
181 # objects.
182 #
183 # @param session The session to run the loop with.
184 #
185 def server_loop(session)
186 end
187
188 end # if false
189 end
190
191 ##
192 # The ROMP client class. A ROMP server must be started on the given
193 # host and port before instantiating a ROMP client.
194 #
195 class Client
196
197 ##
198 # Connect to a ROMP server
199 #
200 # @param endpoint The endpoint the server is listening on.
201 # @param sync Specifies whether to synchronize between threads; turn this off to get a 20% performance boost.
202 #
203 def initialize(endpoint, sync=true)
204 @server = Generic_Client.new(endpoint)
205 @session = Session.new(@server)
206 @session.set_nonblock(true)
207 @mutex = sync ? Mutex.new : Null_Mutex.new
208 @resolve_obj = Proxy_Object.new(@session, @mutex, 0)
209 end
210
211 ##
212 # Given a string, return a proxy object that will forward requests
213 # for an object on the server with that name.
214 #
215 # @param object_name The name of the object to resolve.
216 #
217 # @return A Proxy_Object that can be used to make method calls on the object in the server.
218 #
219 def resolve(object_name)
220 @mutex.synchronize do
221 object_id = @resolve_obj.resolve(object_name)
222 return Proxy_Object.new(@session, @mutex, object_id)
223 end
224 end
225 end
226
227 private
228
229 ##
230 # In case the user does not want synchronization.
231 #
232 class Null_Mutex
233 def synchronize
234 yield
235 end
236
237 def lock
238 end
239
240 def unlock
241 end
242 end
243
244 ##
245 # All the special functions we have to keep track of
246 #
247 class Functions
248 GOOD = [
249 :inspect, :class_variables, :instance_eval, :instance_variables,
250 :to_a, :to_s
251 ]
252
253 BAD = [
254 :clone, :dup, :display
255 ]
256
257 METHOD = [
258 :methods, :private_methods, :protected_methods, :public_methods,
259 :singleton_methods
260 ]
261
262 RESPOND = [
263 [ :method, "raise NameError" ],
264 [ :respond_to?, "false" ]
265 ]
266 end
267
268 ##
269 # A ROMP::Object_Reference is created on the server side to represent an
270 # object in the system. It can be returned from a server object to a
271 # client object, at which point it is converted into a ROMP::Proxy_Object.
272 #
273 class Object_Reference
274 attr_reader :object_id
275
276 def initialize(object_id)
277 @object_id = object_id
278 end
279 end
280
281 ##
282 # A ROMP::Object acts as a proxy; it forwards most methods to the server
283 # for execution. When you make calls to a ROMP server, you will be
284 # making the calls through a Proxy_Object.
285 #
286 class Proxy_Object
287
288 if false then # the following functions are implemented in C:
289
290 ##
291 # The method_missing function is called for any method that is not
292 # defined on the client side. It will forward requests to the server
293 # for processing, and can iterate through a block, raise an exception,
294 # or return a value.
295 #
296 def method_missing(function, *args)
297 end
298
299 ##
300 # The oneway function is called to make a oneway call to the server
301 # without synchronization.
302 #
303 def onweway(function, *args)
304 end
305
306 ##
307 # The oneway_sync function is called to make a oneway call to the
308 # server with synchronization (the server will return a null message
309 # to the client before it begins processing). This is slightly safer
310 # than a normal oneway call, but it is slower (except on a linux 2.2
311 # kernel; see the bug list above).
312 #
313 def oneway_sync(function, *args)
314 end
315
316 ##
317 # The sync function will synchronize with the server. It sends a sync
318 # request and waits for a response.
319 #
320 def sync()
321 end
322
323 end # if false
324
325 # Make sure certain methods get passed down the wire.
326 Functions::GOOD.each do |method|
327 eval %{
328 def #{method}(*args)
329 method_missing(:#{method}, *args) #return
330 end
331 }
332 end
333
334 # And make sure others never get called.
335 Functions::BAD.each do |method|
336 eval %{
337 def #{method}(*args)
338 raise(NameError,
339 "undefined method `#{method}' for " +
340 "\#<#{self.class}:#{self.id}>")
341 end
342 }
343 end
344
345 # And remove these function names from any method lists that get
346 # returned; there's nothing we can do about people who decide to
347 # return them from other functions.
348 Functions::METHOD.each do |method|
349 eval %{
350 def #{method}(*args)
351 retval = method_missing(:#{method}, *args)
352 retval.each do |item|
353 Functions::BAD.each do |bad|
354 retval.delete(bad.to_s)
355 end
356 end
357 retval #return
358 end
359 }
360 end
361
362 # Same here, except don't let the call go through in the first place.
363 Functions::RESPOND.each do |method, action|
364 eval %{
365 def #{method}(arg, *args)
366 Functions::BAD.each do |bad|
367 if arg === bad.to_s then
368 return eval("#{action}")
369 end
370 end
371 method_missing(:#{method}, arg, *args) #return
372 end
373 }
374 end
375
376 end
377
378 ##
379 # The Resolve_Server class registers objects for the server. You will
380 # never have to use this class directly.
381 #
382 class Resolve_Server
383 def initialize
384 @next_id = 0
385 @unused_ids = Array.new
386 @id_to_object = Hash.new
387 @name_to_id = Hash.new
388 end
389
390 def register(obj)
391 if @next_id >= Session::MAX_ID then
392 if @unused_ids.size == 0 then
393 raise "Object limit exceeded"
394 else
395 id = @unused_ids.pop
396 end
397 end
398 @id_to_object[@next_id] = obj
399 old_id = @next_id
400 @next_id = @next_id.succ()
401 old_id #return
402 end
403
404 def get_object(object_id)
405 @id_to_object[object_id] #return
406 end
407
408 def unregister(obj)
409 delete_obj_from_array_private(@id_to_object, obj)
410 end
411
412 def bind(name, id)
413 @name_to_id[name] = id
414 end
415
416 def resolve(name)
417 @name_to_id[name] #return
418 end
419
420 def delete_obj_from_array_private(array, obj)
421 index = array.index(obj)
422 array[index] = nil unless index == nil
423 end
424 end
425
426 ##
427 # The Resolve_Obj class handles resolve requests for the client. It is
428 # a special ROMP object with an object id of 0. You will never have to
429 # make calls on it directly, but will instead make calls on it through
430 # the Client object.
431 #
432 class Resolve_Obj
433 def initialize(resolve_server)
434 @resolve_server = resolve_server
435 end
436
437 def resolve(name)
438 @resolve_server.resolve(name) #return
439 end
440 end
441
442 ##
443 # A Generic_Server creates an endpoint to listen on, waits for connections,
444 # and accepts them when requested. It can operate on different kinds of
445 # connections. You will never have to use this object directly.
446 #
447 class Generic_Server
448 def initialize(endpoint)
449 case endpoint
450 when %r{^(tcp)?romp://(.*?):(.*)}
451 @type = "tcp"
452 @host = $2 == "" ? nil : $2
453 @port = $3
454 @server = TCPServer.new(@host, @port)
455 when %r{^(udp)romp://(.*?):(.*)}
456 @type = "udp"
457 @host = $2 == "" ? nil : $2
458 @port = $3
459 @server = UDPSocket.open()
460 @server.bind(@host, @port)
461 @mutex = Mutex.new
462 when %r{^(unix)romp://(.*)}
463 @type = "unix"
464 @path = $2
465 @server = UNIXServer.open(@path)
466 else
467 raise ArgumentError, "Invalid endpoint"
468 end
469 end
470 def accept
471 case @type
472 when "tcp"
473 socket = @server.accept
474 socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1)
475 socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
476 socket.sync = true
477 socket #return
478 when "udp"
479 @mutex.lock
480 socket = @server
481 socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
482 socket #return
483 when "unix"
484 socket = @server.accept
485 socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
486 socket.sync = true
487 socket #return
488 end
489 end
490 end
491
492 ##
493 # A Generic_Client connects to a Generic_Server on a given endpoint.
494 # You will never have to use this object directly.
495 #
496 class Generic_Client
497 def self.new(endpoint)
498 case endpoint
499 when %r{^(tcp)?romp://(.*?):(.*)}
500 socket = TCPSocket.open($2, $3)
501 socket.sync = true
502 socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1)
503 socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
504 socket #return
505 when %r{^(udp)romp://(.*?):(.*)}
506 socket = UDPSocket.open
507 socket.connect($2, $3)
508 when %r{^(unix)romp://(.*)}
509 socket = UNIXSocket.open($2)
510 else
511 raise ArgumentError, "Invalid endpoint"
512 end
513 end
514 end
515
516
517 ##
518 # Print an exception to the screen. This is necessary, because Ruby does
519 # not give us access to its error_print function from within Ruby.
520 #
521 # @param exc The exception object to print.
522 #
523 def self.print_exception(exc)
524 first = true
525 $!.backtrace.each do |bt|
526 if first then
527 puts "#{bt}: #{$!} (#{$!.message})"
528 else
529 puts "\tfrom #{bt}"
530 end
531 first = false
532 end
533 end
534
535 if false then # the following classes are implemented in C:
536
537 ##
538 # The Sesssion class is defined in romp_helper.so. You should never have
539 # to use it directly.
540 #
541 class Session
542 end
543
544 end # if false
545
546 end