1 require 'socket' 2 require 'thread' 3 4 =begin 5 DRuby (not to be confused with dRuby, written by Masatoshi SEKI) 6 (C) Copyright 2001 Paul Brannan (cout at rm-f.net) 7 8 DRuby is a set of classes for providing distributed object support to a 9 Ruby program. You may distribute and/or modify it under the same terms as 10 Ruby (see http://www.ruby-lang.org/en/LICENSE.txt). Example: 11 12 Client 13 ------ 14 client = DRuby::Client.new('localhost', 4242) 15 obj = client.resolve("foo") 16 puts obj.foo("foo") 17 obj.oneway(:foo, "foo") 18 19 Server 20 ------ 21 class Foo 22 def foo(x); return x; end 23 end 24 obj = Foo.new 25 server = DRuby::Server.new('localhost', 4242) 26 server.bind(obj, "foo") 27 server.thread.join 28 29 You can do all sorts of cool things with DRuby, including passing blocks to 30 the functions, throwing exceptions and propogating them from server to 31 client, and more. Unlike CORBA, where you must create an interface 32 definition and strictly adhere to it, DRuby uses marshalling, so you can 33 use almost any object with it. But, alas, it is not as powerful as CORBA. 34 35 On a fast machine, you should expect around 4000 messages per second with 36 normal method calls, and up to 10000 messages per second with oneway calls. 37 My dual-processor machine gets 3500/6500 messages per second, though, so 38 YMMV. 39 40 The DRuby message format is broken into 3 components: 41 [ msg_type, obj_id, message ] 42 For each msg_type, there is a specific format to the message. Additionally, 43 certain msg_types are only valid when being sent to the server, and others 44 are valid only when being sent back to the client. Here is a summary: 45 46 msg_type send to meaning of obj_id msg format 47 ---------------------------------------------------------------------------- 48 REQUEST server obj to talk to [:method, *args] 49 REQUEST_BLOCK server obj to talk to [:method, *args] 50 ONEWAY server obj to talk to [:method, *args] 51 RETVAL client always 0 retval 52 EXCEPTION client always 0 $! 53 YIELD client always 0 [value, value, ...] 54 RESOLVE server always 0 object_name 55 SYNC either 0=request, 1=response nil 56 57 Known bugs: 58 1) Oneway calls are really slow if mixed with regular calls. I'm not sure 59 why. A workaround is to use a separate connection for each type of call, 60 but sometimes that doesn't work either. 61 2) The server does not refuse connections for unauthorized addresses; it will 62 instead accept a connection and then immediately close it. 63 3) If the server is flooded with oneway calls and drops packets, it will 64 listen for data until it encounters a valid message. If this happens, the 65 client will not be notified. 66 =end 67 68 module DRuby 69 70 public 71 72 # The DRuby server class. Like its drb equivalent, this class spawns off 73 # a new thread which processes requests, allowing the server to do other 74 # things while it is doing processing for a distributed object. This 75 # means, though, that all objects used with DRuby must be thread-safe. 76 class Server 77 78 public 79 attr_reader :host, :port, :obj, :thread 80 81 # Start a server on the given host and port. An acceptor may be 82 # specified to accept/reject connections; it should be a proc that 83 # takes a Socket as an argument and returns true or false. 84 def initialize(host, port, acceptor=nil) 85 @host = host 86 @port = port 87 @name_to_id = Hash.new 88 @id_to_object = Array.new 89 @id_to_acceptor = Array.new 90 @mutex = Mutex.new 91 @next_id = 1 92 @unused_ids = Array.new 93 @thread = Thread.new do 94 server = TCPServer.new(@host, @port) 95 while(socket = server.accept) 96 if acceptor then 97 if !acceptor.call(socket) then 98 socket.close 99 next 100 end 101 end 102 socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1) 103 Thread.new(socket) do |socket| 104 session_loop(Session.new(socket)) 105 end 106 end 107 end 108 end 109 110 # Register an object with the server. The object will be given an 111 # id of @next_id, and @next_id will be incremented. We could use the 112 # object's real id, but this is insecure. The acceptor argument is 113 # a proc that takes a Socket as an arugment, and returns true or false 114 # depending on whether the request should be allowed. The supplied 115 # object must be thread-safe. 116 def create_reference(obj, acceptor=nil) 117 @mutex.synchronize do 118 ref = register_private(obj, acceptor) 119 Object_Reference.new(ref) #return 120 end 121 end 122 123 # Finds an object in linear time and unregister it. If you need a 124 # faster function, write one and send me a patch. 125 def delete_reference(obj) 126 @mutex.synchronize do 127 unregister_private(obj) 128 end 129 nil #return 130 end 131 132 # Register an object with the server and bind it to name 133 def bind(obj, name, acceptor=nil) 134 id = register_private(obj, acceptor) 135 @name_to_id[name] = id 136 nil #return 137 end 138 139 # Main server loop. Wait for a REQUEST message, process it, and send 140 # back a YIELD, EXCEPTION, or RETVAL message. Note that we don't do 141 # any thread synchronization here, because all registered objects are 142 # required to be thread-safe, and the @id_to_object lookup is atomic 143 # (and if it succeeds, the object is guaranteed to be fully 144 # registered). 145 def session_loop(session) 146 while not session.finished() 147 type, object_id, message = session.get_message() 148 begin 149 obj = @id_to_object[object_id] 150 if (acceptor = @id_to_acceptor[object_id]) then 151 if !acceptor.call(session.socket) then 152 if type != Session::ONEWAY then 153 raise "No authorization" 154 end 155 end 156 end 157 case type # In order of most to least common 158 when Session::REQUEST 159 retval = obj.__send__(*message) 160 when Session::ONEWAY 161 begin; obj.__send__(*message); rescue Exception; end 162 next 163 when Session::REQUEST_BLOCK 164 retval = obj.__send__(*message) do |*i| 165 session.send_message(Session::YIELD, 0, i) 166 end 167 when Session::RESOLVE 168 retval = @name_to_id[message] 169 when Session::SYNC 170 session.reply_sync(object_id) 171 next 172 else 173 raise "Bad session request" 174 end 175 session.send_message(Session::RETVAL, 0, retval) 176 rescue Exception 177 session.send_message(Session::EXCEPTION, 0, $!) 178 end 179 end 180 end 181 182 private 183 def register_private(obj, acceptor) 184 if @next_id >= Session::MAX_ID then 185 if @unused_ids.size == 0 then 186 raise "Object limit exceeded" 187 else 188 id = @unused_ids.pop 189 end 190 end 191 @id_to_acceptor[@next_id] = acceptor 192 @id_to_object[@next_id] = obj 193 old_id = @next_id 194 @next_id = @next_id.succ() 195 old_id #return 196 end 197 198 def unregister_private(obj) 199 delete_obj_from_array_private(@id_to_acceptor, @next_id) 200 delete_obj_from_array_private(@id_to_object, @next_id) 201 end 202 203 def delete_obj_from_array_private(array, obj) 204 index = array.index(obj) 205 array[index] = nil unless index == nil 206 end 207 end 208 209 # The DRuby client class. A DRuby server must be started on the given 210 # host and port before instantiating a DRuby client. 211 class Client 212 attr_reader :host, :port 213 214 # Connect a client to the server on the given host and port. The 215 # user can specify sync=false to turn off synchronization and get a 216 # 20% speed boost. 217 def initialize(host, port, sync=true) 218 @host = host 219 @port = port 220 @server = TCPSocket.open(@host, @port) 221 @server.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1) 222 @session = Session.new(@server) 223 @mutex = sync ? Mutex.new : Null_Mutex.new 224 end 225 226 # Given a string, return a proxy object that will forward requests 227 # for an object on the server with that name. 228 def resolve(object_name) 229 @mutex.synchronize do 230 @session.send_message(Session::RESOLVE, 0, object_name) 231 type, obj, message = @session.get_message() 232 case type 233 when Session::RETVAL 234 Proxy_Object.new(@session, @mutex, message) #return 235 when Session::EXCEPTION ; raise message 236 else ; raise RuntimeError 237 end 238 end 239 end 240 end 241 242 private 243 244 # In case the user does not want synchronization. 245 class Null_Mutex 246 def synchronize 247 yield 248 end 249 end 250 251 # All the special functions we have to keep track of 252 class Functions 253 GOOD = [ 254 :inspect, :class_variables, :instance_eval, :instance_variables, 255 :to_a, :to_s 256 ] 257 258 BAD = [ 259 :clone, :dup, :display 260 ] 261 262 METHOD = [ 263 :methods, :private_methods, :protected_methods, :public_methods, 264 :singleton_methods 265 ] 266 267 RESPOND = [ 268 [ :method, "raise NameError" ], 269 [ :respond_to?, "false" ] 270 ] 271 end 272 273 # A DRuby::Object_Reference is created on the server side to represent an 274 # object in the system. It can be returned from a server object to a 275 # client object, at which point it is converted into a DRuby::Proxy_Object. 276 class Object_Reference 277 attr_reader :object_id 278 279 def initialize(object_id) 280 @object_id = object_id 281 end 282 end 283 284 # A DRuby::Object acts as a proxy; it forwards most methods to the server 285 # for execution. 286 class Proxy_Object 287 288 public 289 290 def initialize(session, mutex, object_id) 291 @object_id = object_id 292 @session = session 293 @mutex = mutex 294 end 295 296 # Client request handler. The idea here is to send out a REQUEST 297 # message, and get back a YIELD, EXCEPTION, or RETVAL message. 298 def method_missing(method, *args) 299 message = [ method, *args ] 300 @mutex.synchronize do 301 @session.send_message( 302 (block_given?) ? Session::REQUEST_BLOCK : Session::REQUEST, 303 @object_id, 304 message) 305 loop do 306 type, obj, message = @session.get_message() 307 case type 308 when Session::RETVAL ; return msg_to_obj(message) 309 when Session::YIELD ; yield msg_to_obj(message) 310 when Session::EXCEPTION ; raise message 311 when Session::SYNC ; @session.reply_sync(obj) 312 else ; raise "Invalid msg type" 313 end 314 end 315 end 316 end 317 318 # This is a special function that lets you ignore the return value 319 # of the calling function to get some extra speed, in exchange for not 320 # knowing when the function will complete. A future version of the 321 # server should probably check whether a given function is allowed to 322 # be called as a oneway, in order to reduce DoS attacks. 323 def oneway(*message) 324 @mutex.synchronize do 325 @session.send_message(Session::ONEWAY, @object_id, message) 326 nil #return 327 end 328 end 329 330 # sync() will synchonize the client with the server (useful for 331 # determining when a oneway call completes) 332 def sync() 333 @mutex.synchronize do 334 @session.send_sync() 335 @session.wait_sync() 336 end 337 end 338 339 # Make sure certain methods get passed down the wire. 340 Functions::GOOD.each do |method| 341 eval %{ 342 def #{method}(*args) 343 method_missing(:#{method}, *args) #return 344 end 345 } 346 end 347 348 # And make sure others never get called. 349 Functions::BAD.each do |method| 350 eval %{ 351 def #{method}(*args) 352 raise(NameError, 353 "undefined method `#{method}' for " + 354 "\#<#{self.class}:#{self.id}>") 355 end 356 } 357 end 358 359 # And remove these function names from any method lists that get 360 # returned; there's nothing we can do about people who decide to 361 # return them from other functions. 362 Functions::METHOD.each do |method| 363 eval %{ 364 def #{method}(*args) 365 retval = method_missing(:#{method}, *args) 366 retval.each do |item| 367 Functions::BAD.each do |bad| 368 retval.delete(bad.to_s) 369 end 370 end 371 retval #return 372 end 373 } 374 end 375 376 # Same here, except don't let the call go through in the first place. 377 Functions::RESPOND.each do |method, action| 378 eval %{ 379 def #{method}(arg, *args) 380 Functions::BAD.each do |bad| 381 if arg === bad.to_s then 382 return eval("#{action}") 383 end 384 end 385 method_missing(:#{method}, arg, *args) #return 386 end 387 } 388 end 389 390 private 391 392 def msg_to_obj(obj) 393 if Object_Reference === obj then 394 Proxy_Object.new(@session, @mutex, obj.object_id) #return 395 else 396 obj #return 397 end 398 end 399 400 end 401 402 # A DRuby session sends messages back and forth between server and client. 403 # The message format is as follows: 404 # +-----+-----+-----+-----+-----+-----+-----+-----+-----+--- + ---+-----+ 405 # | MSG_START | msg. size | msg. type | obj. id | marshalled msg/args | 406 # +-----+-----+-----+-----+-----+-----+-----+-----+-----+--- + ---+-----+ 407 class Session 408 REQUEST = 0x1001 409 REQUEST_BLOCK = 0x1002 410 ONEWAY = 0x1003 411 RETVAL = 0x2001 412 EXCEPTION = 0x2002 413 YIELD = 0x2003 414 RESOLVE = 0x3001 415 SYNC = 0x4001 416 MSG_START = 0x4242 417 418 MAX_ID = 2**16 419 420 attr_reader :io 421 alias_method :socket, :io 422 423 def initialize(io) 424 @io = io 425 end 426 427 def send_message(type, obj, message) 428 data = Marshal.dump(message) 429 header = [MSG_START, data.length, type, obj] 430 @io.write(header.pack("vvvv")) 431 @io.write(data) 432 end 433 434 def get_message() 435 header = @io.read(8) 436 magic, size, type, obj = header.unpack("vvvv") 437 while magic != MSG_START 438 header = @io.read(8) 439 magic, size, type, obj = header.unpack("vvvv") 440 end 441 data = @io.read(size) 442 message = Marshal.load(data) 443 [ type, obj, message ] #return 444 end 445 446 def finished() 447 @io.eof #return 448 end 449 450 def send_sync() 451 send_message(Session::SYNC, 0, nil) 452 end 453 454 def wait_sync() 455 sleep 1 456 type, obj, message = get_message() 457 if type != Session::SYNC && obj != 0 && message != nil then 458 raise "DRuby synchonization failed" 459 end 460 end 461 462 def reply_sync(value) 463 if value == 0 then 464 send_message(Session::SYNC, 1, nil) 465 end 466 end 467 468 end 469 end