require 'rcs-common/trace' require 'rcs-common/systemstatus' require 'fileutils' require_relative 'extractor' require_relative 'pool' module RCS module Connector module Dispatcher extend RCS::Tracer extend self include SystemStatusMixin def run @pool = Pool.new loop_and_wait do ConnectorQueue.scopes.each do |scope| next if @pool.has_thread?(scope) @pool.defer(scope) { dispatch(scope) } end change_status(:ok, @pool.empty? ? "Idle" : "Working") end end def loop_and_wait loop do yield sleep(30) end end # @warning: Exceptions are suppressed here def dispatch(scope) loop do connector_queue = ConnectorQueue.take(scope) break unless connector_queue process(connector_queue) connector_queue.destroy end rescue Exception => e trace :error, "Exception in dispatcher thread #{scope}: [#{e.class}] #{e.message}, #{e.backtrace}" change_status(:error, "Some errors occured. Check the logfile.") end def process(connector_queue) trace :debug, "Processing #{connector_queue}" connector = connector_queue.connector data = connector_queue.data type = connector_queue.type if !connector trace :warn, "Was about to process #{connector_queue}, but the connector is missing." return end archive_node = connector.archive_node if type == :send_sync_event archive_node.send_sync_event(event: data['event'], params: data['params'], agent_id: data['path'].last) return end evidence = connector_queue.evidence if !evidence trace :warn, "Was about to process #{connector_queue}, but the evidence is missing." return end if type == :send_evidence archive_node.send_evidence(evidence, path: data['path']) elsif type == :dump_evidence dump(evidence, connector) end end def dump(evidence, connector) trace :info, "Export evidence #{evidence.id} (#{evidence.type.to_s.upcase}) with connector #{connector.name.inspect}" Extractor.new(evidence, connector.dest, connector.format).dump end end end end .