summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2015-10-25 10:40:11 -0600
committermo khan <mo@mokhan.ca>2015-10-25 10:40:11 -0600
commit33ad9f1ad97e2c349cb7652fad3f4dc6ff1078ba (patch)
tree8b1f3f26699314780ea64ce32305744b5d0f7e78
parent3edf04d805c11d2fbe91d3e739c3d0764e65aae4 (diff)
rename run to subscribe.
-rwxr-xr-xexe/killjoy6
-rwxr-xr-xexe/killjoy-publisher9
-rw-r--r--lib/killjoy.rb2
-rw-r--r--lib/killjoy/experiments.rb7
-rw-r--r--lib/killjoy/message_bus.rb2
-rw-r--r--lib/killjoy/publisher.rb9
-rw-r--r--lib/killjoy/server.rb4
-rw-r--r--lib/killjoy/worker.rb2
8 files changed, 19 insertions, 22 deletions
diff --git a/exe/killjoy b/exe/killjoy
index ccfbffb..ca446ad 100755
--- a/exe/killjoy
+++ b/exe/killjoy
@@ -3,16 +3,10 @@
require "bundler/setup"
$LOAD_PATH.unshift(File.expand_path("../lib", File.dirname(__FILE__)))
require "killjoy"
-require "killjoy/worker"
require "killjoy/server"
-require "serverengine"
cpus = Facter.value('processors')['count'].to_i
-#Sneakers.configure({
-#threads: cpus,
-#})
-
server = ServerEngine.create(Killjoy::Server, Killjoy::Worker) do
{
amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"),
diff --git a/exe/killjoy-publisher b/exe/killjoy-publisher
index 881315e..1ad42dc 100755
--- a/exe/killjoy-publisher
+++ b/exe/killjoy-publisher
@@ -6,15 +6,8 @@ require "killjoy"
log_file = ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log")
parser = Killjoy::LogParser.new
-configuration = {
- amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"),
- exchange: 'killjoy',
- exchange_type: 'x-modulus-hash',
- heartbeat: 2,
- prefetch: 8,
-}
-Killjoy::Publisher.using(configuration) do |publisher|
+Killjoy::Publisher.using do |publisher|
log_file = File.join(Dir.pwd, log_file)
lines = File.readlines(log_file)
diff --git a/lib/killjoy.rb b/lib/killjoy.rb
index 7b6fdc8..42498b5 100644
--- a/lib/killjoy.rb
+++ b/lib/killjoy.rb
@@ -31,7 +31,7 @@ module Killjoy
def self.logger
if @logger.nil?
Killjoy.logger = Logger.new(STDOUT)
- Killjoy.logger.level = Logger::INFO
+ Killjoy.logger.level = Logger::WARN
end
@logger
end
diff --git a/lib/killjoy/experiments.rb b/lib/killjoy/experiments.rb
index 9b6a607..6abb9c9 100644
--- a/lib/killjoy/experiments.rb
+++ b/lib/killjoy/experiments.rb
@@ -14,6 +14,7 @@ module Killjoy
exchange_type: 'x-modulus-hash',
heartbeat: 2,
prefetch: cpus,
+ queue_shards: ENV.fetch("RMQ_SHARDS", 4).to_i,
}
parser = Killjoy::LogParser.new
log_file = File.join(Dir.pwd, ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log"))
@@ -23,7 +24,7 @@ module Killjoy
end
def publish_messages
- Killjoy::Publisher.use do |publisher|
+ Killjoy::Publisher.using do |publisher|
lines.each do |line|
publisher.publish(line)
end
@@ -62,9 +63,9 @@ module Killjoy
resource = ConditionVariable.new
message_bus = Killjoy::MessageBus.new(configuration)
- 4.times do |shard|
+ configuration[:queue_shards].times do |shard|
consumer = consumer_class.new(writers, shard)
- message_bus.run(consumer) do |message|
+ message_bus.subscribe(consumer) do |message|
message.intercept(:ack) do
queue << message
if queue.size == messages_to_process
diff --git a/lib/killjoy/message_bus.rb b/lib/killjoy/message_bus.rb
index 9e09c27..0365812 100644
--- a/lib/killjoy/message_bus.rb
+++ b/lib/killjoy/message_bus.rb
@@ -7,7 +7,7 @@ module Killjoy
@subscriptions = Queue.new
end
- def run(consumer)
+ def subscribe(consumer)
options = { manual_ack: true, block: false }
@subscriptions << create_queue(consumer).subscribe(options) do |info, metadata, raw_message|
Thread.new do
diff --git a/lib/killjoy/publisher.rb b/lib/killjoy/publisher.rb
index 865ad39..f23ee78 100644
--- a/lib/killjoy/publisher.rb
+++ b/lib/killjoy/publisher.rb
@@ -6,7 +6,14 @@ module Killjoy
@message_bus = MessageBus.new(configuration)
end
- def self.using(configuration)
+ def self.using
+ configuration = {
+ amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"),
+ exchange: 'killjoy',
+ exchange_type: 'x-modulus-hash',
+ heartbeat: 2,
+ prefetch: 8,
+ }
publisher = new(configuration)
yield publisher
ensure
diff --git a/lib/killjoy/server.rb b/lib/killjoy/server.rb
index 4aba07b..2d96c0f 100644
--- a/lib/killjoy/server.rb
+++ b/lib/killjoy/server.rb
@@ -1,7 +1,9 @@
+require "killjoy/worker"
+require "serverengine"
+
module Killjoy
module Server
def before_run
end
end
end
-
diff --git a/lib/killjoy/worker.rb b/lib/killjoy/worker.rb
index fafe771..ba85146 100644
--- a/lib/killjoy/worker.rb
+++ b/lib/killjoy/worker.rb
@@ -11,7 +11,7 @@ module Killjoy
writers = Spank::IOC.resolve_all(:writer)
config[:queue_shards].times do |shard|
- @message_bus.run(Killjoy::Consumer.new(writers, shard))
+ @message_bus.subscribe(Killjoy::Consumer.new(writers, shard))
end
until @mutex.wait_for_set(config[:heartbeat])
Killjoy.logger.debug("Heartbeat: [#{Thread.object_id}]")