diff options
| author | mo khan <mo@mokhan.ca> | 2015-10-17 16:06:34 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2015-10-17 16:06:34 -0600 |
| commit | 991fca5de7e62cfa078869f29902c14c24c23678 (patch) | |
| tree | 383b3add44b33a7318df68aeff61f3a8b3ab2cb7 | |
| parent | 1e6a2a3be437ed41f42fb22ea5e0447693dd4305 (diff) | |
use ASYNC ENV var to run asynchronously and trap TERM to stop cassandra reactor thread.
| -rw-r--r-- | Procfile | 16 | ||||
| -rwxr-xr-x | exe/killjoy | 10 | ||||
| -rw-r--r-- | lib/killjoy.rb | 1 | ||||
| -rw-r--r-- | lib/killjoy/after_fork.rb | 5 | ||||
| -rw-r--r-- | lib/killjoy/async_consumer.rb | 30 | ||||
| -rw-r--r-- | lib/killjoy/cassandra_writer.rb | 6 |
6 files changed, 38 insertions, 30 deletions
@@ -6,11 +6,11 @@ worker5: RMQ_SHARD=4 exe/killjoy worker6: RMQ_SHARD=5 exe/killjoy worker7: RMQ_SHARD=6 exe/killjoy worker8: RMQ_SHARD=7 exe/killjoy -worker9: RMQ_SHARD=8 exe/killjoy -worker10: RMQ_SHARD=9 exe/killjoy -worker11: RMQ_SHARD=10 exe/killjoy -worker12: RMQ_SHARD=11 exe/killjoy -worker13: RMQ_SHARD=12 exe/killjoy -worker14: RMQ_SHARD=13 exe/killjoy -worker15: RMQ_SHARD=14 exe/killjoy -worker16: RMQ_SHARD=15 exe/killjoy +worker9: ASYNC=true RMQ_SHARD=8 exe/killjoy +worker10: ASYNC=true RMQ_SHARD=9 exe/killjoy +worker11: ASYNC=true RMQ_SHARD=10 exe/killjoy +worker12: ASYNC=true RMQ_SHARD=11 exe/killjoy +worker13: ASYNC=true RMQ_SHARD=12 exe/killjoy +worker14: ASYNC=true RMQ_SHARD=13 exe/killjoy +worker15: ASYNC=true RMQ_SHARD=14 exe/killjoy +worker16: ASYNC=true RMQ_SHARD=15 exe/killjoy diff --git a/exe/killjoy b/exe/killjoy index a9bdac5..0ad795a 100755 --- a/exe/killjoy +++ b/exe/killjoy @@ -11,6 +11,8 @@ require "statsd-ruby" cpus = Facter.value('processors')['count'].to_i +run_asynchronously = ENV["ASYNC"] == 'true' + Sneakers.configure({ amqp: ENV.fetch("AMQP_URL", "amqp://guest:guest@localhost:5672"), exchange: 'shard.killjoy', @@ -22,7 +24,7 @@ Sneakers.configure({ #timeout_job_after: 5, #workers: 1, env: ENV.fetch('ENV', 'development'), - ack: ENV["ASYNC"] != "true", + ack: run_asynchronously == false, durable: true, prefetch: cpus, threads: cpus, @@ -31,4 +33,8 @@ Sneakers.configure({ } }) Sneakers.logger.level = Logger::INFO -Sneakers::Runner.new([ Killjoy::Consumer ]).run +if run_asynchronously + Sneakers::Runner.new([ Killjoy::AsyncConsumer ]).run +else + Sneakers::Runner.new([ Killjoy::Consumer ]).run +end diff --git a/lib/killjoy.rb b/lib/killjoy.rb index 44cfbbe..4329cab 100644 --- a/lib/killjoy.rb +++ b/lib/killjoy.rb @@ -7,6 +7,7 @@ require "spank" require "virtus" require "killjoy/after_fork" +require "killjoy/async_consumer" require "killjoy/cassandra_db" require "killjoy/cassandra_writer" require "killjoy/consumer" diff --git a/lib/killjoy/after_fork.rb b/lib/killjoy/after_fork.rb index b9797a0..6585775 100644 --- a/lib/killjoy/after_fork.rb +++ b/lib/killjoy/after_fork.rb @@ -6,6 +6,11 @@ module Killjoy Spank::IOC.bind_to(container) Spank::IOC.resolve(:session).execute("select * from system.hints;") end + + Signal.trap("TERM") do + Spank::IOC.resolve(:session).close + Spank::IOC.resolve(:cluster).close + end rescue => error puts [error.message, error.backtrace].inspect end diff --git a/lib/killjoy/async_consumer.rb b/lib/killjoy/async_consumer.rb index 7b0b7df..3cdbe06 100644 --- a/lib/killjoy/async_consumer.rb +++ b/lib/killjoy/async_consumer.rb @@ -1,20 +1,24 @@ module Killjoy - class AyncConsumer + class AsyncConsumer include Sneakers::Worker from_queue "sharding: shard.killjoy - rabbit@localhost - #{ENV.fetch("RMQ_SHARD", "1")}" def work_with_params(raw_message, delivery_info, metadata) - tag_closure = delivery_info.delivery_tag message = JSON.parse(raw_message, symbolize_names: true) + writes = writers.map do |writer| + writer.write(message) + end + process(::Cassandra::Future.all(writes), delivery_info.delivery_tag) + end - batch = batch_for(message) - future = session.execute_async(batch) + private - future.on_success do |values| - channel.acknowledge(tag_closure, false) + def process(future, tag) + future.on_success do |rows| + channel.acknowledge(tag, false) end future.on_failure do |error| - channel.reject(tag_closure, false) + channel.reject(tag, false) end end @@ -29,17 +33,5 @@ module Killjoy def writers @writers ||= Spank::IOC.resolve_all(:writer) end - - private - - def batch_for(json) - session.batch do |batch| - writers.each do |writer| - writer.save(json) do |statement, parameters| - batch.add(statement, parameters) - end - end - end - end end end diff --git a/lib/killjoy/cassandra_writer.rb b/lib/killjoy/cassandra_writer.rb index 45fc952..329ff70 100644 --- a/lib/killjoy/cassandra_writer.rb +++ b/lib/killjoy/cassandra_writer.rb @@ -9,7 +9,11 @@ module Killjoy def write(message) batch = batch_for(message) - session.execute(batch) + if ENV['ASYNC'] == 'true' + session.execute_async(batch) + else + session.execute(batch) + end end private |
