summaryrefslogtreecommitdiff
path: root/lib/killjoy/kafka/message.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/killjoy/kafka/message.rb')
-rw-r--r--lib/killjoy/kafka/message.rb47
1 files changed, 47 insertions, 0 deletions
diff --git a/lib/killjoy/kafka/message.rb b/lib/killjoy/kafka/message.rb
new file mode 100644
index 0000000..c63b186
--- /dev/null
+++ b/lib/killjoy/kafka/message.rb
@@ -0,0 +1,47 @@
+module Killjoy
+ module Kafka
+ class Message
+ attr_reader :to_hash
+
+ def initialize(raw_message)
+ @to_hash = JSON.parse(raw_message, symbolize_names: true)
+ @interceptors = { ack: [], reject: [] }
+ end
+
+ def intercept(response_type, &block)
+ @interceptors[response_type] << block
+ end
+
+ def process(future)
+ future.on_success do |rows|
+ ack!
+ end
+ future.on_failure do |error|
+ reject!
+ end
+ end
+
+ def ack!
+ #puts "TODO:: ack!"
+ run_interceptors_for(:ack)
+ end
+
+ def reject!(requeue = false)
+ #puts "TODO:: reject!"
+ run_interceptors_for(:reject)
+ end
+
+ def to_s
+ to_hash
+ end
+
+ private
+
+ def run_interceptors_for(response_type)
+ @interceptors[response_type].each do |interceptor|
+ interceptor.call
+ end
+ end
+ end
+ end
+end