summaryrefslogtreecommitdiff
path: root/lib/killjoy/rmq
diff options
context:
space:
mode:
authormo khan <mokhan@users.noreply.github.com>2019-04-20 09:32:24 -0600
committerGitHub <noreply@github.com>2019-04-20 09:32:24 -0600
commitbc2274ce4183bee7523665bf8a905a584f20081a (patch)
treeb336f1e4c305a3d22a2177b5b8e14f2b68989348 /lib/killjoy/rmq
parenta7cd8453ab61f3a7b6ab59cda9d051dc725cdead (diff)
parent85bd0e5d520109f29011f6eff69e9d2f2e20af26 (diff)
Merge pull request #2 from mokhan/kafkamain
Kafka
Diffstat (limited to 'lib/killjoy/rmq')
-rw-r--r--lib/killjoy/rmq/amqp_configuration.rb47
-rw-r--r--lib/killjoy/rmq/message.rb47
-rw-r--r--lib/killjoy/rmq/message_bus.rb75
3 files changed, 169 insertions, 0 deletions
diff --git a/lib/killjoy/rmq/amqp_configuration.rb b/lib/killjoy/rmq/amqp_configuration.rb
new file mode 100644
index 0000000..62262f8
--- /dev/null
+++ b/lib/killjoy/rmq/amqp_configuration.rb
@@ -0,0 +1,47 @@
+module Killjoy
+ class AMQPConfiguration
+ attr_reader :environment
+
+ def initialize(environment: ENV.fetch("ENV", "development"))
+ @environment = environment
+ end
+
+ def amqp_uri
+ configuration['amqp_uri']
+ end
+
+ def heartbeat
+ configuration['heartbeat'].to_i
+ end
+
+ def prefetch
+ configuration['prefetch'].to_i
+ end
+
+ def exchange
+ configuration['exchange']
+ end
+
+ def exchange_type
+ configuration['exchange_type']
+ end
+
+ def shards
+ configuration['shards'].to_i
+ end
+
+ def to_hash
+ configuration
+ end
+
+ private
+
+ def configuration(file = "config/amqp.yml")
+ @configuration ||= YAML.load(expand_template(file))[environment]
+ end
+
+ def expand_template(file)
+ ERB.new(File.read(file)).result(binding)
+ end
+ end
+end
diff --git a/lib/killjoy/rmq/message.rb b/lib/killjoy/rmq/message.rb
new file mode 100644
index 0000000..2aa9c7d
--- /dev/null
+++ b/lib/killjoy/rmq/message.rb
@@ -0,0 +1,47 @@
+module Killjoy
+ class Message
+ attr_reader :to_hash, :info, :channel
+
+ def initialize(raw_message, info, channel)
+ @to_hash = JSON.parse(raw_message, symbolize_names: true)
+ @info = info
+ @channel = channel
+ @interceptors = { ack: [], reject: [] }
+ end
+
+ def intercept(response_type, &block)
+ @interceptors[response_type] << block
+ end
+
+ def process(future)
+ future.on_success do |rows|
+ ack!
+ end
+ future.on_failure do |error|
+ reject!
+ end
+ end
+
+ def ack!
+ run_interceptors_for(:ack)
+ channel.acknowledge(info.delivery_tag, false)
+ end
+
+ def reject!(requeue = false)
+ run_interceptors_for(:reject)
+ channel.reject(info.delivery_tag, requeue)
+ end
+
+ def to_s
+ to_hash
+ end
+
+ private
+
+ def run_interceptors_for(response_type)
+ @interceptors[response_type].each do |interceptor|
+ interceptor.call
+ end
+ end
+ end
+end
diff --git a/lib/killjoy/rmq/message_bus.rb b/lib/killjoy/rmq/message_bus.rb
new file mode 100644
index 0000000..710e4cd
--- /dev/null
+++ b/lib/killjoy/rmq/message_bus.rb
@@ -0,0 +1,75 @@
+module Killjoy
+ class MessageBus
+ attr_reader :configuration
+
+ def initialize(configuration = AMQPConfiguration.new)
+ @configuration = configuration
+ @subscriptions = Queue.new
+ @cpus = Facter.value('processors')['count'].to_i
+ end
+
+ def subscribe(consumer)
+ options = { manual_ack: true, block: false }
+ queue = create_queue(consumer)
+ subscription = queue.subscribe(options) do |info, metadata, raw_message|
+ begin
+ message = Message.new(raw_message, info, channel)
+ if block_given?
+ yield message
+ else
+ consumer.work(message)
+ end
+ rescue => error
+ Killjoy.logger.error(error.message)
+ message.reject!
+ end
+ end
+ @subscriptions << subscription
+ end
+
+ def stop
+ while @subscriptions.size > 0
+ @subscriptions.deq.cancel
+ end
+ connection.close
+ end
+
+ def publish(message)
+ message.publish_to(exchange)
+ end
+
+ private
+
+ def connection
+ @connection ||= Bunny.new(
+ configuration.amqp_uri,
+ heartbeat: configuration.heartbeat,
+ logger: Killjoy.logger
+ ).tap do |connection|
+ connection.start
+ end
+ end
+
+ def channel
+ @channel ||= connection.create_channel(nil, @cpus).tap do |x|
+ x.prefetch(configuration.prefetch)
+ end
+ end
+
+ def exchange
+ @exchange ||= channel.exchange(
+ configuration.exchange,
+ durable: true,
+ type: configuration.exchange_type
+ )
+ end
+
+ def create_queue(consumer)
+ queue = channel.queue(consumer.queue_name, exclusive: false, durable: true)
+ consumer.bindings.each do |binding|
+ queue.bind(exchange, routing_key: binding)
+ end
+ queue
+ end
+ end
+end