summaryrefslogtreecommitdiff
path: root/lib/killjoy/worker.rb
blob: 09ba538cb97418652a79aca33935d678c4313743 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
module Killjoy
  module Worker
    def initialize
      @mutex = ServerEngine::BlockingFlag.new
      #@message_bus = Killjoy::MessageBus.new
      @message_bus = Killjoy::Kafka::MessageBus.new
    end

    def run
      after_fork

      writers = Spank::IOC.resolve_all(:writer)

      config[:queue_shards].times do |shard|
        consumer = Cassandra::NonBlockingWritesConsumer.new(writers, shard)
        @message_bus.subscribe(consumer)
      end
      @mutex.wait
    end

    def stop
      @message_bus.stop
      @mutex.set!
    end

    private

    def after_fork
      Killjoy::AfterFork.new.call
    end
  end
end