1 require 'socket' 2 require 'thread' 3 require 'fcntl' 4 require 'romp_helper' 5 6 ## 7 # ROMP - The Ruby Object Message Proxy 8 # @author Paul Brannan 9 # @version 0.1 10 # (C) Copyright 2001 Paul Brannan (cout at rm-f.net) 11 # 12 # <pre> 13 # ROMP is a set of classes for providing distributed object support to a 14 # Ruby program. You may distribute and/or modify it under the same terms as 15 # Ruby (see http://www.ruby-lang.org/en/LICENSE.txt). Example: 16 # 17 # Client 18 # ------ 19 # client = ROMP::Client.new('localhost', 4242) 20 # obj = client.resolve("foo") 21 # puts obj.foo("foo") 22 # obj.oneway(:foo, "foo") 23 # 24 # Server 25 # ------ 26 # class Foo 27 # def foo(x); return x; end 28 # end 29 # obj = Foo.new 30 # server = ROMP::Server.new('localhost', 4242) 31 # server.bind(obj, "foo") 32 # server.thread.join 33 # 34 # You can do all sorts of cool things with ROMP, including passing blocks to 35 # the functions, throwing exceptions and propogating them from server to 36 # client, and more. Unlike CORBA, where you must create an interface 37 # definition and strictly adhere to it, ROMP uses marshalling, so you can 38 # use almost any object with it. But, alas, it is not as powerful as CORBA. 39 # 40 # On a fast machine, you should expect around 7000 messages per second with 41 # normal method calls, up to 10000 messages per second with oneway calls with 42 # sync, and up to 40000 messages per second for oneway calls without sync. 43 # These numbers can vary depending on various factors, so YMMV. 44 # 45 # The ROMP message format is broken into 3 components: 46 # [ msg_type, obj_id, message ] 47 # For each msg_type, there is a specific format to the message. Additionally, 48 # certain msg_types are only valid when being sent to the server, and others 49 # are valid only when being sent back to the client. Here is a summary: 50 # 51 # msg_type send to meaning of obj_id msg format 52 # ---------------------------------------------------------------------------- 53 # REQUEST server obj to talk to [:method, *args] 54 # REQUEST_BLOCK server obj to talk to [:method, *args] 55 # ONEWAY server obj to talk to [:method, *args] 56 # ONEWAY_SYNC server obj to talk to [:method, *args] 57 # RETVAL client always 0 retval 58 # EXCEPTION client always 0 $! 59 # YIELD client always 0 [value, value, ...] 60 # SYNC either 0=request, 1=response nil 61 # NULL_MSG either always 0 n/a 62 # 63 # BUGS: 64 # - On a 2.2 kernel, oneway calls without sync is very slow. 65 # - UDP support does not currently work. 66 # </pre> 67 68 module ROMP 69 70 public 71 72 ## 73 # The ROMP server class. Like its drb equivalent, this class spawns off 74 # a new thread which processes requests, allowing the server to do other 75 # things while it is doing processing for a distributed object. This 76 # means, though, that all objects used with ROMP must be thread-safe. 77 # 78 class Server 79 80 public 81 attr_reader :obj, :thread 82 83 ## 84 # Start a ROMP server. 85 # 86 # @param endpoint An endpoint for the server to listen on; should be specified in URI notation. 87 # @param acceptor A proc object that can accept or reject connections; it should take a Socket as an argument and returns true or false. 88 # @param debug Turns on debugging messages if enabled. 89 # 90 def initialize(endpoint, acceptor=nil, debug=false) 91 @mutex = Mutex.new 92 @debug = debug 93 @resolve_server = Resolve_Server.new 94 @resolve_obj = Resolve_Obj.new(@resolve_server) 95 @resolve_server.register(@resolve_obj) 96 97 @thread = Thread.new do 98 server = Generic_Server.new(endpoint) 99 while(socket = server.accept) 100 puts "Got a connection" if @debug 101 if acceptor then 102 if !acceptor.call(socket) then 103 socket.close 104 next 105 end 106 end 107 puts "Accepted the connection" if @debug 108 session = Session.new(socket) 109 session.set_nonblock(true) 110 Thread.new(socket) do |socket| 111 Thread.current.abort_on_exception = true 112 begin 113 # TODO: Send a sync message to the client so it 114 # knows we are ready to receive data. 115 server_loop(session) 116 rescue Exception 117 ROMP::print_exception($!) if @debug 118 end 119 puts "Connection closed" if @debug 120 end 121 end 122 end 123 end 124 125 ## 126 # Register an object with the server. The object will be given an 127 # id of @next_id, and @next_id will be incremented. We could use the 128 # object's real id, but this is insecure. The supplied object must 129 # be thread-safe. 130 # 131 # @param obj The object to register. 132 # 133 # @return A new Object_Reference that should be returned to the client. 134 # 135 def create_reference(obj) 136 @mutex.synchronize do 137 id = @resolve_server.register(obj) 138 Object_Reference.new(id) #return 139 end 140 end 141 142 ## 143 # Find an object in linear time and unregister it. Be careful with 144 # this function, because the client may not know the object has 145 # gone away. 146 # 147 # @param obj The object to unregister. 148 # 149 def delete_reference(obj) 150 @mutex.synchronize do 151 @resolve_server.unregister(obj) 152 end 153 nil #return 154 end 155 156 ## 157 # Register an object with the server and bind it to name. 158 # 159 # @param obj The object to bind. 160 # @param name The name of to bind the object to. 161 # 162 def bind(obj, name) 163 id = @resolve_server.register(obj) 164 @resolve_server.bind(name, id) 165 nil #return 166 end 167 168 ## 169 # This keeps the client from seeing our objects when they call inspect 170 # 171 alias_method :__inspect__, :inspect 172 def inspect() 173 return "" 174 end 175 176 private 177 if false then # the following functions are implemented in C: 178 179 ## 180 # The server_loop function is the guts of the server. It takes in 181 # requests from the client and forwards them to already-registered 182 # objects. 183 # 184 # @param session The session to run the loop with. 185 # 186 def server_loop(session) 187 end 188 189 end # if false 190 end 191 192 ## 193 # The ROMP client class. A ROMP server must be started on the given 194 # host and port before instantiating a ROMP client. 195 # 196 class Client 197 198 ## 199 # Connect to a ROMP server 200 # 201 # @param endpoint The endpoint the server is listening on. 202 # @param sync Specifies whether to synchronize between threads; turn this off to get a 20% performance boost. 203 # 204 def initialize(endpoint, sync=true) 205 @server = Generic_Client.new(endpoint) 206 @session = Session.new(@server) 207 @session.set_nonblock(true) 208 @mutex = sync ? Mutex.new : Null_Mutex.new 209 @resolve_obj = Proxy_Object.new(@session, @mutex, 0) 210 end 211 212 ## 213 # Given a string, return a proxy object that will forward requests 214 # for an object on the server with that name. 215 # 216 # @param object_name The name of the object to resolve. 217 # 218 # @return A Proxy_Object that can be used to make method calls on the object in the server. 219 # 220 def resolve(object_name) 221 @mutex.synchronize do 222 object_id = @resolve_obj.resolve(object_name) 223 return Proxy_Object.new(@session, @mutex, object_id) 224 end 225 end 226 end 227 228 private 229 230 ## 231 # In case the user does not want synchronization. 232 # 233 class Null_Mutex 234 def synchronize 235 yield 236 end 237 238 def lock 239 end 240 241 def unlock 242 end 243 end 244 245 ## 246 # All the special functions we have to keep track of 247 # 248 class Functions 249 GOOD = [ 250 :inspect, :class_variables, :instance_eval, :instance_variables, 251 :to_a, :to_s 252 ] 253 254 BAD = [ 255 :clone, :dup, :display 256 ] 257 258 METHOD = [ 259 :methods, :private_methods, :protected_methods, :public_methods, 260 :singleton_methods 261 ] 262 263 RESPOND = [ 264 [ :method, "raise NameError" ], 265 [ :respond_to?, "false" ] 266 ] 267 end 268 269 ## 270 # A ROMP::Object_Reference is created on the server side to represent an 271 # object in the system. It can be returned from a server object to a 272 # client object, at which point it is converted into a ROMP::Proxy_Object. 273 # 274 class Object_Reference 275 attr_reader :object_id 276 277 def initialize(object_id) 278 @object_id = object_id 279 end 280 end 281 282 ## 283 # A ROMP::Object acts as a proxy; it forwards most methods to the server 284 # for execution. When you make calls to a ROMP server, you will be 285 # making the calls through a Proxy_Object. 286 # 287 class Proxy_Object 288 289 if false then # the following functions are implemented in C: 290 291 ## 292 # The method_missing function is called for any method that is not 293 # defined on the client side. It will forward requests to the server 294 # for processing, and can iterate through a block, raise an exception, 295 # or return a value. 296 # 297 def method_missing(function, *args) 298 end 299 300 ## 301 # The oneway function is called to make a oneway call to the server 302 # without synchronization. 303 # 304 def onweway(function, *args) 305 end 306 307 ## 308 # The oneway_sync function is called to make a oneway call to the 309 # server with synchronization (the server will return a null message 310 # to the client before it begins processing). This is slightly safer 311 # than a normal oneway call, but it is slower (except on a linux 2.2 312 # kernel; see the bug list above). 313 # 314 def oneway_sync(function, *args) 315 end 316 317 ## 318 # The sync function will synchronize with the server. It sends a sync 319 # request and waits for a response. 320 # 321 def sync() 322 end 323 324 end # if false 325 326 # Make sure certain methods get passed down the wire. 327 Functions::GOOD.each do |method| 328 eval %{ 329 def #{method}(*args) 330 method_missing(:#{method}, *args) #return 331 end 332 } 333 end 334 335 # And make sure others never get called. 336 Functions::BAD.each do |method| 337 eval %{ 338 def #{method}(*args) 339 raise(NameError, 340 "undefined method `#{method}' for " + 341 "\#<#{self.class}:#{self.id}>") 342 end 343 } 344 end 345 346 # And remove these function names from any method lists that get 347 # returned; there's nothing we can do about people who decide to 348 # return them from other functions. 349 Functions::METHOD.each do |method| 350 eval %{ 351 def #{method}(*args) 352 retval = method_missing(:#{method}, *args) 353 retval.each do |item| 354 Functions::BAD.each do |bad| 355 retval.delete(bad.to_s) 356 end 357 end 358 retval #return 359 end 360 } 361 end 362 363 # Same here, except don't let the call go through in the first place. 364 Functions::RESPOND.each do |method, action| 365 eval %{ 366 def #{method}(arg, *args) 367 Functions::BAD.each do |bad| 368 if arg === bad.to_s then 369 return eval("#{action}") 370 end 371 end 372 method_missing(:#{method}, arg, *args) #return 373 end 374 } 375 end 376 377 end 378 379 ## 380 # The Resolve_Server class registers objects for the server. You will 381 # never have to use this class directly. 382 # 383 class Resolve_Server 384 def initialize 385 @next_id = 0 386 @unused_ids = Array.new 387 @id_to_object = Hash.new 388 @name_to_id = Hash.new 389 end 390 391 def register(obj) 392 if @next_id >= Session::MAX_ID then 393 if @unused_ids.size == 0 then 394 raise "Object limit exceeded" 395 else 396 id = @unused_ids.pop 397 end 398 end 399 @id_to_object[@next_id] = obj 400 old_id = @next_id 401 @next_id = @next_id.succ() 402 old_id #return 403 end 404 405 def get_object(object_id) 406 @id_to_object[object_id] #return 407 end 408 409 def unregister(obj) 410 delete_obj_from_array_private(@id_to_object, obj) 411 end 412 413 def bind(name, id) 414 @name_to_id[name] = id 415 end 416 417 def resolve(name) 418 @name_to_id[name] #return 419 end 420 421 def delete_obj_from_array_private(array, obj) 422 index = array.index(obj) 423 array[index] = nil unless index == nil 424 end 425 end 426 427 ## 428 # The Resolve_Obj class handles resolve requests for the client. It is 429 # a special ROMP object with an object id of 0. You will never have to 430 # make calls on it directly, but will instead make calls on it through 431 # the Client object. 432 # 433 class Resolve_Obj 434 def initialize(resolve_server) 435 @resolve_server = resolve_server 436 end 437 438 def resolve(name) 439 @resolve_server.resolve(name) #return 440 end 441 end 442 443 ## 444 # A Generic_Server creates an endpoint to listen on, waits for connections, 445 # and accepts them when requested. It can operate on different kinds of 446 # connections. You will never have to use this object directly. 447 # 448 class Generic_Server 449 def initialize(endpoint) 450 case endpoint 451 when %r{^(tcp)?romp://(.*?):(.*)} 452 @type = "tcp" 453 @host = $2 == "" ? nil : $2 454 @port = $3 455 @server = TCPServer.new(@host, @port) 456 when %r{^(udp)romp://(.*?):(.*)} 457 @type = "udp" 458 @host = $2 == "" ? nil : $2 459 @port = $3 460 @server = UDPSocket.open() 461 @server.bind(@host, @port) 462 @mutex = Mutex.new 463 when %r{^(unix)romp://(.*)} 464 @type = "unix" 465 @path = $2 466 @server = UNIXServer.open(@path) 467 else 468 raise ArgumentError, "Invalid endpoint" 469 end 470 end 471 def accept 472 case @type 473 when "tcp" 474 socket = @server.accept 475 socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1) 476 socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) 477 socket.sync = true 478 socket #return 479 when "udp" 480 @mutex.lock 481 socket = @server 482 socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) 483 socket #return 484 when "unix" 485 socket = @server.accept 486 socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) 487 socket.sync = true 488 socket #return 489 end 490 end 491 end 492 493 ## 494 # A Generic_Client connects to a Generic_Server on a given endpoint. 495 # You will never have to use this object directly. 496 # 497 class Generic_Client 498 def self.new(endpoint) 499 case endpoint 500 when %r{^(tcp)?romp://(.*?):(.*)} 501 socket = TCPSocket.open($2, $3) 502 socket.sync = true 503 socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1) 504 socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) 505 socket #return 506 when %r{^(udp)romp://(.*?):(.*)} 507 socket = UDPSocket.open 508 socket.connect($2, $3) 509 socket #return 510 when %r{^(unix)romp://(.*)} 511 socket = UNIXSocket.open($2) 512 else 513 raise ArgumentError, "Invalid endpoint" 514 end 515 end 516 end 517 518 519 ## 520 # Print an exception to the screen. This is necessary, because Ruby does 521 # not give us access to its error_print function from within Ruby. 522 # 523 # @param exc The exception object to print. 524 # 525 def self.print_exception(exc) 526 first = true 527 $!.backtrace.each do |bt| 528 if first then 529 puts "#{bt}: #{$!} (#{$!.message})" 530 else 531 puts "\tfrom #{bt}" 532 end 533 first = false 534 end 535 end 536 537 if false then # the following classes are implemented in C: 538 539 ## 540 # The Sesssion class is defined in romp_helper.so. You should never have 541 # to use it directly. 542 # 543 class Session 544 end 545 546 end # if false 547 548 end