diff options
Diffstat (limited to 'lib/killjoy')
| -rw-r--r-- | lib/killjoy/experiments.rb | 21 | ||||
| -rw-r--r-- | lib/killjoy/kafka/message.rb | 47 | ||||
| -rw-r--r-- | lib/killjoy/kafka/message_bus.rb | 70 | ||||
| -rw-r--r-- | lib/killjoy/rmq/amqp_configuration.rb (renamed from lib/killjoy/amqp_configuration.rb) | 0 | ||||
| -rw-r--r-- | lib/killjoy/rmq/message.rb (renamed from lib/killjoy/message.rb) | 0 | ||||
| -rw-r--r-- | lib/killjoy/rmq/message_bus.rb (renamed from lib/killjoy/message_bus.rb) | 0 | ||||
| -rw-r--r-- | lib/killjoy/tasks/rabbitmq.rake | 2 | ||||
| -rw-r--r-- | lib/killjoy/worker.rb | 3 |
8 files changed, 139 insertions, 4 deletions
diff --git a/lib/killjoy/experiments.rb b/lib/killjoy/experiments.rb index 2977eb3..8bc4578 100644 --- a/lib/killjoy/experiments.rb +++ b/lib/killjoy/experiments.rb @@ -42,6 +42,22 @@ module Killjoy end end + def kafka_cassandra_non_blocking_writes + profile('tmp/kafka-cassandra-cpu-non-blocking-writes.dump') do + run(Kafka::MessageBus.new, 1) do |shard| + Cassandra::NonBlockingWritesConsumer.new(writers, shard) + end + end + end + + def kafka_mongo_writes + profile('tmp/kafka-mongo-cpu-non-blocking-writes.dump') do + run(Kafka::MessageBus.new, 1) do |shard| + Mongo::Consumer.new(@mongo_client, shard) + end + end + end + private def profile(filename) @@ -54,9 +70,10 @@ module Killjoy end end - def run + def run( + message_bus = MessageBus.new, queue_shards = ENV.fetch("RMQ_SHARDS", 4).to_i - message_bus = MessageBus.new + ) publish_messages(message_bus) queue = Queue.new diff --git a/lib/killjoy/kafka/message.rb b/lib/killjoy/kafka/message.rb new file mode 100644 index 0000000..c63b186 --- /dev/null +++ b/lib/killjoy/kafka/message.rb @@ -0,0 +1,47 @@ +module Killjoy + module Kafka + class Message + attr_reader :to_hash + + def initialize(raw_message) + @to_hash = JSON.parse(raw_message, symbolize_names: true) + @interceptors = { ack: [], reject: [] } + end + + def intercept(response_type, &block) + @interceptors[response_type] << block + end + + def process(future) + future.on_success do |rows| + ack! + end + future.on_failure do |error| + reject! + end + end + + def ack! + #puts "TODO:: ack!" + run_interceptors_for(:ack) + end + + def reject!(requeue = false) + #puts "TODO:: reject!" + run_interceptors_for(:reject) + end + + def to_s + to_hash + end + + private + + def run_interceptors_for(response_type) + @interceptors[response_type].each do |interceptor| + interceptor.call + end + end + end + end +end diff --git a/lib/killjoy/kafka/message_bus.rb b/lib/killjoy/kafka/message_bus.rb new file mode 100644 index 0000000..bf5fa08 --- /dev/null +++ b/lib/killjoy/kafka/message_bus.rb @@ -0,0 +1,70 @@ +module Killjoy + module Kafka + class MessageBus + def initialize(topic = "killjoy_topic") + @topic = topic + @kafka_consumer = Poseidon::PartitionConsumer.new("killjoy_consumer", "localhost", 9092, @topic, 0, :earliest_offset) + Thread.abort_on_exception = true + end + + def subscribe(consumer) + @thread = Thread.new do + loop do + messages = @kafka_consumer.fetch + messages.each do |raw_message| + begin + #puts raw_message.value.inspect + message = Message.new(raw_message.value) + if block_given? + yield message + else + consumer.work(message) + end + rescue => error + Killjoy.logger.error(error.message) + message.reject! + end + end + end + end + end + + def stop + if @kafka_consumer + @kafka_consumer.close + else + puts 'no consumer' + end + if @thread + puts "KILL THE THREAD" + Thread.kill(@thread) + @thread = nil + else + puts 'no thread' + end + end + + def publish(message) + message.publish_to(exchange) + end + + private + + def exchange + @exchange ||= KafkaExchange.new(@topic) + end + + class KafkaExchange + def initialize(topic) + @topic = topic + @producer = Poseidon::Producer.new(["localhost:9092"], "killjoy_producer") + end + + def publish(json, options = {}) + #puts "publishing #{json.inspect}" + @producer.send_messages([Poseidon::MessageToSend.new(@topic, json)]) + end + end + end + end +end diff --git a/lib/killjoy/amqp_configuration.rb b/lib/killjoy/rmq/amqp_configuration.rb index 62262f8..62262f8 100644 --- a/lib/killjoy/amqp_configuration.rb +++ b/lib/killjoy/rmq/amqp_configuration.rb diff --git a/lib/killjoy/message.rb b/lib/killjoy/rmq/message.rb index 2aa9c7d..2aa9c7d 100644 --- a/lib/killjoy/message.rb +++ b/lib/killjoy/rmq/message.rb diff --git a/lib/killjoy/message_bus.rb b/lib/killjoy/rmq/message_bus.rb index 710e4cd..710e4cd 100644 --- a/lib/killjoy/message_bus.rb +++ b/lib/killjoy/rmq/message_bus.rb diff --git a/lib/killjoy/tasks/rabbitmq.rake b/lib/killjoy/tasks/rabbitmq.rake index e18f9d5..ebd1500 100644 --- a/lib/killjoy/tasks/rabbitmq.rake +++ b/lib/killjoy/tasks/rabbitmq.rake @@ -2,7 +2,7 @@ namespace :rabbitmq do require 'active_support/core_ext/string' require 'erb' require 'yaml' - require_relative '../amqp_configuration' + require_relative '../rmq/amqp_configuration' desc 'setup rabbitmqadmin' task :setup do diff --git a/lib/killjoy/worker.rb b/lib/killjoy/worker.rb index e32790a..09ba538 100644 --- a/lib/killjoy/worker.rb +++ b/lib/killjoy/worker.rb @@ -2,7 +2,8 @@ module Killjoy module Worker def initialize @mutex = ServerEngine::BlockingFlag.new - @message_bus = Killjoy::MessageBus.new + #@message_bus = Killjoy::MessageBus.new + @message_bus = Killjoy::Kafka::MessageBus.new end def run |
