diff options
| author | mo khan <mo@mokhan.ca> | 2015-10-25 10:40:11 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2015-10-25 10:40:11 -0600 |
| commit | 33ad9f1ad97e2c349cb7652fad3f4dc6ff1078ba (patch) | |
| tree | 8b1f3f26699314780ea64ce32305744b5d0f7e78 | |
| parent | 3edf04d805c11d2fbe91d3e739c3d0764e65aae4 (diff) | |
rename run to subscribe.
| -rwxr-xr-x | exe/killjoy | 6 | ||||
| -rwxr-xr-x | exe/killjoy-publisher | 9 | ||||
| -rw-r--r-- | lib/killjoy.rb | 2 | ||||
| -rw-r--r-- | lib/killjoy/experiments.rb | 7 | ||||
| -rw-r--r-- | lib/killjoy/message_bus.rb | 2 | ||||
| -rw-r--r-- | lib/killjoy/publisher.rb | 9 | ||||
| -rw-r--r-- | lib/killjoy/server.rb | 4 | ||||
| -rw-r--r-- | lib/killjoy/worker.rb | 2 |
8 files changed, 19 insertions, 22 deletions
diff --git a/exe/killjoy b/exe/killjoy index ccfbffb..ca446ad 100755 --- a/exe/killjoy +++ b/exe/killjoy @@ -3,16 +3,10 @@ require "bundler/setup" $LOAD_PATH.unshift(File.expand_path("../lib", File.dirname(__FILE__))) require "killjoy" -require "killjoy/worker" require "killjoy/server" -require "serverengine" cpus = Facter.value('processors')['count'].to_i -#Sneakers.configure({ -#threads: cpus, -#}) - server = ServerEngine.create(Killjoy::Server, Killjoy::Worker) do { amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"), diff --git a/exe/killjoy-publisher b/exe/killjoy-publisher index 881315e..1ad42dc 100755 --- a/exe/killjoy-publisher +++ b/exe/killjoy-publisher @@ -6,15 +6,8 @@ require "killjoy" log_file = ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log") parser = Killjoy::LogParser.new -configuration = { - amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"), - exchange: 'killjoy', - exchange_type: 'x-modulus-hash', - heartbeat: 2, - prefetch: 8, -} -Killjoy::Publisher.using(configuration) do |publisher| +Killjoy::Publisher.using do |publisher| log_file = File.join(Dir.pwd, log_file) lines = File.readlines(log_file) diff --git a/lib/killjoy.rb b/lib/killjoy.rb index 7b6fdc8..42498b5 100644 --- a/lib/killjoy.rb +++ b/lib/killjoy.rb @@ -31,7 +31,7 @@ module Killjoy def self.logger if @logger.nil? Killjoy.logger = Logger.new(STDOUT) - Killjoy.logger.level = Logger::INFO + Killjoy.logger.level = Logger::WARN end @logger end diff --git a/lib/killjoy/experiments.rb b/lib/killjoy/experiments.rb index 9b6a607..6abb9c9 100644 --- a/lib/killjoy/experiments.rb +++ b/lib/killjoy/experiments.rb @@ -14,6 +14,7 @@ module Killjoy exchange_type: 'x-modulus-hash', heartbeat: 2, prefetch: cpus, + queue_shards: ENV.fetch("RMQ_SHARDS", 4).to_i, } parser = Killjoy::LogParser.new log_file = File.join(Dir.pwd, ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log")) @@ -23,7 +24,7 @@ module Killjoy end def publish_messages - Killjoy::Publisher.use do |publisher| + Killjoy::Publisher.using do |publisher| lines.each do |line| publisher.publish(line) end @@ -62,9 +63,9 @@ module Killjoy resource = ConditionVariable.new message_bus = Killjoy::MessageBus.new(configuration) - 4.times do |shard| + configuration[:queue_shards].times do |shard| consumer = consumer_class.new(writers, shard) - message_bus.run(consumer) do |message| + message_bus.subscribe(consumer) do |message| message.intercept(:ack) do queue << message if queue.size == messages_to_process diff --git a/lib/killjoy/message_bus.rb b/lib/killjoy/message_bus.rb index 9e09c27..0365812 100644 --- a/lib/killjoy/message_bus.rb +++ b/lib/killjoy/message_bus.rb @@ -7,7 +7,7 @@ module Killjoy @subscriptions = Queue.new end - def run(consumer) + def subscribe(consumer) options = { manual_ack: true, block: false } @subscriptions << create_queue(consumer).subscribe(options) do |info, metadata, raw_message| Thread.new do diff --git a/lib/killjoy/publisher.rb b/lib/killjoy/publisher.rb index 865ad39..f23ee78 100644 --- a/lib/killjoy/publisher.rb +++ b/lib/killjoy/publisher.rb @@ -6,7 +6,14 @@ module Killjoy @message_bus = MessageBus.new(configuration) end - def self.using(configuration) + def self.using + configuration = { + amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"), + exchange: 'killjoy', + exchange_type: 'x-modulus-hash', + heartbeat: 2, + prefetch: 8, + } publisher = new(configuration) yield publisher ensure diff --git a/lib/killjoy/server.rb b/lib/killjoy/server.rb index 4aba07b..2d96c0f 100644 --- a/lib/killjoy/server.rb +++ b/lib/killjoy/server.rb @@ -1,7 +1,9 @@ +require "killjoy/worker" +require "serverengine" + module Killjoy module Server def before_run end end end - diff --git a/lib/killjoy/worker.rb b/lib/killjoy/worker.rb index fafe771..ba85146 100644 --- a/lib/killjoy/worker.rb +++ b/lib/killjoy/worker.rb @@ -11,7 +11,7 @@ module Killjoy writers = Spank::IOC.resolve_all(:writer) config[:queue_shards].times do |shard| - @message_bus.run(Killjoy::Consumer.new(writers, shard)) + @message_bus.subscribe(Killjoy::Consumer.new(writers, shard)) end until @mutex.wait_for_set(config[:heartbeat]) Killjoy.logger.debug("Heartbeat: [#{Thread.object_id}]") |
