diff options
| -rw-r--r-- | lib/killjoy/cassandra/blocking_writes_consumer.rb | 7 | ||||
| -rw-r--r-- | lib/killjoy/cassandra/non_blocking_writes_consumer.rb | 7 | ||||
| -rw-r--r-- | lib/killjoy/consumer.rb | 5 | ||||
| -rw-r--r-- | lib/killjoy/experiments.rb | 25 | ||||
| -rw-r--r-- | lib/killjoy/mongo/consumer.rb | 6 |
5 files changed, 35 insertions, 15 deletions
diff --git a/lib/killjoy/cassandra/blocking_writes_consumer.rb b/lib/killjoy/cassandra/blocking_writes_consumer.rb index 33cbb03..221d8af 100644 --- a/lib/killjoy/cassandra/blocking_writes_consumer.rb +++ b/lib/killjoy/cassandra/blocking_writes_consumer.rb @@ -3,6 +3,13 @@ require "killjoy/consumer" module Killjoy module Cassandra class BlockingWritesConsumer < Killjoy::Consumer + attr_reader :writers + + def initialize(writers, shard) + @writers = writers + super(shard) + end + def work(message) writers.each do |writer| writer.write(message.to_hash) diff --git a/lib/killjoy/cassandra/non_blocking_writes_consumer.rb b/lib/killjoy/cassandra/non_blocking_writes_consumer.rb index d62ae89..23aa24a 100644 --- a/lib/killjoy/cassandra/non_blocking_writes_consumer.rb +++ b/lib/killjoy/cassandra/non_blocking_writes_consumer.rb @@ -3,6 +3,13 @@ require "killjoy/consumer" module Killjoy module Cassandra class NonBlockingWritesConsumer < Killjoy::Consumer + attr_reader :writers + + def initialize(writers, shard) + @writers = writers + super(shard) + end + def work(message) writes = writers.map do |writer| writer.write(message.to_hash, async: true) diff --git a/lib/killjoy/consumer.rb b/lib/killjoy/consumer.rb index 0bb3879..176e658 100644 --- a/lib/killjoy/consumer.rb +++ b/lib/killjoy/consumer.rb @@ -1,10 +1,9 @@ module Killjoy class Consumer - attr_reader :shard, :writers + attr_reader :shard - def initialize(writers, shard) + def initialize(shard) @shard = shard - @writers = writers end def work(message) diff --git a/lib/killjoy/experiments.rb b/lib/killjoy/experiments.rb index 4180d93..04ff534 100644 --- a/lib/killjoy/experiments.rb +++ b/lib/killjoy/experiments.rb @@ -4,7 +4,7 @@ module Killjoy def initialize(enable_profiler: false) @enable_profiler = enable_profiler - Killjoy::AfterFork.new.call + AfterFork.new.call @messages_to_process = ENV.fetch("MESSAGES", 1_000).to_i @writers = Spank::IOC.resolve_all(:writer) @configuration = { @@ -13,7 +13,8 @@ module Killjoy exchange_type: 'x-modulus-hash', queue_shards: ENV.fetch("RMQ_SHARDS", 4).to_i, } - parser = Killjoy::LogParser.new + @mongo_client = Spank::IOC.resolve(:mongo_client) + parser = LogParser.new log_file = File.join(Dir.pwd, ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log")) @lines = File.readlines(log_file).take(messages_to_process).map do |x| parser.parse(x) @@ -21,7 +22,7 @@ module Killjoy end def publish_messages(message_bus) - publisher = Killjoy::Publisher.new(message_bus) + publisher = Publisher.new(message_bus) lines.each do |line| publisher.publish(line) end @@ -29,19 +30,25 @@ module Killjoy def blocking_writes profile('tmp/cassandra-cpu-blocking-writes.dump') do - run(Killjoy::Cassandra::BlockingWritesConsumer) + run do |shard| + Cassandra::BlockingWritesConsumer.new(writers, shard) + end end end def non_blocking_writes profile('tmp/cassandra-cpu-non-blocking-writes.dump') do - run(Killjoy::Cassandra::NonBlockingWritesConsumer) + run do |shard| + Cassandra::NonBlockingWritesConsumer.new(writers, shard) + end end end def mongo_writes profile('tmp/mongo-cpu-non-blocking-writes.dump') do - run(Killjoy::Mongo::Consumer) + run do |shard| + Mongo::Consumer.new(@mongo_client, shard) + end end end @@ -57,8 +64,8 @@ module Killjoy end end - def run(consumer_class) - message_bus = Killjoy::MessageBus.new(configuration) + def run + message_bus = MessageBus.new(configuration) publish_messages(message_bus) queue = Queue.new @@ -66,7 +73,7 @@ module Killjoy resource = ConditionVariable.new configuration[:queue_shards].times do |shard| - consumer = consumer_class.new(writers, shard) + consumer = yield(shard) message_bus.subscribe(consumer) do |message| message.intercept(:ack) do queue << message diff --git a/lib/killjoy/mongo/consumer.rb b/lib/killjoy/mongo/consumer.rb index d5cc2ee..78de650 100644 --- a/lib/killjoy/mongo/consumer.rb +++ b/lib/killjoy/mongo/consumer.rb @@ -3,9 +3,9 @@ module Killjoy class Consumer < Consumer attr_reader :mongo_client - def initialize(writers, shard) - @mongo_client = Spank::IOC.resolve(:mongo_client) - super(writers, shard) + def initialize(mongo_client, shard) + @mongo_client = mongo_client + super(shard) end def work(message) |
