worker_manager.rb - warvox - VoIP based wardialing tool, forked from rapid7/warvox.
(HTM) git clone git://jay.scot/warvox
(DIR) Log
(DIR) Files
(DIR) Refs
(DIR) README
---
worker_manager.rb (4573B)
---
1 #!/usr/bin/env ruby
2 ###################
3
4 #
5 # Load the library path
6 #
7 base = __FILE__
8 while File.symlink?(base)
9 base = File.expand_path(File.readlink(base), File.dirname(base))
10 end
11 $:.unshift(File.join(File.expand_path(File.dirname(base)), '..', 'lib'))
12
13 @worker_path = File.expand_path(File.join(File.dirname(base), "worker.rb"))
14
15 require 'warvox'
16 require 'socket'
17
18 ENV['RAILS_ENV'] ||= 'production'
19
20 $:.unshift(File.join(File.expand_path(File.dirname(base)), '..'))
21 require 'config/boot'
22 require 'config/environment'
23
24
25 @jobs = []
26
27 def stop
28 WarVOX::Log.info("Worker Manager is terminating due to signal")
29
30 unless @jobs.length > 0
31 exit(0)
32 end
33
34 # Update the database
35 Job.update_all({ status: "stopped", completed_at: Time.now.utc}, { id: @jobs.map{|j| j[:id] } })
36
37 # Signal running jobs to shut down
38 @jobs.map{|j| Process.kill("TERM", j[:pid]) rescue nil }
39
40 # Sleep for five seconds
41 sleep(5)
42
43 # Forcibly kill any remaining job processes
44 @jobs.map{|j| Process.kill("KILL", j[:pid]) rescue nil }
45
46 exit(0)
47 end
48
49
50 def clear_zombies
51 while ( r = Process.waitpid(-1, Process::WNOHANG) rescue nil ) do
52 end
53 end
54
55 def schedule_job(j)
56 WarVOX::Log.debug("Worker Manager is launching job #{j.id}")
57 @jobs << {
58 id: j.id,
59 pid: Process.fork { exec("#{@worker_path} #{j.id}") }
60 }
61 end
62
63 def stop_cancelled_jobs
64 jids = []
65 @jobs.each do |x|
66 jids << x[:id]
67 end
68
69 return if jids.length == 0
70 Job.where(status: 'cancelled', id: jids).find_each do |j|
71 job = @jobs.select{ |o| o[:id] == j.id }.first
72 next unless job and job[:pid]
73 pid = job[:pid]
74
75 WarVOX::Log.debug("Worker Manager is killing job #{j.id} with PID #{pid}")
76 Process.kill('TERM', pid)
77 end
78 end
79
80 def clear_completed_jobs
81 dead_pids = []
82 dead_jids = []
83
84 @jobs.each do |j|
85 alive = Process.kill(0, j[:pid]) rescue nil
86 next if alive
87 dead_pids << j[:pid]
88 dead_jids << j[:id]
89 end
90
91 return unless dead_jids.length > 0
92
93 WarVOX::Log.debug("Worker Manager is clearing #{dead_pids.length} completed jobs")
94
95 @jobs = @jobs.reject{|x| dead_pids.include?( x[:pid] ) }
96
97 # Mark failed/crashed jobs as completed
98 Job.where(id: dead_jids, completed_at: nil).update_all({completed_at: Time.now.utc})
99 end
100
101 def clear_stale_jobs
102 jids = @jobs.map{|x| x[:id] }
103 stale = nil
104
105 if jids.length > 0
106 stale = Job.where("completed_at IS NULL AND locked_by LIKE ? AND id NOT IN (?)", Socket.gethostname + "^%", jids)
107 else
108 stale = Job.where("completed_at IS NULL AND locked_by LIKE ?", Socket.gethostname + "^%")
109 end
110
111 dead = []
112 pids = {}
113
114 # Extract the PID from the locked_by cookie for each job
115 stale.each do |j|
116 host, pid, uniq = j.locked_by.to_s.split("^", 3)
117 next unless (pid and uniq)
118 pids[pid] ||= []
119 pids[pid] << j
120 end
121
122 # Identify dead processes (must be same user or root)
123 pids.keys.each do |pid|
124 alive = Process.kill(0, pid.to_i) rescue nil
125 next if alive
126 pids[pid].each do |j|
127 dead << j.id
128 end
129 end
130
131 # Mark these jobs as abandoned
132 if dead.length > 0
133 WarVOX::Log.debug("Worker Manager is marking #{dead.length} jobs as abandoned")
134 Job.where(id: dead).update_all({locked_by: nil, status: 'abandoned'})
135 end
136 end
137
138 def schedule_submitted_jobs
139 loop do
140 # Look for a candidate job with no current owner
141 j = Job.where(status: 'submitted', locked_by: nil).limit(1).first
142 return unless j
143
144 # Try to get a lock on this job
145 Job.where(id: j.id, locked_by: nil).update_all({locked_by: @cookie, locked_at: Time.now.utc, status: 'scheduled'})
146
147 # See if we actually got the lock
148 j = Job.where(id: j.id, status: 'scheduled', locked_by: @cookie).limit(1).first
149
150 # Try again if we lost the race,
151 next unless j
152
153 # Hurray, we got a job, run it
154 schedule_job(j)
155
156 return true
157 end
158 end
159
160 #
161 # Main
162 #
163
164 trap("SIGINT") { Thread.new{ stop } }
165 trap("SIGTERM") { Thread.new{ stop } }
166
167 @cookie = Socket.gethostname + "^" + $$.to_s + "^" + sprintf("%.8x", rand(0x100000000))
168 @max_jobs = 3
169
170
171 WarVOX::Log.info("Worker Manager initialized with cookie #{@cookie}")
172
173 loop do
174 $0 = "warvox manager: #{@jobs.length} active jobs (cookie : #{@cookie})"
175
176 # Clear any zombie processes
177 clear_zombies()
178
179 # Clear any completed jobs
180 clear_completed_jobs()
181
182 # Stop any jobs cancelled by the user
183 stop_cancelled_jobs()
184
185 # Clear locks on any stale jobs from this host
186 clear_stale_jobs()
187
188 while @jobs.length < @max_jobs
189 break unless schedule_submitted_jobs
190 end
191
192 # Sleep between 3-8 seconds before re-entering the loop
193 sleep(rand(5) + 3)
194 end