Browse Source

Merge branch 'wagslane:main' into switch/new-upstream

pull/38/head
Aurélien Perrier 4 years ago
committed by GitHub
parent
commit
e51929ae8e
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 8 deletions
  1. +26
    -5
      consume.go
  2. +2
    -3
      examples/consumer/main.go

+ 26
- 5
consume.go View File

@ -7,6 +7,21 @@ import (
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
) )
// Action is an action that occurs after processed this delivery
type Action int
// Handler defines the handler of each Delivery and return Action
type Handler func(d Delivery) (action Action)
const (
// Ack default ack this msg after you have successfully processed this delivery.
Ack Action = iota
// NackDiscard the message will be dropped or delivered to a server configured dead-letter queue.
NackDiscard
// NackRequeue deliver this message to a different consumer.
NackRequeue
)
// Consumer allows you to create and connect to queues for data consumption. // Consumer allows you to create and connect to queues for data consumption.
type Consumer struct { type Consumer struct {
chManager *channelManager chManager *channelManager
@ -69,7 +84,7 @@ func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) {
// The provided handler is called once for each message. If the provided queue doesn't exist, it // The provided handler is called once for each message. If the provided queue doesn't exist, it
// will be created on the cluster // will be created on the cluster
func (consumer Consumer) StartConsuming( func (consumer Consumer) StartConsuming(
handler func(d Delivery) bool,
handler Handler,
queue string, queue string,
routingKeys []string, routingKeys []string,
optionFuncs ...func(*ConsumeOptions), optionFuncs ...func(*ConsumeOptions),
@ -136,7 +151,7 @@ func (consumer Consumer) StopConsuming(consumerName string, noWait bool) {
// startGoroutinesWithRetries attempts to start consuming on a channel // startGoroutinesWithRetries attempts to start consuming on a channel
// with an exponential backoff // with an exponential backoff
func (consumer Consumer) startGoroutinesWithRetries( func (consumer Consumer) startGoroutinesWithRetries(
handler func(d Delivery) bool,
handler Handler,
queue string, queue string,
routingKeys []string, routingKeys []string,
consumeOptions ConsumeOptions, consumeOptions ConsumeOptions,
@ -164,7 +179,7 @@ func (consumer Consumer) startGoroutinesWithRetries(
// binds the queue to the routing key(s), and starts the goroutines // binds the queue to the routing key(s), and starts the goroutines
// that will consume from the queue // that will consume from the queue
func (consumer Consumer) startGoroutines( func (consumer Consumer) startGoroutines(
handler func(d Delivery) bool,
handler Handler,
queue string, queue string,
routingKeys []string, routingKeys []string,
consumeOptions ConsumeOptions, consumeOptions ConsumeOptions,
@ -246,12 +261,18 @@ func (consumer Consumer) startGoroutines(
handler(Delivery{msg}) handler(Delivery{msg})
continue continue
} }
if handler(Delivery{msg}) {
switch handler(Delivery{msg}) {
case Ack:
err := msg.Ack(false) err := msg.Ack(false)
if err != nil { if err != nil {
consumer.logger.Printf("can't ack message: %v", err) consumer.logger.Printf("can't ack message: %v", err)
} }
} else {
case NackDiscard:
err := msg.Nack(false, false)
if err != nil {
consumer.logger.Printf("can't nack message: %v", err)
}
case NackRequeue:
err := msg.Nack(false, true) err := msg.Nack(false, true)
if err != nil { if err != nil {
consumer.logger.Printf("can't nack message: %v", err) consumer.logger.Printf("can't nack message: %v", err)


+ 2
- 3
examples/consumer/main.go View File

@ -16,10 +16,9 @@ func main() {
log.Fatal(err) log.Fatal(err)
} }
err = consumer.StartConsuming( err = consumer.StartConsuming(
func(d rabbitmq.Delivery) bool {
func(d rabbitmq.Delivery) (action rabbitmq.Action) {
log.Printf("consumed: %v", string(d.Body)) log.Printf("consumed: %v", string(d.Body))
// true to ACK, false to NACK
return true
return
}, },
"my_queue", "my_queue",
[]string{"routing_key", "routing_key_2"}, []string{"routing_key", "routing_key_2"},


Loading…
Cancel
Save