summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2015-11-07 15:14:40 -0700
committermo khan <mo@mokhan.ca>2015-11-07 15:14:40 -0700
commit7a73f5aec92f7390669f9836492fe9700e7421cb (patch)
treea6c78f2889dc47e5fe41645d167a2224558f27b8
parentc6b1442c77238391a68d70b8478835441f8bb304 (diff)
move RMQ configuration to a yaml file.
-rw-r--r--config/amqp.yml15
-rwxr-xr-xexe/killjoy10
-rwxr-xr-xexe/killjoy-publisher7
-rw-r--r--lib/killjoy.rb3
-rw-r--r--lib/killjoy/amqp_configuration.rb39
-rw-r--r--lib/killjoy/cassandra/database_configuration.rb3
-rw-r--r--lib/killjoy/experiments.rb7
-rw-r--r--lib/killjoy/message_bus.rb19
-rw-r--r--lib/killjoy/web.rb3
-rw-r--r--lib/killjoy/worker.rb2
10 files changed, 73 insertions, 35 deletions
diff --git a/config/amqp.yml b/config/amqp.yml
new file mode 100644
index 0000000..859cf94
--- /dev/null
+++ b/config/amqp.yml
@@ -0,0 +1,15 @@
+development:
+ amqp_uri: '<%= ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672") %>'
+ exchange: 'killjoy'
+ exchange_type: 'x-modulus-hash'
+ shards: <%= ENV.fetch("RMQ_SHARDS", 4).to_i %>
+test:
+ amqp_uri: '<%= ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672") %>'
+ exchange: 'killjoy'
+ exchange_type: 'x-modulus-hash'
+ shards: <%= ENV.fetch("RMQ_SHARDS", 4).to_i %>
+production:
+ amqp_uri: '<%= ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672") %>'
+ exchange: 'killjoy'
+ exchange_type: 'x-modulus-hash'
+ shards: <%= ENV.fetch("RMQ_SHARDS", 4).to_i %>
diff --git a/exe/killjoy b/exe/killjoy
index 53985f6..c22e9e9 100755
--- a/exe/killjoy
+++ b/exe/killjoy
@@ -5,18 +5,14 @@ $LOAD_PATH.unshift(File.expand_path("../lib", File.dirname(__FILE__)))
require "killjoy"
require "killjoy/server"
-shards = ENV.fetch("RMQ_SHARDS", 4).to_i
-
+configuration = Killjoy::AMQPConfiguration.new
server = ServerEngine::Daemon.new(Killjoy::Server, Killjoy::Worker, {}) do
{
- amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"),
daemonize: false,
- exchange: 'killjoy',
- exchange_type: 'x-modulus-hash',
pid_path: 'tmp/killjoy.pid',
- queue_shards: shards,
+ queue_shards: configuration.shards,
worker_type: 'process',
- workers: shards,
+ workers: configuration.shards,
}
end
server.run
diff --git a/exe/killjoy-publisher b/exe/killjoy-publisher
index 453ce0f..3d50a56 100755
--- a/exe/killjoy-publisher
+++ b/exe/killjoy-publisher
@@ -7,12 +7,7 @@ require "killjoy"
log_file = ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log")
parser = Killjoy::LogParser.new
-configuration = {
- amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"),
- exchange: 'killjoy',
- exchange_type: 'x-modulus-hash',
-}
-message_bus = Killjoy::MessageBus.new(configuration)
+message_bus = Killjoy::MessageBus.new
Killjoy::Publisher.using(message_bus) do |publisher|
log_file = File.join(Dir.pwd, log_file)
lines = File.readlines(log_file)
diff --git a/lib/killjoy.rb b/lib/killjoy.rb
index d4f6e6f..d7478e5 100644
--- a/lib/killjoy.rb
+++ b/lib/killjoy.rb
@@ -1,6 +1,7 @@
require "active_support/core_ext/string"
require "bunny"
require "cassandra"
+require "erb"
require "facter"
require "json"
require "killjoy/nullable"
@@ -8,8 +9,10 @@ require "logger"
require "mongo"
require "spank"
require "virtus"
+require "yaml"
require "killjoy/after_fork"
+require "killjoy/amqp_configuration"
require "killjoy/cassandra/blocking_writes_consumer"
require "killjoy/cassandra/database_cleaner"
require "killjoy/cassandra/database_configuration"
diff --git a/lib/killjoy/amqp_configuration.rb b/lib/killjoy/amqp_configuration.rb
new file mode 100644
index 0000000..87bfab3
--- /dev/null
+++ b/lib/killjoy/amqp_configuration.rb
@@ -0,0 +1,39 @@
+module Killjoy
+ class AMQPConfiguration
+ attr_reader :environment
+
+ def initialize(environment: ENV.fetch("ENV", "development"))
+ @environment = environment
+ end
+
+ def amqp_uri
+ configuration['amqp_uri']
+ end
+
+ def exchange
+ configuration['exchange']
+ end
+
+ def exchange_type
+ configuration['exchange_type']
+ end
+
+ def shards
+ configuration['shards'].to_i
+ end
+
+ def to_hash
+ configuration
+ end
+
+ private
+
+ def configuration(file = "config/amqp.yml")
+ @configuration ||= YAML.load(expand_template(file))[environment]
+ end
+
+ def expand_template(file)
+ ERB.new(File.read(file)).result(binding)
+ end
+ end
+end
diff --git a/lib/killjoy/cassandra/database_configuration.rb b/lib/killjoy/cassandra/database_configuration.rb
index 1361d02..da1a97c 100644
--- a/lib/killjoy/cassandra/database_configuration.rb
+++ b/lib/killjoy/cassandra/database_configuration.rb
@@ -1,6 +1,3 @@
-require 'erb'
-require 'yaml'
-
module Killjoy
module Cassandra
class DatabaseConfiguration
diff --git a/lib/killjoy/experiments.rb b/lib/killjoy/experiments.rb
index d5d9f76..2977eb3 100644
--- a/lib/killjoy/experiments.rb
+++ b/lib/killjoy/experiments.rb
@@ -56,12 +56,7 @@ module Killjoy
def run
queue_shards = ENV.fetch("RMQ_SHARDS", 4).to_i
- configuration = {
- amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"),
- exchange: 'killjoy',
- exchange_type: 'x-modulus-hash',
- }
- message_bus = MessageBus.new(configuration)
+ message_bus = MessageBus.new
publish_messages(message_bus)
queue = Queue.new
diff --git a/lib/killjoy/message_bus.rb b/lib/killjoy/message_bus.rb
index 05b4c2f..f60080a 100644
--- a/lib/killjoy/message_bus.rb
+++ b/lib/killjoy/message_bus.rb
@@ -2,7 +2,7 @@ module Killjoy
class MessageBus
attr_reader :configuration
- def initialize(configuration)
+ def initialize(configuration = AMQPConfiguration.new)
@configuration = configuration
@subscriptions = Queue.new
@cpus = Facter.value('processors')['count'].to_i
@@ -10,7 +10,8 @@ module Killjoy
def subscribe(consumer)
options = { manual_ack: true, block: false }
- @subscriptions << create_queue(consumer).subscribe(options) do |info, metadata, raw_message|
+ queue = create_queue(consumer)
+ subscription = queue.subscribe(options) do |info, metadata, raw_message|
begin
message = Message.new(raw_message, info, channel)
if block_given?
@@ -20,10 +21,10 @@ module Killjoy
end
rescue => error
Killjoy.logger.error(error.message)
- message.reject! if message
- reject(info)
+ message.reject!
end
end
+ @subscriptions << subscription
end
def stop
@@ -41,7 +42,7 @@ module Killjoy
def connection
@connection ||= Bunny.new(
- configuration[:amqp_uri],
+ configuration.amqp_uri,
heartbeat: 2,
logger: Killjoy.logger
).tap do |connection|
@@ -57,9 +58,9 @@ module Killjoy
def exchange
channel.exchange(
- configuration[:exchange],
+ configuration.exchange,
durable: true,
- type: configuration[:exchange_type]
+ type: configuration.exchange_type
)
end
@@ -70,9 +71,5 @@ module Killjoy
end
queue
end
-
- def reject(info, requeue = false)
- channel.reject(info.delivery_tag, requeue)
- end
end
end
diff --git a/lib/killjoy/web.rb b/lib/killjoy/web.rb
index d84be10..fb4fca0 100644
--- a/lib/killjoy/web.rb
+++ b/lib/killjoy/web.rb
@@ -10,6 +10,7 @@ Killjoy::Startup.new(Spank::Container.new).run do |container|
Spank::IOC.bind_to(container)
Spank::IOC.resolve(:session).execute("select * from system.hints;")
end
+message_bus = Killjoy::MessageBus.new
helpers do
def h(text)
@@ -38,7 +39,7 @@ get '/ping' do
url: request["PATH_INFO"],
user_agent: request["HTTP_USER_AGENT"],
)
- Killjoy::Publisher.new.publish(message)
+ Killjoy::Publisher.new(message_bus).publish(message)
"Hello World!"
end
diff --git a/lib/killjoy/worker.rb b/lib/killjoy/worker.rb
index 822b635..e32790a 100644
--- a/lib/killjoy/worker.rb
+++ b/lib/killjoy/worker.rb
@@ -2,7 +2,7 @@ module Killjoy
module Worker
def initialize
@mutex = ServerEngine::BlockingFlag.new
- @message_bus = Killjoy::MessageBus.new(config)
+ @message_bus = Killjoy::MessageBus.new
end
def run