From f27ea861853bdf857d95fe027a042e535cf90bb1 Mon Sep 17 00:00:00 2001 From: QiuzZz <39342407+Qiu-zzz@users.noreply.github.com> Date: Tue, 17 Aug 2021 16:12:34 +0800 Subject: [PATCH 1/3] an enum for consumer return value --- consume.go | 30 +++++++++++++++++++++++++----- examples/consumer/main.go | 5 ++--- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/consume.go b/consume.go index 37e828a..c0ce636 100644 --- a/consume.go +++ b/consume.go @@ -7,6 +7,20 @@ import ( "github.com/streadway/amqp" ) +// Action is an action that occurs after processed this delivery +type Action int + +type Handler func(d Delivery) (action Action) + +const ( + // Ack default ack this msg after you have successfully processed this delivery. + Ack Action = iota + // NackDiscard the message will be dropped or delivered to a server configured dead-letter queue. + NackDiscard + // NackRequeue deliver this message to a different consumer. + NackRequeue +) + // Consumer allows you to create and connect to queues for data consumption. type Consumer struct { chManager *channelManager @@ -69,7 +83,7 @@ func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) { // The provided handler is called once for each message. If the provided queue doesn't exist, it // will be created on the cluster func (consumer Consumer) StartConsuming( - handler func(d Delivery) bool, + handler Handler, queue string, routingKeys []string, optionFuncs ...func(*ConsumeOptions), @@ -136,7 +150,7 @@ func (consumer Consumer) StopConsuming(consumerName string, noWait bool) { // startGoroutinesWithRetries attempts to start consuming on a channel // with an exponential backoff func (consumer Consumer) startGoroutinesWithRetries( - handler func(d Delivery) bool, + handler Handler, queue string, routingKeys []string, consumeOptions ConsumeOptions, @@ -164,7 +178,7 @@ func (consumer Consumer) startGoroutinesWithRetries( // binds the queue to the routing key(s), and starts the goroutines // that will consume from the queue func (consumer Consumer) startGoroutines( - handler func(d Delivery) bool, + handler Handler, queue string, routingKeys []string, consumeOptions ConsumeOptions, @@ -246,12 +260,18 @@ func (consumer Consumer) startGoroutines( handler(Delivery{msg}) continue } - if handler(Delivery{msg}) { + switch handler(Delivery{msg}) { + case Ack: err := msg.Ack(false) if err != nil { consumer.logger.Printf("can't ack message: %v", err) } - } else { + case NackDiscard: + err := msg.Nack(false, false) + if err != nil { + consumer.logger.Printf("can't nack message: %v", err) + } + case NackRequeue: err := msg.Nack(false, true) if err != nil { consumer.logger.Printf("can't nack message: %v", err) diff --git a/examples/consumer/main.go b/examples/consumer/main.go index bcd7b2c..a0b19c1 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -16,10 +16,9 @@ func main() { log.Fatal(err) } err = consumer.StartConsuming( - func(d rabbitmq.Delivery) bool { + func(d rabbitmq.Delivery) (action rabbitmq.Action) { log.Printf("consumed: %v", string(d.Body)) - // true to ACK, false to NACK - return true + return }, "my_queue", []string{"routing_key", "routing_key_2"}, From 58fd376df4c28b8b5ce3c45901a2b9b14e5a7f94 Mon Sep 17 00:00:00 2001 From: QiuzZz <39342407+Qiu-zzz@users.noreply.github.com> Date: Tue, 17 Aug 2021 16:12:34 +0800 Subject: [PATCH 2/3] an enum for consumer return value --- consume.go | 31 ++++++++++++++++++++++++++----- examples/consumer/main.go | 5 ++--- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/consume.go b/consume.go index 37e828a..d10f024 100644 --- a/consume.go +++ b/consume.go @@ -7,6 +7,21 @@ import ( "github.com/streadway/amqp" ) +// Action is an action that occurs after processed this delivery +type Action int + +// Handler defines the handler of each Delivery and return Action +type Handler func(d Delivery) (action Action) + +const ( + // Ack default ack this msg after you have successfully processed this delivery. + Ack Action = iota + // NackDiscard the message will be dropped or delivered to a server configured dead-letter queue. + NackDiscard + // NackRequeue deliver this message to a different consumer. + NackRequeue +) + // Consumer allows you to create and connect to queues for data consumption. type Consumer struct { chManager *channelManager @@ -69,7 +84,7 @@ func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) { // The provided handler is called once for each message. If the provided queue doesn't exist, it // will be created on the cluster func (consumer Consumer) StartConsuming( - handler func(d Delivery) bool, + handler Handler, queue string, routingKeys []string, optionFuncs ...func(*ConsumeOptions), @@ -136,7 +151,7 @@ func (consumer Consumer) StopConsuming(consumerName string, noWait bool) { // startGoroutinesWithRetries attempts to start consuming on a channel // with an exponential backoff func (consumer Consumer) startGoroutinesWithRetries( - handler func(d Delivery) bool, + handler Handler, queue string, routingKeys []string, consumeOptions ConsumeOptions, @@ -164,7 +179,7 @@ func (consumer Consumer) startGoroutinesWithRetries( // binds the queue to the routing key(s), and starts the goroutines // that will consume from the queue func (consumer Consumer) startGoroutines( - handler func(d Delivery) bool, + handler Handler, queue string, routingKeys []string, consumeOptions ConsumeOptions, @@ -246,12 +261,18 @@ func (consumer Consumer) startGoroutines( handler(Delivery{msg}) continue } - if handler(Delivery{msg}) { + switch handler(Delivery{msg}) { + case Ack: err := msg.Ack(false) if err != nil { consumer.logger.Printf("can't ack message: %v", err) } - } else { + case NackDiscard: + err := msg.Nack(false, false) + if err != nil { + consumer.logger.Printf("can't nack message: %v", err) + } + case NackRequeue: err := msg.Nack(false, true) if err != nil { consumer.logger.Printf("can't nack message: %v", err) diff --git a/examples/consumer/main.go b/examples/consumer/main.go index bcd7b2c..a0b19c1 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -16,10 +16,9 @@ func main() { log.Fatal(err) } err = consumer.StartConsuming( - func(d rabbitmq.Delivery) bool { + func(d rabbitmq.Delivery) (action rabbitmq.Action) { log.Printf("consumed: %v", string(d.Body)) - // true to ACK, false to NACK - return true + return }, "my_queue", []string{"routing_key", "routing_key_2"}, From 08e83f54da4d3452ff90c6a86950325d8c581abf Mon Sep 17 00:00:00 2001 From: QiuzZz <39342407+Qiu-zzz@users.noreply.github.com> Date: Thu, 26 Aug 2021 09:01:04 +0800 Subject: [PATCH 3/3] style --- consume.go | 1 - 1 file changed, 1 deletion(-) diff --git a/consume.go b/consume.go index a966f64..d10f024 100644 --- a/consume.go +++ b/consume.go @@ -10,7 +10,6 @@ import ( // Action is an action that occurs after processed this delivery type Action int - // Handler defines the handler of each Delivery and return Action type Handler func(d Delivery) (action Action)