summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2015-10-17 16:06:34 -0600
committermo khan <mo@mokhan.ca>2015-10-17 16:06:34 -0600
commit991fca5de7e62cfa078869f29902c14c24c23678 (patch)
tree383b3add44b33a7318df68aeff61f3a8b3ab2cb7
parent1e6a2a3be437ed41f42fb22ea5e0447693dd4305 (diff)
use ASYNC ENV var to run asynchronously and trap TERM to stop cassandra reactor thread.
-rw-r--r--Procfile16
-rwxr-xr-xexe/killjoy10
-rw-r--r--lib/killjoy.rb1
-rw-r--r--lib/killjoy/after_fork.rb5
-rw-r--r--lib/killjoy/async_consumer.rb30
-rw-r--r--lib/killjoy/cassandra_writer.rb6
6 files changed, 38 insertions, 30 deletions
diff --git a/Procfile b/Procfile
index b68ee8f..5779403 100644
--- a/Procfile
+++ b/Procfile
@@ -6,11 +6,11 @@ worker5: RMQ_SHARD=4 exe/killjoy
worker6: RMQ_SHARD=5 exe/killjoy
worker7: RMQ_SHARD=6 exe/killjoy
worker8: RMQ_SHARD=7 exe/killjoy
-worker9: RMQ_SHARD=8 exe/killjoy
-worker10: RMQ_SHARD=9 exe/killjoy
-worker11: RMQ_SHARD=10 exe/killjoy
-worker12: RMQ_SHARD=11 exe/killjoy
-worker13: RMQ_SHARD=12 exe/killjoy
-worker14: RMQ_SHARD=13 exe/killjoy
-worker15: RMQ_SHARD=14 exe/killjoy
-worker16: RMQ_SHARD=15 exe/killjoy
+worker9: ASYNC=true RMQ_SHARD=8 exe/killjoy
+worker10: ASYNC=true RMQ_SHARD=9 exe/killjoy
+worker11: ASYNC=true RMQ_SHARD=10 exe/killjoy
+worker12: ASYNC=true RMQ_SHARD=11 exe/killjoy
+worker13: ASYNC=true RMQ_SHARD=12 exe/killjoy
+worker14: ASYNC=true RMQ_SHARD=13 exe/killjoy
+worker15: ASYNC=true RMQ_SHARD=14 exe/killjoy
+worker16: ASYNC=true RMQ_SHARD=15 exe/killjoy
diff --git a/exe/killjoy b/exe/killjoy
index a9bdac5..0ad795a 100755
--- a/exe/killjoy
+++ b/exe/killjoy
@@ -11,6 +11,8 @@ require "statsd-ruby"
cpus = Facter.value('processors')['count'].to_i
+run_asynchronously = ENV["ASYNC"] == 'true'
+
Sneakers.configure({
amqp: ENV.fetch("AMQP_URL", "amqp://guest:guest@localhost:5672"),
exchange: 'shard.killjoy',
@@ -22,7 +24,7 @@ Sneakers.configure({
#timeout_job_after: 5,
#workers: 1,
env: ENV.fetch('ENV', 'development'),
- ack: ENV["ASYNC"] != "true",
+ ack: run_asynchronously == false,
durable: true,
prefetch: cpus,
threads: cpus,
@@ -31,4 +33,8 @@ Sneakers.configure({
}
})
Sneakers.logger.level = Logger::INFO
-Sneakers::Runner.new([ Killjoy::Consumer ]).run
+if run_asynchronously
+ Sneakers::Runner.new([ Killjoy::AsyncConsumer ]).run
+else
+ Sneakers::Runner.new([ Killjoy::Consumer ]).run
+end
diff --git a/lib/killjoy.rb b/lib/killjoy.rb
index 44cfbbe..4329cab 100644
--- a/lib/killjoy.rb
+++ b/lib/killjoy.rb
@@ -7,6 +7,7 @@ require "spank"
require "virtus"
require "killjoy/after_fork"
+require "killjoy/async_consumer"
require "killjoy/cassandra_db"
require "killjoy/cassandra_writer"
require "killjoy/consumer"
diff --git a/lib/killjoy/after_fork.rb b/lib/killjoy/after_fork.rb
index b9797a0..6585775 100644
--- a/lib/killjoy/after_fork.rb
+++ b/lib/killjoy/after_fork.rb
@@ -6,6 +6,11 @@ module Killjoy
Spank::IOC.bind_to(container)
Spank::IOC.resolve(:session).execute("select * from system.hints;")
end
+
+ Signal.trap("TERM") do
+ Spank::IOC.resolve(:session).close
+ Spank::IOC.resolve(:cluster).close
+ end
rescue => error
puts [error.message, error.backtrace].inspect
end
diff --git a/lib/killjoy/async_consumer.rb b/lib/killjoy/async_consumer.rb
index 7b0b7df..3cdbe06 100644
--- a/lib/killjoy/async_consumer.rb
+++ b/lib/killjoy/async_consumer.rb
@@ -1,20 +1,24 @@
module Killjoy
- class AyncConsumer
+ class AsyncConsumer
include Sneakers::Worker
from_queue "sharding: shard.killjoy - rabbit@localhost - #{ENV.fetch("RMQ_SHARD", "1")}"
def work_with_params(raw_message, delivery_info, metadata)
- tag_closure = delivery_info.delivery_tag
message = JSON.parse(raw_message, symbolize_names: true)
+ writes = writers.map do |writer|
+ writer.write(message)
+ end
+ process(::Cassandra::Future.all(writes), delivery_info.delivery_tag)
+ end
- batch = batch_for(message)
- future = session.execute_async(batch)
+ private
- future.on_success do |values|
- channel.acknowledge(tag_closure, false)
+ def process(future, tag)
+ future.on_success do |rows|
+ channel.acknowledge(tag, false)
end
future.on_failure do |error|
- channel.reject(tag_closure, false)
+ channel.reject(tag, false)
end
end
@@ -29,17 +33,5 @@ module Killjoy
def writers
@writers ||= Spank::IOC.resolve_all(:writer)
end
-
- private
-
- def batch_for(json)
- session.batch do |batch|
- writers.each do |writer|
- writer.save(json) do |statement, parameters|
- batch.add(statement, parameters)
- end
- end
- end
- end
end
end
diff --git a/lib/killjoy/cassandra_writer.rb b/lib/killjoy/cassandra_writer.rb
index 45fc952..329ff70 100644
--- a/lib/killjoy/cassandra_writer.rb
+++ b/lib/killjoy/cassandra_writer.rb
@@ -9,7 +9,11 @@ module Killjoy
def write(message)
batch = batch_for(message)
- session.execute(batch)
+ if ENV['ASYNC'] == 'true'
+ session.execute_async(batch)
+ else
+ session.execute(batch)
+ end
end
private