summaryrefslogtreecommitdiff
path: root/lib/killjoy/consumer.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/killjoy/consumer.rb')
-rw-r--r--lib/killjoy/consumer.rb33
1 files changed, 28 insertions, 5 deletions
diff --git a/lib/killjoy/consumer.rb b/lib/killjoy/consumer.rb
index c39d3b0..6f91e24 100644
--- a/lib/killjoy/consumer.rb
+++ b/lib/killjoy/consumer.rb
@@ -3,11 +3,34 @@ module Killjoy
include Sneakers::Worker
from_queue "sharding: shard.killjoy - rabbit@localhost - #{ENV.fetch("RMQ_SHARD", "1")}"
- def work(message)
- log ["session:", session.object_id].inspect
- batch = batch_for(JSON.parse(message, symbolize_names: true))
- session.execute(batch)
- ack!
+ if ENV["ASYNC"] == "true"
+ def work_with_params(message, delivery_info, metadata)
+ log ["ASYNC session:", session.object_id].inspect
+
+ tag_closure = delivery_info.delivery_tag
+ batch = batch_for(JSON.parse(message, symbolize_names: true))
+ future = session.execute_async(batch)
+
+ future.on_success do |values|
+ log ["SUCCESS", values]
+ channel.acknowledge(tag_closure, false)
+ end
+ future.on_failure do |error|
+ log ["FAIL", error]
+ channel.reject(tag_closure, false)
+ end
+ end
+
+ def channel
+ @channel ||= @queue.instance_variable_get("@channel")
+ end
+ else
+ def work(message)
+ log ["session:", session.object_id].inspect
+ batch = batch_for(JSON.parse(message, symbolize_names: true))
+ session.execute(batch)
+ ack!
+ end
end
def session