diff options
Diffstat (limited to 'lib/killjoy/consumer.rb')
| -rw-r--r-- | lib/killjoy/consumer.rb | 33 |
1 files changed, 28 insertions, 5 deletions
diff --git a/lib/killjoy/consumer.rb b/lib/killjoy/consumer.rb index c39d3b0..6f91e24 100644 --- a/lib/killjoy/consumer.rb +++ b/lib/killjoy/consumer.rb @@ -3,11 +3,34 @@ module Killjoy include Sneakers::Worker from_queue "sharding: shard.killjoy - rabbit@localhost - #{ENV.fetch("RMQ_SHARD", "1")}" - def work(message) - log ["session:", session.object_id].inspect - batch = batch_for(JSON.parse(message, symbolize_names: true)) - session.execute(batch) - ack! + if ENV["ASYNC"] == "true" + def work_with_params(message, delivery_info, metadata) + log ["ASYNC session:", session.object_id].inspect + + tag_closure = delivery_info.delivery_tag + batch = batch_for(JSON.parse(message, symbolize_names: true)) + future = session.execute_async(batch) + + future.on_success do |values| + log ["SUCCESS", values] + channel.acknowledge(tag_closure, false) + end + future.on_failure do |error| + log ["FAIL", error] + channel.reject(tag_closure, false) + end + end + + def channel + @channel ||= @queue.instance_variable_get("@channel") + end + else + def work(message) + log ["session:", session.object_id].inspect + batch = batch_for(JSON.parse(message, symbolize_names: true)) + session.execute(batch) + ack! + end end def session |
