diff options
Diffstat (limited to 'lib/killjoy/rmq')
| -rw-r--r-- | lib/killjoy/rmq/amqp_configuration.rb | 47 | ||||
| -rw-r--r-- | lib/killjoy/rmq/message.rb | 47 | ||||
| -rw-r--r-- | lib/killjoy/rmq/message_bus.rb | 75 |
3 files changed, 169 insertions, 0 deletions
diff --git a/lib/killjoy/rmq/amqp_configuration.rb b/lib/killjoy/rmq/amqp_configuration.rb new file mode 100644 index 0000000..62262f8 --- /dev/null +++ b/lib/killjoy/rmq/amqp_configuration.rb @@ -0,0 +1,47 @@ +module Killjoy + class AMQPConfiguration + attr_reader :environment + + def initialize(environment: ENV.fetch("ENV", "development")) + @environment = environment + end + + def amqp_uri + configuration['amqp_uri'] + end + + def heartbeat + configuration['heartbeat'].to_i + end + + def prefetch + configuration['prefetch'].to_i + end + + def exchange + configuration['exchange'] + end + + def exchange_type + configuration['exchange_type'] + end + + def shards + configuration['shards'].to_i + end + + def to_hash + configuration + end + + private + + def configuration(file = "config/amqp.yml") + @configuration ||= YAML.load(expand_template(file))[environment] + end + + def expand_template(file) + ERB.new(File.read(file)).result(binding) + end + end +end diff --git a/lib/killjoy/rmq/message.rb b/lib/killjoy/rmq/message.rb new file mode 100644 index 0000000..2aa9c7d --- /dev/null +++ b/lib/killjoy/rmq/message.rb @@ -0,0 +1,47 @@ +module Killjoy + class Message + attr_reader :to_hash, :info, :channel + + def initialize(raw_message, info, channel) + @to_hash = JSON.parse(raw_message, symbolize_names: true) + @info = info + @channel = channel + @interceptors = { ack: [], reject: [] } + end + + def intercept(response_type, &block) + @interceptors[response_type] << block + end + + def process(future) + future.on_success do |rows| + ack! + end + future.on_failure do |error| + reject! + end + end + + def ack! + run_interceptors_for(:ack) + channel.acknowledge(info.delivery_tag, false) + end + + def reject!(requeue = false) + run_interceptors_for(:reject) + channel.reject(info.delivery_tag, requeue) + end + + def to_s + to_hash + end + + private + + def run_interceptors_for(response_type) + @interceptors[response_type].each do |interceptor| + interceptor.call + end + end + end +end diff --git a/lib/killjoy/rmq/message_bus.rb b/lib/killjoy/rmq/message_bus.rb new file mode 100644 index 0000000..710e4cd --- /dev/null +++ b/lib/killjoy/rmq/message_bus.rb @@ -0,0 +1,75 @@ +module Killjoy + class MessageBus + attr_reader :configuration + + def initialize(configuration = AMQPConfiguration.new) + @configuration = configuration + @subscriptions = Queue.new + @cpus = Facter.value('processors')['count'].to_i + end + + def subscribe(consumer) + options = { manual_ack: true, block: false } + queue = create_queue(consumer) + subscription = queue.subscribe(options) do |info, metadata, raw_message| + begin + message = Message.new(raw_message, info, channel) + if block_given? + yield message + else + consumer.work(message) + end + rescue => error + Killjoy.logger.error(error.message) + message.reject! + end + end + @subscriptions << subscription + end + + def stop + while @subscriptions.size > 0 + @subscriptions.deq.cancel + end + connection.close + end + + def publish(message) + message.publish_to(exchange) + end + + private + + def connection + @connection ||= Bunny.new( + configuration.amqp_uri, + heartbeat: configuration.heartbeat, + logger: Killjoy.logger + ).tap do |connection| + connection.start + end + end + + def channel + @channel ||= connection.create_channel(nil, @cpus).tap do |x| + x.prefetch(configuration.prefetch) + end + end + + def exchange + @exchange ||= channel.exchange( + configuration.exchange, + durable: true, + type: configuration.exchange_type + ) + end + + def create_queue(consumer) + queue = channel.queue(consumer.queue_name, exclusive: false, durable: true) + consumer.bindings.each do |binding| + queue.bind(exchange, routing_key: binding) + end + queue + end + end +end |
