1 require 'socket' 2 require 'thread' 3 4 =begin 5 DRuby (not to be confused with dRuby, written by Masatoshi SEKI) 6 (C) Copyright 2001 Paul Brannan (cout at rm-f.net) 7 8 DRuby is a set of classes for providing distributed object support to a 9 Ruby program. You may distribute and/or modify it under the same terms as 10 Ruby (see http://www.ruby-lang.org/en/LICENSE.txt). Example: 11 12 Client 13 ------ 14 client = DRuby::Client.new('localhost', 4242) 15 obj = client.resolve("foo") 16 puts obj.foo("foo") 17 obj.oneway(:foo, "foo") 18 19 Server 20 ------ 21 class Foo 22 def foo(x); return x; end 23 end 24 obj = Foo.new 25 server = DRuby::Server.new('localhost', 4242) 26 server.register(obj, "foo") 27 server.thread.join 28 29 You can do all sorts of cool things with DRuby, including passing blocks to 30 the functions, throwing exceptions and propogating them from server to 31 client, and more. Unlike CORBA, where you must create an interface 32 definition and strictly adhere to it, DRuby uses marshalling, so you can 33 use almost any object with it. But, alas, it is not as powerful as CORBA. 34 35 On a fast machine, you should expect around 4000 messages per second with 36 normal method calls, and up to 10000 messages per second with oneway calls. 37 My dual-processor machine gets 3500/6500 messages per second, though, so 38 YMMV. 39 40 The DRuby message format is broken into 3 components: 41 [ msg_type, obj_id, message ] 42 For each msg_type, there is a specific format to the message. Additionally, 43 certain msg_types are only valid when being sent to the server, and others 44 are valid only when being sent back to the client. Here is a summary: 45 46 msg_type send to meaning of obj_id msg format 47 ---------------------------------------------------------------------------- 48 REQUEST server obj to talk to [:method, *args] 49 REQUEST_BLOCK server obj to talk to [:method, *args] 50 ONEWAY server obj to talk to [:method, *args] 51 RETVAL client always 0 retval 52 EXCEPTION client always 0 $! 53 YIELD client always 0 [value, value, ...] 54 RESOLVE server always 0 object_name 55 SYNC either 0=request, 1=response nil 56 57 Known bugs: 58 1) Oneway calls are really slow if mixed with regular calls. I'm not sure 59 why. A workaround is to use a separate connection for each type of call, 60 but sometimes that doesn't work either. 61 2) The server does not refuse connections for unauthorized addresses; it will 62 instead accept a connection and then immediately close it. 63 3) If the server is flooded with oneway calls and drops packets, it will 64 listen for data until it encounters a valid message. If this happens, the 65 client will not be notified. 66 =end 67 68 module DRuby 69 70 public 71 72 # The DRuby server class. Like its drb equivalent, this class spawns off 73 # a new thread which processes requests, allowing the server to do other 74 # things while it is doing processing for a distributed object. This 75 # means, though, that all objects used with DRuby must be thread-safe. 76 class Server 77 attr_reader :host, :port, :obj, :thread 78 79 # Start a server on the given host and port. An acceptor may be 80 # specified to accept/reject connections; it should be a proc that 81 # takes a Socket as an argument and returns true or false. 82 def initialize(host, port, acceptor=nil) 83 @host = host 84 @port = port 85 @name_to_id = Hash.new 86 @id_to_object = Hash.new 87 @id_to_acceptor = Hash.new 88 @mutex = Mutex.new 89 @next_id = 1 90 @thread = Thread.new do 91 server = TCPServer.new(@host, @port) 92 while(socket = server.accept) 93 if acceptor then 94 if !acceptor.call(socket) then 95 socket.close 96 next 97 end 98 end 99 socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1) 100 Thread.new(socket) do |socket| 101 session_loop(Session.new(socket)) 102 end 103 end 104 end 105 end 106 107 # Register an object with the server. Once an object is registered, 108 # it cannot be unregistered. The object will be given an id of 109 # @next_id, and @next_id will be incremented. We could use the 110 # object's real id, but we want to be sure that the id will fit into 111 # a 16-bit short. The acceptor argument is a proc that takes a Socket 112 # as an arugment, and returns true or false depending on whether the 113 # request should be allowed. The supplied object must be thread-safe. 114 def register(obj, name, acceptor=nil) 115 @mutex.synchronize do 116 if @next_id >= Session::MAX_ID then 117 raise ArgumentError, "Object limit exceeded" 118 end 119 @name_to_id[name] = @next_id 120 @id_to_acceptor[@next_id] = acceptor 121 @id_to_object[@next_id] = obj 122 @next_id = @next_id.succ() 123 end 124 end 125 126 # Main server loop. Wait for a REQUEST message, process it, and send 127 # back a YIELD, EXCEPTION, or RETVAL message. Note that we don't do 128 # any thread synchronization here, because all registered objects are 129 # required to be thread-safe, and the @id_to_object lookup is atomic 130 # (and if it succeeds, the object is guaranteed to be fully 131 # registered). 132 def session_loop(session) 133 while not session.finished() 134 type, object_id, message = session.get_message() 135 obj = @id_to_object[object_id] 136 acceptor = @id_to_acceptor[object_id] 137 begin 138 if acceptor then 139 if !acceptor.call(session.socket) then 140 if type != Session::ONEWAY then 141 raise ArgumentError, "No authorization" 142 end 143 end 144 end 145 case type 146 when Session::REQUEST 147 retval = obj.__send__(*message) 148 when Session::REQUEST_BLOCK 149 retval = obj.__send__(*message) do |*i| 150 session.send_message(Session::YIELD, 0, i) 151 end 152 when Session::ONEWAY 153 begin; obj.__send__(*message); rescue Exception; end 154 next 155 when Session::RESOLVE 156 retval = @name_to_id[message] 157 when Session::SYNC 158 session.reply_sync(object_id) 159 next 160 else 161 raise ArgumentError, "Bad session request" 162 end 163 session.send_message(Session::RETVAL, 0, retval) 164 rescue Exception 165 session.send_message(Session::EXCEPTION, 0, $!) 166 end 167 end 168 end 169 end 170 171 # The DRuby client class. A DRuby server must be started on the given 172 # host and port before instantiating a DRuby client. 173 class Client 174 attr_reader :host, :port 175 176 # Connect a client to the server on the given host and port. The 177 # user can specify sync=false to turn off synchronization and get a 178 # 20% speed boost. 179 def initialize(host, port, sync=true) 180 @host = host 181 @port = port 182 @server = TCPSocket.open(@host, @port) 183 @server.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1) 184 @session = Session.new(@server) 185 @mutex = sync ? Mutex.new : Null_Mutex.new 186 end 187 188 # Given a string, return a proxy object that will forward requests 189 # for an object on the server with that name. 190 def resolve(object_name) 191 @mutex.synchronize do 192 @session.send_message(Session::RESOLVE, 0, object_name) 193 type, obj, message = @session.get_message() 194 case type 195 when Session::RETVAL 196 return Object.new(@session, @mutex, message) 197 when Session::EXCEPTION ; raise message 198 else ; raise RuntimeError 199 end 200 end 201 end 202 end 203 204 private 205 206 # In case the user does not want synchronization. 207 class Null_Mutex 208 def synchronize 209 yield 210 end 211 end 212 213 # A DRuby::Object acts as a proxy; it forwards most methods to the server 214 # for execution. 215 class Object 216 217 def initialize(session, mutex, object_id) 218 @object_id = object_id 219 @session = session 220 @mutex = mutex 221 end 222 223 # Client request handler. The idea here is to send out a REQUEST 224 # message, and get back a YIELD, EXCEPTION, or RETVAL message. 225 def method_missing(method, *args) 226 message = [ method, *args ] 227 @mutex.synchronize do 228 @session.send_message( 229 (block_given?) ? Session::REQUEST_BLOCK : Session::REQUEST, 230 @object_id, 231 message) 232 loop do 233 type, obj, message = @session.get_message() 234 case type 235 when Session::RETVAL ; return message 236 when Session::YIELD ; yield message 237 when Session::EXCEPTION ; raise message 238 when Session::SYNC ; @session.reply_sync(obj) 239 else ; raise RuntimeError 240 end 241 end 242 end 243 end 244 245 # This is a special function that lets you ignore the return value 246 # of the calling function to get some extra speed, in exchange for not 247 # knowing when the function will complete. A future version of the 248 # server should probably check whether a given function is allowed to 249 # be called as a oneway, in order to reduce DoS attacks. 250 def oneway(*message) 251 @mutex.synchronize do 252 @session.send_message(Session::ONEWAY, @object_id, message) 253 return nil 254 end 255 end 256 257 # sync() will synchonize the client with the server (useful for 258 # determining when a oneway call completes) 259 def sync() 260 @mutex.synchronize do 261 @session.send_sync() 262 @session.wait_sync() 263 end 264 end 265 266 GOOD_FUNCTIONS = [ 267 :inspect, :class_variables, :instance_eval, :instance_variables, 268 :to_a, :to_s 269 ] 270 271 BAD_FUNCTIONS = [ 272 :clone, :dup, :display 273 ] 274 275 METHOD_FUNCTIONS = [ 276 :methods, :private_methods, :protected_methods, :public_methods, 277 :singleton_methods 278 ] 279 280 RESPOND_FUNCTIONS = [ 281 [ :method, "raise NameError" ], 282 [ :respond_to?, "return false" ] 283 ] 284 285 # Make sure certain methods get passed down the wire. 286 GOOD_FUNCTIONS.each do |method| 287 eval %{ 288 def #{method}(*args) 289 return method_missing(:#{method}, *args) 290 end 291 } 292 end 293 294 # And make sure others never get called. 295 BAD_FUNCTIONS.each do |method| 296 undef_method method 297 end 298 299 # And remove these function names from any method lists that get 300 # returned; there's nothing we can do about people who decide to 301 # return them from other functions. 302 METHOD_FUNCTIONS.each do |method| 303 eval %{ 304 def #{method}(*args) 305 retval = method_missing(:#{method}, *args) 306 retval.each do |item| 307 BAD_FUNCTIONS.each do |bad| 308 retval.delete(bad.to_s) 309 end 310 end 311 return retval 312 end 313 } 314 end 315 316 # Same here, except don't let the call go through in the first place. 317 RESPOND_FUNCTIONS.each do |method, action| 318 eval %{ 319 def #{method}(arg, *args) 320 BAD_FUNCTIONS.each do |bad| 321 if arg === bad.to_s then 322 eval("#{action}") 323 end 324 end 325 return method_missing(:#{method}, arg, *args) 326 end 327 } 328 end 329 end 330 331 # A DRuby session sends messages back and forth between server and client. 332 # The message format is as follows: 333 # +-----+-----+-----+-----+-----+-----+-----+-----+-----+--- + ---+-----+ 334 # | MSG_START | msg. size | msg. type | obj. id | marshalled msg/args | 335 # +-----+-----+-----+-----+-----+-----+-----+-----+-----+--- + ---+-----+ 336 class Session 337 REQUEST = 0x1001 338 REQUEST_BLOCK = 0x1002 339 ONEWAY = 0x1003 340 RETVAL = 0x2001 341 EXCEPTION = 0x2002 342 YIELD = 0x2003 343 RESOLVE = 0x3001 344 SYNC = 0x4001 345 MSG_START = 0x4242 346 347 MAX_ID = 2**16 348 349 attr_reader :io 350 alias_method :socket, :io 351 352 def initialize(io) 353 @io = io 354 end 355 356 def send_message(type, obj, message) 357 data = Marshal.dump(message) 358 header = [MSG_START, data.length, type, obj] 359 @io.write(header.pack("vvvv")) 360 @io.write(data) 361 end 362 363 def get_message() 364 header = @io.read(8) 365 magic, size, type, obj = header.unpack("vvvv") 366 while magic != MSG_START 367 header = @io.read(8) 368 magic, size, type, obj = header.unpack("vvvv") 369 end 370 data = @io.read(size) 371 message = Marshal.load(data) 372 return [ type, obj, message ] 373 end 374 375 def finished() 376 return @io.eof 377 end 378 379 def send_sync() 380 send_message(Session::SYNC, 0, nil) 381 end 382 383 def wait_sync() 384 sleep 1 385 type, obj, message = get_message() 386 if type != Session::SYNC && obj != 0 && message != nil then 387 raise RuntimeError, "DRuby synchonization failed" 388 end 389 end 390 391 def reply_sync(value) 392 if value == 0 then 393 send_message(Session::SYNC, 1, nil) 394 end 395 end 396 397 end 398 399 end