summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/killjoy/cassandra/blocking_writes_consumer.rb7
-rw-r--r--lib/killjoy/cassandra/non_blocking_writes_consumer.rb7
-rw-r--r--lib/killjoy/consumer.rb5
-rw-r--r--lib/killjoy/experiments.rb25
-rw-r--r--lib/killjoy/mongo/consumer.rb6
5 files changed, 35 insertions, 15 deletions
diff --git a/lib/killjoy/cassandra/blocking_writes_consumer.rb b/lib/killjoy/cassandra/blocking_writes_consumer.rb
index 33cbb03..221d8af 100644
--- a/lib/killjoy/cassandra/blocking_writes_consumer.rb
+++ b/lib/killjoy/cassandra/blocking_writes_consumer.rb
@@ -3,6 +3,13 @@ require "killjoy/consumer"
module Killjoy
module Cassandra
class BlockingWritesConsumer < Killjoy::Consumer
+ attr_reader :writers
+
+ def initialize(writers, shard)
+ @writers = writers
+ super(shard)
+ end
+
def work(message)
writers.each do |writer|
writer.write(message.to_hash)
diff --git a/lib/killjoy/cassandra/non_blocking_writes_consumer.rb b/lib/killjoy/cassandra/non_blocking_writes_consumer.rb
index d62ae89..23aa24a 100644
--- a/lib/killjoy/cassandra/non_blocking_writes_consumer.rb
+++ b/lib/killjoy/cassandra/non_blocking_writes_consumer.rb
@@ -3,6 +3,13 @@ require "killjoy/consumer"
module Killjoy
module Cassandra
class NonBlockingWritesConsumer < Killjoy::Consumer
+ attr_reader :writers
+
+ def initialize(writers, shard)
+ @writers = writers
+ super(shard)
+ end
+
def work(message)
writes = writers.map do |writer|
writer.write(message.to_hash, async: true)
diff --git a/lib/killjoy/consumer.rb b/lib/killjoy/consumer.rb
index 0bb3879..176e658 100644
--- a/lib/killjoy/consumer.rb
+++ b/lib/killjoy/consumer.rb
@@ -1,10 +1,9 @@
module Killjoy
class Consumer
- attr_reader :shard, :writers
+ attr_reader :shard
- def initialize(writers, shard)
+ def initialize(shard)
@shard = shard
- @writers = writers
end
def work(message)
diff --git a/lib/killjoy/experiments.rb b/lib/killjoy/experiments.rb
index 4180d93..04ff534 100644
--- a/lib/killjoy/experiments.rb
+++ b/lib/killjoy/experiments.rb
@@ -4,7 +4,7 @@ module Killjoy
def initialize(enable_profiler: false)
@enable_profiler = enable_profiler
- Killjoy::AfterFork.new.call
+ AfterFork.new.call
@messages_to_process = ENV.fetch("MESSAGES", 1_000).to_i
@writers = Spank::IOC.resolve_all(:writer)
@configuration = {
@@ -13,7 +13,8 @@ module Killjoy
exchange_type: 'x-modulus-hash',
queue_shards: ENV.fetch("RMQ_SHARDS", 4).to_i,
}
- parser = Killjoy::LogParser.new
+ @mongo_client = Spank::IOC.resolve(:mongo_client)
+ parser = LogParser.new
log_file = File.join(Dir.pwd, ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log"))
@lines = File.readlines(log_file).take(messages_to_process).map do |x|
parser.parse(x)
@@ -21,7 +22,7 @@ module Killjoy
end
def publish_messages(message_bus)
- publisher = Killjoy::Publisher.new(message_bus)
+ publisher = Publisher.new(message_bus)
lines.each do |line|
publisher.publish(line)
end
@@ -29,19 +30,25 @@ module Killjoy
def blocking_writes
profile('tmp/cassandra-cpu-blocking-writes.dump') do
- run(Killjoy::Cassandra::BlockingWritesConsumer)
+ run do |shard|
+ Cassandra::BlockingWritesConsumer.new(writers, shard)
+ end
end
end
def non_blocking_writes
profile('tmp/cassandra-cpu-non-blocking-writes.dump') do
- run(Killjoy::Cassandra::NonBlockingWritesConsumer)
+ run do |shard|
+ Cassandra::NonBlockingWritesConsumer.new(writers, shard)
+ end
end
end
def mongo_writes
profile('tmp/mongo-cpu-non-blocking-writes.dump') do
- run(Killjoy::Mongo::Consumer)
+ run do |shard|
+ Mongo::Consumer.new(@mongo_client, shard)
+ end
end
end
@@ -57,8 +64,8 @@ module Killjoy
end
end
- def run(consumer_class)
- message_bus = Killjoy::MessageBus.new(configuration)
+ def run
+ message_bus = MessageBus.new(configuration)
publish_messages(message_bus)
queue = Queue.new
@@ -66,7 +73,7 @@ module Killjoy
resource = ConditionVariable.new
configuration[:queue_shards].times do |shard|
- consumer = consumer_class.new(writers, shard)
+ consumer = yield(shard)
message_bus.subscribe(consumer) do |message|
message.intercept(:ack) do
queue << message
diff --git a/lib/killjoy/mongo/consumer.rb b/lib/killjoy/mongo/consumer.rb
index d5cc2ee..78de650 100644
--- a/lib/killjoy/mongo/consumer.rb
+++ b/lib/killjoy/mongo/consumer.rb
@@ -3,9 +3,9 @@ module Killjoy
class Consumer < Consumer
attr_reader :mongo_client
- def initialize(writers, shard)
- @mongo_client = Spank::IOC.resolve(:mongo_client)
- super(writers, shard)
+ def initialize(mongo_client, shard)
+ @mongo_client = mongo_client
+ super(shard)
end
def work(message)