You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

201 lines
5.5 KiB

package rabbitmq
import (
"errors"
"fmt"
"sync"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/channelmanager"
)
// Action is an action that occurs after processed this delivery
type Action int
// Handler defines the handler of each Delivery and return Action
type Handler func(d Delivery) (action Action)
const (
// Ack default ack this msg after you have successfully processed this delivery.
Ack Action = iota
// NackDiscard the message will be dropped or delivered to a server configured dead-letter queue.
NackDiscard
// NackRequeue deliver this message to a different consumer.
NackRequeue
)
// Consumer allows you to create and connect to queues for data consumption.
type Consumer struct {
chanManager *channelmanager.ChannelManager
reconnectErrCh <-chan error
closeConnectionToManagerCh chan<- struct{}
options ConsumerOptions
isClosedMux *sync.RWMutex
isClosed bool
}
// Delivery captures the fields for a previously delivered message resident in
// a queue to be delivered by the server to a consumer from Channel.Consume or
// Channel.Get.
type Delivery struct {
amqp.Delivery
}
// NewConsumer returns a new Consumer connected to the given rabbitmq server
// it also starts consuming on the given connection with automatic reconnection handling
// Do do reuse the returned consumer for anything other than to close it
func NewConsumer(
conn *Conn,
handler Handler,
queue string,
optionFuncs ...func(*ConsumerOptions),
) (*Consumer, error) {
defaultOptions := getDefaultConsumerOptions(queue)
options := &defaultOptions
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
if conn.connectionManager == nil {
return nil, errors.New("connection manager can't be nil")
}
chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval)
if err != nil {
return nil, err
}
reconnectErrCh, closeCh := chanManager.NotifyReconnect()
consumer := &Consumer{
chanManager: chanManager,
reconnectErrCh: reconnectErrCh,
closeConnectionToManagerCh: closeCh,
options: *options,
isClosedMux: &sync.RWMutex{},
isClosed: false,
}
err = consumer.startGoroutines(
handler,
*options,
)
if err != nil {
return nil, err
}
go func() {
for err := range consumer.reconnectErrCh {
consumer.options.Logger.Infof("successful recovery from: %v", err)
err = consumer.startGoroutines(
handler,
*options,
)
if err != nil {
consumer.options.Logger.Errorf("error restarting consumer goroutines after cancel or close: %v", err)
}
}
}()
return consumer, nil
}
// Close cleans up resources and closes the consumer.
// It does not close the connection manager, just the subscription
// to the connection manager and the consuming goroutines.
// Only call once.
func (consumer *Consumer) Close() {
consumer.isClosedMux.Lock()
defer consumer.isClosedMux.Unlock()
consumer.isClosed = true
consumer.options.Logger.Infof("closing consumer...")
go func() {
consumer.closeConnectionToManagerCh <- struct{}{}
}()
}
// startGoroutines declares the queue if it doesn't exist,
// binds the queue to the routing key(s), and starts the goroutines
// that will consume from the queue
func (consumer *Consumer) startGoroutines(
handler Handler,
options ConsumerOptions,
) error {
err := consumer.chanManager.QosSafe(
options.QOSPrefetch,
0,
options.QOSGlobal,
)
if err != nil {
return fmt.Errorf("declare qos failed: %w", err)
}
err = declareExchange(consumer.chanManager, options.ExchangeOptions)
if err != nil {
return fmt.Errorf("declare exchange failed: %w", err)
}
err = declareQueue(consumer.chanManager, options.QueueOptions)
if err != nil {
return fmt.Errorf("declare queue failed: %w", err)
}
err = declareBindings(consumer.chanManager, options)
if err != nil {
return fmt.Errorf("declare bindings failed: %w", err)
}
msgs, err := consumer.chanManager.ConsumeSafe(
options.QueueOptions.Name,
options.RabbitConsumerOptions.Name,
options.RabbitConsumerOptions.AutoAck,
options.RabbitConsumerOptions.Exclusive,
false, // no-local is not supported by RabbitMQ
options.RabbitConsumerOptions.NoWait,
tableToAMQPTable(options.RabbitConsumerOptions.Args),
)
if err != nil {
return err
}
for i := 0; i < options.Concurrency; i++ {
go handlerGoroutine(consumer, msgs, options, handler)
}
consumer.options.Logger.Infof("Processing messages on %v goroutines", options.Concurrency)
return nil
}
func (consumer *Consumer) getIsClosed() bool {
consumer.isClosedMux.RLock()
defer consumer.isClosedMux.RUnlock()
return consumer.isClosed
}
func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOptions ConsumerOptions, handler Handler) {
for msg := range msgs {
if consumer.getIsClosed() {
break
}
if consumeOptions.RabbitConsumerOptions.AutoAck {
handler(Delivery{msg})
continue
}
switch handler(Delivery{msg}) {
case Ack:
err := msg.Ack(false)
if err != nil {
consumer.options.Logger.Errorf("can't ack message: %v", err)
}
case NackDiscard:
err := msg.Nack(false, false)
if err != nil {
consumer.options.Logger.Errorf("can't nack message: %v", err)
}
case NackRequeue:
err := msg.Nack(false, true)
if err != nil {
consumer.options.Logger.Errorf("can't nack message: %v", err)
}
}
}
consumer.options.Logger.Infof("rabbit consumer goroutine closed")
}