summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Gemfile2
-rw-r--r--Rakefile4
-rwxr-xr-xexe/killjoy-timing6
-rw-r--r--lib/killjoy.rb1
-rw-r--r--lib/killjoy/experiments.rb21
-rw-r--r--lib/killjoy/kafka/message.rb21
-rw-r--r--lib/killjoy/kafka/message_bus.rb35
7 files changed, 75 insertions, 15 deletions
diff --git a/Gemfile b/Gemfile
index 8f5675d..5d54fa5 100644
--- a/Gemfile
+++ b/Gemfile
@@ -11,7 +11,7 @@ gem "poseidon"
gem "rake"
gem "serverengine"
gem "sinatra"
-gem "snappy"
+#gem "snappy"
gem "spank"
gem "virtus"
diff --git a/Rakefile b/Rakefile
index 1261793..287a352 100644
--- a/Rakefile
+++ b/Rakefile
@@ -6,3 +6,7 @@ import "lib/killjoy/tasks/rabbitmq.rake"
RSpec::Core::RakeTask.new(:spec)
task :default => :spec
+
+task :timing => ['rabbitmq:reset', 'mongo:drop', 'db:reset'] do
+ sh "exe/killjoy-timing"
+end
diff --git a/exe/killjoy-timing b/exe/killjoy-timing
index bb48b60..6764710 100755
--- a/exe/killjoy-timing
+++ b/exe/killjoy-timing
@@ -25,6 +25,12 @@ Benchmark.ips do |x|
x.report("mongo: writes") do
experiments.mongo_writes
end
+ x.report("kafka-mongo: writes") do
+ experiments.kafka_mongo_writes
+ end
+ x.report("kafka-cassandra: non blocking writes") do
+ experiments.kafka_cassandra_non_blocking_writes
+ end
x.compare!
end
diff --git a/lib/killjoy.rb b/lib/killjoy.rb
index a6388ab..576d90e 100644
--- a/lib/killjoy.rb
+++ b/lib/killjoy.rb
@@ -23,6 +23,7 @@ require "killjoy/cassandra/query_builder"
require "killjoy/cassandra/queryable"
require "killjoy/cassandra/writer"
require "killjoy/consumer"
+require "killjoy/kafka/message"
require "killjoy/kafka/message_bus"
require "killjoy/log_line"
require "killjoy/log_parser"
diff --git a/lib/killjoy/experiments.rb b/lib/killjoy/experiments.rb
index 2977eb3..8bc4578 100644
--- a/lib/killjoy/experiments.rb
+++ b/lib/killjoy/experiments.rb
@@ -42,6 +42,22 @@ module Killjoy
end
end
+ def kafka_cassandra_non_blocking_writes
+ profile('tmp/kafka-cassandra-cpu-non-blocking-writes.dump') do
+ run(Kafka::MessageBus.new, 1) do |shard|
+ Cassandra::NonBlockingWritesConsumer.new(writers, shard)
+ end
+ end
+ end
+
+ def kafka_mongo_writes
+ profile('tmp/kafka-mongo-cpu-non-blocking-writes.dump') do
+ run(Kafka::MessageBus.new, 1) do |shard|
+ Mongo::Consumer.new(@mongo_client, shard)
+ end
+ end
+ end
+
private
def profile(filename)
@@ -54,9 +70,10 @@ module Killjoy
end
end
- def run
+ def run(
+ message_bus = MessageBus.new,
queue_shards = ENV.fetch("RMQ_SHARDS", 4).to_i
- message_bus = MessageBus.new
+ )
publish_messages(message_bus)
queue = Queue.new
diff --git a/lib/killjoy/kafka/message.rb b/lib/killjoy/kafka/message.rb
index cd1f14e..c63b186 100644
--- a/lib/killjoy/kafka/message.rb
+++ b/lib/killjoy/kafka/message.rb
@@ -1,10 +1,15 @@
module Killjoy
module Kafka
class Message
- attr_reader :to_hash, :info, :channel
+ attr_reader :to_hash
def initialize(raw_message)
@to_hash = JSON.parse(raw_message, symbolize_names: true)
+ @interceptors = { ack: [], reject: [] }
+ end
+
+ def intercept(response_type, &block)
+ @interceptors[response_type] << block
end
def process(future)
@@ -17,16 +22,26 @@ module Killjoy
end
def ack!
- puts "TODO:: ack!"
+ #puts "TODO:: ack!"
+ run_interceptors_for(:ack)
end
def reject!(requeue = false)
- puts "TODO:: reject!"
+ #puts "TODO:: reject!"
+ run_interceptors_for(:reject)
end
def to_s
to_hash
end
+
+ private
+
+ def run_interceptors_for(response_type)
+ @interceptors[response_type].each do |interceptor|
+ interceptor.call
+ end
+ end
end
end
end
diff --git a/lib/killjoy/kafka/message_bus.rb b/lib/killjoy/kafka/message_bus.rb
index 1dc8720..bf5fa08 100644
--- a/lib/killjoy/kafka/message_bus.rb
+++ b/lib/killjoy/kafka/message_bus.rb
@@ -1,15 +1,20 @@
module Killjoy
module Kafka
class MessageBus
+ def initialize(topic = "killjoy_topic")
+ @topic = topic
+ @kafka_consumer = Poseidon::PartitionConsumer.new("killjoy_consumer", "localhost", 9092, @topic, 0, :earliest_offset)
+ Thread.abort_on_exception = true
+ end
+
def subscribe(consumer)
@thread = Thread.new do
- consumer = Poseidon::PartitionConsumer.new("killjoy_consumer", "localhost", 9092, "killjoy_topic", 0, :earliest_offset)
-
loop do
- messages = consumer.fetch
+ messages = @kafka_consumer.fetch
messages.each do |raw_message|
begin
- message = KafkaMessage.new(raw_message)
+ #puts raw_message.value.inspect
+ message = Message.new(raw_message.value)
if block_given?
yield message
else
@@ -25,7 +30,18 @@ module Killjoy
end
def stop
- @thread.terminate if @thread
+ if @kafka_consumer
+ @kafka_consumer.close
+ else
+ puts 'no consumer'
+ end
+ if @thread
+ puts "KILL THE THREAD"
+ Thread.kill(@thread)
+ @thread = nil
+ else
+ puts 'no thread'
+ end
end
def publish(message)
@@ -35,17 +51,18 @@ module Killjoy
private
def exchange
- @exchange ||= KafkaExchange.new
+ @exchange ||= KafkaExchange.new(@topic)
end
class KafkaExchange
- def initialize
+ def initialize(topic)
+ @topic = topic
@producer = Poseidon::Producer.new(["localhost:9092"], "killjoy_producer")
end
def publish(json, options = {})
- puts "publishing #{json.inspect}"
- @producer.send_messages([Poseidon::MessageToSend.new("killjoy_topic", json)])
+ #puts "publishing #{json.inspect}"
+ @producer.send_messages([Poseidon::MessageToSend.new(@topic, json)])
end
end
end