1 require 'socket' 2 require 'thread' 3 require 'fcntl' 4 require 'romp_helper' 5 6 ## 7 # ROMP - The Ruby Object Message Proxy 8 # @author Paul Brannan 9 # @version 0.1 10 # (C) Copyright 2001 Paul Brannan (cout at rm-f.net) 11 # 12 # <pre> 13 # ROMP is a set of classes for providing distributed object support to a 14 # Ruby program. You may distribute and/or modify it under the same terms as 15 # Ruby (see http://www.ruby-lang.org/en/LICENSE.txt). Example: 16 # 17 # Client 18 # ------ 19 # client = ROMP::Client.new('localhost', 4242) 20 # obj = client.resolve("foo") 21 # puts obj.foo("foo") 22 # obj.oneway(:foo, "foo") 23 # 24 # Server 25 # ------ 26 # class Foo 27 # def foo(x); return x; end 28 # end 29 # obj = Foo.new 30 # server = ROMP::Server.new('localhost', 4242) 31 # server.bind(obj, "foo") 32 # server.thread.join 33 # 34 # You can do all sorts of cool things with ROMP, including passing blocks to 35 # the functions, throwing exceptions and propogating them from server to 36 # client, and more. Unlike CORBA, where you must create an interface 37 # definition and strictly adhere to it, ROMP uses marshalling, so you can 38 # use almost any object with it. But, alas, it is not as powerful as CORBA. 39 # 40 # On a fast machine, you should expect around 7000 messages per second with 41 # normal method calls, up to 10000 messages per second with oneway calls with 42 # sync, and up to 40000 messages per second for oneway calls without sync. 43 # These numbers can vary depending on various factors, so YMMV. 44 # 45 # The ROMP message format is broken into 3 components: 46 # [ msg_type, obj_id, message ] 47 # For each msg_type, there is a specific format to the message. Additionally, 48 # certain msg_types are only valid when being sent to the server, and others 49 # are valid only when being sent back to the client. Here is a summary: 50 # 51 # msg_type send to meaning of obj_id msg format 52 # ---------------------------------------------------------------------------- 53 # REQUEST server obj to talk to [:method, *args] 54 # REQUEST_BLOCK server obj to talk to [:method, *args] 55 # ONEWAY server obj to talk to [:method, *args] 56 # ONEWAY_SYNC server obj to talk to [:method, *args] 57 # RETVAL client always 0 retval 58 # EXCEPTION client always 0 $! 59 # YIELD client always 0 [value, value, ...] 60 # SYNC either 0=request, 1=response nil 61 # NULL_MSG either always 0 n/a 62 # 63 # BUGS: 64 # - On a 2.2 kernel, oneway calls without sync is very slow. 65 # - UDP support does not currently work. 66 # </pre> 67 68 module ROMP 69 70 public 71 72 ## 73 # The ROMP server class. Like its drb equivalent, this class spawns off 74 # a new thread which processes requests, allowing the server to do other 75 # things while it is doing processing for a distributed object. This 76 # means, though, that all objects used with ROMP must be thread-safe. 77 # 78 class Server 79 80 public 81 attr_reader :obj, :thread 82 83 ## 84 # Start a ROMP server. 85 # 86 # @param endpoint An endpoint for the server to listen on; should be specified in URI notation. 87 # @param acceptor A proc object that can accept or reject connections; it should take a Socket as an argument and returns true or false. 88 # @param debug Turns on debugging messages if enabled. 89 # 90 def initialize(endpoint, acceptor=nil, debug=false) 91 @mutex = Mutex.new 92 @debug = debug 93 @resolve_server = Resolve_Server.new 94 @resolve_obj = Resolve_Obj.new(@resolve_server) 95 @resolve_server.register(@resolve_obj) 96 97 @thread = Thread.new do 98 server = Generic_Server.new(endpoint) 99 while(socket = server.accept) 100 puts "Got a connection" if @debug 101 if acceptor then 102 if !acceptor.call(socket) then 103 socket.close 104 next 105 end 106 end 107 puts "Accepted the connection" if @debug 108 session = Session.new(socket) 109 session.set_nonblock(true) 110 Thread.new(socket) do |socket| 111 begin 112 # TODO: Send a sync message to the client so it 113 # knows we are ready to receive data. 114 server_loop(session) 115 rescue Exception 116 ROMP::print_exception($!) if @debug 117 end 118 puts "Connection closed" if @debug 119 end 120 end 121 end 122 end 123 124 ## 125 # Register an object with the server. The object will be given an 126 # id of @next_id, and @next_id will be incremented. We could use the 127 # object's real id, but this is insecure. The supplied object must 128 # be thread-safe. 129 # 130 # @param obj The object to register. 131 # 132 # @return A new Object_Reference that should be returned to the client. 133 # 134 def create_reference(obj) 135 @mutex.synchronize do 136 id = @resolve_server.register(obj) 137 Object_Reference.new(id) #return 138 end 139 end 140 141 ## 142 # Find an object in linear time and unregister it. Be careful with 143 # this function, because the client may not know the object has 144 # gone away. 145 # 146 # @param obj The object to unregister. 147 # 148 def delete_reference(obj) 149 @mutex.synchronize do 150 @resolve_server.unregister(obj) 151 end 152 nil #return 153 end 154 155 ## 156 # Register an object with the server and bind it to name. 157 # 158 # @param obj The object to bind. 159 # @param name The name of to bind the object to. 160 # 161 def bind(obj, name) 162 id = @resolve_server.register(obj) 163 @resolve_server.bind(name, id) 164 nil #return 165 end 166 167 ## 168 # This keeps the client from seeing our objects when they call inspect 169 # 170 alias_method :__inspect__, :inspect 171 def inspect() 172 return "" 173 end 174 175 private 176 if false then # the following functions are implemented in C: 177 178 ## 179 # The server_loop function is the guts of the server. It takes in 180 # requests from the client and forwards them to already-registered 181 # objects. 182 # 183 # @param session The session to run the loop with. 184 # 185 def server_loop(session) 186 end 187 188 end # if false 189 end 190 191 ## 192 # The ROMP client class. A ROMP server must be started on the given 193 # host and port before instantiating a ROMP client. 194 # 195 class Client 196 197 ## 198 # Connect to a ROMP server 199 # 200 # @param endpoint The endpoint the server is listening on. 201 # @param sync Specifies whether to synchronize between threads; turn this off to get a 20% performance boost. 202 # 203 def initialize(endpoint, sync=true) 204 @server = Generic_Client.new(endpoint) 205 @session = Session.new(@server) 206 @session.set_nonblock(true) 207 @mutex = sync ? Mutex.new : Null_Mutex.new 208 @resolve_obj = Proxy_Object.new(@session, @mutex, 0) 209 end 210 211 ## 212 # Given a string, return a proxy object that will forward requests 213 # for an object on the server with that name. 214 # 215 # @param object_name The name of the object to resolve. 216 # 217 # @return A Proxy_Object that can be used to make method calls on the object in the server. 218 # 219 def resolve(object_name) 220 @mutex.synchronize do 221 object_id = @resolve_obj.resolve(object_name) 222 return Proxy_Object.new(@session, @mutex, object_id) 223 end 224 end 225 end 226 227 private 228 229 ## 230 # In case the user does not want synchronization. 231 # 232 class Null_Mutex 233 def synchronize 234 yield 235 end 236 237 def lock 238 end 239 240 def unlock 241 end 242 end 243 244 ## 245 # All the special functions we have to keep track of 246 # 247 class Functions 248 GOOD = [ 249 :inspect, :class_variables, :instance_eval, :instance_variables, 250 :to_a, :to_s 251 ] 252 253 BAD = [ 254 :clone, :dup, :display 255 ] 256 257 METHOD = [ 258 :methods, :private_methods, :protected_methods, :public_methods, 259 :singleton_methods 260 ] 261 262 RESPOND = [ 263 [ :method, "raise NameError" ], 264 [ :respond_to?, "false" ] 265 ] 266 end 267 268 ## 269 # A ROMP::Object_Reference is created on the server side to represent an 270 # object in the system. It can be returned from a server object to a 271 # client object, at which point it is converted into a ROMP::Proxy_Object. 272 # 273 class Object_Reference 274 attr_reader :object_id 275 276 def initialize(object_id) 277 @object_id = object_id 278 end 279 end 280 281 ## 282 # A ROMP::Object acts as a proxy; it forwards most methods to the server 283 # for execution. When you make calls to a ROMP server, you will be 284 # making the calls through a Proxy_Object. 285 # 286 class Proxy_Object 287 288 if false then # the following functions are implemented in C: 289 290 ## 291 # The method_missing function is called for any method that is not 292 # defined on the client side. It will forward requests to the server 293 # for processing, and can iterate through a block, raise an exception, 294 # or return a value. 295 # 296 def method_missing(function, *args) 297 end 298 299 ## 300 # The oneway function is called to make a oneway call to the server 301 # without synchronization. 302 # 303 def onweway(function, *args) 304 end 305 306 ## 307 # The oneway_sync function is called to make a oneway call to the 308 # server with synchronization (the server will return a null message 309 # to the client before it begins processing). This is slightly safer 310 # than a normal oneway call, but it is slower (except on a linux 2.2 311 # kernel; see the bug list above). 312 # 313 def oneway_sync(function, *args) 314 end 315 316 ## 317 # The sync function will synchronize with the server. It sends a sync 318 # request and waits for a response. 319 # 320 def sync() 321 end 322 323 end # if false 324 325 # Make sure certain methods get passed down the wire. 326 Functions::GOOD.each do |method| 327 eval %{ 328 def #{method}(*args) 329 method_missing(:#{method}, *args) #return 330 end 331 } 332 end 333 334 # And make sure others never get called. 335 Functions::BAD.each do |method| 336 eval %{ 337 def #{method}(*args) 338 raise(NameError, 339 "undefined method `#{method}' for " + 340 "\#<#{self.class}:#{self.id}>") 341 end 342 } 343 end 344 345 # And remove these function names from any method lists that get 346 # returned; there's nothing we can do about people who decide to 347 # return them from other functions. 348 Functions::METHOD.each do |method| 349 eval %{ 350 def #{method}(*args) 351 retval = method_missing(:#{method}, *args) 352 retval.each do |item| 353 Functions::BAD.each do |bad| 354 retval.delete(bad.to_s) 355 end 356 end 357 retval #return 358 end 359 } 360 end 361 362 # Same here, except don't let the call go through in the first place. 363 Functions::RESPOND.each do |method, action| 364 eval %{ 365 def #{method}(arg, *args) 366 Functions::BAD.each do |bad| 367 if arg === bad.to_s then 368 return eval("#{action}") 369 end 370 end 371 method_missing(:#{method}, arg, *args) #return 372 end 373 } 374 end 375 376 end 377 378 ## 379 # The Resolve_Server class registers objects for the server. You will 380 # never have to use this class directly. 381 # 382 class Resolve_Server 383 def initialize 384 @next_id = 0 385 @unused_ids = Array.new 386 @id_to_object = Hash.new 387 @name_to_id = Hash.new 388 end 389 390 def register(obj) 391 if @next_id >= Session::MAX_ID then 392 if @unused_ids.size == 0 then 393 raise "Object limit exceeded" 394 else 395 id = @unused_ids.pop 396 end 397 end 398 @id_to_object[@next_id] = obj 399 old_id = @next_id 400 @next_id = @next_id.succ() 401 old_id #return 402 end 403 404 def get_object(object_id) 405 @id_to_object[object_id] #return 406 end 407 408 def unregister(obj) 409 delete_obj_from_array_private(@id_to_object, obj) 410 end 411 412 def bind(name, id) 413 @name_to_id[name] = id 414 end 415 416 def resolve(name) 417 @name_to_id[name] #return 418 end 419 420 def delete_obj_from_array_private(array, obj) 421 index = array.index(obj) 422 array[index] = nil unless index == nil 423 end 424 end 425 426 ## 427 # The Resolve_Obj class handles resolve requests for the client. It is 428 # a special ROMP object with an object id of 0. You will never have to 429 # make calls on it directly, but will instead make calls on it through 430 # the Client object. 431 # 432 class Resolve_Obj 433 def initialize(resolve_server) 434 @resolve_server = resolve_server 435 end 436 437 def resolve(name) 438 @resolve_server.resolve(name) #return 439 end 440 end 441 442 ## 443 # A Generic_Server creates an endpoint to listen on, waits for connections, 444 # and accepts them when requested. It can operate on different kinds of 445 # connections. You will never have to use this object directly. 446 # 447 class Generic_Server 448 def initialize(endpoint) 449 case endpoint 450 when %r{^(tcp)?romp://(.*?):(.*)} 451 @type = "tcp" 452 @host = $2 == "" ? nil : $2 453 @port = $3 454 @server = TCPServer.new(@host, @port) 455 when %r{^(udp)romp://(.*?):(.*)} 456 @type = "udp" 457 @host = $2 == "" ? nil : $2 458 @port = $3 459 @server = UDPSocket.open() 460 @server.bind(@host, @port) 461 @mutex = Mutex.new 462 when %r{^(unix)romp://(.*)} 463 @type = "unix" 464 @path = $2 465 @server = UNIXServer.open(@path) 466 else 467 raise ArgumentError, "Invalid endpoint" 468 end 469 end 470 def accept 471 case @type 472 when "tcp" 473 socket = @server.accept 474 socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1) 475 socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) 476 socket.sync = true 477 socket #return 478 when "udp" 479 @mutex.lock 480 socket = @server 481 socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) 482 socket #return 483 when "unix" 484 socket = @server.accept 485 socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) 486 socket.sync = true 487 socket #return 488 end 489 end 490 end 491 492 ## 493 # A Generic_Client connects to a Generic_Server on a given endpoint. 494 # You will never have to use this object directly. 495 # 496 class Generic_Client 497 def self.new(endpoint) 498 case endpoint 499 when %r{^(tcp)?romp://(.*?):(.*)} 500 socket = TCPSocket.open($2, $3) 501 socket.sync = true 502 socket.setsockopt(Socket::SOL_TCP, Socket::TCP_NODELAY, 1) 503 socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) 504 socket #return 505 when %r{^(udp)romp://(.*?):(.*)} 506 socket = UDPSocket.open 507 socket.connect($2, $3) 508 when %r{^(unix)romp://(.*)} 509 socket = UNIXSocket.open($2) 510 else 511 raise ArgumentError, "Invalid endpoint" 512 end 513 end 514 end 515 516 517 ## 518 # Print an exception to the screen. This is necessary, because Ruby does 519 # not give us access to its error_print function from within Ruby. 520 # 521 # @param exc The exception object to print. 522 # 523 def self.print_exception(exc) 524 first = true 525 $!.backtrace.each do |bt| 526 if first then 527 puts "#{bt}: #{$!} (#{$!.message})" 528 else 529 puts "\tfrom #{bt}" 530 end 531 first = false 532 end 533 end 534 535 if false then # the following classes are implemented in C: 536 537 ## 538 # The Sesssion class is defined in romp_helper.so. You should never have 539 # to use it directly. 540 # 541 class Session 542 end 543 544 end # if false 545 546 end