diff options
| author | mo khan <mo@mokhan.ca> | 2015-10-24 17:10:27 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2015-10-24 17:10:27 -0600 |
| commit | 09f14adba048b0ae07e399910d3bd85e0ece2c0c (patch) | |
| tree | 1d6912318c03003aaabefe0b1f3fc5d1d609fc1a | |
| parent | 76115984087068e85896854b3b8017fe35b3dc84 (diff) | |
move experiments to a separate file.
| -rwxr-xr-x | exe/killjoy-timing | 73 | ||||
| -rw-r--r-- | lib/killjoy/experiments.rb | 71 |
2 files changed, 73 insertions, 71 deletions
diff --git a/exe/killjoy-timing b/exe/killjoy-timing index acb3127..0ec5545 100755 --- a/exe/killjoy-timing +++ b/exe/killjoy-timing @@ -3,79 +3,10 @@ require "bundler/setup" $LOAD_PATH << File.expand_path("../lib", File.dirname(__FILE__)) require "killjoy" +require "killjoy/experiments" require "benchmark/ips" -class Experiments - attr_reader :configuration, :messages_to_process, :writers, :lines - - def initialize - Killjoy::AfterFork.new.call - @messages_to_process = ENV.fetch("MESSAGES", 1_000).to_i - @writers = Spank::IOC.resolve_all(:writer) - cpus = Facter.value('processors')['count'].to_i - @configuration = { - amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"), - exchange: 'killjoy', - exchange_type: 'x-modulus-hash', - heartbeat: 2, - prefetch: cpus, - } - parser = Killjoy::LogParser.new - log_file = File.join(Dir.pwd, ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log")) - @lines = File.readlines(log_file).take(messages_to_process).map do |x| - parser.parse(x) - end - end - - def publish_messages - Killjoy::Publisher.use do |publisher| - lines.each do |line| - publisher.publish(line) - end - end - end - - def blocking_writes - run(Killjoy::Consumer) - end - - def non_blocking_writes - run(Killjoy::AsyncConsumer) - end - - private - - def run(consumer_class) - publish_messages - - queue = Queue.new - mutex = Mutex.new - resource = ConditionVariable.new - message_bus = Killjoy::MessageBus.new(configuration) - - 4.times do |shard| - consumer = consumer_class.new(writers, shard) - message_bus.run(consumer) do |message| - message.intercept(:ack) do - queue << message - if queue.size == messages_to_process - mutex.synchronize do - resource.signal - end - end - end - consumer.work(message) - end - end - - mutex.synchronize do - resource.wait(mutex) - message_bus.stop - end - end -end - -experiments = Experiments.new +experiments = Killjoy::Experiments.new Benchmark.ips do |x| x.config(time: 5, warmup: 2) diff --git a/lib/killjoy/experiments.rb b/lib/killjoy/experiments.rb new file mode 100644 index 0000000..2f490c9 --- /dev/null +++ b/lib/killjoy/experiments.rb @@ -0,0 +1,71 @@ +module Killjoy + class Experiments + attr_reader :configuration, :messages_to_process, :writers, :lines + + def initialize + Killjoy::AfterFork.new.call + @messages_to_process = ENV.fetch("MESSAGES", 1_000).to_i + @writers = Spank::IOC.resolve_all(:writer) + cpus = Facter.value('processors')['count'].to_i + @configuration = { + amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"), + exchange: 'killjoy', + exchange_type: 'x-modulus-hash', + heartbeat: 2, + prefetch: cpus, + } + parser = Killjoy::LogParser.new + log_file = File.join(Dir.pwd, ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log")) + @lines = File.readlines(log_file).take(messages_to_process).map do |x| + parser.parse(x) + end + end + + def publish_messages + Killjoy::Publisher.use do |publisher| + lines.each do |line| + publisher.publish(line) + end + end + end + + def blocking_writes + run(Killjoy::Consumer) + end + + def non_blocking_writes + run(Killjoy::AsyncConsumer) + end + + private + + def run(consumer_class) + publish_messages + + queue = Queue.new + mutex = Mutex.new + resource = ConditionVariable.new + message_bus = Killjoy::MessageBus.new(configuration) + + 4.times do |shard| + consumer = consumer_class.new(writers, shard) + message_bus.run(consumer) do |message| + message.intercept(:ack) do + queue << message + if queue.size == messages_to_process + mutex.synchronize do + resource.signal + end + end + end + consumer.work(message) + end + end + + mutex.synchronize do + resource.wait(mutex) + message_bus.stop + end + end + end +end |
