summaryrefslogtreecommitdiff
path: root/lib/killjoy/message_bus.rb
blob: 05b4c2f40dd07da65f72c11445735e5a1be36ad7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
module Killjoy
  class MessageBus
    attr_reader :configuration

    def initialize(configuration)
      @configuration = configuration
      @subscriptions = Queue.new
      @cpus = Facter.value('processors')['count'].to_i
    end

    def subscribe(consumer)
      options = { manual_ack: true, block: false }
      @subscriptions << create_queue(consumer).subscribe(options) do |info, metadata, raw_message|
        begin
          message = Message.new(raw_message, info, channel)
          if block_given?
            yield message
          else
            consumer.work(message)
          end
        rescue => error
          Killjoy.logger.error(error.message)
          message.reject! if message
          reject(info)
        end
      end
    end

    def stop
      while @subscriptions.size > 0
        @subscriptions.deq.cancel
      end
      connection.close
    end

    def publish(message)
      message.publish_to(exchange)
    end

    private

    def connection
      @connection ||= Bunny.new(
        configuration[:amqp_uri],
        heartbeat: 2,
        logger: Killjoy.logger
      ).tap do |connection|
        connection.start
      end
    end

    def channel
      @channel ||= connection.create_channel(nil, @cpus).tap do |x|
        x.prefetch(@cpus)
      end
    end

    def exchange
      channel.exchange(
        configuration[:exchange],
        durable: true,
        type: configuration[:exchange_type]
      )
    end

    def create_queue(consumer)
      queue = channel.queue(consumer.queue_name, exclusive: false, durable: true)
      consumer.bindings.each do |binding|
        queue.bind(exchange, routing_key: binding)
      end
      queue
    end

    def reject(info, requeue = false)
      channel.reject(info.delivery_tag, requeue)
    end
  end
end