|
|
package rabbitmq
|
|
|
|
|
|
import (
|
|
|
"time"
|
|
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
|
"github.com/wagslane/go-rabbitmq/internal/logger"
|
|
|
)
|
|
|
|
|
|
// getDefaultConsumerOptions describes the options that will be used when a value isn't provided
|
|
|
func getDefaultConsumerOptions(queueName string) ConsumerOptions {
|
|
|
return ConsumerOptions{
|
|
|
RabbitConsumerOptions: RabbitConsumerOptions{
|
|
|
Name: "",
|
|
|
AutoAck: false,
|
|
|
Exclusive: false,
|
|
|
NoWait: false,
|
|
|
NoLocal: false,
|
|
|
Args: Table{},
|
|
|
},
|
|
|
QueueOptions: QueueOptions{
|
|
|
Name: queueName,
|
|
|
Durable: false,
|
|
|
AutoDelete: false,
|
|
|
Exclusive: false,
|
|
|
NoWait: false,
|
|
|
Passive: false,
|
|
|
Args: Table{},
|
|
|
Declare: true,
|
|
|
},
|
|
|
ExchangeOptions: []ExchangeOptions{},
|
|
|
Concurrency: 1,
|
|
|
CloseGracefully: true,
|
|
|
Logger: stdDebugLogger{},
|
|
|
QOSPrefetch: 10,
|
|
|
QOSGlobal: false,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func getDefaultExchangeOptions() ExchangeOptions {
|
|
|
return ExchangeOptions{
|
|
|
Name: "",
|
|
|
Kind: amqp.ExchangeDirect,
|
|
|
Durable: false,
|
|
|
AutoDelete: false,
|
|
|
Internal: false,
|
|
|
NoWait: false,
|
|
|
Passive: false,
|
|
|
Args: Table{},
|
|
|
Declare: false,
|
|
|
Bindings: []Binding{},
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func getDefaultBindingOptions() BindingOptions {
|
|
|
return BindingOptions{
|
|
|
NoWait: false,
|
|
|
Args: Table{},
|
|
|
Declare: true,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// ConsumerOptions are used to describe how a new consumer will be created.
|
|
|
// If QueueOptions is not nil, the options will be used to declare a queue
|
|
|
// If ExchangeOptions is not nil, it will be used to declare an exchange
|
|
|
// If there are Bindings, the queue will be bound to them
|
|
|
type ConsumerOptions struct {
|
|
|
RabbitConsumerOptions RabbitConsumerOptions
|
|
|
QueueOptions QueueOptions
|
|
|
CloseGracefully bool
|
|
|
ExchangeOptions []ExchangeOptions
|
|
|
Concurrency int
|
|
|
Logger logger.Logger
|
|
|
QOSPrefetch int
|
|
|
QOSGlobal bool
|
|
|
}
|
|
|
|
|
|
// RabbitConsumerOptions are used to configure the consumer
|
|
|
// on the rabbit server
|
|
|
type RabbitConsumerOptions struct {
|
|
|
Name string
|
|
|
AutoAck bool
|
|
|
Exclusive bool
|
|
|
NoWait bool
|
|
|
NoLocal bool
|
|
|
Args Table
|
|
|
}
|
|
|
|
|
|
// QueueOptions are used to configure a queue.
|
|
|
// A passive queue is assumed by RabbitMQ to already exist, and attempting to connect
|
|
|
// to a non-existent queue will cause RabbitMQ to throw an exception.
|
|
|
type QueueOptions struct {
|
|
|
Name string
|
|
|
Durable bool
|
|
|
AutoDelete bool
|
|
|
Exclusive bool
|
|
|
NoWait bool
|
|
|
Passive bool // if false, a missing queue will be created on the server
|
|
|
Args Table
|
|
|
Declare bool
|
|
|
}
|
|
|
|
|
|
// Binding describes the bhinding of a queue to a routing key on an exchange
|
|
|
type Binding struct {
|
|
|
RoutingKey string
|
|
|
BindingOptions
|
|
|
}
|
|
|
|
|
|
// BindingOptions describes the options a binding can have
|
|
|
type BindingOptions struct {
|
|
|
NoWait bool
|
|
|
Args Table
|
|
|
Declare bool
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsQueueDurable ensures the queue is a durable queue
|
|
|
func WithConsumerOptionsQueueDurable(options *ConsumerOptions) {
|
|
|
options.QueueOptions.Durable = true
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsQueueAutoDelete ensures the queue is an auto-delete queue
|
|
|
func WithConsumerOptionsQueueAutoDelete(options *ConsumerOptions) {
|
|
|
options.QueueOptions.AutoDelete = true
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsQueueExclusive ensures the queue is an exclusive queue
|
|
|
func WithConsumerOptionsQueueExclusive(options *ConsumerOptions) {
|
|
|
options.QueueOptions.Exclusive = true
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsQueueNoWait ensures the queue is a no-wait queue
|
|
|
func WithConsumerOptionsQueueNoWait(options *ConsumerOptions) {
|
|
|
options.QueueOptions.NoWait = true
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsQueuePassive ensures the queue is a passive queue
|
|
|
func WithConsumerOptionsQueuePassive(options *ConsumerOptions) {
|
|
|
options.QueueOptions.Passive = true
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsQueueNoDeclare will turn off the declaration of the queue's
|
|
|
// existance upon startup
|
|
|
func WithConsumerOptionsQueueNoDeclare(options *ConsumerOptions) {
|
|
|
options.QueueOptions.Declare = false
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsQueueArgs adds optional args to the queue
|
|
|
func WithConsumerOptionsQueueArgs(args Table) func(*ConsumerOptions) {
|
|
|
return func(options *ConsumerOptions) {
|
|
|
options.QueueOptions.Args = args
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func ensureExchangeOptions(options *ConsumerOptions) {
|
|
|
if len(options.ExchangeOptions) == 0 {
|
|
|
options.ExchangeOptions = append(options.ExchangeOptions, getDefaultExchangeOptions())
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsExchangeName sets the exchange name
|
|
|
func WithConsumerOptionsExchangeName(name string) func(*ConsumerOptions) {
|
|
|
return func(options *ConsumerOptions) {
|
|
|
ensureExchangeOptions(options)
|
|
|
options.ExchangeOptions[0].Name = name
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsExchangeKind ensures the queue is a durable queue
|
|
|
func WithConsumerOptionsExchangeKind(kind string) func(*ConsumerOptions) {
|
|
|
return func(options *ConsumerOptions) {
|
|
|
ensureExchangeOptions(options)
|
|
|
options.ExchangeOptions[0].Kind = kind
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsExchangeDurable ensures the exchange is a durable exchange
|
|
|
func WithConsumerOptionsExchangeDurable(options *ConsumerOptions) {
|
|
|
ensureExchangeOptions(options)
|
|
|
options.ExchangeOptions[0].Durable = true
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange
|
|
|
func WithConsumerOptionsExchangeAutoDelete(options *ConsumerOptions) {
|
|
|
ensureExchangeOptions(options)
|
|
|
options.ExchangeOptions[0].AutoDelete = true
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsExchangeInternal ensures the exchange is an internal exchange
|
|
|
func WithConsumerOptionsExchangeInternal(options *ConsumerOptions) {
|
|
|
ensureExchangeOptions(options)
|
|
|
options.ExchangeOptions[0].Internal = true
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsExchangeNoWait ensures the exchange is a no-wait exchange
|
|
|
func WithConsumerOptionsExchangeNoWait(options *ConsumerOptions) {
|
|
|
ensureExchangeOptions(options)
|
|
|
options.ExchangeOptions[0].NoWait = true
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsExchangeDeclare stops this library from declaring the exchanges existance
|
|
|
func WithConsumerOptionsExchangeDeclare(options *ConsumerOptions) {
|
|
|
ensureExchangeOptions(options)
|
|
|
options.ExchangeOptions[0].Declare = true
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsExchangePassive ensures the exchange is a passive exchange
|
|
|
func WithConsumerOptionsExchangePassive(options *ConsumerOptions) {
|
|
|
ensureExchangeOptions(options)
|
|
|
options.ExchangeOptions[0].Passive = true
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsExchangeArgs adds optional args to the exchange
|
|
|
func WithConsumerOptionsExchangeArgs(args Table) func(*ConsumerOptions) {
|
|
|
return func(options *ConsumerOptions) {
|
|
|
ensureExchangeOptions(options)
|
|
|
options.ExchangeOptions[0].Args = args
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsRoutingKey binds the queue to a routing key with the default binding options
|
|
|
func WithConsumerOptionsRoutingKey(routingKey string) func(*ConsumerOptions) {
|
|
|
return func(options *ConsumerOptions) {
|
|
|
ensureExchangeOptions(options)
|
|
|
options.ExchangeOptions[0].Bindings = append(options.ExchangeOptions[0].Bindings, Binding{
|
|
|
RoutingKey: routingKey,
|
|
|
BindingOptions: getDefaultBindingOptions(),
|
|
|
})
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsBinding adds a new binding to the queue which allows you to set the binding options
|
|
|
// on a per-binding basis. Keep in mind that everything in the BindingOptions struct will default to
|
|
|
// the zero value. If you want to declare your bindings for example, be sure to set Declare=true
|
|
|
func WithConsumerOptionsBinding(binding Binding) func(*ConsumerOptions) {
|
|
|
return func(options *ConsumerOptions) {
|
|
|
ensureExchangeOptions(options)
|
|
|
options.ExchangeOptions[0].Bindings = append(options.ExchangeOptions[0].Bindings, binding)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsExchangeOptions adds a new exchange to the consumer, this should probably only be
|
|
|
// used if you want to to consume from multiple exchanges on the same consumer
|
|
|
func WithConsumerOptionsExchangeOptions(exchangeOptions ExchangeOptions) func(*ConsumerOptions) {
|
|
|
return func(options *ConsumerOptions) {
|
|
|
options.ExchangeOptions = append(options.ExchangeOptions, exchangeOptions)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsConcurrency returns a function that sets the concurrency, which means that
|
|
|
// many goroutines will be spawned to run the provided handler on messages
|
|
|
func WithConsumerOptionsConcurrency(concurrency int) func(*ConsumerOptions) {
|
|
|
return func(options *ConsumerOptions) {
|
|
|
options.Concurrency = concurrency
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsConsumerName returns a function that sets the name on the server of this consumer
|
|
|
// if unset a random name will be given
|
|
|
func WithConsumerOptionsConsumerName(consumerName string) func(*ConsumerOptions) {
|
|
|
return func(options *ConsumerOptions) {
|
|
|
options.RabbitConsumerOptions.Name = consumerName
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer
|
|
|
// if unset the default will be used (false)
|
|
|
func WithConsumerOptionsConsumerAutoAck(autoAck bool) func(*ConsumerOptions) {
|
|
|
return func(options *ConsumerOptions) {
|
|
|
options.RabbitConsumerOptions.AutoAck = autoAck
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsConsumerExclusive sets the consumer to exclusive, which means
|
|
|
// the server will ensure that this is the sole consumer
|
|
|
// from this queue. When exclusive is false, the server will fairly distribute
|
|
|
// deliveries across multiple consumers.
|
|
|
func WithConsumerOptionsConsumerExclusive(options *ConsumerOptions) {
|
|
|
options.RabbitConsumerOptions.Exclusive = true
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsConsumerNoWait sets the consumer to nowait, which means
|
|
|
// it does not wait for the server to confirm the request and
|
|
|
// immediately begin deliveries. If it is not possible to consume, a channel
|
|
|
// exception will be raised and the channel will be closed.
|
|
|
func WithConsumerOptionsConsumerNoWait(options *ConsumerOptions) {
|
|
|
options.RabbitConsumerOptions.NoWait = true
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsLogging uses a default logger that writes to std out
|
|
|
func WithConsumerOptionsLogging(options *ConsumerOptions) {
|
|
|
options.Logger = &stdDebugLogger{}
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsLogger sets logging to a custom interface.
|
|
|
// Use WithConsumerOptionsLogging to just log to stdout.
|
|
|
func WithConsumerOptionsLogger(log logger.Logger) func(options *ConsumerOptions) {
|
|
|
return func(options *ConsumerOptions) {
|
|
|
options.Logger = log
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsQOSPrefetch returns a function that sets the prefetch count, which means that
|
|
|
// many messages will be fetched from the server in advance to help with throughput.
|
|
|
// This doesn't affect the handler, messages are still processed one at a time.
|
|
|
func WithConsumerOptionsQOSPrefetch(prefetchCount int) func(*ConsumerOptions) {
|
|
|
return func(options *ConsumerOptions) {
|
|
|
options.QOSPrefetch = prefetchCount
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsQOSGlobal sets the qos on the channel to global, which means
|
|
|
// these QOS settings apply to ALL existing and future
|
|
|
// consumers on all channels on the same connection
|
|
|
func WithConsumerOptionsQOSGlobal(options *ConsumerOptions) {
|
|
|
options.QOSGlobal = true
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsForceShutdown tells the consumer to not wait for
|
|
|
// the handler to complete in consumer.Close
|
|
|
func WithConsumerOptionsForceShutdown(options *ConsumerOptions) {
|
|
|
options.CloseGracefully = false
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsQueueQuorum sets the queue a quorum type, which means
|
|
|
// multiple nodes in the cluster will have the messages distributed amongst them
|
|
|
// for higher reliability
|
|
|
func WithConsumerOptionsQueueQuorum(options *ConsumerOptions) {
|
|
|
if options.QueueOptions.Args == nil {
|
|
|
options.QueueOptions.Args = Table{}
|
|
|
}
|
|
|
|
|
|
options.QueueOptions.Args["x-queue-type"] = "quorum"
|
|
|
}
|
|
|
|
|
|
// WithConsumerOptionsQueueMessageExpiration sets the message expiration (TTL) for all messages in the queue.
|
|
|
// This option defines how long a message can remain in the queue before it is discarded if not consumed.
|
|
|
// The TTL is specified as a time.Duration and will be converted to milliseconds for RabbitMQ.
|
|
|
// See https://www.rabbitmq.com/docs/ttl#per-queue-message-ttl
|
|
|
func WithConsumerOptionsQueueMessageExpiration(ttl time.Duration) func(*ConsumerOptions) {
|
|
|
return func(options *ConsumerOptions) {
|
|
|
if options.QueueOptions.Args == nil {
|
|
|
options.QueueOptions.Args = Table{}
|
|
|
}
|
|
|
options.QueueOptions.Args["x-message-ttl"] = ttl.Milliseconds()
|
|
|
}
|
|
|
}
|