diff options
| author | mo khan <mokhan@users.noreply.github.com> | 2019-04-20 09:32:24 -0600 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-04-20 09:32:24 -0600 |
| commit | bc2274ce4183bee7523665bf8a905a584f20081a (patch) | |
| tree | b336f1e4c305a3d22a2177b5b8e14f2b68989348 /lib/killjoy/kafka/message.rb | |
| parent | a7cd8453ab61f3a7b6ab59cda9d051dc725cdead (diff) | |
| parent | 85bd0e5d520109f29011f6eff69e9d2f2e20af26 (diff) | |
Merge pull request #2 from mokhan/kafkamain
Kafka
Diffstat (limited to 'lib/killjoy/kafka/message.rb')
| -rw-r--r-- | lib/killjoy/kafka/message.rb | 47 |
1 files changed, 47 insertions, 0 deletions
diff --git a/lib/killjoy/kafka/message.rb b/lib/killjoy/kafka/message.rb new file mode 100644 index 0000000..c63b186 --- /dev/null +++ b/lib/killjoy/kafka/message.rb @@ -0,0 +1,47 @@ +module Killjoy + module Kafka + class Message + attr_reader :to_hash + + def initialize(raw_message) + @to_hash = JSON.parse(raw_message, symbolize_names: true) + @interceptors = { ack: [], reject: [] } + end + + def intercept(response_type, &block) + @interceptors[response_type] << block + end + + def process(future) + future.on_success do |rows| + ack! + end + future.on_failure do |error| + reject! + end + end + + def ack! + #puts "TODO:: ack!" + run_interceptors_for(:ack) + end + + def reject!(requeue = false) + #puts "TODO:: reject!" + run_interceptors_for(:reject) + end + + def to_s + to_hash + end + + private + + def run_interceptors_for(response_type) + @interceptors[response_type].each do |interceptor| + interceptor.call + end + end + end + end +end |
