diff options
| author | mo khan <mo@mokhan.ca> | 2015-02-07 12:33:06 -0700 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2015-02-07 12:33:06 -0700 |
| commit | 65145e6be87df7b0440fb98c11abc23a2b3c49ed (patch) | |
| tree | 026e5260bf03be83d38d454028023316a555a1ea | |
| parent | 2bbe4091b513ab40b0540a1315c8946ade21b476 (diff) | |
use a single topic exchange instead of multiple fanout exchanges.
| -rw-r--r-- | app/controllers/agents/events_controller.rb | 4 | ||||
| -rw-r--r-- | app/controllers/agents/files_controller.rb | 9 | ||||
| -rw-r--r-- | app/controllers/dispositions_controller.rb | 3 | ||||
| -rw-r--r-- | app/jobs/fingerprint_lookup_job.rb | 1 | ||||
| -rw-r--r-- | app/services/publisher.rb | 6 | ||||
| -rw-r--r-- | app/workers/cloud_queries.rb | 17 | ||||
| -rw-r--r-- | lib/tasks/rabbitmq.rake | 30 |
7 files changed, 36 insertions, 34 deletions
diff --git a/app/controllers/agents/events_controller.rb b/app/controllers/agents/events_controller.rb index abe9c04..8b1d158 100644 --- a/app/controllers/agents/events_controller.rb +++ b/app/controllers/agents/events_controller.rb @@ -11,7 +11,9 @@ module Agents end def create - Publisher.publish("events", event_params.merge({agent_id: @agent.id})) + message = event_params.merge({agent_id: @agent.id}) + routing_key = "events.#{event_params[:name]}.#{@agent.id}" + Publisher.publish(routing_key, message) redirect_to agent_events_url, notice: 'Event was successfully created.' end diff --git a/app/controllers/agents/files_controller.rb b/app/controllers/agents/files_controller.rb index 76051cf..3c2e47b 100644 --- a/app/controllers/agents/files_controller.rb +++ b/app/controllers/agents/files_controller.rb @@ -16,12 +16,19 @@ module Agents name: params[:name], data: params[:data] }) + + message = { + agent_id: params[:id], + name: params[:name], + data: params[:data] + } + Publisher.publish("events.scanned.#{@agent.id}", message) end private def load_agent - Agent.find(params[:agent_id]) + @agent = Agent.find(params[:agent_id]) end end end diff --git a/app/controllers/dispositions_controller.rb b/app/controllers/dispositions_controller.rb index 2581d80..116b7bb 100644 --- a/app/controllers/dispositions_controller.rb +++ b/app/controllers/dispositions_controller.rb @@ -26,7 +26,8 @@ class DispositionsController < ApplicationController # POST /dispositions # POST /dispositions.json def create - Publisher.publish("poke", disposition_params) + fingerprint = disposition_params[:fingerprint] + Publisher.publish("commands.poke.#{fingerprint}", disposition_params) respond_to do |format| format.html { redirect_to dispositions_path, notice: 'Disposition was successfully created.' } diff --git a/app/jobs/fingerprint_lookup_job.rb b/app/jobs/fingerprint_lookup_job.rb index 25a17f6..849d9cb 100644 --- a/app/jobs/fingerprint_lookup_job.rb +++ b/app/jobs/fingerprint_lookup_job.rb @@ -9,7 +9,6 @@ class FingerprintLookupJob < ActiveJob::Base apiKey: ENV.fetch("VIRUS_TOTAL_API_KEY"), }) report = JSON.parse(response.response_body) - puts report.inspect disposition = Disposition.find_by(fingerprint: fingerprint) disposition.file_reports.create!(data: report) end diff --git a/app/services/publisher.rb b/app/services/publisher.rb index ef940e8..704f1e3 100644 --- a/app/services/publisher.rb +++ b/app/services/publisher.rb @@ -1,7 +1,7 @@ class Publisher - def self.publish(exchange, message = {}) - exchange = channel.fanout("malwer.#{exchange}") - exchange.publish(message.to_json) + def self.publish(routing_key, message = {}) + exchange = channel.topic("malwer") + exchange.publish(message.to_json, routing_key: routing_key) end def self.channel diff --git a/app/workers/cloud_queries.rb b/app/workers/cloud_queries.rb index e944142..fbbe5c4 100644 --- a/app/workers/cloud_queries.rb +++ b/app/workers/cloud_queries.rb @@ -11,21 +11,8 @@ class CloudQueries fingerprint = attributes["data"]["fingerprint"] disposition = Disposition.find_by(fingerprint: fingerprint) - publish(JSON.generate({ - name: :scanned, - agent_id: attributes["agent_id"], - data: attributes["data"] - }), to_queue: "worker.events") - - if disposition.nil? - #publish(JSON.generate({ - #command: :request_analysis, - #agent_id: attributes["agent_id"], - #fingerprint: fingerprint, - #}), routing_key: "malwer.commands") - Disposition.create!(fingerprint: fingerprint, state: :unknown) - FingerprintLookupJob.perform_later(fingerprint) - end + Disposition.create!(fingerprint: fingerprint, state: :unknown) if disposition.nil? + FingerprintLookupJob.perform_later(fingerprint) if disposition.state == :unknown ack! end diff --git a/lib/tasks/rabbitmq.rake b/lib/tasks/rabbitmq.rake index 0fbe1c7..22b401c 100644 --- a/lib/tasks/rabbitmq.rake +++ b/lib/tasks/rabbitmq.rake @@ -6,20 +6,26 @@ namespace :rabbitmq do connection.start channel = connection.create_channel - # event intake bindings - exchange = channel.fanout("malwer.events") - queue = channel.queue("worker.events", durable: true) - queue.bind("malwer.events") + # single malwer topic exchange + # routing keys: + # * commands.command_type.(agent_id/fingerprint) + # * commands can be issued for specific agents + # * commands can be issued globally. (e.g. poke a dispostion) + # * events.event_type.agent_id - # poke bindings - exchange = channel.fanout("malwer.poke") - queue = channel.queue("worker.poke", durable: true) - queue.bind("malwer.poke") + channel.topic("malwer").tap do |exchange| + # event intake bindings + queue = channel.queue("worker.events", durable: true) + queue.bind(exchange, routing_key: "events.#") - # cloud queries bindings - exchange = channel.fanout("malwer.queries") - queue = channel.queue("worker.queries", durable: true) - queue.bind("malwer.queries") + # poke bindings + queue = channel.queue("worker.poke", durable: true) + queue.bind(exchange, routing_key: "commands.poke.#") + + # cloud queries bindings + queue = channel.queue("worker.queries", durable: true) + queue.bind(exchange, routing_key: 'events.scanned.#') + end connection.close end |
