worker_manager.rb - warvox - VoIP based wardialing tool, forked from rapid7/warvox.
 (HTM) git clone git://jay.scot/warvox
 (DIR) Log
 (DIR) Files
 (DIR) Refs
 (DIR) README
       ---
       worker_manager.rb (4573B)
       ---
            1 #!/usr/bin/env ruby
            2 ###################
            3 
            4 #
            5 # Load the library path
            6 #
            7 base = __FILE__
            8 while File.symlink?(base)
            9   base = File.expand_path(File.readlink(base), File.dirname(base))
           10 end
           11 $:.unshift(File.join(File.expand_path(File.dirname(base)), '..', 'lib'))
           12 
           13 @worker_path = File.expand_path(File.join(File.dirname(base), "worker.rb"))
           14 
           15 require 'warvox'
           16 require 'socket'
           17 
           18 ENV['RAILS_ENV'] ||= 'production'
           19 
           20 $:.unshift(File.join(File.expand_path(File.dirname(base)), '..'))
           21 require 'config/boot'
           22 require 'config/environment'
           23 
           24 
           25 @jobs = []
           26 
           27 def stop
           28   WarVOX::Log.info("Worker Manager is terminating due to signal")
           29 
           30   unless @jobs.length > 0
           31     exit(0)
           32   end
           33 
           34   # Update the database
           35   Job.update_all({ status: "stopped", completed_at: Time.now.utc}, { id: @jobs.map{|j| j[:id] } })
           36 
           37   # Signal running jobs to shut down
           38   @jobs.map{|j| Process.kill("TERM", j[:pid]) rescue nil }
           39 
           40   # Sleep for five seconds
           41   sleep(5)
           42 
           43   # Forcibly kill any remaining job processes
           44   @jobs.map{|j| Process.kill("KILL", j[:pid]) rescue nil }
           45 
           46   exit(0)
           47 end
           48 
           49 
           50 def clear_zombies
           51   while ( r = Process.waitpid(-1, Process::WNOHANG) rescue nil ) do
           52   end
           53 end
           54 
           55 def schedule_job(j)
           56   WarVOX::Log.debug("Worker Manager is launching job #{j.id}")
           57   @jobs <<  {
           58     id: j.id,
           59     pid: Process.fork { exec("#{@worker_path} #{j.id}") }
           60   }
           61 end
           62 
           63 def stop_cancelled_jobs
           64   jids = []
           65   @jobs.each do |x|
           66     jids << x[:id]
           67   end
           68 
           69   return if jids.length == 0
           70   Job.where(status: 'cancelled', id: jids).find_each do |j|
           71     job = @jobs.select{ |o| o[:id] == j.id }.first
           72     next unless job and job[:pid]
           73     pid = job[:pid]
           74 
           75     WarVOX::Log.debug("Worker Manager is killing job #{j.id} with PID #{pid}")
           76     Process.kill('TERM', pid)
           77   end
           78 end
           79 
           80 def clear_completed_jobs
           81   dead_pids = []
           82   dead_jids = []
           83 
           84   @jobs.each do |j|
           85     alive = Process.kill(0, j[:pid]) rescue nil
           86     next if alive
           87     dead_pids << j[:pid]
           88     dead_jids << j[:id]
           89   end
           90 
           91   return unless dead_jids.length > 0
           92 
           93   WarVOX::Log.debug("Worker Manager is clearing #{dead_pids.length} completed jobs")
           94 
           95   @jobs = @jobs.reject{|x| dead_pids.include?( x[:pid] ) }
           96 
           97   # Mark failed/crashed jobs as completed
           98   Job.where(id: dead_jids, completed_at: nil).update_all({completed_at: Time.now.utc})
           99 end
          100 
          101 def clear_stale_jobs
          102   jids  = @jobs.map{|x| x[:id] }
          103   stale = nil
          104 
          105   if jids.length > 0
          106     stale = Job.where("completed_at IS NULL AND locked_by LIKE ? AND id NOT IN (?)", Socket.gethostname + "^%", jids)
          107   else
          108     stale = Job.where("completed_at IS NULL AND locked_by LIKE ?", Socket.gethostname + "^%")
          109   end
          110 
          111   dead = []
          112   pids = {}
          113 
          114   # Extract the PID from the locked_by cookie for each job
          115   stale.each do |j|
          116     host, pid, uniq = j.locked_by.to_s.split("^", 3)
          117     next unless (pid and uniq)
          118     pids[pid] ||= []
          119     pids[pid]  << j
          120   end
          121 
          122   # Identify dead processes (must be same user or root)
          123   pids.keys.each do |pid|
          124     alive =  Process.kill(0, pid.to_i) rescue nil
          125     next if alive
          126     pids[pid].each do |j|
          127       dead << j.id
          128     end
          129   end
          130 
          131   # Mark these jobs as abandoned
          132   if dead.length > 0
          133     WarVOX::Log.debug("Worker Manager is marking #{dead.length} jobs as abandoned")
          134     Job.where(id: dead).update_all({locked_by: nil, status: 'abandoned'})
          135   end
          136 end
          137 
          138 def schedule_submitted_jobs
          139   loop do
          140     # Look for a candidate job with no current owner
          141     j  = Job.where(status: 'submitted', locked_by: nil).limit(1).first
          142     return unless j
          143 
          144     # Try to get a lock on this job
          145     Job.where(id: j.id, locked_by: nil).update_all({locked_by: @cookie, locked_at: Time.now.utc, status: 'scheduled'})
          146 
          147     # See if we actually got the lock
          148     j  = Job.where(id: j.id, status: 'scheduled', locked_by: @cookie).limit(1).first
          149 
          150     # Try again if we lost the race,
          151     next unless j
          152 
          153     # Hurray, we got a job, run it
          154     schedule_job(j)
          155 
          156     return true
          157   end
          158 end
          159 
          160 #
          161 # Main
          162 #
          163 
          164 trap("SIGINT")  { Thread.new{ stop } }
          165 trap("SIGTERM") { Thread.new{ stop } }
          166 
          167 @cookie   = Socket.gethostname + "^" + $$.to_s + "^" + sprintf("%.8x", rand(0x100000000))
          168 @max_jobs = 3
          169 
          170 
          171 WarVOX::Log.info("Worker Manager initialized with cookie #{@cookie}")
          172 
          173 loop do
          174   $0 = "warvox manager: #{@jobs.length} active jobs (cookie : #{@cookie})"
          175 
          176   # Clear any zombie processes
          177   clear_zombies()
          178 
          179   # Clear any completed jobs
          180   clear_completed_jobs()
          181 
          182   # Stop any jobs cancelled by the user
          183   stop_cancelled_jobs()
          184 
          185   # Clear locks on any stale jobs from this host
          186   clear_stale_jobs()
          187 
          188   while @jobs.length < @max_jobs
          189     break unless schedule_submitted_jobs
          190   end
          191 
          192   # Sleep between 3-8 seconds before re-entering the loop
          193   sleep(rand(5) + 3)
          194 end