summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormo khan <mokhan@users.noreply.github.com>2019-04-20 09:32:24 -0600
committerGitHub <noreply@github.com>2019-04-20 09:32:24 -0600
commitbc2274ce4183bee7523665bf8a905a584f20081a (patch)
treeb336f1e4c305a3d22a2177b5b8e14f2b68989348
parenta7cd8453ab61f3a7b6ab59cda9d051dc725cdead (diff)
parent85bd0e5d520109f29011f6eff69e9d2f2e20af26 (diff)
Merge pull request #2 from mokhan/kafkamain
Kafka
-rw-r--r--Gemfile2
-rw-r--r--Gemfile.lock4
-rw-r--r--Rakefile4
-rw-r--r--config/chef_apply.rb24
-rwxr-xr-xexe/killjoy2
-rwxr-xr-xexe/killjoy-publisher3
-rwxr-xr-xexe/killjoy-timing6
-rw-r--r--lib/killjoy.rb9
-rw-r--r--lib/killjoy/experiments.rb21
-rw-r--r--lib/killjoy/kafka/message.rb47
-rw-r--r--lib/killjoy/kafka/message_bus.rb70
-rw-r--r--lib/killjoy/rmq/amqp_configuration.rb (renamed from lib/killjoy/amqp_configuration.rb)0
-rw-r--r--lib/killjoy/rmq/message.rb (renamed from lib/killjoy/message.rb)0
-rw-r--r--lib/killjoy/rmq/message_bus.rb (renamed from lib/killjoy/message_bus.rb)0
-rw-r--r--lib/killjoy/tasks/rabbitmq.rake2
-rw-r--r--lib/killjoy/worker.rb3
16 files changed, 182 insertions, 15 deletions
diff --git a/Gemfile b/Gemfile
index 3b28844..5d54fa5 100644
--- a/Gemfile
+++ b/Gemfile
@@ -7,9 +7,11 @@ gem "facter"
gem "foreman"
gem "lz4-ruby"
gem "mongo"
+gem "poseidon"
gem "rake"
gem "serverengine"
gem "sinatra"
+#gem "snappy"
gem "spank"
gem "virtus"
diff --git a/Gemfile.lock b/Gemfile.lock
index cd22f25..eb9c166 100644
--- a/Gemfile.lock
+++ b/Gemfile.lock
@@ -42,6 +42,7 @@ GEM
minitest (5.8.2)
mongo (2.1.2)
bson (~> 3.0)
+ poseidon (0.0.5)
pry (0.10.3)
coderay (~> 1.1.0)
method_source (~> 0.8.1)
@@ -101,6 +102,7 @@ DEPENDENCIES
foreman
lz4-ruby
mongo
+ poseidon
pry
pry-byebug
rake
@@ -113,4 +115,4 @@ DEPENDENCIES
virtus
BUNDLED WITH
- 1.10.6
+ 1.11.2
diff --git a/Rakefile b/Rakefile
index 1261793..287a352 100644
--- a/Rakefile
+++ b/Rakefile
@@ -6,3 +6,7 @@ import "lib/killjoy/tasks/rabbitmq.rake"
RSpec::Core::RakeTask.new(:spec)
task :default => :spec
+
+task :timing => ['rabbitmq:reset', 'mongo:drop', 'db:reset'] do
+ sh "exe/killjoy-timing"
+end
diff --git a/config/chef_apply.rb b/config/chef_apply.rb
index b518828..2121a51 100644
--- a/config/chef_apply.rb
+++ b/config/chef_apply.rb
@@ -9,7 +9,7 @@ name=DataStax Repo for Apache Cassandra
baseurl=http://rpm.datastax.com/community
enabled=1
gpgcheck=0
-CONTENT
+ CONTENT
end
file "/etc/yum.repos.d/mongodb.repo" do
content <<-SCRIPT
@@ -18,7 +18,7 @@ name=MongoDB Repository
baseurl=http://downloads-distro.mongodb.org/repo/redhat/os/x86_64/
enabled=1
gpgcheck=0
-SCRIPT
+ SCRIPT
end
execute "rpm --import https://www.rabbitmq.com/rabbitmq-signing-key-public.asc"
@@ -37,6 +37,20 @@ execute "yum install -y /tmp/rabbitmq-server-3.5.6-1.noarch.rpm" do
not_if "sudo rabbitmqctl status | grep '{rabbit,' | grep '3.5.6'"
end
+remote_file "/tmp/kafka_2.11-0.9.0.0.tgz" do
+ source "http://apache.mirror.iweb.ca/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz"
+end
+
+bash "install_kafka" do
+ cwd "/tmp"
+ code <<-BASH
+ tar -xzvf kafka_2.11-0.9.0.0.tgz
+ mv kafka_2.11-0.9.0.0 /opt/
+ ln -s /opt/kafka_2.11-0.9.0.0 /opt/kafka
+ BASH
+ not_if { ::Dir.exist?("kafka_2.11-0.9.0.0") }
+end
+
package "epel-release"
execute "yum clean all"
@@ -109,7 +123,7 @@ file "/etc/profile.d/rbenv.sh" do
export RBENV_ROOT="/usr/local/rbenv"
export PATH="/usr/local/rbenv/bin:$PATH"
eval "$(rbenv init -)"
-CONTENT
+ CONTENT
end
directory "/usr/local/rbenv/plugins"
@@ -126,8 +140,8 @@ bash "install_ruby" do
source /etc/profile.d/rbenv.sh
rbenv install #{ruby_version}
rbenv global #{ruby_version}
-rbenv install jruby-9.0.3.0
-EOH
+rbenv install jruby-9.0.5.0
+ EOH
end
bash "install_bundler" do
diff --git a/exe/killjoy b/exe/killjoy
index c22e9e9..902bdb0 100755
--- a/exe/killjoy
+++ b/exe/killjoy
@@ -12,7 +12,7 @@ server = ServerEngine::Daemon.new(Killjoy::Server, Killjoy::Worker, {}) do
pid_path: 'tmp/killjoy.pid',
queue_shards: configuration.shards,
worker_type: 'process',
- workers: configuration.shards,
+ workers: 1, # configuration.shards,
}
end
server.run
diff --git a/exe/killjoy-publisher b/exe/killjoy-publisher
index 3d50a56..a969835 100755
--- a/exe/killjoy-publisher
+++ b/exe/killjoy-publisher
@@ -7,7 +7,8 @@ require "killjoy"
log_file = ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log")
parser = Killjoy::LogParser.new
-message_bus = Killjoy::MessageBus.new
+#message_bus = Killjoy::MessageBus.new
+message_bus = Killjoy::Kafka::MessageBus.new
Killjoy::Publisher.using(message_bus) do |publisher|
log_file = File.join(Dir.pwd, log_file)
lines = File.readlines(log_file)
diff --git a/exe/killjoy-timing b/exe/killjoy-timing
index bb48b60..6764710 100755
--- a/exe/killjoy-timing
+++ b/exe/killjoy-timing
@@ -25,6 +25,12 @@ Benchmark.ips do |x|
x.report("mongo: writes") do
experiments.mongo_writes
end
+ x.report("kafka-mongo: writes") do
+ experiments.kafka_mongo_writes
+ end
+ x.report("kafka-cassandra: non blocking writes") do
+ experiments.kafka_cassandra_non_blocking_writes
+ end
x.compare!
end
diff --git a/lib/killjoy.rb b/lib/killjoy.rb
index d7478e5..576d90e 100644
--- a/lib/killjoy.rb
+++ b/lib/killjoy.rb
@@ -7,12 +7,12 @@ require "json"
require "killjoy/nullable"
require "logger"
require "mongo"
+require "poseidon"
require "spank"
require "virtus"
require "yaml"
require "killjoy/after_fork"
-require "killjoy/amqp_configuration"
require "killjoy/cassandra/blocking_writes_consumer"
require "killjoy/cassandra/database_cleaner"
require "killjoy/cassandra/database_configuration"
@@ -23,12 +23,15 @@ require "killjoy/cassandra/query_builder"
require "killjoy/cassandra/queryable"
require "killjoy/cassandra/writer"
require "killjoy/consumer"
+require "killjoy/kafka/message"
+require "killjoy/kafka/message_bus"
require "killjoy/log_line"
require "killjoy/log_parser"
-require "killjoy/message"
-require "killjoy/message_bus"
require "killjoy/mongo/consumer"
require "killjoy/publisher"
+require "killjoy/rmq/amqp_configuration"
+require "killjoy/rmq/message"
+require "killjoy/rmq/message_bus"
require "killjoy/thread_pool"
require "killjoy/version"
diff --git a/lib/killjoy/experiments.rb b/lib/killjoy/experiments.rb
index 2977eb3..8bc4578 100644
--- a/lib/killjoy/experiments.rb
+++ b/lib/killjoy/experiments.rb
@@ -42,6 +42,22 @@ module Killjoy
end
end
+ def kafka_cassandra_non_blocking_writes
+ profile('tmp/kafka-cassandra-cpu-non-blocking-writes.dump') do
+ run(Kafka::MessageBus.new, 1) do |shard|
+ Cassandra::NonBlockingWritesConsumer.new(writers, shard)
+ end
+ end
+ end
+
+ def kafka_mongo_writes
+ profile('tmp/kafka-mongo-cpu-non-blocking-writes.dump') do
+ run(Kafka::MessageBus.new, 1) do |shard|
+ Mongo::Consumer.new(@mongo_client, shard)
+ end
+ end
+ end
+
private
def profile(filename)
@@ -54,9 +70,10 @@ module Killjoy
end
end
- def run
+ def run(
+ message_bus = MessageBus.new,
queue_shards = ENV.fetch("RMQ_SHARDS", 4).to_i
- message_bus = MessageBus.new
+ )
publish_messages(message_bus)
queue = Queue.new
diff --git a/lib/killjoy/kafka/message.rb b/lib/killjoy/kafka/message.rb
new file mode 100644
index 0000000..c63b186
--- /dev/null
+++ b/lib/killjoy/kafka/message.rb
@@ -0,0 +1,47 @@
+module Killjoy
+ module Kafka
+ class Message
+ attr_reader :to_hash
+
+ def initialize(raw_message)
+ @to_hash = JSON.parse(raw_message, symbolize_names: true)
+ @interceptors = { ack: [], reject: [] }
+ end
+
+ def intercept(response_type, &block)
+ @interceptors[response_type] << block
+ end
+
+ def process(future)
+ future.on_success do |rows|
+ ack!
+ end
+ future.on_failure do |error|
+ reject!
+ end
+ end
+
+ def ack!
+ #puts "TODO:: ack!"
+ run_interceptors_for(:ack)
+ end
+
+ def reject!(requeue = false)
+ #puts "TODO:: reject!"
+ run_interceptors_for(:reject)
+ end
+
+ def to_s
+ to_hash
+ end
+
+ private
+
+ def run_interceptors_for(response_type)
+ @interceptors[response_type].each do |interceptor|
+ interceptor.call
+ end
+ end
+ end
+ end
+end
diff --git a/lib/killjoy/kafka/message_bus.rb b/lib/killjoy/kafka/message_bus.rb
new file mode 100644
index 0000000..bf5fa08
--- /dev/null
+++ b/lib/killjoy/kafka/message_bus.rb
@@ -0,0 +1,70 @@
+module Killjoy
+ module Kafka
+ class MessageBus
+ def initialize(topic = "killjoy_topic")
+ @topic = topic
+ @kafka_consumer = Poseidon::PartitionConsumer.new("killjoy_consumer", "localhost", 9092, @topic, 0, :earliest_offset)
+ Thread.abort_on_exception = true
+ end
+
+ def subscribe(consumer)
+ @thread = Thread.new do
+ loop do
+ messages = @kafka_consumer.fetch
+ messages.each do |raw_message|
+ begin
+ #puts raw_message.value.inspect
+ message = Message.new(raw_message.value)
+ if block_given?
+ yield message
+ else
+ consumer.work(message)
+ end
+ rescue => error
+ Killjoy.logger.error(error.message)
+ message.reject!
+ end
+ end
+ end
+ end
+ end
+
+ def stop
+ if @kafka_consumer
+ @kafka_consumer.close
+ else
+ puts 'no consumer'
+ end
+ if @thread
+ puts "KILL THE THREAD"
+ Thread.kill(@thread)
+ @thread = nil
+ else
+ puts 'no thread'
+ end
+ end
+
+ def publish(message)
+ message.publish_to(exchange)
+ end
+
+ private
+
+ def exchange
+ @exchange ||= KafkaExchange.new(@topic)
+ end
+
+ class KafkaExchange
+ def initialize(topic)
+ @topic = topic
+ @producer = Poseidon::Producer.new(["localhost:9092"], "killjoy_producer")
+ end
+
+ def publish(json, options = {})
+ #puts "publishing #{json.inspect}"
+ @producer.send_messages([Poseidon::MessageToSend.new(@topic, json)])
+ end
+ end
+ end
+ end
+end
diff --git a/lib/killjoy/amqp_configuration.rb b/lib/killjoy/rmq/amqp_configuration.rb
index 62262f8..62262f8 100644
--- a/lib/killjoy/amqp_configuration.rb
+++ b/lib/killjoy/rmq/amqp_configuration.rb
diff --git a/lib/killjoy/message.rb b/lib/killjoy/rmq/message.rb
index 2aa9c7d..2aa9c7d 100644
--- a/lib/killjoy/message.rb
+++ b/lib/killjoy/rmq/message.rb
diff --git a/lib/killjoy/message_bus.rb b/lib/killjoy/rmq/message_bus.rb
index 710e4cd..710e4cd 100644
--- a/lib/killjoy/message_bus.rb
+++ b/lib/killjoy/rmq/message_bus.rb
diff --git a/lib/killjoy/tasks/rabbitmq.rake b/lib/killjoy/tasks/rabbitmq.rake
index e18f9d5..ebd1500 100644
--- a/lib/killjoy/tasks/rabbitmq.rake
+++ b/lib/killjoy/tasks/rabbitmq.rake
@@ -2,7 +2,7 @@ namespace :rabbitmq do
require 'active_support/core_ext/string'
require 'erb'
require 'yaml'
- require_relative '../amqp_configuration'
+ require_relative '../rmq/amqp_configuration'
desc 'setup rabbitmqadmin'
task :setup do
diff --git a/lib/killjoy/worker.rb b/lib/killjoy/worker.rb
index e32790a..09ba538 100644
--- a/lib/killjoy/worker.rb
+++ b/lib/killjoy/worker.rb
@@ -2,7 +2,8 @@ module Killjoy
module Worker
def initialize
@mutex = ServerEngine::BlockingFlag.new
- @message_bus = Killjoy::MessageBus.new
+ #@message_bus = Killjoy::MessageBus.new
+ @message_bus = Killjoy::Kafka::MessageBus.new
end
def run