summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2015-10-24 17:10:27 -0600
committermo khan <mo@mokhan.ca>2015-10-24 17:10:27 -0600
commit09f14adba048b0ae07e399910d3bd85e0ece2c0c (patch)
tree1d6912318c03003aaabefe0b1f3fc5d1d609fc1a
parent76115984087068e85896854b3b8017fe35b3dc84 (diff)
move experiments to a separate file.
-rwxr-xr-xexe/killjoy-timing73
-rw-r--r--lib/killjoy/experiments.rb71
2 files changed, 73 insertions, 71 deletions
diff --git a/exe/killjoy-timing b/exe/killjoy-timing
index acb3127..0ec5545 100755
--- a/exe/killjoy-timing
+++ b/exe/killjoy-timing
@@ -3,79 +3,10 @@
require "bundler/setup"
$LOAD_PATH << File.expand_path("../lib", File.dirname(__FILE__))
require "killjoy"
+require "killjoy/experiments"
require "benchmark/ips"
-class Experiments
- attr_reader :configuration, :messages_to_process, :writers, :lines
-
- def initialize
- Killjoy::AfterFork.new.call
- @messages_to_process = ENV.fetch("MESSAGES", 1_000).to_i
- @writers = Spank::IOC.resolve_all(:writer)
- cpus = Facter.value('processors')['count'].to_i
- @configuration = {
- amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"),
- exchange: 'killjoy',
- exchange_type: 'x-modulus-hash',
- heartbeat: 2,
- prefetch: cpus,
- }
- parser = Killjoy::LogParser.new
- log_file = File.join(Dir.pwd, ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log"))
- @lines = File.readlines(log_file).take(messages_to_process).map do |x|
- parser.parse(x)
- end
- end
-
- def publish_messages
- Killjoy::Publisher.use do |publisher|
- lines.each do |line|
- publisher.publish(line)
- end
- end
- end
-
- def blocking_writes
- run(Killjoy::Consumer)
- end
-
- def non_blocking_writes
- run(Killjoy::AsyncConsumer)
- end
-
- private
-
- def run(consumer_class)
- publish_messages
-
- queue = Queue.new
- mutex = Mutex.new
- resource = ConditionVariable.new
- message_bus = Killjoy::MessageBus.new(configuration)
-
- 4.times do |shard|
- consumer = consumer_class.new(writers, shard)
- message_bus.run(consumer) do |message|
- message.intercept(:ack) do
- queue << message
- if queue.size == messages_to_process
- mutex.synchronize do
- resource.signal
- end
- end
- end
- consumer.work(message)
- end
- end
-
- mutex.synchronize do
- resource.wait(mutex)
- message_bus.stop
- end
- end
-end
-
-experiments = Experiments.new
+experiments = Killjoy::Experiments.new
Benchmark.ips do |x|
x.config(time: 5, warmup: 2)
diff --git a/lib/killjoy/experiments.rb b/lib/killjoy/experiments.rb
new file mode 100644
index 0000000..2f490c9
--- /dev/null
+++ b/lib/killjoy/experiments.rb
@@ -0,0 +1,71 @@
+module Killjoy
+ class Experiments
+ attr_reader :configuration, :messages_to_process, :writers, :lines
+
+ def initialize
+ Killjoy::AfterFork.new.call
+ @messages_to_process = ENV.fetch("MESSAGES", 1_000).to_i
+ @writers = Spank::IOC.resolve_all(:writer)
+ cpus = Facter.value('processors')['count'].to_i
+ @configuration = {
+ amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"),
+ exchange: 'killjoy',
+ exchange_type: 'x-modulus-hash',
+ heartbeat: 2,
+ prefetch: cpus,
+ }
+ parser = Killjoy::LogParser.new
+ log_file = File.join(Dir.pwd, ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log"))
+ @lines = File.readlines(log_file).take(messages_to_process).map do |x|
+ parser.parse(x)
+ end
+ end
+
+ def publish_messages
+ Killjoy::Publisher.use do |publisher|
+ lines.each do |line|
+ publisher.publish(line)
+ end
+ end
+ end
+
+ def blocking_writes
+ run(Killjoy::Consumer)
+ end
+
+ def non_blocking_writes
+ run(Killjoy::AsyncConsumer)
+ end
+
+ private
+
+ def run(consumer_class)
+ publish_messages
+
+ queue = Queue.new
+ mutex = Mutex.new
+ resource = ConditionVariable.new
+ message_bus = Killjoy::MessageBus.new(configuration)
+
+ 4.times do |shard|
+ consumer = consumer_class.new(writers, shard)
+ message_bus.run(consumer) do |message|
+ message.intercept(:ack) do
+ queue << message
+ if queue.size == messages_to_process
+ mutex.synchronize do
+ resource.signal
+ end
+ end
+ end
+ consumer.work(message)
+ end
+ end
+
+ mutex.synchronize do
+ resource.wait(mutex)
+ message_bus.stop
+ end
+ end
+ end
+end