diff options
| author | mo khan <mo@mokhan.ca> | 2015-10-17 22:08:58 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2015-10-17 22:08:58 -0600 |
| commit | eab963bc3b57bad94573dd080c69b895c1aa15a7 (patch) | |
| tree | b7f5216fd597794fcd9faff1234881213d5ac3d3 | |
| parent | 991fca5de7e62cfa078869f29902c14c24c23678 (diff) | |
drop from 16 to 4 queues.
| -rw-r--r-- | Procfile | 12 | ||||
| -rw-r--r-- | Vagrantfile | 1 | ||||
| -rwxr-xr-x | exe/killjoy | 8 | ||||
| -rw-r--r-- | lib/killjoy/async_consumer.rb | 8 | ||||
| -rw-r--r-- | lib/killjoy/consumer.rb | 3 | ||||
| -rw-r--r-- | lib/killjoy/publisher.rb | 14 | ||||
| -rw-r--r-- | lib/killjoy/tasks/rabbitmq.rake | 4 |
7 files changed, 13 insertions, 37 deletions
@@ -2,15 +2,3 @@ worker1: RMQ_SHARD=0 exe/killjoy worker2: RMQ_SHARD=1 exe/killjoy worker3: RMQ_SHARD=2 exe/killjoy worker4: RMQ_SHARD=3 exe/killjoy -worker5: RMQ_SHARD=4 exe/killjoy -worker6: RMQ_SHARD=5 exe/killjoy -worker7: RMQ_SHARD=6 exe/killjoy -worker8: RMQ_SHARD=7 exe/killjoy -worker9: ASYNC=true RMQ_SHARD=8 exe/killjoy -worker10: ASYNC=true RMQ_SHARD=9 exe/killjoy -worker11: ASYNC=true RMQ_SHARD=10 exe/killjoy -worker12: ASYNC=true RMQ_SHARD=11 exe/killjoy -worker13: ASYNC=true RMQ_SHARD=12 exe/killjoy -worker14: ASYNC=true RMQ_SHARD=13 exe/killjoy -worker15: ASYNC=true RMQ_SHARD=14 exe/killjoy -worker16: ASYNC=true RMQ_SHARD=15 exe/killjoy diff --git a/Vagrantfile b/Vagrantfile index 483d826..6668d5a 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -11,6 +11,7 @@ Vagrant.configure(2) do |config| config.vm.network "forwarded_port", guest: 5672, host: 5672 config.vm.network "forwarded_port", guest: 9042, host: 9042 config.vm.network "forwarded_port", guest: 9125, host: 9125 + config.vm.network "forwarded_port", guest: 8888, host: 8888 config.vm.provision :chef_apply do |chef| chef.recipe = File.read("config/chef_apply.rb") end diff --git a/exe/killjoy b/exe/killjoy index 0ad795a..fc85db0 100755 --- a/exe/killjoy +++ b/exe/killjoy @@ -14,15 +14,15 @@ cpus = Facter.value('processors')['count'].to_i run_asynchronously = ENV["ASYNC"] == 'true' Sneakers.configure({ - amqp: ENV.fetch("AMQP_URL", "amqp://guest:guest@localhost:5672"), - exchange: 'shard.killjoy', + amqp: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"), + exchange: 'killjoy', exchange_type: 'x-modulus-hash', daemonize: false, log: STDOUT, metrics: Sneakers::Metrics::LoggingMetrics.new, #metrics: Sneakers::Metrics::StatsdMetrics.new(Statsd.new(ENV["STATSD_HOST"], 9125)) #timeout_job_after: 5, - #workers: 1, + workers: 16, env: ENV.fetch('ENV', 'development'), ack: run_asynchronously == false, durable: true, @@ -32,7 +32,7 @@ Sneakers.configure({ after_fork: Killjoy::AfterFork.new, } }) -Sneakers.logger.level = Logger::INFO +Sneakers.logger.level = Logger::WARN if run_asynchronously Sneakers::Runner.new([ Killjoy::AsyncConsumer ]).run else diff --git a/lib/killjoy/async_consumer.rb b/lib/killjoy/async_consumer.rb index 3cdbe06..d3c0306 100644 --- a/lib/killjoy/async_consumer.rb +++ b/lib/killjoy/async_consumer.rb @@ -1,7 +1,7 @@ module Killjoy class AsyncConsumer include Sneakers::Worker - from_queue "sharding: shard.killjoy - rabbit@localhost - #{ENV.fetch("RMQ_SHARD", "1")}" + from_queue "sharding: killjoy - rabbit@localhost - #{ENV.fetch("RMQ_SHARD", "1")}" def work_with_params(raw_message, delivery_info, metadata) message = JSON.parse(raw_message, symbolize_names: true) @@ -15,9 +15,11 @@ module Killjoy def process(future, tag) future.on_success do |rows| + worker_trace("ACK: #{tag}") channel.acknowledge(tag, false) end future.on_failure do |error| + worker_trace("NACK: #{tag}") channel.reject(tag, false) end end @@ -26,10 +28,6 @@ module Killjoy @channel ||= @queue.instance_variable_get("@channel") end - def session - @session ||= Spank::IOC.resolve(:session) - end - def writers @writers ||= Spank::IOC.resolve_all(:writer) end diff --git a/lib/killjoy/consumer.rb b/lib/killjoy/consumer.rb index 4023def..b274804 100644 --- a/lib/killjoy/consumer.rb +++ b/lib/killjoy/consumer.rb @@ -1,10 +1,9 @@ module Killjoy class Consumer include Sneakers::Worker - from_queue "sharding: shard.killjoy - rabbit@localhost - #{ENV.fetch("RMQ_SHARD", "1")}" + from_queue "sharding: killjoy - rabbit@localhost - #{ENV.fetch("RMQ_SHARD", "1")}" def work(raw_message) - worker_trace("processing #{raw_message}") message = JSON.parse(raw_message, symbolize_names: true) writers.each do |writer| writer.write(message) diff --git a/lib/killjoy/publisher.rb b/lib/killjoy/publisher.rb index cbd6c11..6807a9a 100644 --- a/lib/killjoy/publisher.rb +++ b/lib/killjoy/publisher.rb @@ -2,7 +2,7 @@ module Killjoy class Publisher attr_reader :exchange_name, :exchange_type, :parser - def initialize(exchange_name = "shard.killjoy", exchange_type = 'x-modulus-hash') + def initialize(exchange_name = "killjoy", exchange_type = 'x-modulus-hash') @exchange_name = exchange_name @exchange_type = exchange_type @parser = LogParser.new @@ -34,12 +34,7 @@ module Killjoy end def configuration - { - host: ENV.fetch("RMQ_HOST", "localhost"), - password: ENV.fetch("RMQ_PASSWORD", "guest"), - port: ENV.fetch("RMQ_PORT", 5672).to_i, - username: ENV.fetch("RMQ_USERNAME", "guest"), - } + ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672") end def channel @@ -49,10 +44,5 @@ module Killjoy def exchange @exchange ||= channel.exchange(exchange_name, durable: true, type: exchange_type) end - - #"http_status.ip.unix_timestamp" - def routing_key_for(line) - "#{line.http_status}.#{line.ipaddress}.#{line.timestamp}" - end end end diff --git a/lib/killjoy/tasks/rabbitmq.rake b/lib/killjoy/tasks/rabbitmq.rake index 194ee49..5890d34 100644 --- a/lib/killjoy/tasks/rabbitmq.rake +++ b/lib/killjoy/tasks/rabbitmq.rake @@ -16,8 +16,8 @@ namespace :rabbitmq do desc "create sharded exchange." task :create do - sh "rabbitmqadmin declare exchange --vhost=/ name=shard.killjoy type=x-modulus-hash" - sh "sudo rabbitmqctl set_policy killjoy-shard \"^shard.killjoy$\" '{\"shards-per-node\": 16, \"routing-key\": \"#\"}' --apply-to exchanges" + sh "rabbitmqadmin declare exchange --vhost=/ name=killjoy type=x-modulus-hash" + sh "sudo rabbitmqctl set_policy killjoy-shard \"^killjoy$\" '{\"shards-per-node\": 4, \"routing-key\": \"#\"}' --apply-to exchanges" end desc "delete sharded exchange." |
