You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

346 lines
12 KiB

package rabbitmq
import (
"time"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/logger"
)
// getDefaultConsumerOptions describes the options that will be used when a value isn't provided
func getDefaultConsumerOptions(queueName string) ConsumerOptions {
return ConsumerOptions{
RabbitConsumerOptions: RabbitConsumerOptions{
Name: "",
AutoAck: false,
Exclusive: false,
NoWait: false,
NoLocal: false,
Args: Table{},
},
QueueOptions: QueueOptions{
Name: queueName,
Durable: false,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Passive: false,
Args: Table{},
Declare: true,
},
ExchangeOptions: []ExchangeOptions{},
Concurrency: 1,
CloseGracefully: true,
Logger: stdDebugLogger{},
QOSPrefetch: 10,
QOSGlobal: false,
}
}
func getDefaultExchangeOptions() ExchangeOptions {
return ExchangeOptions{
Name: "",
Kind: amqp.ExchangeDirect,
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
Passive: false,
Args: Table{},
Declare: false,
Bindings: []Binding{},
}
}
func getDefaultBindingOptions() BindingOptions {
return BindingOptions{
NoWait: false,
Args: Table{},
Declare: true,
}
}
// ConsumerOptions are used to describe how a new consumer will be created.
// If QueueOptions is not nil, the options will be used to declare a queue
// If ExchangeOptions is not nil, it will be used to declare an exchange
// If there are Bindings, the queue will be bound to them
type ConsumerOptions struct {
RabbitConsumerOptions RabbitConsumerOptions
QueueOptions QueueOptions
CloseGracefully bool
ExchangeOptions []ExchangeOptions
Concurrency int
Logger logger.Logger
QOSPrefetch int
QOSGlobal bool
}
// RabbitConsumerOptions are used to configure the consumer
// on the rabbit server
type RabbitConsumerOptions struct {
Name string
AutoAck bool
Exclusive bool
NoWait bool
NoLocal bool
Args Table
}
// QueueOptions are used to configure a queue.
// A passive queue is assumed by RabbitMQ to already exist, and attempting to connect
// to a non-existent queue will cause RabbitMQ to throw an exception.
type QueueOptions struct {
Name string
Durable bool
AutoDelete bool
Exclusive bool
NoWait bool
Passive bool // if false, a missing queue will be created on the server
Args Table
Declare bool
}
// Binding describes the bhinding of a queue to a routing key on an exchange
type Binding struct {
RoutingKey string
BindingOptions
}
// BindingOptions describes the options a binding can have
type BindingOptions struct {
NoWait bool
Args Table
Declare bool
}
// WithConsumerOptionsQueueDurable ensures the queue is a durable queue
func WithConsumerOptionsQueueDurable(options *ConsumerOptions) {
options.QueueOptions.Durable = true
}
// WithConsumerOptionsQueueAutoDelete ensures the queue is an auto-delete queue
func WithConsumerOptionsQueueAutoDelete(options *ConsumerOptions) {
options.QueueOptions.AutoDelete = true
}
// WithConsumerOptionsQueueExclusive ensures the queue is an exclusive queue
func WithConsumerOptionsQueueExclusive(options *ConsumerOptions) {
options.QueueOptions.Exclusive = true
}
// WithConsumerOptionsQueueNoWait ensures the queue is a no-wait queue
func WithConsumerOptionsQueueNoWait(options *ConsumerOptions) {
options.QueueOptions.NoWait = true
}
// WithConsumerOptionsQueuePassive ensures the queue is a passive queue
func WithConsumerOptionsQueuePassive(options *ConsumerOptions) {
options.QueueOptions.Passive = true
}
// WithConsumerOptionsQueueNoDeclare will turn off the declaration of the queue's
// existance upon startup
func WithConsumerOptionsQueueNoDeclare(options *ConsumerOptions) {
options.QueueOptions.Declare = false
}
// WithConsumerOptionsQueueArgs adds optional args to the queue
func WithConsumerOptionsQueueArgs(args Table) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.QueueOptions.Args = args
}
}
func ensureExchangeOptions(options *ConsumerOptions) {
if len(options.ExchangeOptions) == 0 {
options.ExchangeOptions = append(options.ExchangeOptions, getDefaultExchangeOptions())
}
}
// WithConsumerOptionsExchangeName sets the exchange name
func WithConsumerOptionsExchangeName(name string) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
ensureExchangeOptions(options)
options.ExchangeOptions[0].Name = name
}
}
// WithConsumerOptionsExchangeKind ensures the queue is a durable queue
func WithConsumerOptionsExchangeKind(kind string) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
ensureExchangeOptions(options)
options.ExchangeOptions[0].Kind = kind
}
}
// WithConsumerOptionsExchangeDurable ensures the exchange is a durable exchange
func WithConsumerOptionsExchangeDurable(options *ConsumerOptions) {
ensureExchangeOptions(options)
options.ExchangeOptions[0].Durable = true
}
// WithConsumerOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange
func WithConsumerOptionsExchangeAutoDelete(options *ConsumerOptions) {
ensureExchangeOptions(options)
options.ExchangeOptions[0].AutoDelete = true
}
// WithConsumerOptionsExchangeInternal ensures the exchange is an internal exchange
func WithConsumerOptionsExchangeInternal(options *ConsumerOptions) {
ensureExchangeOptions(options)
options.ExchangeOptions[0].Internal = true
}
// WithConsumerOptionsExchangeNoWait ensures the exchange is a no-wait exchange
func WithConsumerOptionsExchangeNoWait(options *ConsumerOptions) {
ensureExchangeOptions(options)
options.ExchangeOptions[0].NoWait = true
}
// WithConsumerOptionsExchangeDeclare stops this library from declaring the exchanges existance
func WithConsumerOptionsExchangeDeclare(options *ConsumerOptions) {
ensureExchangeOptions(options)
options.ExchangeOptions[0].Declare = true
}
// WithConsumerOptionsExchangePassive ensures the exchange is a passive exchange
func WithConsumerOptionsExchangePassive(options *ConsumerOptions) {
ensureExchangeOptions(options)
options.ExchangeOptions[0].Passive = true
}
// WithConsumerOptionsExchangeArgs adds optional args to the exchange
func WithConsumerOptionsExchangeArgs(args Table) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
ensureExchangeOptions(options)
options.ExchangeOptions[0].Args = args
}
}
// WithConsumerOptionsRoutingKey binds the queue to a routing key with the default binding options
func WithConsumerOptionsRoutingKey(routingKey string) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
ensureExchangeOptions(options)
options.ExchangeOptions[0].Bindings = append(options.ExchangeOptions[0].Bindings, Binding{
RoutingKey: routingKey,
BindingOptions: getDefaultBindingOptions(),
})
}
}
// WithConsumerOptionsBinding adds a new binding to the queue which allows you to set the binding options
// on a per-binding basis. Keep in mind that everything in the BindingOptions struct will default to
// the zero value. If you want to declare your bindings for example, be sure to set Declare=true
func WithConsumerOptionsBinding(binding Binding) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
ensureExchangeOptions(options)
options.ExchangeOptions[0].Bindings = append(options.ExchangeOptions[0].Bindings, binding)
}
}
// WithConsumerOptionsExchangeOptions adds a new exchange to the consumer, this should probably only be
// used if you want to to consume from multiple exchanges on the same consumer
func WithConsumerOptionsExchangeOptions(exchangeOptions ExchangeOptions) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.ExchangeOptions = append(options.ExchangeOptions, exchangeOptions)
}
}
// WithConsumerOptionsConcurrency returns a function that sets the concurrency, which means that
// many goroutines will be spawned to run the provided handler on messages
func WithConsumerOptionsConcurrency(concurrency int) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Concurrency = concurrency
}
}
// WithConsumerOptionsConsumerName returns a function that sets the name on the server of this consumer
// if unset a random name will be given
func WithConsumerOptionsConsumerName(consumerName string) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.RabbitConsumerOptions.Name = consumerName
}
}
// WithConsumerOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer
// if unset the default will be used (false)
func WithConsumerOptionsConsumerAutoAck(autoAck bool) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.RabbitConsumerOptions.AutoAck = autoAck
}
}
// WithConsumerOptionsConsumerExclusive sets the consumer to exclusive, which means
// the server will ensure that this is the sole consumer
// from this queue. When exclusive is false, the server will fairly distribute
// deliveries across multiple consumers.
func WithConsumerOptionsConsumerExclusive(options *ConsumerOptions) {
options.RabbitConsumerOptions.Exclusive = true
}
// WithConsumerOptionsConsumerNoWait sets the consumer to nowait, which means
// it does not wait for the server to confirm the request and
// immediately begin deliveries. If it is not possible to consume, a channel
// exception will be raised and the channel will be closed.
func WithConsumerOptionsConsumerNoWait(options *ConsumerOptions) {
options.RabbitConsumerOptions.NoWait = true
}
// WithConsumerOptionsLogging uses a default logger that writes to std out
func WithConsumerOptionsLogging(options *ConsumerOptions) {
options.Logger = &stdDebugLogger{}
}
// WithConsumerOptionsLogger sets logging to a custom interface.
// Use WithConsumerOptionsLogging to just log to stdout.
func WithConsumerOptionsLogger(log logger.Logger) func(options *ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Logger = log
}
}
// WithConsumerOptionsQOSPrefetch returns a function that sets the prefetch count, which means that
// many messages will be fetched from the server in advance to help with throughput.
// This doesn't affect the handler, messages are still processed one at a time.
func WithConsumerOptionsQOSPrefetch(prefetchCount int) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.QOSPrefetch = prefetchCount
}
}
// WithConsumerOptionsQOSGlobal sets the qos on the channel to global, which means
// these QOS settings apply to ALL existing and future
// consumers on all channels on the same connection
func WithConsumerOptionsQOSGlobal(options *ConsumerOptions) {
options.QOSGlobal = true
}
// WithConsumerOptionsForceShutdown tells the consumer to not wait for
// the handler to complete in consumer.Close
func WithConsumerOptionsForceShutdown(options *ConsumerOptions) {
options.CloseGracefully = false
}
// WithConsumerOptionsQueueQuorum sets the queue a quorum type, which means
// multiple nodes in the cluster will have the messages distributed amongst them
// for higher reliability
func WithConsumerOptionsQueueQuorum(options *ConsumerOptions) {
if options.QueueOptions.Args == nil {
options.QueueOptions.Args = Table{}
}
options.QueueOptions.Args["x-queue-type"] = "quorum"
}
// WithConsumerOptionsQueueMessageExpiration sets the message expiration (TTL) for all messages in the queue.
// This option defines how long a message can remain in the queue before it is discarded if not consumed.
// The TTL is specified as a time.Duration and will be converted to milliseconds for RabbitMQ.
// See https://www.rabbitmq.com/docs/ttl#per-queue-message-ttl
func WithConsumerOptionsQueueMessageExpiration(ttl time.Duration) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
if options.QueueOptions.Args == nil {
options.QueueOptions.Args = Table{}
}
options.QueueOptions.Args["x-message-ttl"] = ttl.Milliseconds()
}
}