More job mgmt overhaul work - warvox - Unnamed repository; edit this file 'description' to name the repository. (DIR) Log (DIR) Files (DIR) Refs (DIR) README --- (DIR) commit aa07abc9e7aaabc1723625d72daac6dd9e1edfcd (DIR) parent 6da2bec72d9c7e6c7e8114fc4dfba5ac5e30598e (HTM) Author: HD Moore <hd_moore@rapid7.com> Date: Mon, 31 Dec 2012 19:10:51 -0600 More job mgmt overhaul work Diffstat: M db/migrate/20121228171549_initial_… | 1 + M lib/warvox/jobs/analysis.rb | 162 ++++++++++++++----------------- M lib/warvox/jobs/dialer.rb | 184 ++++++++++++++++--------------- 3 files changed, 167 insertions(+), 180 deletions(-) --- (DIR) diff --git a/db/migrate/20121228171549_initial_schema.rb b/db/migrate/20121228171549_initial_schema.rb @@ -87,6 +87,7 @@ class InitialSchema < ActiveRecord::Migration t.integer "provider_id", :null => false t.boolean "answered" t.boolean "busy" + t.text "error" t.integer "audio_length" t.integer "ring_length" t.text "caller_id" (DIR) diff --git a/lib/warvox/jobs/analysis.rb b/lib/warvox/jobs/analysis.rb @@ -6,12 +6,7 @@ class Analysis < Base require 'tempfile' require 'open3' - @@kissfft_loaded = false - begin - require 'kissfft' - @@kissfft_loaded = true - rescue ::LoadError - end + require 'kissfft' class Classifier @@ -39,59 +34,91 @@ class Analysis < Base 'analysis' end - def initialize(job_id) - @name = job_id - if(not @@kissfft_loaded) - raise RuntimeError, "The KissFFT module is not available, analysis failed" - end + def initialize(job_id, conf) + @job_id = job_id + @conf = conf + @tasks = [] + @calls = [] end - def get_job - ::DialJob.find(@name) + def stop + @calls = [] + @tasks.each do |t| + t.kill rescue nil + end + @tasks = [] end def start - @status = 'active' - - begin - start_processing() - - model = get_job - model.processed = true + @calls = [] - db_save(model) + query = nil - stop() + ::ActiveRecord::Base.connection_pool.with_connection { - rescue ::Exception => e - $stderr.puts "Exception in the job queue: #{e.class} #{e} #{e.backtrace}" + job = Job.find(@job_id) + if not job + raise RuntimeError, "The parent job no longer exists" end - end - def stop - @status = 'completed' - end + case @conf[:scope] + when 'job' + if @conf[:force] + query = {:job_id => job.id, :answered => true, :busy => false} + else + query = {:job_id => job.id, :answered => true, :busy => false, :analysis_started_at => nil} + end + when 'project' + if @conf[:force] + query = {:project_id => job.project_id, :answered => true, :busy => false} + else + query = {:project_id => job.project_id, :answered => true, :busy => false, :analysis_started_at => nil} + end + when 'global' + if @conf[:force] + query = {:answered => true, :busy => false} + else + query = {:answered => true, :busy => false, :analysis_started_at => nil} + end + end - def start_processing - jobs = ::DialResult.where(:dial_job_id => @name, :processed => false, :completed => true, :busy => false).all max_threads = WarVOX::Config.analysis_threads + last_update = Time.now + + @total_calls = Call.count(:conditions => query) + @completed_calls = 0 - while(not jobs.empty?) - threads = [] - output = [] - 1.upto(max_threads) do - j = jobs.shift || break - output << j - threads << Thread.new { run_analyze_call(j) } + Call.find_each(:conditions => query) do |call| + while @tasks.length < max_threads + call.analysis_started_at = Time.now.utc + call.analysis_job_id = job.id + @tasks << Thread.new(call) { |c| ::ActiveRecord::Base.connection_pool.with_connection { run_analyze_call(c) }} end + clear_stale_tasks - # Wait for the threads to complete - threads.each {|t| t.join} + # Update progress every 10 seconds or so + if Time.now.to_f - last_update.to_f > 10 + update_progress((@completed_calls / @total_calls.to_f) * 100) + last_update = Time.now + end - # Save the results to the database - output.each {|r| db_save(r) if r.processed } + clear_zombies() end + + } + end + + def clear_stale_tasks + # Remove dead threads from the task list + @tasks = @tasks.select{ |x| x.status } + IO.select(nil, nil, nil, 0.25) + end + + def update_progress(pct) + ::ActiveRecord::Base.connection_pool.with_connection { + Job.update({ :progress => pct }, { :id => @job_id }) + } end def run_analyze_call(dr) @@ -121,8 +148,7 @@ class Analysis < Base end end - dr.processed_at = Time.now - dr.processed = true + dr.analysis_completed_at = Time.now.utc rescue ::Interrupt ensure @@ -131,8 +157,9 @@ class Analysis < Base end mr.save + dr.save - true + @completed_calls += 1 end # Takes the raw file path as an argument, returns a hash @@ -341,52 +368,5 @@ class Analysis < Base end -class CallAnalysis < Analysis - - @@kissfft_loaded = false - begin - require 'kissfft' - @@kissfft_loaded = true - rescue ::LoadError - end - - def type - 'call_analysis' - end - - def initialize(result_id) - @name = result_id - if(not @@kissfft_loaded) - raise RuntimeError, "The KissFFT module is not available, analysis failed" - end - end - - def get_job - ::DialResult.find(@name) - end - - def start - @status = 'active' - - begin - start_processing() - stop() - rescue ::Exception => e - $stderr.puts "Exception in the job queue: #{e.class} #{e} #{e.backtrace}" - end - end - - def stop - @status = 'completed' - end - - def start_processing - r = get_job() - return if not r.completed - return if r.busy - analyze_call(r) - end -end - end end (DIR) diff --git a/lib/warvox/jobs/dialer.rb b/lib/warvox/jobs/dialer.rb @@ -8,18 +8,21 @@ class Dialer < Base 'dialer' end - def initialize(job_id) - @name = job_id - @job = get_job - @range = @job.range - @seconds = @job.seconds - @lines = @job.lines + def initialize(job_id, conf) + @job_id = job_id + @conf = conf + @range = @conf[:range] + @seconds = @conf[:seconds] + @lines = @conf[:lines] @nums = shuffle_a(WarVOX::Phone.crack_mask(@range)) + @tasks = [] + @provs = get_providers + # CallerID modes (SELF or a mask) - @cid_self = @job.cid_mask == 'SELF' + @cid_self = @conf[:cid_mask] == 'SELF' if(not @cid_self) - @cid_range = WarVOX::Phone.crack_mask(@job.cid_mask) + @cid_range = WarVOX::Phone.crack_mask(@conf[:cid_mask]) end end @@ -44,52 +47,34 @@ class Dialer < Base def get_providers res = [] - ::Provider.where(:enabled => true).all.each do |prov| - info = { - :name => prov.name, - :id => prov.id, - :port => prov.port, - :host => prov.host, - :user => prov.user, - :pass => prov.pass, - :lines => prov.lines - } - 1.upto(prov.lines) {|i| res.push(info) } - end + ::ActiveRecord::Base.connection_pool.with_connection { + ::Provider.where(:enabled => true).all.each do |prov| + info = { + :name => prov.name, + :id => prov.id, + :port => prov.port, + :host => prov.host, + :user => prov.user, + :pass => prov.pass, + :lines => prov.lines + } + 1.upto(prov.lines) {|i| res.push(info) } + end + } shuffle_a(res) end - def get_job - ::DialJob.find(@name) - end - - def start - begin - - model = get_job - model.status = 'active' - model.started_at = Time.now - db_save(model) - - start_dialing() - - stop() - - rescue ::Exception => e - $stderr.puts "Exception in the job queue: #{$e.class} #{e} #{e.backtrace}" - end - end def stop - @status = 'completed' - model = get_job - model.status = 'completed' - model.completed_at = Time.now - db_save(model) + @nums = [] + @tasks.each do |t| + t.kill rescue nil + end + @tasks = [] end - def start_dialing + def start # Scrub all numbers matching the blacklist list = WarVOX::Config.blacklist_load list.each do |b| @@ -102,24 +87,19 @@ class Dialer < Base end end + last_update = Time.now @nums_total = @nums.length - while(@nums.length > 0) - @calls = [] - @provs = get_providers - tasks = [] - max_tasks = [@provs.length, @lines].min - 1.upto(max_tasks) do - tasks << Thread.new do + max_tasks = [@provs.length, @lines].min - Thread.current.kill if @nums.length == 0 - Thread.current.kill if @provs.length == 0 + while(@nums.length > 0) + while( @tasks.length < max_tasks ) do + tnum = @nums.shift + break unless tnum - num = @nums.shift - prov = @provs.shift + tprov = allocate_provider - Thread.current.kill if not num - Thread.current.kill if not prov + @tasks << Thread.new(tnum,tprov) do |num,prov| out_fd = Tempfile.new("rawfile") out = out_fd.path @@ -165,30 +145,36 @@ class Dialer < Base end end - res = ::DialResult.new - res.number = num - res.cid = cid - res.dial_job_id = @name - res.provider_id = prov[:id] - res.completed = (fail == 0) ? true : false - res.busy = (busy == 1) ? true : false - res.seconds = (byte / 16000) # 8khz @ 16-bit - res.ringtime = ring - res.processed = false - res.save - - if(File.exists?(out)) - File.open(out, "rb") do |fd| - med = res.media - med.audio = fd.read(fd.stat.size) - med.save + :ActiveRecord::Base.connection_pool.with_connection do + job = Job.find(@job_id) + if not job + raise RuntimeError, "The parent job is not available" end - end - out_fd.close - ::FileUtils.rm_f(out) + res = ::Call.new + res.number = num + res.job_id = job.id + res.project_id = job.project_id + res.provider_id = prov[:id] + res.answered = (fail == 0) ? true : false + res.busy = (busy == 1) ? true : false + res.audio_seconds = (byte / 16000) # 8khz @ 16-bit + res.ring_seconds = ring + res.caller_id = cid + + res.save + + if(File.exists?(out)) + File.open(out, "rb") do |fd| + med = res.media + med.audio = fd.read(fd.stat.size) + med.save + end + end - @calls << res + out_fd.close + ::FileUtils.rm_f(out) + end rescue ::Exception => e $stderr.puts "ERROR: #{e.class} #{e} #{e.backtrace} #{num} #{prov.inspect}" @@ -198,24 +184,44 @@ class Dialer < Base # END NEW THREAD end # END SPAWN THREADS - tasks.map{|t| t.join if t} - # Iterate through the results - @calls.each do |r| - db_save(r) - end + clear_stale_tasks - # Update the progress bar - model = get_job - model.progress = ((@nums_total - @nums.length) / @nums_total.to_f) * 100 - db_save(model) + # Update progress every 10 seconds or so + if Time.now.to_f - last_update.to_f > 10 + update_progress(((@nums_total - @nums.length) / @nums_total.to_f) * 100) + last_update = Time.now.to_f + end clear_zombies() end + while @tasks.length > 0 + clear_stale_tasks + end + # ALL DONE end + def clear_stale_tasks + # Remove dead threads from the task list + @tasks = @tasks.select{ |x| x.status } + IO.select(nil, nil, nil, 0.25) + end + + def update_progress(pct) + ::ActiveRecord::Base.connection_pool.with_connection { + Job.update({ :progress => pct }, { :id => @job_id }) + } + end + + def allocate_provider + @prov_idx ||= 0 + prov = @provs[ @prov_idx % @provs.length ] + @prov_idx += 1 + prov + end + end end end