diff options
| author | mo khan <mo@mokhan.ca> | 2016-01-30 21:48:40 -0700 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2016-01-30 21:48:40 -0700 |
| commit | 85bd0e5d520109f29011f6eff69e9d2f2e20af26 (patch) | |
| tree | b336f1e4c305a3d22a2177b5b8e14f2b68989348 | |
| parent | 9c30095d0aa9cf21c0fc64527695b3df85dec476 (diff) | |
attempt to update timing tests for kafka.
| -rw-r--r-- | Gemfile | 2 | ||||
| -rw-r--r-- | Rakefile | 4 | ||||
| -rwxr-xr-x | exe/killjoy-timing | 6 | ||||
| -rw-r--r-- | lib/killjoy.rb | 1 | ||||
| -rw-r--r-- | lib/killjoy/experiments.rb | 21 | ||||
| -rw-r--r-- | lib/killjoy/kafka/message.rb | 21 | ||||
| -rw-r--r-- | lib/killjoy/kafka/message_bus.rb | 35 |
7 files changed, 75 insertions, 15 deletions
@@ -11,7 +11,7 @@ gem "poseidon" gem "rake" gem "serverengine" gem "sinatra" -gem "snappy" +#gem "snappy" gem "spank" gem "virtus" @@ -6,3 +6,7 @@ import "lib/killjoy/tasks/rabbitmq.rake" RSpec::Core::RakeTask.new(:spec) task :default => :spec + +task :timing => ['rabbitmq:reset', 'mongo:drop', 'db:reset'] do + sh "exe/killjoy-timing" +end diff --git a/exe/killjoy-timing b/exe/killjoy-timing index bb48b60..6764710 100755 --- a/exe/killjoy-timing +++ b/exe/killjoy-timing @@ -25,6 +25,12 @@ Benchmark.ips do |x| x.report("mongo: writes") do experiments.mongo_writes end + x.report("kafka-mongo: writes") do + experiments.kafka_mongo_writes + end + x.report("kafka-cassandra: non blocking writes") do + experiments.kafka_cassandra_non_blocking_writes + end x.compare! end diff --git a/lib/killjoy.rb b/lib/killjoy.rb index a6388ab..576d90e 100644 --- a/lib/killjoy.rb +++ b/lib/killjoy.rb @@ -23,6 +23,7 @@ require "killjoy/cassandra/query_builder" require "killjoy/cassandra/queryable" require "killjoy/cassandra/writer" require "killjoy/consumer" +require "killjoy/kafka/message" require "killjoy/kafka/message_bus" require "killjoy/log_line" require "killjoy/log_parser" diff --git a/lib/killjoy/experiments.rb b/lib/killjoy/experiments.rb index 2977eb3..8bc4578 100644 --- a/lib/killjoy/experiments.rb +++ b/lib/killjoy/experiments.rb @@ -42,6 +42,22 @@ module Killjoy end end + def kafka_cassandra_non_blocking_writes + profile('tmp/kafka-cassandra-cpu-non-blocking-writes.dump') do + run(Kafka::MessageBus.new, 1) do |shard| + Cassandra::NonBlockingWritesConsumer.new(writers, shard) + end + end + end + + def kafka_mongo_writes + profile('tmp/kafka-mongo-cpu-non-blocking-writes.dump') do + run(Kafka::MessageBus.new, 1) do |shard| + Mongo::Consumer.new(@mongo_client, shard) + end + end + end + private def profile(filename) @@ -54,9 +70,10 @@ module Killjoy end end - def run + def run( + message_bus = MessageBus.new, queue_shards = ENV.fetch("RMQ_SHARDS", 4).to_i - message_bus = MessageBus.new + ) publish_messages(message_bus) queue = Queue.new diff --git a/lib/killjoy/kafka/message.rb b/lib/killjoy/kafka/message.rb index cd1f14e..c63b186 100644 --- a/lib/killjoy/kafka/message.rb +++ b/lib/killjoy/kafka/message.rb @@ -1,10 +1,15 @@ module Killjoy module Kafka class Message - attr_reader :to_hash, :info, :channel + attr_reader :to_hash def initialize(raw_message) @to_hash = JSON.parse(raw_message, symbolize_names: true) + @interceptors = { ack: [], reject: [] } + end + + def intercept(response_type, &block) + @interceptors[response_type] << block end def process(future) @@ -17,16 +22,26 @@ module Killjoy end def ack! - puts "TODO:: ack!" + #puts "TODO:: ack!" + run_interceptors_for(:ack) end def reject!(requeue = false) - puts "TODO:: reject!" + #puts "TODO:: reject!" + run_interceptors_for(:reject) end def to_s to_hash end + + private + + def run_interceptors_for(response_type) + @interceptors[response_type].each do |interceptor| + interceptor.call + end + end end end end diff --git a/lib/killjoy/kafka/message_bus.rb b/lib/killjoy/kafka/message_bus.rb index 1dc8720..bf5fa08 100644 --- a/lib/killjoy/kafka/message_bus.rb +++ b/lib/killjoy/kafka/message_bus.rb @@ -1,15 +1,20 @@ module Killjoy module Kafka class MessageBus + def initialize(topic = "killjoy_topic") + @topic = topic + @kafka_consumer = Poseidon::PartitionConsumer.new("killjoy_consumer", "localhost", 9092, @topic, 0, :earliest_offset) + Thread.abort_on_exception = true + end + def subscribe(consumer) @thread = Thread.new do - consumer = Poseidon::PartitionConsumer.new("killjoy_consumer", "localhost", 9092, "killjoy_topic", 0, :earliest_offset) - loop do - messages = consumer.fetch + messages = @kafka_consumer.fetch messages.each do |raw_message| begin - message = KafkaMessage.new(raw_message) + #puts raw_message.value.inspect + message = Message.new(raw_message.value) if block_given? yield message else @@ -25,7 +30,18 @@ module Killjoy end def stop - @thread.terminate if @thread + if @kafka_consumer + @kafka_consumer.close + else + puts 'no consumer' + end + if @thread + puts "KILL THE THREAD" + Thread.kill(@thread) + @thread = nil + else + puts 'no thread' + end end def publish(message) @@ -35,17 +51,18 @@ module Killjoy private def exchange - @exchange ||= KafkaExchange.new + @exchange ||= KafkaExchange.new(@topic) end class KafkaExchange - def initialize + def initialize(topic) + @topic = topic @producer = Poseidon::Producer.new(["localhost:9092"], "killjoy_producer") end def publish(json, options = {}) - puts "publishing #{json.inspect}" - @producer.send_messages([Poseidon::MessageToSend.new("killjoy_topic", json)]) + #puts "publishing #{json.inspect}" + @producer.send_messages([Poseidon::MessageToSend.new(@topic, json)]) end end end |
