diff options
| author | mo khan <mo@mokhan.ca> | 2015-10-25 21:25:53 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2015-10-25 21:25:53 -0600 |
| commit | fb823e995c0de74044328eb848f4ba30a784f8b4 (patch) | |
| tree | 8e68ddf5548b83bd2bc4f70f773fad323c5bb054 | |
| parent | 33ad9f1ad97e2c349cb7652fad3f4dc6ff1078ba (diff) | |
try a custom thread pool then use bunny consumer pool.
| -rwxr-xr-x | exe/killjoy | 6 | ||||
| -rwxr-xr-x | exe/killjoy-publisher | 10 | ||||
| -rw-r--r-- | lib/killjoy.rb | 6 | ||||
| -rw-r--r-- | lib/killjoy/experiments.rb | 13 | ||||
| -rw-r--r-- | lib/killjoy/message_bus.rb | 30 | ||||
| -rw-r--r-- | lib/killjoy/publisher.rb | 15 | ||||
| -rw-r--r-- | lib/killjoy/thread_pool.rb | 41 | ||||
| -rw-r--r-- | lib/killjoy/worker.rb | 7 |
8 files changed, 85 insertions, 43 deletions
diff --git a/exe/killjoy b/exe/killjoy index ca446ad..1d083f1 100755 --- a/exe/killjoy +++ b/exe/killjoy @@ -6,6 +6,7 @@ require "killjoy" require "killjoy/server" cpus = Facter.value('processors')['count'].to_i +shards = ENV.fetch("RMQ_SHARDS", 4).to_i server = ServerEngine.create(Killjoy::Server, Killjoy::Worker) do { @@ -16,10 +17,9 @@ server = ServerEngine.create(Killjoy::Server, Killjoy::Worker) do heartbeat: 2, pid_path: 'tmp/killjoy.pid', prefetch: cpus, - queue_shards: ENV.fetch("RMQ_SHARDS", 4), - run_asynchronously: ENV["ASYNC"] == 'true', + queue_shards: shards, worker_type: 'process', - workers: ENV.fetch("RMQ_SHARDS", 4), + workers: shards, } end server.run diff --git a/exe/killjoy-publisher b/exe/killjoy-publisher index 1ad42dc..d185dd4 100755 --- a/exe/killjoy-publisher +++ b/exe/killjoy-publisher @@ -7,7 +7,15 @@ require "killjoy" log_file = ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log") parser = Killjoy::LogParser.new -Killjoy::Publisher.using do |publisher| +configuration = { + amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"), + exchange: 'killjoy', + exchange_type: 'x-modulus-hash', + heartbeat: 2, + prefetch: 8, +} +message_bus = Killjoy::MessageBus.new(configuration) +Killjoy::Publisher.using(message_bus) do |publisher| log_file = File.join(Dir.pwd, log_file) lines = File.readlines(log_file) diff --git a/lib/killjoy.rb b/lib/killjoy.rb index 42498b5..170ed8a 100644 --- a/lib/killjoy.rb +++ b/lib/killjoy.rb @@ -23,6 +23,7 @@ require "killjoy/message" require "killjoy/message_bus" require "killjoy/publisher" require "killjoy/query_builder" +require "killjoy/thread_pool" require "killjoy/version" require "killjoy/startup" @@ -30,8 +31,9 @@ require "killjoy/startup" module Killjoy def self.logger if @logger.nil? - Killjoy.logger = Logger.new(STDOUT) - Killjoy.logger.level = Logger::WARN + logger = Logger.new(STDOUT) + logger.level = Logger::WARN + Killjoy.logger = logger end @logger end diff --git a/lib/killjoy/experiments.rb b/lib/killjoy/experiments.rb index 6abb9c9..246fab7 100644 --- a/lib/killjoy/experiments.rb +++ b/lib/killjoy/experiments.rb @@ -23,11 +23,10 @@ module Killjoy end end - def publish_messages - Killjoy::Publisher.using do |publisher| - lines.each do |line| - publisher.publish(line) - end + def publish_messages(message_bus) + publisher = Killjoy::Publisher.new(message_bus) + lines.each do |line| + publisher.publish(line) end end @@ -56,12 +55,12 @@ module Killjoy end def run(consumer_class) - publish_messages + message_bus = Killjoy::MessageBus.new(configuration) + publish_messages(message_bus) queue = Queue.new mutex = Mutex.new resource = ConditionVariable.new - message_bus = Killjoy::MessageBus.new(configuration) configuration[:queue_shards].times do |shard| consumer = consumer_class.new(writers, shard) diff --git a/lib/killjoy/message_bus.rb b/lib/killjoy/message_bus.rb index 0365812..11b01b4 100644 --- a/lib/killjoy/message_bus.rb +++ b/lib/killjoy/message_bus.rb @@ -5,31 +5,29 @@ module Killjoy def initialize(configuration) @configuration = configuration @subscriptions = Queue.new + @cpus = Facter.value('processors')['count'].to_i end def subscribe(consumer) options = { manual_ack: true, block: false } @subscriptions << create_queue(consumer).subscribe(options) do |info, metadata, raw_message| - Thread.new do - begin - message = Message.new(raw_message, info, channel) - if block_given? - yield message - else - consumer.work(message) - end - rescue - message.reject! if message - reject(info) + begin + message = Message.new(raw_message, info, channel) + if block_given? + yield message + else + consumer.work(message) end + rescue + message.reject! if message + reject(info) end end end def stop while @subscriptions.size > 0 - subscription = @subscriptions.deq - subscription.cancel + @subscriptions.deq.cancel end connection.close end @@ -51,13 +49,13 @@ module Killjoy end def channel - @channel ||= connection.create_channel.tap do |channel| - channel.prefetch(configuration[:prefetch]) + @channel ||= connection.create_channel(nil, @cpus).tap do |x| + x.prefetch(configuration[:prefetch]) end end def exchange - @exchange ||= channel.exchange( + channel.exchange( configuration[:exchange], durable: true, type: configuration[:exchange_type] diff --git a/lib/killjoy/publisher.rb b/lib/killjoy/publisher.rb index f23ee78..b14c362 100644 --- a/lib/killjoy/publisher.rb +++ b/lib/killjoy/publisher.rb @@ -2,19 +2,12 @@ module Killjoy class Publisher attr_reader :message_bus - def initialize(configuration) - @message_bus = MessageBus.new(configuration) + def initialize(message_bus) + @message_bus = message_bus end - def self.using - configuration = { - amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"), - exchange: 'killjoy', - exchange_type: 'x-modulus-hash', - heartbeat: 2, - prefetch: 8, - } - publisher = new(configuration) + def self.using(message_bus) + publisher = new(message_bus) yield publisher ensure publisher.dispose diff --git a/lib/killjoy/thread_pool.rb b/lib/killjoy/thread_pool.rb new file mode 100644 index 0000000..61070f3 --- /dev/null +++ b/lib/killjoy/thread_pool.rb @@ -0,0 +1,41 @@ +module Killjoy + class ThreadPool + include Enumerable + + def initialize(max = Facter.value('processors')['count'].to_i) + @threads = [] + @jobs = Queue.new + Thread.abort_on_exception = true + + max.times do |n| + @threads << Thread.new do + loop do + job = @jobs.deq + Killjoy.logger.debug("[#{Thread.current.object_id}] invoking job") + job.call + Killjoy.logger.debug("[#{Thread.current.object_id}] finish job") + end + end + end + end + + def run(&block) + @jobs << block + Killjoy.logger.debug("[#{Thread.current.object_id}] queue up a job. count: #{@jobs.size}") + end + + def stop + @jobs.clear + if block_given? + each do |thread| + yield thread + Thread.kill(thread) + end + end + end + + def each(&block) + @threads.each(&block) + end + end +end diff --git a/lib/killjoy/worker.rb b/lib/killjoy/worker.rb index ba85146..0f16dac 100644 --- a/lib/killjoy/worker.rb +++ b/lib/killjoy/worker.rb @@ -13,9 +13,10 @@ module Killjoy config[:queue_shards].times do |shard| @message_bus.subscribe(Killjoy::Consumer.new(writers, shard)) end - until @mutex.wait_for_set(config[:heartbeat]) - Killjoy.logger.debug("Heartbeat: [#{Thread.object_id}]") - end + @mutex.wait + #until @mutex.wait_for_set(config[:heartbeat]) + #Killjoy.logger.debug("Heartbeat: [#{Thread.current.object_id}]") + #end end def stop |
