summaryrefslogtreecommitdiff
path: root/lib/killjoy/experiments.rb
blob: 8bc45781b920bc2c08808303a9649f9a7b13833d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
module Killjoy
  class Experiments
    attr_reader :configuration, :messages_to_process, :writers, :lines, :enable_profiler

    def initialize(enable_profiler: false)
      @enable_profiler = enable_profiler
      AfterFork.new.call
      @messages_to_process = ENV.fetch("MESSAGES", 1_000).to_i
      @writers = Spank::IOC.resolve_all(:writer)
      @mongo_client = Spank::IOC.resolve(:mongo_client)
      @lines = parse_log_lines(messages_to_process)
    end

    def publish_messages(message_bus)
      publisher = Publisher.new(message_bus)
      lines.each do |line|
        publisher.publish(line)
      end
    end

    def blocking_writes
      profile('tmp/cassandra-cpu-blocking-writes.dump') do
        run do |shard|
          Cassandra::BlockingWritesConsumer.new(writers, shard)
        end
      end
    end

    def non_blocking_writes
      profile('tmp/cassandra-cpu-non-blocking-writes.dump') do
        run do |shard|
          Cassandra::NonBlockingWritesConsumer.new(writers, shard)
        end
      end
    end

    def mongo_writes
      profile('tmp/mongo-cpu-non-blocking-writes.dump') do
        run do |shard|
          Mongo::Consumer.new(@mongo_client, shard)
        end
      end
    end

    def kafka_cassandra_non_blocking_writes
      profile('tmp/kafka-cassandra-cpu-non-blocking-writes.dump') do
        run(Kafka::MessageBus.new, 1) do |shard|
          Cassandra::NonBlockingWritesConsumer.new(writers, shard)
        end
      end
    end

    def kafka_mongo_writes
      profile('tmp/kafka-mongo-cpu-non-blocking-writes.dump') do
        run(Kafka::MessageBus.new, 1) do |shard|
          Mongo::Consumer.new(@mongo_client, shard)
        end
      end
    end

    private

    def profile(filename)
      if enable_profiler && RUBY_PLATFORM != "java"
        StackProf.run(mode: :cpu, out: filename) do
          yield
        end
      else
        yield
      end
    end

    def run(
      message_bus = MessageBus.new,
      queue_shards = ENV.fetch("RMQ_SHARDS", 4).to_i
    )
      publish_messages(message_bus)

      queue = Queue.new
      mutex = Mutex.new
      resource = ConditionVariable.new

      queue_shards.times do |shard|
        consumer = yield(shard)
        message_bus.subscribe(consumer) do |message|
          message.intercept(:ack) do
            queue << message
            if queue.size == messages_to_process
              mutex.synchronize do
                resource.signal
              end
            end
          end
          consumer.work(message)
        end
      end

      mutex.synchronize do
        resource.wait(mutex)
        message_bus.stop
      end
    end

    def parse_log_lines(messages_to_process)
      parser = LogParser.new
      log_file = File.join(Dir.pwd, ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log"))
      File.readlines(log_file).take(messages_to_process).map do |x|
        parser.parse(x)
      end
    end
  end
end