diff options
| author | mo khan <mo@mokhan.ca> | 2015-11-07 15:14:40 -0700 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2015-11-07 15:14:40 -0700 |
| commit | 7a73f5aec92f7390669f9836492fe9700e7421cb (patch) | |
| tree | a6c78f2889dc47e5fe41645d167a2224558f27b8 /lib/killjoy/message_bus.rb | |
| parent | c6b1442c77238391a68d70b8478835441f8bb304 (diff) | |
move RMQ configuration to a yaml file.
Diffstat (limited to 'lib/killjoy/message_bus.rb')
| -rw-r--r-- | lib/killjoy/message_bus.rb | 19 |
1 files changed, 8 insertions, 11 deletions
diff --git a/lib/killjoy/message_bus.rb b/lib/killjoy/message_bus.rb index 05b4c2f..f60080a 100644 --- a/lib/killjoy/message_bus.rb +++ b/lib/killjoy/message_bus.rb @@ -2,7 +2,7 @@ module Killjoy class MessageBus attr_reader :configuration - def initialize(configuration) + def initialize(configuration = AMQPConfiguration.new) @configuration = configuration @subscriptions = Queue.new @cpus = Facter.value('processors')['count'].to_i @@ -10,7 +10,8 @@ module Killjoy def subscribe(consumer) options = { manual_ack: true, block: false } - @subscriptions << create_queue(consumer).subscribe(options) do |info, metadata, raw_message| + queue = create_queue(consumer) + subscription = queue.subscribe(options) do |info, metadata, raw_message| begin message = Message.new(raw_message, info, channel) if block_given? @@ -20,10 +21,10 @@ module Killjoy end rescue => error Killjoy.logger.error(error.message) - message.reject! if message - reject(info) + message.reject! end end + @subscriptions << subscription end def stop @@ -41,7 +42,7 @@ module Killjoy def connection @connection ||= Bunny.new( - configuration[:amqp_uri], + configuration.amqp_uri, heartbeat: 2, logger: Killjoy.logger ).tap do |connection| @@ -57,9 +58,9 @@ module Killjoy def exchange channel.exchange( - configuration[:exchange], + configuration.exchange, durable: true, - type: configuration[:exchange_type] + type: configuration.exchange_type ) end @@ -70,9 +71,5 @@ module Killjoy end queue end - - def reject(info, requeue = false) - channel.reject(info.delivery_tag, requeue) - end end end |
