summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2015-10-17 13:48:01 -0600
committermo khan <mo@mokhan.ca>2015-10-17 13:48:01 -0600
commit3a0b09d2248d33d42994854a90073926efe93fbc (patch)
tree6b0251c5a7c8813ce6041ea136b0ee1d56600e83
parentf15f5ac6e62f7552005582206255ed8b1a1952ef (diff)
add option to run workers using futures api.
-rwxr-xr-xexe/killjoy2
-rw-r--r--lib/killjoy/consumer.rb33
2 files changed, 29 insertions, 6 deletions
diff --git a/exe/killjoy b/exe/killjoy
index 6190a69..33a4049 100755
--- a/exe/killjoy
+++ b/exe/killjoy
@@ -40,7 +40,7 @@ Sneakers.configure({
#timeout_job_after: 5,
#workers: 1,
env: ENV.fetch('ENV', 'development'),
- ack: true,
+ ack: ENV["ASYNC"] != "true",
durable: true,
prefetch: Facter.value('processors')['count'].to_i,
threads: Facter.value('processors')['count'].to_i,
diff --git a/lib/killjoy/consumer.rb b/lib/killjoy/consumer.rb
index c39d3b0..6f91e24 100644
--- a/lib/killjoy/consumer.rb
+++ b/lib/killjoy/consumer.rb
@@ -3,11 +3,34 @@ module Killjoy
include Sneakers::Worker
from_queue "sharding: shard.killjoy - rabbit@localhost - #{ENV.fetch("RMQ_SHARD", "1")}"
- def work(message)
- log ["session:", session.object_id].inspect
- batch = batch_for(JSON.parse(message, symbolize_names: true))
- session.execute(batch)
- ack!
+ if ENV["ASYNC"] == "true"
+ def work_with_params(message, delivery_info, metadata)
+ log ["ASYNC session:", session.object_id].inspect
+
+ tag_closure = delivery_info.delivery_tag
+ batch = batch_for(JSON.parse(message, symbolize_names: true))
+ future = session.execute_async(batch)
+
+ future.on_success do |values|
+ log ["SUCCESS", values]
+ channel.acknowledge(tag_closure, false)
+ end
+ future.on_failure do |error|
+ log ["FAIL", error]
+ channel.reject(tag_closure, false)
+ end
+ end
+
+ def channel
+ @channel ||= @queue.instance_variable_get("@channel")
+ end
+ else
+ def work(message)
+ log ["session:", session.object_id].inspect
+ batch = batch_for(JSON.parse(message, symbolize_names: true))
+ session.execute(batch)
+ ack!
+ end
end
def session