summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2015-02-03 21:10:08 -0700
committermo khan <mo@mokhan.ca>2015-02-03 21:10:08 -0700
commitbab5e66e261774700eb3ad515eb929fc85991a69 (patch)
tree64d52d6825bee7d50b580a7450f4f41cb8ee4f0d
parent59d4503063717dcb6e5eada94976e72c466483cc (diff)
poke disposition in worker.
-rw-r--r--Procfile3
-rw-r--r--app/controllers/dispositions_controller.rb21
-rw-r--r--app/models/disposition.rb1
-rw-r--r--app/workers/events_worker.rb5
-rw-r--r--app/workers/poke.rb16
-rwxr-xr-xbin/setup3
-rw-r--r--lib/tasks/rabbitmq.rake11
7 files changed, 36 insertions, 24 deletions
diff --git a/Procfile b/Procfile
index 10c91ff..c1b3754 100644
--- a/Procfile
+++ b/Procfile
@@ -1,2 +1,3 @@
web: rails s
-worker: env WORKERS=EventsWorker rake sneakers:run
+event_intake: env WORKERS=EventsWorker rake sneakers:run
+poke: env WORKERS=Poke rake sneakers:run
diff --git a/app/controllers/dispositions_controller.rb b/app/controllers/dispositions_controller.rb
index 2e5a1a3..2581d80 100644
--- a/app/controllers/dispositions_controller.rb
+++ b/app/controllers/dispositions_controller.rb
@@ -26,30 +26,21 @@ class DispositionsController < ApplicationController
# POST /dispositions
# POST /dispositions.json
def create
- @disposition = Disposition.new(disposition_params)
+ Publisher.publish("poke", disposition_params)
respond_to do |format|
- if @disposition.save
- format.html { redirect_to @disposition, notice: 'Disposition was successfully created.' }
- format.json { render :show, status: :created, location: @disposition }
- else
- format.html { render :new }
- format.json { render json: @disposition.errors, status: :unprocessable_entity }
- end
+ format.html { redirect_to dispositions_path, notice: 'Disposition was successfully created.' }
+ format.json { head :no_content }
end
end
# PATCH/PUT /dispositions/1
# PATCH/PUT /dispositions/1.json
def update
+ Publisher.publish("poke", disposition_params)
respond_to do |format|
- if @disposition.update(disposition_params)
- format.html { redirect_to @disposition, notice: 'Disposition was successfully updated.' }
- format.json { render :show, status: :ok, location: @disposition }
- else
- format.html { render :edit }
- format.json { render json: @disposition.errors, status: :unprocessable_entity }
- end
+ format.html { redirect_to dispositions_path, notice: 'Disposition was successfully updated.' }
+ format.json { head :no_content }
end
end
diff --git a/app/models/disposition.rb b/app/models/disposition.rb
index 84dae18..1bf43c7 100644
--- a/app/models/disposition.rb
+++ b/app/models/disposition.rb
@@ -3,6 +3,7 @@ class Disposition < ActiveRecord::Base
attr_readonly :fingerprint
validates_uniqueness_of :fingerprint
+ validates_presence_of :fingerprint, :state
def to_param
fingerprint
diff --git a/app/workers/events_worker.rb b/app/workers/events_worker.rb
index 3fcb2c8..dc85009 100644
--- a/app/workers/events_worker.rb
+++ b/app/workers/events_worker.rb
@@ -2,10 +2,7 @@ require 'json'
class EventsWorker
include Sneakers::Worker
- from_queue "worker.events",
- threads: 50,
- prefetch: 50,
- timeout_job_after: 1
+ from_queue "worker.events"
def work(event_json)
logger.info event_json
diff --git a/app/workers/poke.rb b/app/workers/poke.rb
new file mode 100644
index 0000000..f7142ff
--- /dev/null
+++ b/app/workers/poke.rb
@@ -0,0 +1,16 @@
+require 'json'
+
+class Poke
+ include Sneakers::Worker
+ from_queue "worker.poke"
+
+ def work(json)
+ attributes = JSON.parse(json)
+
+ disposition = Disposition.find_or_create_by(fingerprint: attributes["fingerprint"])
+ disposition.state = attributes["state"]
+ disposition.save!
+
+ ack!
+ end
+end
diff --git a/bin/setup b/bin/setup
index acdb2c1..1b6f5a6 100755
--- a/bin/setup
+++ b/bin/setup
@@ -20,6 +20,9 @@ Dir.chdir APP_ROOT do
puts "\n== Preparing database =="
system "bin/rake db:setup"
+ puts "\n== Preparing RabbitMQ bindings =="
+ system "bin/rake rabbitmq:setup"
+
puts "\n== Removing old logs and tempfiles =="
system "rm -f log/*"
system "rm -rf tmp/cache"
diff --git a/lib/tasks/rabbitmq.rake b/lib/tasks/rabbitmq.rake
index 185f993..42cfce9 100644
--- a/lib/tasks/rabbitmq.rake
+++ b/lib/tasks/rabbitmq.rake
@@ -6,13 +6,16 @@ namespace :rabbitmq do
connection.start
channel = connection.create_channel
- # create exchange
+ # event intake bindings
exchange = channel.fanout("malwer.events")
-
- # get or create queue (note the durable setting)
queue = channel.queue("worker.events", durable: true)
- # bind queue to exchange
queue.bind("malwer.events")
+
+ # poke bindings
+ exchange = channel.fanout("malwer.poke")
+ queue = channel.queue("worker.poke", durable: true)
+ queue.bind("malwer.poke")
+
connection.close
end
end