1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
module Killjoy
class Experiments
attr_reader :configuration, :messages_to_process, :writers, :lines, :enable_profiler
def initialize(enable_profiler: false)
@enable_profiler = enable_profiler
AfterFork.new.call
@messages_to_process = ENV.fetch("MESSAGES", 1_000).to_i
@writers = Spank::IOC.resolve_all(:writer)
@mongo_client = Spank::IOC.resolve(:mongo_client)
@lines = parse_log_lines(messages_to_process)
end
def publish_messages(message_bus)
publisher = Publisher.new(message_bus)
lines.each do |line|
publisher.publish(line)
end
end
def blocking_writes
profile('tmp/cassandra-cpu-blocking-writes.dump') do
run do |shard|
Cassandra::BlockingWritesConsumer.new(writers, shard)
end
end
end
def non_blocking_writes
profile('tmp/cassandra-cpu-non-blocking-writes.dump') do
run do |shard|
Cassandra::NonBlockingWritesConsumer.new(writers, shard)
end
end
end
def mongo_writes
profile('tmp/mongo-cpu-non-blocking-writes.dump') do
run do |shard|
Mongo::Consumer.new(@mongo_client, shard)
end
end
end
def kafka_cassandra_non_blocking_writes
profile('tmp/kafka-cassandra-cpu-non-blocking-writes.dump') do
run(Kafka::MessageBus.new, 1) do |shard|
Cassandra::NonBlockingWritesConsumer.new(writers, shard)
end
end
end
def kafka_mongo_writes
profile('tmp/kafka-mongo-cpu-non-blocking-writes.dump') do
run(Kafka::MessageBus.new, 1) do |shard|
Mongo::Consumer.new(@mongo_client, shard)
end
end
end
private
def profile(filename)
if enable_profiler && RUBY_PLATFORM != "java"
StackProf.run(mode: :cpu, out: filename) do
yield
end
else
yield
end
end
def run(
message_bus = MessageBus.new,
queue_shards = ENV.fetch("RMQ_SHARDS", 4).to_i
)
publish_messages(message_bus)
queue = Queue.new
mutex = Mutex.new
resource = ConditionVariable.new
queue_shards.times do |shard|
consumer = yield(shard)
message_bus.subscribe(consumer) do |message|
message.intercept(:ack) do
queue << message
if queue.size == messages_to_process
mutex.synchronize do
resource.signal
end
end
end
consumer.work(message)
end
end
mutex.synchronize do
resource.wait(mutex)
message_bus.stop
end
end
def parse_log_lines(messages_to_process)
parser = LogParser.new
log_file = File.join(Dir.pwd, ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log"))
File.readlines(log_file).take(messages_to_process).map do |x|
parser.parse(x)
end
end
end
end
|