diff options
| author | mo khan <mo@mokhan.ca> | 2015-11-07 15:14:40 -0700 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2015-11-07 15:14:40 -0700 |
| commit | 7a73f5aec92f7390669f9836492fe9700e7421cb (patch) | |
| tree | a6c78f2889dc47e5fe41645d167a2224558f27b8 | |
| parent | c6b1442c77238391a68d70b8478835441f8bb304 (diff) | |
move RMQ configuration to a yaml file.
| -rw-r--r-- | config/amqp.yml | 15 | ||||
| -rwxr-xr-x | exe/killjoy | 10 | ||||
| -rwxr-xr-x | exe/killjoy-publisher | 7 | ||||
| -rw-r--r-- | lib/killjoy.rb | 3 | ||||
| -rw-r--r-- | lib/killjoy/amqp_configuration.rb | 39 | ||||
| -rw-r--r-- | lib/killjoy/cassandra/database_configuration.rb | 3 | ||||
| -rw-r--r-- | lib/killjoy/experiments.rb | 7 | ||||
| -rw-r--r-- | lib/killjoy/message_bus.rb | 19 | ||||
| -rw-r--r-- | lib/killjoy/web.rb | 3 | ||||
| -rw-r--r-- | lib/killjoy/worker.rb | 2 |
10 files changed, 73 insertions, 35 deletions
diff --git a/config/amqp.yml b/config/amqp.yml new file mode 100644 index 0000000..859cf94 --- /dev/null +++ b/config/amqp.yml @@ -0,0 +1,15 @@ +development: + amqp_uri: '<%= ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672") %>' + exchange: 'killjoy' + exchange_type: 'x-modulus-hash' + shards: <%= ENV.fetch("RMQ_SHARDS", 4).to_i %> +test: + amqp_uri: '<%= ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672") %>' + exchange: 'killjoy' + exchange_type: 'x-modulus-hash' + shards: <%= ENV.fetch("RMQ_SHARDS", 4).to_i %> +production: + amqp_uri: '<%= ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672") %>' + exchange: 'killjoy' + exchange_type: 'x-modulus-hash' + shards: <%= ENV.fetch("RMQ_SHARDS", 4).to_i %> diff --git a/exe/killjoy b/exe/killjoy index 53985f6..c22e9e9 100755 --- a/exe/killjoy +++ b/exe/killjoy @@ -5,18 +5,14 @@ $LOAD_PATH.unshift(File.expand_path("../lib", File.dirname(__FILE__))) require "killjoy" require "killjoy/server" -shards = ENV.fetch("RMQ_SHARDS", 4).to_i - +configuration = Killjoy::AMQPConfiguration.new server = ServerEngine::Daemon.new(Killjoy::Server, Killjoy::Worker, {}) do { - amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"), daemonize: false, - exchange: 'killjoy', - exchange_type: 'x-modulus-hash', pid_path: 'tmp/killjoy.pid', - queue_shards: shards, + queue_shards: configuration.shards, worker_type: 'process', - workers: shards, + workers: configuration.shards, } end server.run diff --git a/exe/killjoy-publisher b/exe/killjoy-publisher index 453ce0f..3d50a56 100755 --- a/exe/killjoy-publisher +++ b/exe/killjoy-publisher @@ -7,12 +7,7 @@ require "killjoy" log_file = ENV.fetch("LOG_FILE", "spec/fixtures/nginx.log") parser = Killjoy::LogParser.new -configuration = { - amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"), - exchange: 'killjoy', - exchange_type: 'x-modulus-hash', -} -message_bus = Killjoy::MessageBus.new(configuration) +message_bus = Killjoy::MessageBus.new Killjoy::Publisher.using(message_bus) do |publisher| log_file = File.join(Dir.pwd, log_file) lines = File.readlines(log_file) diff --git a/lib/killjoy.rb b/lib/killjoy.rb index d4f6e6f..d7478e5 100644 --- a/lib/killjoy.rb +++ b/lib/killjoy.rb @@ -1,6 +1,7 @@ require "active_support/core_ext/string" require "bunny" require "cassandra" +require "erb" require "facter" require "json" require "killjoy/nullable" @@ -8,8 +9,10 @@ require "logger" require "mongo" require "spank" require "virtus" +require "yaml" require "killjoy/after_fork" +require "killjoy/amqp_configuration" require "killjoy/cassandra/blocking_writes_consumer" require "killjoy/cassandra/database_cleaner" require "killjoy/cassandra/database_configuration" diff --git a/lib/killjoy/amqp_configuration.rb b/lib/killjoy/amqp_configuration.rb new file mode 100644 index 0000000..87bfab3 --- /dev/null +++ b/lib/killjoy/amqp_configuration.rb @@ -0,0 +1,39 @@ +module Killjoy + class AMQPConfiguration + attr_reader :environment + + def initialize(environment: ENV.fetch("ENV", "development")) + @environment = environment + end + + def amqp_uri + configuration['amqp_uri'] + end + + def exchange + configuration['exchange'] + end + + def exchange_type + configuration['exchange_type'] + end + + def shards + configuration['shards'].to_i + end + + def to_hash + configuration + end + + private + + def configuration(file = "config/amqp.yml") + @configuration ||= YAML.load(expand_template(file))[environment] + end + + def expand_template(file) + ERB.new(File.read(file)).result(binding) + end + end +end diff --git a/lib/killjoy/cassandra/database_configuration.rb b/lib/killjoy/cassandra/database_configuration.rb index 1361d02..da1a97c 100644 --- a/lib/killjoy/cassandra/database_configuration.rb +++ b/lib/killjoy/cassandra/database_configuration.rb @@ -1,6 +1,3 @@ -require 'erb' -require 'yaml' - module Killjoy module Cassandra class DatabaseConfiguration diff --git a/lib/killjoy/experiments.rb b/lib/killjoy/experiments.rb index d5d9f76..2977eb3 100644 --- a/lib/killjoy/experiments.rb +++ b/lib/killjoy/experiments.rb @@ -56,12 +56,7 @@ module Killjoy def run queue_shards = ENV.fetch("RMQ_SHARDS", 4).to_i - configuration = { - amqp_uri: ENV.fetch("RABBITMQ_URL", "amqp://guest:guest@localhost:5672"), - exchange: 'killjoy', - exchange_type: 'x-modulus-hash', - } - message_bus = MessageBus.new(configuration) + message_bus = MessageBus.new publish_messages(message_bus) queue = Queue.new diff --git a/lib/killjoy/message_bus.rb b/lib/killjoy/message_bus.rb index 05b4c2f..f60080a 100644 --- a/lib/killjoy/message_bus.rb +++ b/lib/killjoy/message_bus.rb @@ -2,7 +2,7 @@ module Killjoy class MessageBus attr_reader :configuration - def initialize(configuration) + def initialize(configuration = AMQPConfiguration.new) @configuration = configuration @subscriptions = Queue.new @cpus = Facter.value('processors')['count'].to_i @@ -10,7 +10,8 @@ module Killjoy def subscribe(consumer) options = { manual_ack: true, block: false } - @subscriptions << create_queue(consumer).subscribe(options) do |info, metadata, raw_message| + queue = create_queue(consumer) + subscription = queue.subscribe(options) do |info, metadata, raw_message| begin message = Message.new(raw_message, info, channel) if block_given? @@ -20,10 +21,10 @@ module Killjoy end rescue => error Killjoy.logger.error(error.message) - message.reject! if message - reject(info) + message.reject! end end + @subscriptions << subscription end def stop @@ -41,7 +42,7 @@ module Killjoy def connection @connection ||= Bunny.new( - configuration[:amqp_uri], + configuration.amqp_uri, heartbeat: 2, logger: Killjoy.logger ).tap do |connection| @@ -57,9 +58,9 @@ module Killjoy def exchange channel.exchange( - configuration[:exchange], + configuration.exchange, durable: true, - type: configuration[:exchange_type] + type: configuration.exchange_type ) end @@ -70,9 +71,5 @@ module Killjoy end queue end - - def reject(info, requeue = false) - channel.reject(info.delivery_tag, requeue) - end end end diff --git a/lib/killjoy/web.rb b/lib/killjoy/web.rb index d84be10..fb4fca0 100644 --- a/lib/killjoy/web.rb +++ b/lib/killjoy/web.rb @@ -10,6 +10,7 @@ Killjoy::Startup.new(Spank::Container.new).run do |container| Spank::IOC.bind_to(container) Spank::IOC.resolve(:session).execute("select * from system.hints;") end +message_bus = Killjoy::MessageBus.new helpers do def h(text) @@ -38,7 +39,7 @@ get '/ping' do url: request["PATH_INFO"], user_agent: request["HTTP_USER_AGENT"], ) - Killjoy::Publisher.new.publish(message) + Killjoy::Publisher.new(message_bus).publish(message) "Hello World!" end diff --git a/lib/killjoy/worker.rb b/lib/killjoy/worker.rb index 822b635..e32790a 100644 --- a/lib/killjoy/worker.rb +++ b/lib/killjoy/worker.rb @@ -2,7 +2,7 @@ module Killjoy module Worker def initialize @mutex = ServerEngine::BlockingFlag.new - @message_bus = Killjoy::MessageBus.new(config) + @message_bus = Killjoy::MessageBus.new end def run |
