require 'rexml/document' require 'drb/drb' require 'xmpp4r' require 'monitor' require 'openssl' module XMPP module HTTPBind def log(s) # $stderr.puts '[' + Time.now.asctime + ' ** ' + Process.pid.to_s + ' ** ' + Thread.current.inspect + '] ' + s end module_function :log # # Interpret the passed xml request and interact with the Communication Manager. # Returns a XML repsonse or an exception. # def parse(xml_string) xml = REXML::Document.new(xml_string) #Jabber::debug = true response = nil root_attrs = xml.root.attributes if xml.root.name == 'body' session_id = root_attrs['sid'] # Not in new session request! type = root_attrs['type'] if session_id == nil # create new session response = ConnectionManager.get.connect(xml_string) else response = ConnectionManager.get.send_xml(session_id, xml_string) if type == 'terminate' ConnectionManager.get.terminate(session_id) end end #DRb.stop_service else raise end response end module_function :parse class Daemon def Daemon.start(logfile_name = nil, app_name = nil) # raise Exception, 'Not using fork' if pid = fork Process.wait(pid) return true # We are the parent end Process::setsid trap 'SIGHUP', 'IGNORE' trap 'SIGTERM', 'IGNORE' trap 'SIGUSR1', 'IGNORE' trap 'SIGUSR2', 'IGNORE' exit!(0) if fork trap 'SIGHUP', 'IGNORE' trap 'SIGTERM', 'IGNORE' trap 'SIGUSR1', 'IGNORE' trap 'SIGUSR2', 'IGNORE' $0 = app_name if app_name #Dir::chdir("/") File::umask(0) # Make sure all file descriptors are closed ObjectSpace.each_object(IO) do |io| unless [STDIN, STDOUT, STDERR].include?(io) begin unless io.closed? io.close end rescue ::Exception end end end begin Daemon.redirect_io(logfile_name) yield if block_given? rescue Object => e XMPP::HTTPBind::log 'Problem in start' + e.inspect XMPP::HTTPBind::log e.backtrace.join("\n") end return false # we are the child end def Daemon.redirect_io(logfile_name) STDIN.reopen "/dev/null" rescue nil if logfile_name begin STDOUT.reopen logfile_name, "a" STDOUT.sync = true rescue ::Exception STDOUT.reopen "/dev/null" rescue nil end else STDOUT.reopen "/dev/null" rescue nil end STDERR.reopen STDOUT rescue nil STDERR.sync = true $stdout = STDOUT $stderr = STDERR end end # # A DRb server that manages XMPP sessions. # class ConnectionManager STARTING = 0 STARTED = 1 STOPPING = 2 STOPPED = 3 URI = 'druby://127.0.0.1:61676' #private_class_method :new # # Factory/locator method to get a reference to the single ConnectionManager # instance shared between all FCGI processes. Tries to start a server in a # daemon process. Falls back to a local server if fork() isn't implemented. # def ConnectionManager.get cm = nil # Stop race conditions try_lock() { begin # First, just try and connect to an existing CM # DRb.start_service cm = DRbObject.new_with_uri(URI) # Force contact with connection_manager XMPP::HTTPBind::log "ping" cm.ping XMPP::HTTPBind::log 'Found CM' rescue Exception => e XMPP::HTTPBind::log 'Starting CM because: ' + e.inspect # No connection_manager exists, try to start one in a daemon process. # This only works on Unix derivatives. begin parent = Daemon.start('cm.log') { # Daemon code here XMPP::HTTPBind::log '*** Server PID: ' + Process.pid.to_s begin cm = new() DRb.start_service(URI, cm, { :verbose => true }) DRb.thread.join() rescue Object => e XMPP::HTTPBind::log 'Exception in DRb server: ' + e.inspect end XMPP::HTTPBind::log 'Quitting DRb server' exit!(0) } sleep 2 cm = DRbObject.new_with_uri(URI) # Force contact with connection_manager cm.ping rescue Exception => e # Start a local service cm = new() DRb.start_service(URI, cm) sleep 2 cm = DRbObject.new_with_uri(URI) end end } return cm end # # Used to see if the ConnectionManager DRb server is running # def ping # Need to synchronize on a state variable and sleep until it transitions # then return a STARTED indication. Also rename from 'ping' to # something more meaningful XMPP::HTTPBind::log "pong" end # # Get a new session id # def new_sid synchronize do (@id_seed += 1).to_s end end # # Create a new connection as requested in the passed XML # def connect(xml_string) xml = REXML::Document.new(xml_string) root_attrs = xml.root.attributes request_id = root_attrs['rid'].to_i # Mandatory to = root_attrs['to'] # Should be in new session request lang = root_attrs['xml:lang'] || 'en' # Should be in new session request wait = (root_attrs['wait'] || '0').to_i hold = (root_attrs['hold'] || '0').to_i secure = (root_attrs['secure'] || 'false') === 'true' route = root_attrs['route'] if route host = route.gsub(/[^:]*:([^:]*):.*/, '\1') port = route.gsub(/[^:]*:[^:]*:([^:]*)/, '\1').to_i else host = to port = 5222 end XMPP::HTTPBind::log "to=#{to}, port=#{port}" sid = new_sid session = CMSession.new(host, port, secure, to, wait, lang, hold, sid) set_session sid, session return session.connect end # # Send the passed request using the session identified with the passed sid # def send_xml(sid, xml_string) session = get_session sid if session != nil return session.send_xml(xml_string) else raise RHBNotFoundException, "No such session: #{sid}" end return nil end # # Close the session identified by the passed sid # def terminate(sid) XMPP::HTTPBind::log "CM Terminate Session #{sid}" session = get_session sid @sessions.synchronize do if session != nil session.terminate @sessions.delete(sid) end if @sessions.size == 0 # TODO: a bit of a race condition here. Should be grabbing the file lock. XMPP::HTTPBind::log 'Stopping server' DRb.stop_service end end end # # Get the inter-process lock (private) # def ConnectionManager.try_lock(options={}) # Set default options lockfile_path = options[:lock_file] || 'rhb.lock' retries = options[:retries] || 10 retry_period = options[:retry_period] || 0.5 # Shared or exclusive lock? locking_method = options[:readonly_lock] ? File::LOCK_SH : File::LOCK_EX retries.times do |attempt| lockfile = File.open(lockfile_path, "a") locked = lockfile.flock(locking_method | File::LOCK_NB) if locked then begin lockfile.truncate(0) lockfile.puts(Process.pid) lockfile.flush retval = yield lockfile.close return retval rescue Exception => ex lockfile.close raise ex end else lockfile.close rescue nil # Calculate exponential random backoff ala ethernet backoff_time = rand * retry_period * (2 ** attempt) STDERR.puts("Lock on '#{lockfile_path}' failed (pid:#{Process.pid}) - " + "#{attempt+1}/#{retries} (backing off " + "#{sprintf("%.2f", backoff_time)} seconds)") sleep(backoff_time) end end # If we get here, we're out of retries raise "Locking Error" end private_class_method :try_lock # # 'new' is private. All creation must go through ConnectionManager.get # def initialize() @state = STARTING @sessions = {} @sessions.extend(MonitorMixin) # Make @sessions synchronized @id_seed = Time.now.to_i self.extend(MonitorMixin) # Create reaper thread Thread.new { while true sleep 1 begin reap_sleepy_sessions rescue Exception => e end end } XMPP::HTTPBind::log 'CM Initialized' end # # Get the session object associated with the passed sid # def get_session(sid) session = nil @sessions.synchronize do session = @sessions[sid] end session end private :get_session # # Associate the passed session object with the given sid # def set_session(sid, session) @sessions.synchronize do @sessions[sid] = session end end private :set_session # # Find inactive sessions and terminate them # def reap_sleepy_sessions sleepy_sessions = [] @sessions.synchronize do @sessions.each { |sid, session| if session.sleepy? sleepy_sessions << sid end } sleepy_sessions.each { |sid| XMPP::HTTPBind::log 'Reaping session' terminate(sid) } end end private :reap_sleepy_sessions end # # Indicates that the web server should return a 403 # class RHBForbiddenException < RuntimeError end # # Indicates that the web server should return a 404 # class RHBNotFoundException < RuntimeError end # # A queue with a condition variable # class ConditionQueue < Array include MonitorMixin def initialize super() @cond = new_cond end def wait(time=nil) @cond.wait(time) end def signal @cond.signal end def count_waiters @cond.count_waiters end end # # Holds an individual XMPP session # # Class private to ConnectionManager # class CMSession < ::Jabber::Stream INACTIVITY = 90 # Disconnect from XMPP server after this many seconds of inactivity REQUESTS = 1 # Allow this many simlutaneous requests POLLING = 2 # Can't issue more than this many requests/sec MAX_WAIT = 30 # Some HTTP server environments will kill a connection if a response is not generated within this number of seconds def initialize(host, port, secure, to, wait, lang, hold, sid) super() @host = host @to = to @wait = wait < MAX_WAIT ? wait : MAX_WAIT @lang = lang @hold = hold < REQUESTS ? hold : REQUESTS @sid = sid @port = port @secure = secure @last_used = Time.now @requestq = ConditionQueue.new @resultq = ConditionQueue.new @sender_thread = Thread.new { begin @requestq.synchronize do while true while @requestq.size == 0 @requestq.wait end while @requestq.size > 0 send @requestq.shift end end end rescue exception => e end } XMPP::HTTPBind::log 'Initialized session' end # # return true if the session hasn't been used during the last # INACTIVITY sconds. # def sleepy? @resultq.synchronize do (@resultq.count_waiters == 0) && (Time.now - @last_used > INACTIVITY) end end # # Connect to the XMPP server # def connect XMPP::HTTPBind::log 'Connecting' @last_used = Time.now response = REXML::Document.new @check_next_poll = false begin add_xml_callback(150, 'CMSession') { |e| @resultq.synchronize do @resultq << e @resultq.signal end true } socket = TCPSocket.new(@host, @port) if (@secure) # Create SSL socket ssl_context = OpenSSL::SSL::SSLContext.new() unless ssl_context.verify_mode ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE end sslsocket = OpenSSL::SSL::SSLSocket.new(socket, ssl_context) sslsocket.sync_close = true sslsocket.connect # Make REXML believe its a real socket. class << sslsocket def kind_of?(o) o == IO ? true : super end end @socket = sslsocket else @socket = socket end start(@socket) # Queue send stream start @requestq.synchronize do @requestq << "" @requestq.signal end # Wait for reply @resultq.synchronize do if @resultq.size == 0 @resultq.wait(@wait) end if @resultq.size > 0 # dump the stanzas on the floor @resultq.clear root = response.add_element('body', { 'wait'=>@wait.to_s, 'sid'=>@sid, 'authid'=>@streamid.to_s, 'requests'=>REQUESTS.to_s, # Client can't use more than 2 simultaneous requests. 'inactivity'=>INACTIVITY.to_s, # Client must call at least once every 30 secs. 'polling'=>POLLING.to_s, # Client shouldn't poll more than once every 2 secs. 'xmlns'=>'http://jabber.org/protocol/httpbind' }) else raise RHBNotFoundException, 'Failed to connect to server' end end rescue Exception => e XMPP::HTTPBind::log 'Exception in connect: ' + e.inspect raise end XMPP::HTTPBind::log 'Returning from connect()' return response.to_s end # # Terminate the session # def terminate begin XMPP::HTTPBind::log 'Terminating' send("") sleep 0.2 close @sender_thread.kill rescue Exception => e end end # # Queue some stanzas for sending and wait (a little while) for incoming # stanzas # def send_xml(xml_string) begin XMPP::HTTPBind::log 'Sending' xml = REXML::Document.new(xml_string) rid = xml.root.attributes['rid'] # Get the request ID previous_time = @last_used @last_used = Time.now req = '' xml.root.each { |e| req += e.to_s } # Add stuff to the requestq and wake it up. if req != '' @requestq.synchronize do @requestq << req @requestq.signal end end response = REXML::Document.new root = response.add_element('body', {'xmlns'=>'http://jabber.org/protocol/httpbind'}) @resultq.synchronize do # if there aren't already the maximum number of waiters and there is # nothing in the resultq, hang around if @resultq.count_waiters >= @hold @resultq.signal elsif @resultq.size == 0 @resultq.wait(@wait) end # if @resultq.count_waiters < @hold && @resultq.size == 0 # Jabber::debuglog 'Going bye byes' # @resultq.wait(@wait) @last_used = Time.now # end # Could get here either because there is stuff on the queue or # because we timed-out while @resultq.size > 0 root.add_element @resultq.shift end end XMPP::HTTPBind::log 'Returning from send' return response.to_s rescue Exception => e XMPP::HTTPBind::log 'Exception in sendxml: ' + e.inspect raise e end end end end end