summaryrefslogtreecommitdiff
path: root/lib/killjoy/rmq/message_bus.rb
blob: 710e4cda14fe5da25cbb22356eb6ea88b04a2e3c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
module Killjoy
  class MessageBus
    attr_reader :configuration

    def initialize(configuration = AMQPConfiguration.new)
      @configuration = configuration
      @subscriptions = Queue.new
      @cpus = Facter.value('processors')['count'].to_i
    end

    def subscribe(consumer)
      options = { manual_ack: true, block: false }
      queue = create_queue(consumer)
      subscription = queue.subscribe(options) do |info, metadata, raw_message|
        begin
          message = Message.new(raw_message, info, channel)
          if block_given?
            yield message
          else
            consumer.work(message)
          end
        rescue => error
          Killjoy.logger.error(error.message)
          message.reject!
        end
      end
      @subscriptions << subscription
    end

    def stop
      while @subscriptions.size > 0
        @subscriptions.deq.cancel
      end
      connection.close
    end

    def publish(message)
      message.publish_to(exchange)
    end

    private

    def connection
      @connection ||= Bunny.new(
        configuration.amqp_uri,
        heartbeat: configuration.heartbeat,
        logger: Killjoy.logger
      ).tap do |connection|
        connection.start
      end
    end

    def channel
      @channel ||= connection.create_channel(nil, @cpus).tap do |x|
        x.prefetch(configuration.prefetch)
      end
    end

    def exchange
      @exchange ||= channel.exchange(
        configuration.exchange,
        durable: true,
        type: configuration.exchange_type
      )
    end

    def create_queue(consumer)
      queue = channel.queue(consumer.queue_name, exclusive: false, durable: true)
      consumer.bindings.each do |binding|
        queue.bind(exchange, routing_key: binding)
      end
      queue
    end
  end
end