diff options
| author | mo khan <mo@mokhan.ca> | 2015-10-31 09:02:30 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2015-10-31 09:02:30 -0600 |
| commit | afef14d30275ce249aeebf86b940149b56377cd7 (patch) | |
| tree | b1b5eb28205560de5c407e8a5a11a08abb9f2b23 | |
| parent | c3992ed387caf69c52405262698b06224b039f9a (diff) | |
reduce complexity in experiments constructor.
| -rw-r--r-- | lib/killjoy/experiments.rb | 28 |
1 files changed, 16 insertions, 12 deletions
diff --git a/lib/killjoy/experiments.rb b/lib/killjoy/experiments.rb index 04ff534..d5d9f76 100644 --- a/lib/killjoy/experiments.rb +++ b/lib/killjoy/experiments.rb @@ -7,18 +7,8 @@ module Killjoy AfterFork.new.call @messages_to_process = ENV.fetch("MESSAGES", 1_000).to_i @writers = Spank::IOC.resolve_all(:writer) - @configuration = { - amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"), - exchange: 'killjoy', - exchange_type: 'x-modulus-hash', - queue_shards: ENV.fetch("RMQ_SHARDS", 4).to_i, - } @mongo_client = Spank::IOC.resolve(:mongo_client) - parser = LogParser.new - log_file = File.join(Dir.pwd, ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log")) - @lines = File.readlines(log_file).take(messages_to_process).map do |x| - parser.parse(x) - end + @lines = parse_log_lines(messages_to_process) end def publish_messages(message_bus) @@ -65,6 +55,12 @@ module Killjoy end def run + queue_shards = ENV.fetch("RMQ_SHARDS", 4).to_i + configuration = { + amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"), + exchange: 'killjoy', + exchange_type: 'x-modulus-hash', + } message_bus = MessageBus.new(configuration) publish_messages(message_bus) @@ -72,7 +68,7 @@ module Killjoy mutex = Mutex.new resource = ConditionVariable.new - configuration[:queue_shards].times do |shard| + queue_shards.times do |shard| consumer = yield(shard) message_bus.subscribe(consumer) do |message| message.intercept(:ack) do @@ -92,5 +88,13 @@ module Killjoy message_bus.stop end end + + def parse_log_lines(messages_to_process) + parser = LogParser.new + log_file = File.join(Dir.pwd, ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log")) + File.readlines(log_file).take(messages_to_process).map do |x| + parser.parse(x) + end + end end end |
