summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2015-10-25 21:25:53 -0600
committermo khan <mo@mokhan.ca>2015-10-25 21:25:53 -0600
commitfb823e995c0de74044328eb848f4ba30a784f8b4 (patch)
tree8e68ddf5548b83bd2bc4f70f773fad323c5bb054
parent33ad9f1ad97e2c349cb7652fad3f4dc6ff1078ba (diff)
try a custom thread pool then use bunny consumer pool.
-rwxr-xr-xexe/killjoy6
-rwxr-xr-xexe/killjoy-publisher10
-rw-r--r--lib/killjoy.rb6
-rw-r--r--lib/killjoy/experiments.rb13
-rw-r--r--lib/killjoy/message_bus.rb30
-rw-r--r--lib/killjoy/publisher.rb15
-rw-r--r--lib/killjoy/thread_pool.rb41
-rw-r--r--lib/killjoy/worker.rb7
8 files changed, 85 insertions, 43 deletions
diff --git a/exe/killjoy b/exe/killjoy
index ca446ad..1d083f1 100755
--- a/exe/killjoy
+++ b/exe/killjoy
@@ -6,6 +6,7 @@ require "killjoy"
require "killjoy/server"
cpus = Facter.value('processors')['count'].to_i
+shards = ENV.fetch("RMQ_SHARDS", 4).to_i
server = ServerEngine.create(Killjoy::Server, Killjoy::Worker) do
{
@@ -16,10 +17,9 @@ server = ServerEngine.create(Killjoy::Server, Killjoy::Worker) do
heartbeat: 2,
pid_path: 'tmp/killjoy.pid',
prefetch: cpus,
- queue_shards: ENV.fetch("RMQ_SHARDS", 4),
- run_asynchronously: ENV["ASYNC"] == 'true',
+ queue_shards: shards,
worker_type: 'process',
- workers: ENV.fetch("RMQ_SHARDS", 4),
+ workers: shards,
}
end
server.run
diff --git a/exe/killjoy-publisher b/exe/killjoy-publisher
index 1ad42dc..d185dd4 100755
--- a/exe/killjoy-publisher
+++ b/exe/killjoy-publisher
@@ -7,7 +7,15 @@ require "killjoy"
log_file = ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log")
parser = Killjoy::LogParser.new
-Killjoy::Publisher.using do |publisher|
+configuration = {
+ amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"),
+ exchange: 'killjoy',
+ exchange_type: 'x-modulus-hash',
+ heartbeat: 2,
+ prefetch: 8,
+}
+message_bus = Killjoy::MessageBus.new(configuration)
+Killjoy::Publisher.using(message_bus) do |publisher|
log_file = File.join(Dir.pwd, log_file)
lines = File.readlines(log_file)
diff --git a/lib/killjoy.rb b/lib/killjoy.rb
index 42498b5..170ed8a 100644
--- a/lib/killjoy.rb
+++ b/lib/killjoy.rb
@@ -23,6 +23,7 @@ require "killjoy/message"
require "killjoy/message_bus"
require "killjoy/publisher"
require "killjoy/query_builder"
+require "killjoy/thread_pool"
require "killjoy/version"
require "killjoy/startup"
@@ -30,8 +31,9 @@ require "killjoy/startup"
module Killjoy
def self.logger
if @logger.nil?
- Killjoy.logger = Logger.new(STDOUT)
- Killjoy.logger.level = Logger::WARN
+ logger = Logger.new(STDOUT)
+ logger.level = Logger::WARN
+ Killjoy.logger = logger
end
@logger
end
diff --git a/lib/killjoy/experiments.rb b/lib/killjoy/experiments.rb
index 6abb9c9..246fab7 100644
--- a/lib/killjoy/experiments.rb
+++ b/lib/killjoy/experiments.rb
@@ -23,11 +23,10 @@ module Killjoy
end
end
- def publish_messages
- Killjoy::Publisher.using do |publisher|
- lines.each do |line|
- publisher.publish(line)
- end
+ def publish_messages(message_bus)
+ publisher = Killjoy::Publisher.new(message_bus)
+ lines.each do |line|
+ publisher.publish(line)
end
end
@@ -56,12 +55,12 @@ module Killjoy
end
def run(consumer_class)
- publish_messages
+ message_bus = Killjoy::MessageBus.new(configuration)
+ publish_messages(message_bus)
queue = Queue.new
mutex = Mutex.new
resource = ConditionVariable.new
- message_bus = Killjoy::MessageBus.new(configuration)
configuration[:queue_shards].times do |shard|
consumer = consumer_class.new(writers, shard)
diff --git a/lib/killjoy/message_bus.rb b/lib/killjoy/message_bus.rb
index 0365812..11b01b4 100644
--- a/lib/killjoy/message_bus.rb
+++ b/lib/killjoy/message_bus.rb
@@ -5,31 +5,29 @@ module Killjoy
def initialize(configuration)
@configuration = configuration
@subscriptions = Queue.new
+ @cpus = Facter.value('processors')['count'].to_i
end
def subscribe(consumer)
options = { manual_ack: true, block: false }
@subscriptions << create_queue(consumer).subscribe(options) do |info, metadata, raw_message|
- Thread.new do
- begin
- message = Message.new(raw_message, info, channel)
- if block_given?
- yield message
- else
- consumer.work(message)
- end
- rescue
- message.reject! if message
- reject(info)
+ begin
+ message = Message.new(raw_message, info, channel)
+ if block_given?
+ yield message
+ else
+ consumer.work(message)
end
+ rescue
+ message.reject! if message
+ reject(info)
end
end
end
def stop
while @subscriptions.size > 0
- subscription = @subscriptions.deq
- subscription.cancel
+ @subscriptions.deq.cancel
end
connection.close
end
@@ -51,13 +49,13 @@ module Killjoy
end
def channel
- @channel ||= connection.create_channel.tap do |channel|
- channel.prefetch(configuration[:prefetch])
+ @channel ||= connection.create_channel(nil, @cpus).tap do |x|
+ x.prefetch(configuration[:prefetch])
end
end
def exchange
- @exchange ||= channel.exchange(
+ channel.exchange(
configuration[:exchange],
durable: true,
type: configuration[:exchange_type]
diff --git a/lib/killjoy/publisher.rb b/lib/killjoy/publisher.rb
index f23ee78..b14c362 100644
--- a/lib/killjoy/publisher.rb
+++ b/lib/killjoy/publisher.rb
@@ -2,19 +2,12 @@ module Killjoy
class Publisher
attr_reader :message_bus
- def initialize(configuration)
- @message_bus = MessageBus.new(configuration)
+ def initialize(message_bus)
+ @message_bus = message_bus
end
- def self.using
- configuration = {
- amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"),
- exchange: 'killjoy',
- exchange_type: 'x-modulus-hash',
- heartbeat: 2,
- prefetch: 8,
- }
- publisher = new(configuration)
+ def self.using(message_bus)
+ publisher = new(message_bus)
yield publisher
ensure
publisher.dispose
diff --git a/lib/killjoy/thread_pool.rb b/lib/killjoy/thread_pool.rb
new file mode 100644
index 0000000..61070f3
--- /dev/null
+++ b/lib/killjoy/thread_pool.rb
@@ -0,0 +1,41 @@
+module Killjoy
+ class ThreadPool
+ include Enumerable
+
+ def initialize(max = Facter.value('processors')['count'].to_i)
+ @threads = []
+ @jobs = Queue.new
+ Thread.abort_on_exception = true
+
+ max.times do |n|
+ @threads << Thread.new do
+ loop do
+ job = @jobs.deq
+ Killjoy.logger.debug("[#{Thread.current.object_id}] invoking job")
+ job.call
+ Killjoy.logger.debug("[#{Thread.current.object_id}] finish job")
+ end
+ end
+ end
+ end
+
+ def run(&block)
+ @jobs << block
+ Killjoy.logger.debug("[#{Thread.current.object_id}] queue up a job. count: #{@jobs.size}")
+ end
+
+ def stop
+ @jobs.clear
+ if block_given?
+ each do |thread|
+ yield thread
+ Thread.kill(thread)
+ end
+ end
+ end
+
+ def each(&block)
+ @threads.each(&block)
+ end
+ end
+end
diff --git a/lib/killjoy/worker.rb b/lib/killjoy/worker.rb
index ba85146..0f16dac 100644
--- a/lib/killjoy/worker.rb
+++ b/lib/killjoy/worker.rb
@@ -13,9 +13,10 @@ module Killjoy
config[:queue_shards].times do |shard|
@message_bus.subscribe(Killjoy::Consumer.new(writers, shard))
end
- until @mutex.wait_for_set(config[:heartbeat])
- Killjoy.logger.debug("Heartbeat: [#{Thread.object_id}]")
- end
+ @mutex.wait
+ #until @mutex.wait_for_set(config[:heartbeat])
+ #Killjoy.logger.debug("Heartbeat: [#{Thread.current.object_id}]")
+ #end
end
def stop