diff options
| author | mo khan <mo@mokhan.ca> | 2015-10-17 13:48:01 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2015-10-17 13:48:01 -0600 |
| commit | 3a0b09d2248d33d42994854a90073926efe93fbc (patch) | |
| tree | 6b0251c5a7c8813ce6041ea136b0ee1d56600e83 | |
| parent | f15f5ac6e62f7552005582206255ed8b1a1952ef (diff) | |
add option to run workers using futures api.
| -rwxr-xr-x | exe/killjoy | 2 | ||||
| -rw-r--r-- | lib/killjoy/consumer.rb | 33 |
2 files changed, 29 insertions, 6 deletions
diff --git a/exe/killjoy b/exe/killjoy index 6190a69..33a4049 100755 --- a/exe/killjoy +++ b/exe/killjoy @@ -40,7 +40,7 @@ Sneakers.configure({ #timeout_job_after: 5, #workers: 1, env: ENV.fetch('ENV', 'development'), - ack: true, + ack: ENV["ASYNC"] != "true", durable: true, prefetch: Facter.value('processors')['count'].to_i, threads: Facter.value('processors')['count'].to_i, diff --git a/lib/killjoy/consumer.rb b/lib/killjoy/consumer.rb index c39d3b0..6f91e24 100644 --- a/lib/killjoy/consumer.rb +++ b/lib/killjoy/consumer.rb @@ -3,11 +3,34 @@ module Killjoy include Sneakers::Worker from_queue "sharding: shard.killjoy - rabbit@localhost - #{ENV.fetch("RMQ_SHARD", "1")}" - def work(message) - log ["session:", session.object_id].inspect - batch = batch_for(JSON.parse(message, symbolize_names: true)) - session.execute(batch) - ack! + if ENV["ASYNC"] == "true" + def work_with_params(message, delivery_info, metadata) + log ["ASYNC session:", session.object_id].inspect + + tag_closure = delivery_info.delivery_tag + batch = batch_for(JSON.parse(message, symbolize_names: true)) + future = session.execute_async(batch) + + future.on_success do |values| + log ["SUCCESS", values] + channel.acknowledge(tag_closure, false) + end + future.on_failure do |error| + log ["FAIL", error] + channel.reject(tag_closure, false) + end + end + + def channel + @channel ||= @queue.instance_variable_get("@channel") + end + else + def work(message) + log ["session:", session.object_id].inspect + batch = batch_for(JSON.parse(message, symbolize_names: true)) + session.execute(batch) + ack! + end end def session |
