summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2015-10-31 09:02:30 -0600
committermo khan <mo@mokhan.ca>2015-10-31 09:02:30 -0600
commitafef14d30275ce249aeebf86b940149b56377cd7 (patch)
treeb1b5eb28205560de5c407e8a5a11a08abb9f2b23
parentc3992ed387caf69c52405262698b06224b039f9a (diff)
reduce complexity in experiments constructor.
-rw-r--r--lib/killjoy/experiments.rb28
1 files changed, 16 insertions, 12 deletions
diff --git a/lib/killjoy/experiments.rb b/lib/killjoy/experiments.rb
index 04ff534..d5d9f76 100644
--- a/lib/killjoy/experiments.rb
+++ b/lib/killjoy/experiments.rb
@@ -7,18 +7,8 @@ module Killjoy
AfterFork.new.call
@messages_to_process = ENV.fetch("MESSAGES", 1_000).to_i
@writers = Spank::IOC.resolve_all(:writer)
- @configuration = {
- amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"),
- exchange: 'killjoy',
- exchange_type: 'x-modulus-hash',
- queue_shards: ENV.fetch("RMQ_SHARDS", 4).to_i,
- }
@mongo_client = Spank::IOC.resolve(:mongo_client)
- parser = LogParser.new
- log_file = File.join(Dir.pwd, ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log"))
- @lines = File.readlines(log_file).take(messages_to_process).map do |x|
- parser.parse(x)
- end
+ @lines = parse_log_lines(messages_to_process)
end
def publish_messages(message_bus)
@@ -65,6 +55,12 @@ module Killjoy
end
def run
+ queue_shards = ENV.fetch("RMQ_SHARDS", 4).to_i
+ configuration = {
+ amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"),
+ exchange: 'killjoy',
+ exchange_type: 'x-modulus-hash',
+ }
message_bus = MessageBus.new(configuration)
publish_messages(message_bus)
@@ -72,7 +68,7 @@ module Killjoy
mutex = Mutex.new
resource = ConditionVariable.new
- configuration[:queue_shards].times do |shard|
+ queue_shards.times do |shard|
consumer = yield(shard)
message_bus.subscribe(consumer) do |message|
message.intercept(:ack) do
@@ -92,5 +88,13 @@ module Killjoy
message_bus.stop
end
end
+
+ def parse_log_lines(messages_to_process)
+ parser = LogParser.new
+ log_file = File.join(Dir.pwd, ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log"))
+ File.readlines(log_file).take(messages_to_process).map do |x|
+ parser.parse(x)
+ end
+ end
end
end