summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2015-10-17 22:08:58 -0600
committermo khan <mo@mokhan.ca>2015-10-17 22:08:58 -0600
commiteab963bc3b57bad94573dd080c69b895c1aa15a7 (patch)
treeb7f5216fd597794fcd9faff1234881213d5ac3d3
parent991fca5de7e62cfa078869f29902c14c24c23678 (diff)
drop from 16 to 4 queues.
-rw-r--r--Procfile12
-rw-r--r--Vagrantfile1
-rwxr-xr-xexe/killjoy8
-rw-r--r--lib/killjoy/async_consumer.rb8
-rw-r--r--lib/killjoy/consumer.rb3
-rw-r--r--lib/killjoy/publisher.rb14
-rw-r--r--lib/killjoy/tasks/rabbitmq.rake4
7 files changed, 13 insertions, 37 deletions
diff --git a/Procfile b/Procfile
index 5779403..ea3d5c8 100644
--- a/Procfile
+++ b/Procfile
@@ -2,15 +2,3 @@ worker1: RMQ_SHARD=0 exe/killjoy
worker2: RMQ_SHARD=1 exe/killjoy
worker3: RMQ_SHARD=2 exe/killjoy
worker4: RMQ_SHARD=3 exe/killjoy
-worker5: RMQ_SHARD=4 exe/killjoy
-worker6: RMQ_SHARD=5 exe/killjoy
-worker7: RMQ_SHARD=6 exe/killjoy
-worker8: RMQ_SHARD=7 exe/killjoy
-worker9: ASYNC=true RMQ_SHARD=8 exe/killjoy
-worker10: ASYNC=true RMQ_SHARD=9 exe/killjoy
-worker11: ASYNC=true RMQ_SHARD=10 exe/killjoy
-worker12: ASYNC=true RMQ_SHARD=11 exe/killjoy
-worker13: ASYNC=true RMQ_SHARD=12 exe/killjoy
-worker14: ASYNC=true RMQ_SHARD=13 exe/killjoy
-worker15: ASYNC=true RMQ_SHARD=14 exe/killjoy
-worker16: ASYNC=true RMQ_SHARD=15 exe/killjoy
diff --git a/Vagrantfile b/Vagrantfile
index 483d826..6668d5a 100644
--- a/Vagrantfile
+++ b/Vagrantfile
@@ -11,6 +11,7 @@ Vagrant.configure(2) do |config|
config.vm.network "forwarded_port", guest: 5672, host: 5672
config.vm.network "forwarded_port", guest: 9042, host: 9042
config.vm.network "forwarded_port", guest: 9125, host: 9125
+ config.vm.network "forwarded_port", guest: 8888, host: 8888
config.vm.provision :chef_apply do |chef|
chef.recipe = File.read("config/chef_apply.rb")
end
diff --git a/exe/killjoy b/exe/killjoy
index 0ad795a..fc85db0 100755
--- a/exe/killjoy
+++ b/exe/killjoy
@@ -14,15 +14,15 @@ cpus = Facter.value('processors')['count'].to_i
run_asynchronously = ENV["ASYNC"] == 'true'
Sneakers.configure({
- amqp: ENV.fetch("AMQP_URL", "amqp://guest:guest@localhost:5672"),
- exchange: 'shard.killjoy',
+ amqp: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"),
+ exchange: 'killjoy',
exchange_type: 'x-modulus-hash',
daemonize: false,
log: STDOUT,
metrics: Sneakers::Metrics::LoggingMetrics.new,
#metrics: Sneakers::Metrics::StatsdMetrics.new(Statsd.new(ENV["STATSD_HOST"], 9125))
#timeout_job_after: 5,
- #workers: 1,
+ workers: 16,
env: ENV.fetch('ENV', 'development'),
ack: run_asynchronously == false,
durable: true,
@@ -32,7 +32,7 @@ Sneakers.configure({
after_fork: Killjoy::AfterFork.new,
}
})
-Sneakers.logger.level = Logger::INFO
+Sneakers.logger.level = Logger::WARN
if run_asynchronously
Sneakers::Runner.new([ Killjoy::AsyncConsumer ]).run
else
diff --git a/lib/killjoy/async_consumer.rb b/lib/killjoy/async_consumer.rb
index 3cdbe06..d3c0306 100644
--- a/lib/killjoy/async_consumer.rb
+++ b/lib/killjoy/async_consumer.rb
@@ -1,7 +1,7 @@
module Killjoy
class AsyncConsumer
include Sneakers::Worker
- from_queue "sharding: shard.killjoy - rabbit@localhost - #{ENV.fetch("RMQ_SHARD", "1")}"
+ from_queue "sharding: killjoy - rabbit@localhost - #{ENV.fetch("RMQ_SHARD", "1")}"
def work_with_params(raw_message, delivery_info, metadata)
message = JSON.parse(raw_message, symbolize_names: true)
@@ -15,9 +15,11 @@ module Killjoy
def process(future, tag)
future.on_success do |rows|
+ worker_trace("ACK: #{tag}")
channel.acknowledge(tag, false)
end
future.on_failure do |error|
+ worker_trace("NACK: #{tag}")
channel.reject(tag, false)
end
end
@@ -26,10 +28,6 @@ module Killjoy
@channel ||= @queue.instance_variable_get("@channel")
end
- def session
- @session ||= Spank::IOC.resolve(:session)
- end
-
def writers
@writers ||= Spank::IOC.resolve_all(:writer)
end
diff --git a/lib/killjoy/consumer.rb b/lib/killjoy/consumer.rb
index 4023def..b274804 100644
--- a/lib/killjoy/consumer.rb
+++ b/lib/killjoy/consumer.rb
@@ -1,10 +1,9 @@
module Killjoy
class Consumer
include Sneakers::Worker
- from_queue "sharding: shard.killjoy - rabbit@localhost - #{ENV.fetch("RMQ_SHARD", "1")}"
+ from_queue "sharding: killjoy - rabbit@localhost - #{ENV.fetch("RMQ_SHARD", "1")}"
def work(raw_message)
- worker_trace("processing #{raw_message}")
message = JSON.parse(raw_message, symbolize_names: true)
writers.each do |writer|
writer.write(message)
diff --git a/lib/killjoy/publisher.rb b/lib/killjoy/publisher.rb
index cbd6c11..6807a9a 100644
--- a/lib/killjoy/publisher.rb
+++ b/lib/killjoy/publisher.rb
@@ -2,7 +2,7 @@ module Killjoy
class Publisher
attr_reader :exchange_name, :exchange_type, :parser
- def initialize(exchange_name = "shard.killjoy", exchange_type = 'x-modulus-hash')
+ def initialize(exchange_name = "killjoy", exchange_type = 'x-modulus-hash')
@exchange_name = exchange_name
@exchange_type = exchange_type
@parser = LogParser.new
@@ -34,12 +34,7 @@ module Killjoy
end
def configuration
- {
- host: ENV.fetch("RMQ_HOST", "localhost"),
- password: ENV.fetch("RMQ_PASSWORD", "guest"),
- port: ENV.fetch("RMQ_PORT", 5672).to_i,
- username: ENV.fetch("RMQ_USERNAME", "guest"),
- }
+ ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672")
end
def channel
@@ -49,10 +44,5 @@ module Killjoy
def exchange
@exchange ||= channel.exchange(exchange_name, durable: true, type: exchange_type)
end
-
- #"http_status.ip.unix_timestamp"
- def routing_key_for(line)
- "#{line.http_status}.#{line.ipaddress}.#{line.timestamp}"
- end
end
end
diff --git a/lib/killjoy/tasks/rabbitmq.rake b/lib/killjoy/tasks/rabbitmq.rake
index 194ee49..5890d34 100644
--- a/lib/killjoy/tasks/rabbitmq.rake
+++ b/lib/killjoy/tasks/rabbitmq.rake
@@ -16,8 +16,8 @@ namespace :rabbitmq do
desc "create sharded exchange."
task :create do
- sh "rabbitmqadmin declare exchange --vhost=/ name=shard.killjoy type=x-modulus-hash"
- sh "sudo rabbitmqctl set_policy killjoy-shard \"^shard.killjoy$\" '{\"shards-per-node\": 16, \"routing-key\": \"#\"}' --apply-to exchanges"
+ sh "rabbitmqadmin declare exchange --vhost=/ name=killjoy type=x-modulus-hash"
+ sh "sudo rabbitmqctl set_policy killjoy-shard \"^killjoy$\" '{\"shards-per-node\": 4, \"routing-key\": \"#\"}' --apply-to exchanges"
end
desc "delete sharded exchange."