summaryrefslogtreecommitdiff
path: root/lib/killjoy/message_bus.rb
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2015-11-07 15:14:40 -0700
committermo khan <mo@mokhan.ca>2015-11-07 15:14:40 -0700
commit7a73f5aec92f7390669f9836492fe9700e7421cb (patch)
treea6c78f2889dc47e5fe41645d167a2224558f27b8 /lib/killjoy/message_bus.rb
parentc6b1442c77238391a68d70b8478835441f8bb304 (diff)
move RMQ configuration to a yaml file.
Diffstat (limited to 'lib/killjoy/message_bus.rb')
-rw-r--r--lib/killjoy/message_bus.rb19
1 files changed, 8 insertions, 11 deletions
diff --git a/lib/killjoy/message_bus.rb b/lib/killjoy/message_bus.rb
index 05b4c2f..f60080a 100644
--- a/lib/killjoy/message_bus.rb
+++ b/lib/killjoy/message_bus.rb
@@ -2,7 +2,7 @@ module Killjoy
class MessageBus
attr_reader :configuration
- def initialize(configuration)
+ def initialize(configuration = AMQPConfiguration.new)
@configuration = configuration
@subscriptions = Queue.new
@cpus = Facter.value('processors')['count'].to_i
@@ -10,7 +10,8 @@ module Killjoy
def subscribe(consumer)
options = { manual_ack: true, block: false }
- @subscriptions << create_queue(consumer).subscribe(options) do |info, metadata, raw_message|
+ queue = create_queue(consumer)
+ subscription = queue.subscribe(options) do |info, metadata, raw_message|
begin
message = Message.new(raw_message, info, channel)
if block_given?
@@ -20,10 +21,10 @@ module Killjoy
end
rescue => error
Killjoy.logger.error(error.message)
- message.reject! if message
- reject(info)
+ message.reject!
end
end
+ @subscriptions << subscription
end
def stop
@@ -41,7 +42,7 @@ module Killjoy
def connection
@connection ||= Bunny.new(
- configuration[:amqp_uri],
+ configuration.amqp_uri,
heartbeat: 2,
logger: Killjoy.logger
).tap do |connection|
@@ -57,9 +58,9 @@ module Killjoy
def exchange
channel.exchange(
- configuration[:exchange],
+ configuration.exchange,
durable: true,
- type: configuration[:exchange_type]
+ type: configuration.exchange_type
)
end
@@ -70,9 +71,5 @@ module Killjoy
end
queue
end
-
- def reject(info, requeue = false)
- channel.reject(info.delivery_tag, requeue)
- end
end
end