|
|
|
@ -7,6 +7,20 @@ import ( |
|
|
|
"github.com/streadway/amqp" |
|
|
|
) |
|
|
|
|
|
|
|
// Action is an action that occurs after processed this delivery
|
|
|
|
type Action int |
|
|
|
|
|
|
|
type Handler func(d Delivery) (action Action) |
|
|
|
|
|
|
|
const ( |
|
|
|
// Ack default ack this msg after you have successfully processed this delivery.
|
|
|
|
Ack Action = iota |
|
|
|
// NackDiscard the message will be dropped or delivered to a server configured dead-letter queue.
|
|
|
|
NackDiscard |
|
|
|
// NackRequeue deliver this message to a different consumer.
|
|
|
|
NackRequeue |
|
|
|
) |
|
|
|
|
|
|
|
// Consumer allows you to create and connect to queues for data consumption.
|
|
|
|
type Consumer struct { |
|
|
|
chManager *channelManager |
|
|
|
@ -69,7 +83,7 @@ func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) { |
|
|
|
// The provided handler is called once for each message. If the provided queue doesn't exist, it
|
|
|
|
// will be created on the cluster
|
|
|
|
func (consumer Consumer) StartConsuming( |
|
|
|
handler func(d Delivery) bool, |
|
|
|
handler Handler, |
|
|
|
queue string, |
|
|
|
routingKeys []string, |
|
|
|
optionFuncs ...func(*ConsumeOptions), |
|
|
|
@ -136,7 +150,7 @@ func (consumer Consumer) StopConsuming(consumerName string, noWait bool) { |
|
|
|
// startGoroutinesWithRetries attempts to start consuming on a channel
|
|
|
|
// with an exponential backoff
|
|
|
|
func (consumer Consumer) startGoroutinesWithRetries( |
|
|
|
handler func(d Delivery) bool, |
|
|
|
handler Handler, |
|
|
|
queue string, |
|
|
|
routingKeys []string, |
|
|
|
consumeOptions ConsumeOptions, |
|
|
|
@ -164,7 +178,7 @@ func (consumer Consumer) startGoroutinesWithRetries( |
|
|
|
// binds the queue to the routing key(s), and starts the goroutines
|
|
|
|
// that will consume from the queue
|
|
|
|
func (consumer Consumer) startGoroutines( |
|
|
|
handler func(d Delivery) bool, |
|
|
|
handler Handler, |
|
|
|
queue string, |
|
|
|
routingKeys []string, |
|
|
|
consumeOptions ConsumeOptions, |
|
|
|
@ -246,12 +260,18 @@ func (consumer Consumer) startGoroutines( |
|
|
|
handler(Delivery{msg}) |
|
|
|
continue |
|
|
|
} |
|
|
|
if handler(Delivery{msg}) { |
|
|
|
switch handler(Delivery{msg}) { |
|
|
|
case Ack: |
|
|
|
err := msg.Ack(false) |
|
|
|
if err != nil { |
|
|
|
consumer.logger.Printf("can't ack message: %v", err) |
|
|
|
} |
|
|
|
} else { |
|
|
|
case NackDiscard: |
|
|
|
err := msg.Nack(false, false) |
|
|
|
if err != nil { |
|
|
|
consumer.logger.Printf("can't nack message: %v", err) |
|
|
|
} |
|
|
|
case NackRequeue: |
|
|
|
err := msg.Nack(false, true) |
|
|
|
if err != nil { |
|
|
|
consumer.logger.Printf("can't nack message: %v", err) |
|
|
|
|