diff --git a/consume.go b/consume.go index 37e828a..c0ce636 100644 --- a/consume.go +++ b/consume.go @@ -7,6 +7,20 @@ import ( "github.com/streadway/amqp" ) +// Action is an action that occurs after processed this delivery +type Action int + +type Handler func(d Delivery) (action Action) + +const ( + // Ack default ack this msg after you have successfully processed this delivery. + Ack Action = iota + // NackDiscard the message will be dropped or delivered to a server configured dead-letter queue. + NackDiscard + // NackRequeue deliver this message to a different consumer. + NackRequeue +) + // Consumer allows you to create and connect to queues for data consumption. type Consumer struct { chManager *channelManager @@ -69,7 +83,7 @@ func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) { // The provided handler is called once for each message. If the provided queue doesn't exist, it // will be created on the cluster func (consumer Consumer) StartConsuming( - handler func(d Delivery) bool, + handler Handler, queue string, routingKeys []string, optionFuncs ...func(*ConsumeOptions), @@ -136,7 +150,7 @@ func (consumer Consumer) StopConsuming(consumerName string, noWait bool) { // startGoroutinesWithRetries attempts to start consuming on a channel // with an exponential backoff func (consumer Consumer) startGoroutinesWithRetries( - handler func(d Delivery) bool, + handler Handler, queue string, routingKeys []string, consumeOptions ConsumeOptions, @@ -164,7 +178,7 @@ func (consumer Consumer) startGoroutinesWithRetries( // binds the queue to the routing key(s), and starts the goroutines // that will consume from the queue func (consumer Consumer) startGoroutines( - handler func(d Delivery) bool, + handler Handler, queue string, routingKeys []string, consumeOptions ConsumeOptions, @@ -246,12 +260,18 @@ func (consumer Consumer) startGoroutines( handler(Delivery{msg}) continue } - if handler(Delivery{msg}) { + switch handler(Delivery{msg}) { + case Ack: err := msg.Ack(false) if err != nil { consumer.logger.Printf("can't ack message: %v", err) } - } else { + case NackDiscard: + err := msg.Nack(false, false) + if err != nil { + consumer.logger.Printf("can't nack message: %v", err) + } + case NackRequeue: err := msg.Nack(false, true) if err != nil { consumer.logger.Printf("can't nack message: %v", err) diff --git a/examples/consumer/main.go b/examples/consumer/main.go index bcd7b2c..a0b19c1 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -16,10 +16,9 @@ func main() { log.Fatal(err) } err = consumer.StartConsuming( - func(d rabbitmq.Delivery) bool { + func(d rabbitmq.Delivery) (action rabbitmq.Action) { log.Printf("consumed: %v", string(d.Body)) - // true to ACK, false to NACK - return true + return }, "my_queue", []string{"routing_key", "routing_key_2"},