You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

278 lines
7.7 KiB

package rabbitmq
import (
"context"
"errors"
"fmt"
"sync"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/channelmanager"
)
// Action is an action that occurs after processed this delivery
type Action int
// Handler defines the handler of each Delivery and return Action
type Handler func(d Delivery) (action Action)
const (
// Ack default ack this msg after you have successfully processed this delivery.
Ack Action = iota
// NackDiscard the message will be dropped or delivered to a server configured dead-letter queue.
NackDiscard
// NackRequeue deliver this message to a different consumer.
NackRequeue
// Message acknowledgement is left to the user using the msg.Ack() method
Manual
)
// Consumer allows you to create and connect to queues for data consumption.
type Consumer struct {
chanManager *channelmanager.ChannelManager
reconnectErrCh <-chan error
closeConnectionToManagerCh chan<- struct{}
options ConsumerOptions
handlerMu *sync.RWMutex
isClosedMu *sync.RWMutex
isClosed bool
}
// Delivery captures the fields for a previously delivered message resident in
// a queue to be delivered by the server to a consumer from Channel.Consume or
// Channel.Get.
type Delivery struct {
amqp.Delivery
}
// NewConsumer returns a new Consumer connected to the given rabbitmq server.
func NewConsumer(
conn *Conn,
queue string,
optionFuncs ...func(*ConsumerOptions),
) (*Consumer, error) {
defaultOptions := getDefaultConsumerOptions(queue)
options := &defaultOptions
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
if conn.connectionManager == nil {
return nil, errors.New("connection manager can't be nil")
}
chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval)
if err != nil {
return nil, err
}
reconnectErrCh, closeCh := chanManager.NotifyReconnect()
consumer := &Consumer{
chanManager: chanManager,
reconnectErrCh: reconnectErrCh,
closeConnectionToManagerCh: closeCh,
options: *options,
handlerMu: &sync.RWMutex{},
isClosedMu: &sync.RWMutex{},
isClosed: false,
}
return consumer, nil
}
// Run starts consuming with automatic reconnection handling. Do not reuse the
// consumer for anything other than to close it.
func (consumer *Consumer) Run(handler Handler) error {
handlerWrapper := func(d Delivery) (action Action) {
if !consumer.handlerMu.TryRLock() {
return NackRequeue
}
defer consumer.handlerMu.RUnlock()
return handler(d)
}
err := consumer.startGoroutines(
handlerWrapper,
consumer.options,
)
if err != nil {
return err
}
for err := range consumer.reconnectErrCh {
consumer.options.Logger.Infof("successful consumer recovery from: %v", err)
err = consumer.startGoroutines(
handlerWrapper,
consumer.options,
)
if err != nil {
return fmt.Errorf("error restarting consumer goroutines after cancel or close: %w", err)
}
}
return nil
}
// Close cleans up resources and closes the consumer.
// It waits for handler to finish before returning by default
// (use WithConsumerOptionsForceShutdown option to disable this behavior).
// Use CloseWithContext to specify a context to cancel the handler completion.
// It does not close the connection manager, just the subscription
// to the connection manager and the consuming goroutines.
// Only call once.
func (consumer *Consumer) Close() {
if consumer.options.CloseGracefully {
consumer.options.Logger.Infof("waiting for handler to finish...")
err := consumer.waitForHandlerCompletion(context.Background())
if err != nil {
consumer.options.Logger.Warnf("error while waiting for handler to finish: %v", err)
}
}
consumer.cleanupResources()
}
func (consumer *Consumer) cleanupResources() {
consumer.isClosedMu.Lock()
defer consumer.isClosedMu.Unlock()
consumer.isClosed = true
// close the channel so that rabbitmq server knows that the
// consumer has been stopped.
err := consumer.chanManager.Close()
if err != nil {
consumer.options.Logger.Warnf("error while closing the channel: %v", err)
}
consumer.options.Logger.Infof("closing consumer...")
go func() {
consumer.closeConnectionToManagerCh <- struct{}{}
}()
}
// CloseWithContext cleans up resources and closes the consumer.
// It waits for handler to finish before returning
// (use WithConsumerOptionsForceShutdown option to disable this behavior).
// Use the context to cancel the handler completion.
// CloseWithContext does not close the connection manager, just the subscription
// to the connection manager and the consuming goroutines.
// Only call once.
func (consumer *Consumer) CloseWithContext(ctx context.Context) {
if consumer.options.CloseGracefully {
err := consumer.waitForHandlerCompletion(ctx)
if err != nil {
consumer.options.Logger.Warnf("error while waiting for handler to finish: %v", err)
}
}
consumer.cleanupResources()
}
// startGoroutines declares the queue if it doesn't exist,
// binds the queue to the routing key(s), and starts the goroutines
// that will consume from the queue
func (consumer *Consumer) startGoroutines(
handler Handler,
options ConsumerOptions,
) error {
consumer.isClosedMu.Lock()
defer consumer.isClosedMu.Unlock()
err := consumer.chanManager.QosSafe(
options.QOSPrefetch,
0,
options.QOSGlobal,
)
if err != nil {
return fmt.Errorf("declare qos failed: %w", err)
}
for _, exchangeOption := range options.ExchangeOptions {
err = declareExchange(consumer.chanManager, exchangeOption)
if err != nil {
return fmt.Errorf("declare exchange failed: %w", err)
}
}
err = declareQueue(consumer.chanManager, options.QueueOptions)
if err != nil {
return fmt.Errorf("declare queue failed: %w", err)
}
err = declareBindings(consumer.chanManager, options)
if err != nil {
return fmt.Errorf("declare bindings failed: %w", err)
}
msgs, err := consumer.chanManager.ConsumeSafe(
options.QueueOptions.Name,
options.RabbitConsumerOptions.Name,
options.RabbitConsumerOptions.AutoAck,
options.RabbitConsumerOptions.Exclusive,
false, // no-local is not supported by RabbitMQ
options.RabbitConsumerOptions.NoWait,
tableToAMQPTable(options.RabbitConsumerOptions.Args),
)
if err != nil {
return err
}
for i := 0; i < options.Concurrency; i++ {
go handlerGoroutine(consumer, msgs, options, handler)
}
consumer.options.Logger.Infof("Processing messages on %v goroutines", options.Concurrency)
return nil
}
func (consumer *Consumer) getIsClosed() bool {
consumer.isClosedMu.RLock()
defer consumer.isClosedMu.RUnlock()
return consumer.isClosed
}
func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOptions ConsumerOptions, handler Handler) {
for msg := range msgs {
if consumer.getIsClosed() {
break
}
if consumeOptions.RabbitConsumerOptions.AutoAck {
handler(Delivery{msg})
continue
}
switch handler(Delivery{msg}) {
case Ack:
err := msg.Ack(false)
if err != nil {
consumer.options.Logger.Errorf("can't ack message: %v", err)
}
case NackDiscard:
err := msg.Nack(false, false)
if err != nil {
consumer.options.Logger.Errorf("can't nack message: %v", err)
}
case NackRequeue:
err := msg.Nack(false, true)
if err != nil {
consumer.options.Logger.Errorf("can't nack message: %v", err)
}
}
}
consumer.options.Logger.Infof("rabbit consumer goroutine closed")
}
func (consumer *Consumer) waitForHandlerCompletion(ctx context.Context) error {
if ctx.Err() != nil {
return ctx.Err()
}
c := make(chan struct{})
go func() {
consumer.handlerMu.Lock()
defer consumer.handlerMu.Unlock()
close(c)
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-c:
return nil
}
}