package rabbitmq
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
"github.com/wagslane/go-rabbitmq/internal/channelmanager"
|
|
)
|
|
|
|
// Action is an action that occurs after processed this delivery
|
|
type Action int
|
|
|
|
// Handler defines the handler of each Delivery and return Action
|
|
type Handler func(d Delivery) (action Action)
|
|
|
|
const (
|
|
// Ack default ack this msg after you have successfully processed this delivery.
|
|
Ack Action = iota
|
|
// NackDiscard the message will be dropped or delivered to a server configured dead-letter queue.
|
|
NackDiscard
|
|
// NackRequeue deliver this message to a different consumer.
|
|
NackRequeue
|
|
// Message acknowledgement is left to the user using the msg.Ack() method
|
|
Manual
|
|
)
|
|
|
|
// Consumer allows you to create and connect to queues for data consumption.
|
|
type Consumer struct {
|
|
chanManager *channelmanager.ChannelManager
|
|
reconnectErrCh <-chan error
|
|
closeConnectionToManagerCh chan<- struct{}
|
|
options ConsumerOptions
|
|
handlerMu *sync.RWMutex
|
|
|
|
isClosedMu *sync.RWMutex
|
|
isClosed bool
|
|
}
|
|
|
|
// Delivery captures the fields for a previously delivered message resident in
|
|
// a queue to be delivered by the server to a consumer from Channel.Consume or
|
|
// Channel.Get.
|
|
type Delivery struct {
|
|
amqp.Delivery
|
|
}
|
|
|
|
// NewConsumer returns a new Consumer connected to the given rabbitmq server.
|
|
func NewConsumer(
|
|
conn *Conn,
|
|
queue string,
|
|
optionFuncs ...func(*ConsumerOptions),
|
|
) (*Consumer, error) {
|
|
defaultOptions := getDefaultConsumerOptions(queue)
|
|
options := &defaultOptions
|
|
for _, optionFunc := range optionFuncs {
|
|
optionFunc(options)
|
|
}
|
|
|
|
if conn.connectionManager == nil {
|
|
return nil, errors.New("connection manager can't be nil")
|
|
}
|
|
|
|
chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
reconnectErrCh, closeCh := chanManager.NotifyReconnect()
|
|
|
|
consumer := &Consumer{
|
|
chanManager: chanManager,
|
|
reconnectErrCh: reconnectErrCh,
|
|
closeConnectionToManagerCh: closeCh,
|
|
options: *options,
|
|
handlerMu: &sync.RWMutex{},
|
|
isClosedMu: &sync.RWMutex{},
|
|
isClosed: false,
|
|
}
|
|
|
|
return consumer, nil
|
|
}
|
|
|
|
// Run starts consuming with automatic reconnection handling. Do not reuse the
|
|
// consumer for anything other than to close it.
|
|
func (consumer *Consumer) Run(handler Handler) error {
|
|
handlerWrapper := func(d Delivery) (action Action) {
|
|
if !consumer.handlerMu.TryRLock() {
|
|
return NackRequeue
|
|
}
|
|
defer consumer.handlerMu.RUnlock()
|
|
return handler(d)
|
|
}
|
|
|
|
err := consumer.startGoroutines(
|
|
handlerWrapper,
|
|
consumer.options,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for err := range consumer.reconnectErrCh {
|
|
consumer.options.Logger.Infof("successful consumer recovery from: %v", err)
|
|
err = consumer.startGoroutines(
|
|
handlerWrapper,
|
|
consumer.options,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("error restarting consumer goroutines after cancel or close: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close cleans up resources and closes the consumer.
|
|
// It waits for handler to finish before returning by default
|
|
// (use WithConsumerOptionsForceShutdown option to disable this behavior).
|
|
// Use CloseWithContext to specify a context to cancel the handler completion.
|
|
// It does not close the connection manager, just the subscription
|
|
// to the connection manager and the consuming goroutines.
|
|
// Only call once.
|
|
func (consumer *Consumer) Close() {
|
|
if consumer.options.CloseGracefully {
|
|
consumer.options.Logger.Infof("waiting for handler to finish...")
|
|
err := consumer.waitForHandlerCompletion(context.Background())
|
|
if err != nil {
|
|
consumer.options.Logger.Warnf("error while waiting for handler to finish: %v", err)
|
|
}
|
|
}
|
|
|
|
consumer.cleanupResources()
|
|
|
|
}
|
|
|
|
func (consumer *Consumer) cleanupResources() {
|
|
consumer.isClosedMu.Lock()
|
|
defer consumer.isClosedMu.Unlock()
|
|
consumer.isClosed = true
|
|
// close the channel so that rabbitmq server knows that the
|
|
// consumer has been stopped.
|
|
err := consumer.chanManager.Close()
|
|
if err != nil {
|
|
consumer.options.Logger.Warnf("error while closing the channel: %v", err)
|
|
}
|
|
|
|
consumer.options.Logger.Infof("closing consumer...")
|
|
go func() {
|
|
consumer.closeConnectionToManagerCh <- struct{}{}
|
|
}()
|
|
}
|
|
|
|
// CloseWithContext cleans up resources and closes the consumer.
|
|
// It waits for handler to finish before returning
|
|
// (use WithConsumerOptionsForceShutdown option to disable this behavior).
|
|
// Use the context to cancel the handler completion.
|
|
// CloseWithContext does not close the connection manager, just the subscription
|
|
// to the connection manager and the consuming goroutines.
|
|
// Only call once.
|
|
func (consumer *Consumer) CloseWithContext(ctx context.Context) {
|
|
if consumer.options.CloseGracefully {
|
|
err := consumer.waitForHandlerCompletion(ctx)
|
|
if err != nil {
|
|
consumer.options.Logger.Warnf("error while waiting for handler to finish: %v", err)
|
|
}
|
|
}
|
|
|
|
consumer.cleanupResources()
|
|
}
|
|
|
|
// startGoroutines declares the queue if it doesn't exist,
|
|
// binds the queue to the routing key(s), and starts the goroutines
|
|
// that will consume from the queue
|
|
func (consumer *Consumer) startGoroutines(
|
|
handler Handler,
|
|
options ConsumerOptions,
|
|
) error {
|
|
consumer.isClosedMu.Lock()
|
|
defer consumer.isClosedMu.Unlock()
|
|
err := consumer.chanManager.QosSafe(
|
|
options.QOSPrefetch,
|
|
0,
|
|
options.QOSGlobal,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("declare qos failed: %w", err)
|
|
}
|
|
for _, exchangeOption := range options.ExchangeOptions {
|
|
err = declareExchange(consumer.chanManager, exchangeOption)
|
|
if err != nil {
|
|
return fmt.Errorf("declare exchange failed: %w", err)
|
|
}
|
|
}
|
|
err = declareQueue(consumer.chanManager, options.QueueOptions)
|
|
if err != nil {
|
|
return fmt.Errorf("declare queue failed: %w", err)
|
|
}
|
|
err = declareBindings(consumer.chanManager, options)
|
|
if err != nil {
|
|
return fmt.Errorf("declare bindings failed: %w", err)
|
|
}
|
|
|
|
msgs, err := consumer.chanManager.ConsumeSafe(
|
|
options.QueueOptions.Name,
|
|
options.RabbitConsumerOptions.Name,
|
|
options.RabbitConsumerOptions.AutoAck,
|
|
options.RabbitConsumerOptions.Exclusive,
|
|
false, // no-local is not supported by RabbitMQ
|
|
options.RabbitConsumerOptions.NoWait,
|
|
tableToAMQPTable(options.RabbitConsumerOptions.Args),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for i := 0; i < options.Concurrency; i++ {
|
|
go handlerGoroutine(consumer, msgs, options, handler)
|
|
}
|
|
consumer.options.Logger.Infof("Processing messages on %v goroutines", options.Concurrency)
|
|
return nil
|
|
}
|
|
|
|
func (consumer *Consumer) getIsClosed() bool {
|
|
consumer.isClosedMu.RLock()
|
|
defer consumer.isClosedMu.RUnlock()
|
|
return consumer.isClosed
|
|
}
|
|
|
|
func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOptions ConsumerOptions, handler Handler) {
|
|
for msg := range msgs {
|
|
if consumer.getIsClosed() {
|
|
break
|
|
}
|
|
|
|
if consumeOptions.RabbitConsumerOptions.AutoAck {
|
|
handler(Delivery{msg})
|
|
continue
|
|
}
|
|
|
|
switch handler(Delivery{msg}) {
|
|
case Ack:
|
|
err := msg.Ack(false)
|
|
if err != nil {
|
|
consumer.options.Logger.Errorf("can't ack message: %v", err)
|
|
}
|
|
case NackDiscard:
|
|
err := msg.Nack(false, false)
|
|
if err != nil {
|
|
consumer.options.Logger.Errorf("can't nack message: %v", err)
|
|
}
|
|
case NackRequeue:
|
|
err := msg.Nack(false, true)
|
|
if err != nil {
|
|
consumer.options.Logger.Errorf("can't nack message: %v", err)
|
|
}
|
|
}
|
|
}
|
|
consumer.options.Logger.Infof("rabbit consumer goroutine closed")
|
|
}
|
|
|
|
func (consumer *Consumer) waitForHandlerCompletion(ctx context.Context) error {
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
c := make(chan struct{})
|
|
go func() {
|
|
consumer.handlerMu.Lock()
|
|
defer consumer.handlerMu.Unlock()
|
|
close(c)
|
|
}()
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-c:
|
|
return nil
|
|
}
|
|
}
|