From bb2743db109df15aff6f7482edd46a32fa36c3b4 Mon Sep 17 00:00:00 2001 From: mo khan Date: Sat, 30 Jan 2016 19:29:15 -0700 Subject: move rmq classes to rmq dir and install kafka. --- Gemfile | 1 + Gemfile.lock | 4 +- config/chef_apply.rb | 24 ++++++++--- lib/killjoy.rb | 6 +-- lib/killjoy/amqp_configuration.rb | 47 ---------------------- lib/killjoy/kafka/message_bus.rb | 14 +++++++ lib/killjoy/message.rb | 47 ---------------------- lib/killjoy/message_bus.rb | 75 ----------------------------------- lib/killjoy/rmq/amqp_configuration.rb | 47 ++++++++++++++++++++++ lib/killjoy/rmq/message.rb | 47 ++++++++++++++++++++++ lib/killjoy/rmq/message_bus.rb | 75 +++++++++++++++++++++++++++++++++++ lib/killjoy/tasks/rabbitmq.rake | 2 +- 12 files changed, 210 insertions(+), 179 deletions(-) delete mode 100644 lib/killjoy/amqp_configuration.rb create mode 100644 lib/killjoy/kafka/message_bus.rb delete mode 100644 lib/killjoy/message.rb delete mode 100644 lib/killjoy/message_bus.rb create mode 100644 lib/killjoy/rmq/amqp_configuration.rb create mode 100644 lib/killjoy/rmq/message.rb create mode 100644 lib/killjoy/rmq/message_bus.rb diff --git a/Gemfile b/Gemfile index 3b28844..2dd2698 100644 --- a/Gemfile +++ b/Gemfile @@ -12,6 +12,7 @@ gem "serverengine" gem "sinatra" gem "spank" gem "virtus" +gem "poseidon" group :test, :development do platform :ruby do diff --git a/Gemfile.lock b/Gemfile.lock index cd22f25..eb9c166 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -42,6 +42,7 @@ GEM minitest (5.8.2) mongo (2.1.2) bson (~> 3.0) + poseidon (0.0.5) pry (0.10.3) coderay (~> 1.1.0) method_source (~> 0.8.1) @@ -101,6 +102,7 @@ DEPENDENCIES foreman lz4-ruby mongo + poseidon pry pry-byebug rake @@ -113,4 +115,4 @@ DEPENDENCIES virtus BUNDLED WITH - 1.10.6 + 1.11.2 diff --git a/config/chef_apply.rb b/config/chef_apply.rb index b518828..2121a51 100644 --- a/config/chef_apply.rb +++ b/config/chef_apply.rb @@ -9,7 +9,7 @@ name=DataStax Repo for Apache Cassandra baseurl=http://rpm.datastax.com/community enabled=1 gpgcheck=0 -CONTENT + CONTENT end file "/etc/yum.repos.d/mongodb.repo" do content <<-SCRIPT @@ -18,7 +18,7 @@ name=MongoDB Repository baseurl=http://downloads-distro.mongodb.org/repo/redhat/os/x86_64/ enabled=1 gpgcheck=0 -SCRIPT + SCRIPT end execute "rpm --import https://www.rabbitmq.com/rabbitmq-signing-key-public.asc" @@ -37,6 +37,20 @@ execute "yum install -y /tmp/rabbitmq-server-3.5.6-1.noarch.rpm" do not_if "sudo rabbitmqctl status | grep '{rabbit,' | grep '3.5.6'" end +remote_file "/tmp/kafka_2.11-0.9.0.0.tgz" do + source "http://apache.mirror.iweb.ca/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz" +end + +bash "install_kafka" do + cwd "/tmp" + code <<-BASH + tar -xzvf kafka_2.11-0.9.0.0.tgz + mv kafka_2.11-0.9.0.0 /opt/ + ln -s /opt/kafka_2.11-0.9.0.0 /opt/kafka + BASH + not_if { ::Dir.exist?("kafka_2.11-0.9.0.0") } +end + package "epel-release" execute "yum clean all" @@ -109,7 +123,7 @@ file "/etc/profile.d/rbenv.sh" do export RBENV_ROOT="/usr/local/rbenv" export PATH="/usr/local/rbenv/bin:$PATH" eval "$(rbenv init -)" -CONTENT + CONTENT end directory "/usr/local/rbenv/plugins" @@ -126,8 +140,8 @@ bash "install_ruby" do source /etc/profile.d/rbenv.sh rbenv install #{ruby_version} rbenv global #{ruby_version} -rbenv install jruby-9.0.3.0 -EOH +rbenv install jruby-9.0.5.0 + EOH end bash "install_bundler" do diff --git a/lib/killjoy.rb b/lib/killjoy.rb index d7478e5..bb71e0d 100644 --- a/lib/killjoy.rb +++ b/lib/killjoy.rb @@ -12,7 +12,6 @@ require "virtus" require "yaml" require "killjoy/after_fork" -require "killjoy/amqp_configuration" require "killjoy/cassandra/blocking_writes_consumer" require "killjoy/cassandra/database_cleaner" require "killjoy/cassandra/database_configuration" @@ -25,10 +24,11 @@ require "killjoy/cassandra/writer" require "killjoy/consumer" require "killjoy/log_line" require "killjoy/log_parser" -require "killjoy/message" -require "killjoy/message_bus" require "killjoy/mongo/consumer" require "killjoy/publisher" +require "killjoy/rmq/amqp_configuration" +require "killjoy/rmq/message" +require "killjoy/rmq/message_bus" require "killjoy/thread_pool" require "killjoy/version" diff --git a/lib/killjoy/amqp_configuration.rb b/lib/killjoy/amqp_configuration.rb deleted file mode 100644 index 62262f8..0000000 --- a/lib/killjoy/amqp_configuration.rb +++ /dev/null @@ -1,47 +0,0 @@ -module Killjoy - class AMQPConfiguration - attr_reader :environment - - def initialize(environment: ENV.fetch("ENV", "development")) - @environment = environment - end - - def amqp_uri - configuration['amqp_uri'] - end - - def heartbeat - configuration['heartbeat'].to_i - end - - def prefetch - configuration['prefetch'].to_i - end - - def exchange - configuration['exchange'] - end - - def exchange_type - configuration['exchange_type'] - end - - def shards - configuration['shards'].to_i - end - - def to_hash - configuration - end - - private - - def configuration(file = "config/amqp.yml") - @configuration ||= YAML.load(expand_template(file))[environment] - end - - def expand_template(file) - ERB.new(File.read(file)).result(binding) - end - end -end diff --git a/lib/killjoy/kafka/message_bus.rb b/lib/killjoy/kafka/message_bus.rb new file mode 100644 index 0000000..1672be2 --- /dev/null +++ b/lib/killjoy/kafka/message_bus.rb @@ -0,0 +1,14 @@ +module Killjoy + module Kafka + class MessageBus + def subscribe(consumer) + end + + def stop + end + + def publish(message) + end + end + end +end diff --git a/lib/killjoy/message.rb b/lib/killjoy/message.rb deleted file mode 100644 index 2aa9c7d..0000000 --- a/lib/killjoy/message.rb +++ /dev/null @@ -1,47 +0,0 @@ -module Killjoy - class Message - attr_reader :to_hash, :info, :channel - - def initialize(raw_message, info, channel) - @to_hash = JSON.parse(raw_message, symbolize_names: true) - @info = info - @channel = channel - @interceptors = { ack: [], reject: [] } - end - - def intercept(response_type, &block) - @interceptors[response_type] << block - end - - def process(future) - future.on_success do |rows| - ack! - end - future.on_failure do |error| - reject! - end - end - - def ack! - run_interceptors_for(:ack) - channel.acknowledge(info.delivery_tag, false) - end - - def reject!(requeue = false) - run_interceptors_for(:reject) - channel.reject(info.delivery_tag, requeue) - end - - def to_s - to_hash - end - - private - - def run_interceptors_for(response_type) - @interceptors[response_type].each do |interceptor| - interceptor.call - end - end - end -end diff --git a/lib/killjoy/message_bus.rb b/lib/killjoy/message_bus.rb deleted file mode 100644 index 710e4cd..0000000 --- a/lib/killjoy/message_bus.rb +++ /dev/null @@ -1,75 +0,0 @@ -module Killjoy - class MessageBus - attr_reader :configuration - - def initialize(configuration = AMQPConfiguration.new) - @configuration = configuration - @subscriptions = Queue.new - @cpus = Facter.value('processors')['count'].to_i - end - - def subscribe(consumer) - options = { manual_ack: true, block: false } - queue = create_queue(consumer) - subscription = queue.subscribe(options) do |info, metadata, raw_message| - begin - message = Message.new(raw_message, info, channel) - if block_given? - yield message - else - consumer.work(message) - end - rescue => error - Killjoy.logger.error(error.message) - message.reject! - end - end - @subscriptions << subscription - end - - def stop - while @subscriptions.size > 0 - @subscriptions.deq.cancel - end - connection.close - end - - def publish(message) - message.publish_to(exchange) - end - - private - - def connection - @connection ||= Bunny.new( - configuration.amqp_uri, - heartbeat: configuration.heartbeat, - logger: Killjoy.logger - ).tap do |connection| - connection.start - end - end - - def channel - @channel ||= connection.create_channel(nil, @cpus).tap do |x| - x.prefetch(configuration.prefetch) - end - end - - def exchange - @exchange ||= channel.exchange( - configuration.exchange, - durable: true, - type: configuration.exchange_type - ) - end - - def create_queue(consumer) - queue = channel.queue(consumer.queue_name, exclusive: false, durable: true) - consumer.bindings.each do |binding| - queue.bind(exchange, routing_key: binding) - end - queue - end - end -end diff --git a/lib/killjoy/rmq/amqp_configuration.rb b/lib/killjoy/rmq/amqp_configuration.rb new file mode 100644 index 0000000..62262f8 --- /dev/null +++ b/lib/killjoy/rmq/amqp_configuration.rb @@ -0,0 +1,47 @@ +module Killjoy + class AMQPConfiguration + attr_reader :environment + + def initialize(environment: ENV.fetch("ENV", "development")) + @environment = environment + end + + def amqp_uri + configuration['amqp_uri'] + end + + def heartbeat + configuration['heartbeat'].to_i + end + + def prefetch + configuration['prefetch'].to_i + end + + def exchange + configuration['exchange'] + end + + def exchange_type + configuration['exchange_type'] + end + + def shards + configuration['shards'].to_i + end + + def to_hash + configuration + end + + private + + def configuration(file = "config/amqp.yml") + @configuration ||= YAML.load(expand_template(file))[environment] + end + + def expand_template(file) + ERB.new(File.read(file)).result(binding) + end + end +end diff --git a/lib/killjoy/rmq/message.rb b/lib/killjoy/rmq/message.rb new file mode 100644 index 0000000..2aa9c7d --- /dev/null +++ b/lib/killjoy/rmq/message.rb @@ -0,0 +1,47 @@ +module Killjoy + class Message + attr_reader :to_hash, :info, :channel + + def initialize(raw_message, info, channel) + @to_hash = JSON.parse(raw_message, symbolize_names: true) + @info = info + @channel = channel + @interceptors = { ack: [], reject: [] } + end + + def intercept(response_type, &block) + @interceptors[response_type] << block + end + + def process(future) + future.on_success do |rows| + ack! + end + future.on_failure do |error| + reject! + end + end + + def ack! + run_interceptors_for(:ack) + channel.acknowledge(info.delivery_tag, false) + end + + def reject!(requeue = false) + run_interceptors_for(:reject) + channel.reject(info.delivery_tag, requeue) + end + + def to_s + to_hash + end + + private + + def run_interceptors_for(response_type) + @interceptors[response_type].each do |interceptor| + interceptor.call + end + end + end +end diff --git a/lib/killjoy/rmq/message_bus.rb b/lib/killjoy/rmq/message_bus.rb new file mode 100644 index 0000000..710e4cd --- /dev/null +++ b/lib/killjoy/rmq/message_bus.rb @@ -0,0 +1,75 @@ +module Killjoy + class MessageBus + attr_reader :configuration + + def initialize(configuration = AMQPConfiguration.new) + @configuration = configuration + @subscriptions = Queue.new + @cpus = Facter.value('processors')['count'].to_i + end + + def subscribe(consumer) + options = { manual_ack: true, block: false } + queue = create_queue(consumer) + subscription = queue.subscribe(options) do |info, metadata, raw_message| + begin + message = Message.new(raw_message, info, channel) + if block_given? + yield message + else + consumer.work(message) + end + rescue => error + Killjoy.logger.error(error.message) + message.reject! + end + end + @subscriptions << subscription + end + + def stop + while @subscriptions.size > 0 + @subscriptions.deq.cancel + end + connection.close + end + + def publish(message) + message.publish_to(exchange) + end + + private + + def connection + @connection ||= Bunny.new( + configuration.amqp_uri, + heartbeat: configuration.heartbeat, + logger: Killjoy.logger + ).tap do |connection| + connection.start + end + end + + def channel + @channel ||= connection.create_channel(nil, @cpus).tap do |x| + x.prefetch(configuration.prefetch) + end + end + + def exchange + @exchange ||= channel.exchange( + configuration.exchange, + durable: true, + type: configuration.exchange_type + ) + end + + def create_queue(consumer) + queue = channel.queue(consumer.queue_name, exclusive: false, durable: true) + consumer.bindings.each do |binding| + queue.bind(exchange, routing_key: binding) + end + queue + end + end +end diff --git a/lib/killjoy/tasks/rabbitmq.rake b/lib/killjoy/tasks/rabbitmq.rake index e18f9d5..ebd1500 100644 --- a/lib/killjoy/tasks/rabbitmq.rake +++ b/lib/killjoy/tasks/rabbitmq.rake @@ -2,7 +2,7 @@ namespace :rabbitmq do require 'active_support/core_ext/string' require 'erb' require 'yaml' - require_relative '../amqp_configuration' + require_relative '../rmq/amqp_configuration' desc 'setup rabbitmqadmin' task :setup do -- cgit v1.2.3 From 9c30095d0aa9cf21c0fc64527695b3df85dec476 Mon Sep 17 00:00:00 2001 From: mo khan Date: Sat, 30 Jan 2016 19:53:55 -0700 Subject: implement a kafka producer/consumer. --- Gemfile | 3 ++- exe/killjoy | 2 +- exe/killjoy-publisher | 3 ++- lib/killjoy.rb | 2 ++ lib/killjoy/kafka/message.rb | 32 ++++++++++++++++++++++++++++++++ lib/killjoy/kafka/message_bus.rb | 39 +++++++++++++++++++++++++++++++++++++++ lib/killjoy/worker.rb | 3 ++- 7 files changed, 80 insertions(+), 4 deletions(-) create mode 100644 lib/killjoy/kafka/message.rb diff --git a/Gemfile b/Gemfile index 2dd2698..8f5675d 100644 --- a/Gemfile +++ b/Gemfile @@ -7,12 +7,13 @@ gem "facter" gem "foreman" gem "lz4-ruby" gem "mongo" +gem "poseidon" gem "rake" gem "serverengine" gem "sinatra" +gem "snappy" gem "spank" gem "virtus" -gem "poseidon" group :test, :development do platform :ruby do diff --git a/exe/killjoy b/exe/killjoy index c22e9e9..902bdb0 100755 --- a/exe/killjoy +++ b/exe/killjoy @@ -12,7 +12,7 @@ server = ServerEngine::Daemon.new(Killjoy::Server, Killjoy::Worker, {}) do pid_path: 'tmp/killjoy.pid', queue_shards: configuration.shards, worker_type: 'process', - workers: configuration.shards, + workers: 1, # configuration.shards, } end server.run diff --git a/exe/killjoy-publisher b/exe/killjoy-publisher index 3d50a56..a969835 100755 --- a/exe/killjoy-publisher +++ b/exe/killjoy-publisher @@ -7,7 +7,8 @@ require "killjoy" log_file = ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log") parser = Killjoy::LogParser.new -message_bus = Killjoy::MessageBus.new +#message_bus = Killjoy::MessageBus.new +message_bus = Killjoy::Kafka::MessageBus.new Killjoy::Publisher.using(message_bus) do |publisher| log_file = File.join(Dir.pwd, log_file) lines = File.readlines(log_file) diff --git a/lib/killjoy.rb b/lib/killjoy.rb index bb71e0d..a6388ab 100644 --- a/lib/killjoy.rb +++ b/lib/killjoy.rb @@ -7,6 +7,7 @@ require "json" require "killjoy/nullable" require "logger" require "mongo" +require "poseidon" require "spank" require "virtus" require "yaml" @@ -22,6 +23,7 @@ require "killjoy/cassandra/query_builder" require "killjoy/cassandra/queryable" require "killjoy/cassandra/writer" require "killjoy/consumer" +require "killjoy/kafka/message_bus" require "killjoy/log_line" require "killjoy/log_parser" require "killjoy/mongo/consumer" diff --git a/lib/killjoy/kafka/message.rb b/lib/killjoy/kafka/message.rb new file mode 100644 index 0000000..cd1f14e --- /dev/null +++ b/lib/killjoy/kafka/message.rb @@ -0,0 +1,32 @@ +module Killjoy + module Kafka + class Message + attr_reader :to_hash, :info, :channel + + def initialize(raw_message) + @to_hash = JSON.parse(raw_message, symbolize_names: true) + end + + def process(future) + future.on_success do |rows| + ack! + end + future.on_failure do |error| + reject! + end + end + + def ack! + puts "TODO:: ack!" + end + + def reject!(requeue = false) + puts "TODO:: reject!" + end + + def to_s + to_hash + end + end + end +end diff --git a/lib/killjoy/kafka/message_bus.rb b/lib/killjoy/kafka/message_bus.rb index 1672be2..1dc8720 100644 --- a/lib/killjoy/kafka/message_bus.rb +++ b/lib/killjoy/kafka/message_bus.rb @@ -2,12 +2,51 @@ module Killjoy module Kafka class MessageBus def subscribe(consumer) + @thread = Thread.new do + consumer = Poseidon::PartitionConsumer.new("killjoy_consumer", "localhost", 9092, "killjoy_topic", 0, :earliest_offset) + + loop do + messages = consumer.fetch + messages.each do |raw_message| + begin + message = KafkaMessage.new(raw_message) + if block_given? + yield message + else + consumer.work(message) + end + rescue => error + Killjoy.logger.error(error.message) + message.reject! + end + end + end + end end def stop + @thread.terminate if @thread end def publish(message) + message.publish_to(exchange) + end + + private + + def exchange + @exchange ||= KafkaExchange.new + end + + class KafkaExchange + def initialize + @producer = Poseidon::Producer.new(["localhost:9092"], "killjoy_producer") + end + + def publish(json, options = {}) + puts "publishing #{json.inspect}" + @producer.send_messages([Poseidon::MessageToSend.new("killjoy_topic", json)]) + end end end end diff --git a/lib/killjoy/worker.rb b/lib/killjoy/worker.rb index e32790a..09ba538 100644 --- a/lib/killjoy/worker.rb +++ b/lib/killjoy/worker.rb @@ -2,7 +2,8 @@ module Killjoy module Worker def initialize @mutex = ServerEngine::BlockingFlag.new - @message_bus = Killjoy::MessageBus.new + #@message_bus = Killjoy::MessageBus.new + @message_bus = Killjoy::Kafka::MessageBus.new end def run -- cgit v1.2.3 From 85bd0e5d520109f29011f6eff69e9d2f2e20af26 Mon Sep 17 00:00:00 2001 From: mo khan Date: Sat, 30 Jan 2016 21:48:40 -0700 Subject: attempt to update timing tests for kafka. --- Gemfile | 2 +- Rakefile | 4 ++++ exe/killjoy-timing | 6 ++++++ lib/killjoy.rb | 1 + lib/killjoy/experiments.rb | 21 +++++++++++++++++++-- lib/killjoy/kafka/message.rb | 21 ++++++++++++++++++--- lib/killjoy/kafka/message_bus.rb | 35 ++++++++++++++++++++++++++--------- 7 files changed, 75 insertions(+), 15 deletions(-) diff --git a/Gemfile b/Gemfile index 8f5675d..5d54fa5 100644 --- a/Gemfile +++ b/Gemfile @@ -11,7 +11,7 @@ gem "poseidon" gem "rake" gem "serverengine" gem "sinatra" -gem "snappy" +#gem "snappy" gem "spank" gem "virtus" diff --git a/Rakefile b/Rakefile index 1261793..287a352 100644 --- a/Rakefile +++ b/Rakefile @@ -6,3 +6,7 @@ import "lib/killjoy/tasks/rabbitmq.rake" RSpec::Core::RakeTask.new(:spec) task :default => :spec + +task :timing => ['rabbitmq:reset', 'mongo:drop', 'db:reset'] do + sh "exe/killjoy-timing" +end diff --git a/exe/killjoy-timing b/exe/killjoy-timing index bb48b60..6764710 100755 --- a/exe/killjoy-timing +++ b/exe/killjoy-timing @@ -25,6 +25,12 @@ Benchmark.ips do |x| x.report("mongo: writes") do experiments.mongo_writes end + x.report("kafka-mongo: writes") do + experiments.kafka_mongo_writes + end + x.report("kafka-cassandra: non blocking writes") do + experiments.kafka_cassandra_non_blocking_writes + end x.compare! end diff --git a/lib/killjoy.rb b/lib/killjoy.rb index a6388ab..576d90e 100644 --- a/lib/killjoy.rb +++ b/lib/killjoy.rb @@ -23,6 +23,7 @@ require "killjoy/cassandra/query_builder" require "killjoy/cassandra/queryable" require "killjoy/cassandra/writer" require "killjoy/consumer" +require "killjoy/kafka/message" require "killjoy/kafka/message_bus" require "killjoy/log_line" require "killjoy/log_parser" diff --git a/lib/killjoy/experiments.rb b/lib/killjoy/experiments.rb index 2977eb3..8bc4578 100644 --- a/lib/killjoy/experiments.rb +++ b/lib/killjoy/experiments.rb @@ -42,6 +42,22 @@ module Killjoy end end + def kafka_cassandra_non_blocking_writes + profile('tmp/kafka-cassandra-cpu-non-blocking-writes.dump') do + run(Kafka::MessageBus.new, 1) do |shard| + Cassandra::NonBlockingWritesConsumer.new(writers, shard) + end + end + end + + def kafka_mongo_writes + profile('tmp/kafka-mongo-cpu-non-blocking-writes.dump') do + run(Kafka::MessageBus.new, 1) do |shard| + Mongo::Consumer.new(@mongo_client, shard) + end + end + end + private def profile(filename) @@ -54,9 +70,10 @@ module Killjoy end end - def run + def run( + message_bus = MessageBus.new, queue_shards = ENV.fetch("RMQ_SHARDS", 4).to_i - message_bus = MessageBus.new + ) publish_messages(message_bus) queue = Queue.new diff --git a/lib/killjoy/kafka/message.rb b/lib/killjoy/kafka/message.rb index cd1f14e..c63b186 100644 --- a/lib/killjoy/kafka/message.rb +++ b/lib/killjoy/kafka/message.rb @@ -1,10 +1,15 @@ module Killjoy module Kafka class Message - attr_reader :to_hash, :info, :channel + attr_reader :to_hash def initialize(raw_message) @to_hash = JSON.parse(raw_message, symbolize_names: true) + @interceptors = { ack: [], reject: [] } + end + + def intercept(response_type, &block) + @interceptors[response_type] << block end def process(future) @@ -17,16 +22,26 @@ module Killjoy end def ack! - puts "TODO:: ack!" + #puts "TODO:: ack!" + run_interceptors_for(:ack) end def reject!(requeue = false) - puts "TODO:: reject!" + #puts "TODO:: reject!" + run_interceptors_for(:reject) end def to_s to_hash end + + private + + def run_interceptors_for(response_type) + @interceptors[response_type].each do |interceptor| + interceptor.call + end + end end end end diff --git a/lib/killjoy/kafka/message_bus.rb b/lib/killjoy/kafka/message_bus.rb index 1dc8720..bf5fa08 100644 --- a/lib/killjoy/kafka/message_bus.rb +++ b/lib/killjoy/kafka/message_bus.rb @@ -1,15 +1,20 @@ module Killjoy module Kafka class MessageBus + def initialize(topic = "killjoy_topic") + @topic = topic + @kafka_consumer = Poseidon::PartitionConsumer.new("killjoy_consumer", "localhost", 9092, @topic, 0, :earliest_offset) + Thread.abort_on_exception = true + end + def subscribe(consumer) @thread = Thread.new do - consumer = Poseidon::PartitionConsumer.new("killjoy_consumer", "localhost", 9092, "killjoy_topic", 0, :earliest_offset) - loop do - messages = consumer.fetch + messages = @kafka_consumer.fetch messages.each do |raw_message| begin - message = KafkaMessage.new(raw_message) + #puts raw_message.value.inspect + message = Message.new(raw_message.value) if block_given? yield message else @@ -25,7 +30,18 @@ module Killjoy end def stop - @thread.terminate if @thread + if @kafka_consumer + @kafka_consumer.close + else + puts 'no consumer' + end + if @thread + puts "KILL THE THREAD" + Thread.kill(@thread) + @thread = nil + else + puts 'no thread' + end end def publish(message) @@ -35,17 +51,18 @@ module Killjoy private def exchange - @exchange ||= KafkaExchange.new + @exchange ||= KafkaExchange.new(@topic) end class KafkaExchange - def initialize + def initialize(topic) + @topic = topic @producer = Poseidon::Producer.new(["localhost:9092"], "killjoy_producer") end def publish(json, options = {}) - puts "publishing #{json.inspect}" - @producer.send_messages([Poseidon::MessageToSend.new("killjoy_topic", json)]) + #puts "publishing #{json.inspect}" + @producer.send_messages([Poseidon::MessageToSend.new(@topic, json)]) end end end -- cgit v1.2.3