1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
module Killjoy
class MessageBus
attr_reader :configuration
def initialize(configuration = AMQPConfiguration.new)
@configuration = configuration
@subscriptions = Queue.new
@cpus = Facter.value('processors')['count'].to_i
end
def subscribe(consumer)
options = { manual_ack: true, block: false }
queue = create_queue(consumer)
subscription = queue.subscribe(options) do |info, metadata, raw_message|
begin
message = Message.new(raw_message, info, channel)
if block_given?
yield message
else
consumer.work(message)
end
rescue => error
Killjoy.logger.error(error.message)
message.reject!
end
end
@subscriptions << subscription
end
def stop
while @subscriptions.size > 0
@subscriptions.deq.cancel
end
connection.close
end
def publish(message)
message.publish_to(exchange)
end
private
def connection
@connection ||= Bunny.new(
configuration.amqp_uri,
heartbeat: configuration.heartbeat,
logger: Killjoy.logger
).tap do |connection|
connection.start
end
end
def channel
@channel ||= connection.create_channel(nil, @cpus).tap do |x|
x.prefetch(configuration.prefetch)
end
end
def exchange
@exchange ||= channel.exchange(
configuration.exchange,
durable: true,
type: configuration.exchange_type
)
end
def create_queue(consumer)
queue = channel.queue(consumer.queue_name, exclusive: false, durable: true)
consumer.bindings.each do |binding|
queue.bind(exchange, routing_key: binding)
end
queue
end
end
end
|