diff options
| -rw-r--r-- | app/workers/cassandra_writer.rb | 2 | ||||
| -rw-r--r-- | app/workers/cloud_queries.rb | 7 | ||||
| -rw-r--r-- | lib/tasks/rabbitmq.rake | 4 |
3 files changed, 8 insertions, 5 deletions
diff --git a/app/workers/cassandra_writer.rb b/app/workers/cassandra_writer.rb index e8ee29d..0d4057a 100644 --- a/app/workers/cassandra_writer.rb +++ b/app/workers/cassandra_writer.rb @@ -1,6 +1,6 @@ class CassandraWriter include Sneakers::Worker - from_queue "worker.events" + from_queue "worker.cassandra" def work(event_json) ack! diff --git a/app/workers/cloud_queries.rb b/app/workers/cloud_queries.rb index a1fa68d..5dd757f 100644 --- a/app/workers/cloud_queries.rb +++ b/app/workers/cloud_queries.rb @@ -9,10 +9,9 @@ class CloudQueries attributes = JSON.parse(json) fingerprint = attributes["data"]["fingerprint"] - disposition = Disposition.find_by(fingerprint: fingerprint) - - disposition = Disposition.create!(fingerprint: fingerprint, state: :unknown) if disposition.nil? - FingerprintLookupJob.perform_later(fingerprint) if disposition.state == :unknown + disposition = Disposition.find_by(fingerprint: fingerprint) || + Disposition.create!(fingerprint: fingerprint, state: :unknown) + FingerprintLookupJob.perform_later(fingerprint) if disposition.unknown? ack! end diff --git a/lib/tasks/rabbitmq.rake b/lib/tasks/rabbitmq.rake index 22b401c..c7baa19 100644 --- a/lib/tasks/rabbitmq.rake +++ b/lib/tasks/rabbitmq.rake @@ -25,6 +25,10 @@ namespace :rabbitmq do # cloud queries bindings queue = channel.queue("worker.queries", durable: true) queue.bind(exchange, routing_key: 'events.scanned.#') + + # cassandra worker bindings + queue = channel.queue("worker.cassandra", durable: true) + queue.bind(exchange, routing_key: 'events.scanned.#') end connection.close |
