summaryrefslogtreecommitdiff
path: root/lib/killjoy
diff options
context:
space:
mode:
Diffstat (limited to 'lib/killjoy')
-rw-r--r--lib/killjoy/experiments.rb21
-rw-r--r--lib/killjoy/kafka/message.rb47
-rw-r--r--lib/killjoy/kafka/message_bus.rb70
-rw-r--r--lib/killjoy/rmq/amqp_configuration.rb (renamed from lib/killjoy/amqp_configuration.rb)0
-rw-r--r--lib/killjoy/rmq/message.rb (renamed from lib/killjoy/message.rb)0
-rw-r--r--lib/killjoy/rmq/message_bus.rb (renamed from lib/killjoy/message_bus.rb)0
-rw-r--r--lib/killjoy/tasks/rabbitmq.rake2
-rw-r--r--lib/killjoy/worker.rb3
8 files changed, 139 insertions, 4 deletions
diff --git a/lib/killjoy/experiments.rb b/lib/killjoy/experiments.rb
index 2977eb3..8bc4578 100644
--- a/lib/killjoy/experiments.rb
+++ b/lib/killjoy/experiments.rb
@@ -42,6 +42,22 @@ module Killjoy
end
end
+ def kafka_cassandra_non_blocking_writes
+ profile('tmp/kafka-cassandra-cpu-non-blocking-writes.dump') do
+ run(Kafka::MessageBus.new, 1) do |shard|
+ Cassandra::NonBlockingWritesConsumer.new(writers, shard)
+ end
+ end
+ end
+
+ def kafka_mongo_writes
+ profile('tmp/kafka-mongo-cpu-non-blocking-writes.dump') do
+ run(Kafka::MessageBus.new, 1) do |shard|
+ Mongo::Consumer.new(@mongo_client, shard)
+ end
+ end
+ end
+
private
def profile(filename)
@@ -54,9 +70,10 @@ module Killjoy
end
end
- def run
+ def run(
+ message_bus = MessageBus.new,
queue_shards = ENV.fetch("RMQ_SHARDS", 4).to_i
- message_bus = MessageBus.new
+ )
publish_messages(message_bus)
queue = Queue.new
diff --git a/lib/killjoy/kafka/message.rb b/lib/killjoy/kafka/message.rb
new file mode 100644
index 0000000..c63b186
--- /dev/null
+++ b/lib/killjoy/kafka/message.rb
@@ -0,0 +1,47 @@
+module Killjoy
+ module Kafka
+ class Message
+ attr_reader :to_hash
+
+ def initialize(raw_message)
+ @to_hash = JSON.parse(raw_message, symbolize_names: true)
+ @interceptors = { ack: [], reject: [] }
+ end
+
+ def intercept(response_type, &block)
+ @interceptors[response_type] << block
+ end
+
+ def process(future)
+ future.on_success do |rows|
+ ack!
+ end
+ future.on_failure do |error|
+ reject!
+ end
+ end
+
+ def ack!
+ #puts "TODO:: ack!"
+ run_interceptors_for(:ack)
+ end
+
+ def reject!(requeue = false)
+ #puts "TODO:: reject!"
+ run_interceptors_for(:reject)
+ end
+
+ def to_s
+ to_hash
+ end
+
+ private
+
+ def run_interceptors_for(response_type)
+ @interceptors[response_type].each do |interceptor|
+ interceptor.call
+ end
+ end
+ end
+ end
+end
diff --git a/lib/killjoy/kafka/message_bus.rb b/lib/killjoy/kafka/message_bus.rb
new file mode 100644
index 0000000..bf5fa08
--- /dev/null
+++ b/lib/killjoy/kafka/message_bus.rb
@@ -0,0 +1,70 @@
+module Killjoy
+ module Kafka
+ class MessageBus
+ def initialize(topic = "killjoy_topic")
+ @topic = topic
+ @kafka_consumer = Poseidon::PartitionConsumer.new("killjoy_consumer", "localhost", 9092, @topic, 0, :earliest_offset)
+ Thread.abort_on_exception = true
+ end
+
+ def subscribe(consumer)
+ @thread = Thread.new do
+ loop do
+ messages = @kafka_consumer.fetch
+ messages.each do |raw_message|
+ begin
+ #puts raw_message.value.inspect
+ message = Message.new(raw_message.value)
+ if block_given?
+ yield message
+ else
+ consumer.work(message)
+ end
+ rescue => error
+ Killjoy.logger.error(error.message)
+ message.reject!
+ end
+ end
+ end
+ end
+ end
+
+ def stop
+ if @kafka_consumer
+ @kafka_consumer.close
+ else
+ puts 'no consumer'
+ end
+ if @thread
+ puts "KILL THE THREAD"
+ Thread.kill(@thread)
+ @thread = nil
+ else
+ puts 'no thread'
+ end
+ end
+
+ def publish(message)
+ message.publish_to(exchange)
+ end
+
+ private
+
+ def exchange
+ @exchange ||= KafkaExchange.new(@topic)
+ end
+
+ class KafkaExchange
+ def initialize(topic)
+ @topic = topic
+ @producer = Poseidon::Producer.new(["localhost:9092"], "killjoy_producer")
+ end
+
+ def publish(json, options = {})
+ #puts "publishing #{json.inspect}"
+ @producer.send_messages([Poseidon::MessageToSend.new(@topic, json)])
+ end
+ end
+ end
+ end
+end
diff --git a/lib/killjoy/amqp_configuration.rb b/lib/killjoy/rmq/amqp_configuration.rb
index 62262f8..62262f8 100644
--- a/lib/killjoy/amqp_configuration.rb
+++ b/lib/killjoy/rmq/amqp_configuration.rb
diff --git a/lib/killjoy/message.rb b/lib/killjoy/rmq/message.rb
index 2aa9c7d..2aa9c7d 100644
--- a/lib/killjoy/message.rb
+++ b/lib/killjoy/rmq/message.rb
diff --git a/lib/killjoy/message_bus.rb b/lib/killjoy/rmq/message_bus.rb
index 710e4cd..710e4cd 100644
--- a/lib/killjoy/message_bus.rb
+++ b/lib/killjoy/rmq/message_bus.rb
diff --git a/lib/killjoy/tasks/rabbitmq.rake b/lib/killjoy/tasks/rabbitmq.rake
index e18f9d5..ebd1500 100644
--- a/lib/killjoy/tasks/rabbitmq.rake
+++ b/lib/killjoy/tasks/rabbitmq.rake
@@ -2,7 +2,7 @@ namespace :rabbitmq do
require 'active_support/core_ext/string'
require 'erb'
require 'yaml'
- require_relative '../amqp_configuration'
+ require_relative '../rmq/amqp_configuration'
desc 'setup rabbitmqadmin'
task :setup do
diff --git a/lib/killjoy/worker.rb b/lib/killjoy/worker.rb
index e32790a..09ba538 100644
--- a/lib/killjoy/worker.rb
+++ b/lib/killjoy/worker.rb
@@ -2,7 +2,8 @@ module Killjoy
module Worker
def initialize
@mutex = ServerEngine::BlockingFlag.new
- @message_bus = Killjoy::MessageBus.new
+ #@message_bus = Killjoy::MessageBus.new
+ @message_bus = Killjoy::Kafka::MessageBus.new
end
def run