summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2015-02-07 12:33:06 -0700
committermo khan <mo@mokhan.ca>2015-02-07 12:33:06 -0700
commit65145e6be87df7b0440fb98c11abc23a2b3c49ed (patch)
tree026e5260bf03be83d38d454028023316a555a1ea
parent2bbe4091b513ab40b0540a1315c8946ade21b476 (diff)
use a single topic exchange instead of multiple fanout exchanges.
-rw-r--r--app/controllers/agents/events_controller.rb4
-rw-r--r--app/controllers/agents/files_controller.rb9
-rw-r--r--app/controllers/dispositions_controller.rb3
-rw-r--r--app/jobs/fingerprint_lookup_job.rb1
-rw-r--r--app/services/publisher.rb6
-rw-r--r--app/workers/cloud_queries.rb17
-rw-r--r--lib/tasks/rabbitmq.rake30
7 files changed, 36 insertions, 34 deletions
diff --git a/app/controllers/agents/events_controller.rb b/app/controllers/agents/events_controller.rb
index abe9c04..8b1d158 100644
--- a/app/controllers/agents/events_controller.rb
+++ b/app/controllers/agents/events_controller.rb
@@ -11,7 +11,9 @@ module Agents
end
def create
- Publisher.publish("events", event_params.merge({agent_id: @agent.id}))
+ message = event_params.merge({agent_id: @agent.id})
+ routing_key = "events.#{event_params[:name]}.#{@agent.id}"
+ Publisher.publish(routing_key, message)
redirect_to agent_events_url, notice: 'Event was successfully created.'
end
diff --git a/app/controllers/agents/files_controller.rb b/app/controllers/agents/files_controller.rb
index 76051cf..3c2e47b 100644
--- a/app/controllers/agents/files_controller.rb
+++ b/app/controllers/agents/files_controller.rb
@@ -16,12 +16,19 @@ module Agents
name: params[:name],
data: params[:data]
})
+
+ message = {
+ agent_id: params[:id],
+ name: params[:name],
+ data: params[:data]
+ }
+ Publisher.publish("events.scanned.#{@agent.id}", message)
end
private
def load_agent
- Agent.find(params[:agent_id])
+ @agent = Agent.find(params[:agent_id])
end
end
end
diff --git a/app/controllers/dispositions_controller.rb b/app/controllers/dispositions_controller.rb
index 2581d80..116b7bb 100644
--- a/app/controllers/dispositions_controller.rb
+++ b/app/controllers/dispositions_controller.rb
@@ -26,7 +26,8 @@ class DispositionsController < ApplicationController
# POST /dispositions
# POST /dispositions.json
def create
- Publisher.publish("poke", disposition_params)
+ fingerprint = disposition_params[:fingerprint]
+ Publisher.publish("commands.poke.#{fingerprint}", disposition_params)
respond_to do |format|
format.html { redirect_to dispositions_path, notice: 'Disposition was successfully created.' }
diff --git a/app/jobs/fingerprint_lookup_job.rb b/app/jobs/fingerprint_lookup_job.rb
index 25a17f6..849d9cb 100644
--- a/app/jobs/fingerprint_lookup_job.rb
+++ b/app/jobs/fingerprint_lookup_job.rb
@@ -9,7 +9,6 @@ class FingerprintLookupJob < ActiveJob::Base
apiKey: ENV.fetch("VIRUS_TOTAL_API_KEY"),
})
report = JSON.parse(response.response_body)
- puts report.inspect
disposition = Disposition.find_by(fingerprint: fingerprint)
disposition.file_reports.create!(data: report)
end
diff --git a/app/services/publisher.rb b/app/services/publisher.rb
index ef940e8..704f1e3 100644
--- a/app/services/publisher.rb
+++ b/app/services/publisher.rb
@@ -1,7 +1,7 @@
class Publisher
- def self.publish(exchange, message = {})
- exchange = channel.fanout("malwer.#{exchange}")
- exchange.publish(message.to_json)
+ def self.publish(routing_key, message = {})
+ exchange = channel.topic("malwer")
+ exchange.publish(message.to_json, routing_key: routing_key)
end
def self.channel
diff --git a/app/workers/cloud_queries.rb b/app/workers/cloud_queries.rb
index e944142..fbbe5c4 100644
--- a/app/workers/cloud_queries.rb
+++ b/app/workers/cloud_queries.rb
@@ -11,21 +11,8 @@ class CloudQueries
fingerprint = attributes["data"]["fingerprint"]
disposition = Disposition.find_by(fingerprint: fingerprint)
- publish(JSON.generate({
- name: :scanned,
- agent_id: attributes["agent_id"],
- data: attributes["data"]
- }), to_queue: "worker.events")
-
- if disposition.nil?
- #publish(JSON.generate({
- #command: :request_analysis,
- #agent_id: attributes["agent_id"],
- #fingerprint: fingerprint,
- #}), routing_key: "malwer.commands")
- Disposition.create!(fingerprint: fingerprint, state: :unknown)
- FingerprintLookupJob.perform_later(fingerprint)
- end
+ Disposition.create!(fingerprint: fingerprint, state: :unknown) if disposition.nil?
+ FingerprintLookupJob.perform_later(fingerprint) if disposition.state == :unknown
ack!
end
diff --git a/lib/tasks/rabbitmq.rake b/lib/tasks/rabbitmq.rake
index 0fbe1c7..22b401c 100644
--- a/lib/tasks/rabbitmq.rake
+++ b/lib/tasks/rabbitmq.rake
@@ -6,20 +6,26 @@ namespace :rabbitmq do
connection.start
channel = connection.create_channel
- # event intake bindings
- exchange = channel.fanout("malwer.events")
- queue = channel.queue("worker.events", durable: true)
- queue.bind("malwer.events")
+ # single malwer topic exchange
+ # routing keys:
+ # * commands.command_type.(agent_id/fingerprint)
+ # * commands can be issued for specific agents
+ # * commands can be issued globally. (e.g. poke a dispostion)
+ # * events.event_type.agent_id
- # poke bindings
- exchange = channel.fanout("malwer.poke")
- queue = channel.queue("worker.poke", durable: true)
- queue.bind("malwer.poke")
+ channel.topic("malwer").tap do |exchange|
+ # event intake bindings
+ queue = channel.queue("worker.events", durable: true)
+ queue.bind(exchange, routing_key: "events.#")
- # cloud queries bindings
- exchange = channel.fanout("malwer.queries")
- queue = channel.queue("worker.queries", durable: true)
- queue.bind("malwer.queries")
+ # poke bindings
+ queue = channel.queue("worker.poke", durable: true)
+ queue.bind(exchange, routing_key: "commands.poke.#")
+
+ # cloud queries bindings
+ queue = channel.queue("worker.queries", durable: true)
+ queue.bind(exchange, routing_key: 'events.scanned.#')
+ end
connection.close
end