From aeee45d7e97eb6926f6a90bcf1a25304b3aaeb0a Mon Sep 17 00:00:00 2001 From: Henrik Hofmeister Date: Sun, 11 Feb 2024 16:20:07 +0100 Subject: [PATCH] Add support for any kind of declarations when creating publisher / consumer Note: Breaking Changes - Removes ConsumerOptions.QueueOptions is removed - Removes ConsumerOptions.ExchangeOptions is removed - Adds ConsumerOptions.QueueName as the queue to listen to - Adds slices Queues, Exchanges and Bindings to ConsumerOptions - Adds slices Queues, Exchanges and Bindings to PublisherOptions - Expands Binding to allow for exchange bindings --- consume.go | 20 ++- consumer_options.go | 178 ++++++++++++++++++-------- declare.go | 87 +++++++++++-- internal/channelmanager/safe_wraps.go | 16 +++ publish.go | 19 ++- publisher_options.go | 78 +++++++---- 6 files changed, 299 insertions(+), 99 deletions(-) diff --git a/consume.go b/consume.go index d1f802f..efd52d1 100644 --- a/consume.go +++ b/consume.go @@ -140,21 +140,19 @@ func (consumer *Consumer) startGoroutines( if err != nil { return fmt.Errorf("declare qos failed: %w", err) } - err = declareExchange(consumer.chanManager, options.ExchangeOptions) - if err != nil { - return fmt.Errorf("declare exchange failed: %w", err) - } - err = declareQueue(consumer.chanManager, options.QueueOptions) - if err != nil { - return fmt.Errorf("declare queue failed: %w", err) - } - err = declareBindings(consumer.chanManager, options) + + err = declareAll(consumer.chanManager, declareOptions{ + Queues: options.Queues, + Exchanges: options.Exchanges, + Bindings: options.Bindings, + }) + if err != nil { - return fmt.Errorf("declare bindings failed: %w", err) + return err } msgs, err := consumer.chanManager.ConsumeSafe( - options.QueueOptions.Name, + options.QueueName, options.RabbitConsumerOptions.Name, options.RabbitConsumerOptions.AutoAck, options.RabbitConsumerOptions.Exclusive, diff --git a/consumer_options.go b/consumer_options.go index 80f2979..b923448 100644 --- a/consumer_options.go +++ b/consumer_options.go @@ -16,26 +16,30 @@ func getDefaultConsumerOptions(queueName string) ConsumerOptions { NoLocal: false, Args: Table{}, }, - QueueOptions: QueueOptions{ - Name: queueName, - Durable: false, - AutoDelete: false, - Exclusive: false, - NoWait: false, - Passive: false, - Args: Table{}, - Declare: true, + Queues: []QueueOptions{ + { + Name: queueName, + Durable: false, + AutoDelete: false, + Exclusive: false, + NoWait: false, + Passive: false, + Args: Table{}, + Declare: true, + }, }, - ExchangeOptions: ExchangeOptions{ - Name: "", - Kind: amqp.ExchangeDirect, - Durable: false, - AutoDelete: false, - Internal: false, - NoWait: false, - Passive: false, - Args: Table{}, - Declare: false, + Exchanges: []ExchangeOptions{ + { + Name: "", + Kind: amqp.ExchangeDirect, + Durable: false, + AutoDelete: false, + Internal: false, + NoWait: false, + Passive: false, + Args: Table{}, + Declare: false, + }, }, Bindings: []Binding{}, Concurrency: 1, @@ -59,8 +63,9 @@ func getDefaultBindingOptions() BindingOptions { // If there are Bindings, the queue will be bound to them type ConsumerOptions struct { RabbitConsumerOptions RabbitConsumerOptions - QueueOptions QueueOptions - ExchangeOptions ExchangeOptions + QueueName string + Queues []QueueOptions + Exchanges []ExchangeOptions Bindings []Binding Concurrency int Logger logger.Logger @@ -93,105 +98,126 @@ type QueueOptions struct { Declare bool } -// Binding describes the bhinding of a queue to a routing key on an exchange -type Binding struct { - RoutingKey string - BindingOptions -} - -// BindingOptions describes the options a binding can have -type BindingOptions struct { - NoWait bool - Args Table - Declare bool -} - // WithConsumerOptionsQueueDurable ensures the queue is a durable queue func WithConsumerOptionsQueueDurable(options *ConsumerOptions) { - options.QueueOptions.Durable = true + WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) { + queueOptions.Durable = true + }) } // WithConsumerOptionsQueueAutoDelete ensures the queue is an auto-delete queue func WithConsumerOptionsQueueAutoDelete(options *ConsumerOptions) { - options.QueueOptions.AutoDelete = true + WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) { + queueOptions.AutoDelete = true + }) } // WithConsumerOptionsQueueExclusive ensures the queue is an exclusive queue func WithConsumerOptionsQueueExclusive(options *ConsumerOptions) { - options.QueueOptions.Exclusive = true + WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) { + queueOptions.Exclusive = true + }) } // WithConsumerOptionsQueueNoWait ensures the queue is a no-wait queue func WithConsumerOptionsQueueNoWait(options *ConsumerOptions) { - options.QueueOptions.NoWait = true + WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) { + queueOptions.NoWait = true + }) } // WithConsumerOptionsQueuePassive ensures the queue is a passive queue func WithConsumerOptionsQueuePassive(options *ConsumerOptions) { - options.QueueOptions.Passive = true + WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) { + queueOptions.Passive = true + }) } // WithConsumerOptionsQueueNoDeclare will turn off the declaration of the queue's // existance upon startup func WithConsumerOptionsQueueNoDeclare(options *ConsumerOptions) { - options.QueueOptions.Declare = false + WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) { + queueOptions.Declare = false + }) } // WithConsumerOptionsQueueArgs adds optional args to the queue func WithConsumerOptionsQueueArgs(args Table) func(*ConsumerOptions) { return func(options *ConsumerOptions) { - options.QueueOptions.Args = args + WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) { + queueOptions.Args = args + }) } } // WithConsumerOptionsExchangeName sets the exchange name func WithConsumerOptionsExchangeName(name string) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { - options.ExchangeOptions.Name = name + WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) { + exchangeOptions.Name = name + }) } } // WithConsumerOptionsExchangeKind ensures the queue is a durable queue func WithConsumerOptionsExchangeKind(kind string) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { - options.ExchangeOptions.Kind = kind + WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) { + exchangeOptions.Kind = kind + }) } } // WithConsumerOptionsExchangeDurable ensures the exchange is a durable exchange func WithConsumerOptionsExchangeDurable(options *ConsumerOptions) { - options.ExchangeOptions.Durable = true + WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) { + exchangeOptions.Durable = true + }) } // WithConsumerOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange func WithConsumerOptionsExchangeAutoDelete(options *ConsumerOptions) { - options.ExchangeOptions.AutoDelete = true + WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) { + exchangeOptions.AutoDelete = true + }) } // WithConsumerOptionsExchangeInternal ensures the exchange is an internal exchange func WithConsumerOptionsExchangeInternal(options *ConsumerOptions) { - options.ExchangeOptions.Internal = true + WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) { + exchangeOptions.Internal = true + }) } // WithConsumerOptionsExchangeNoWait ensures the exchange is a no-wait exchange func WithConsumerOptionsExchangeNoWait(options *ConsumerOptions) { - options.ExchangeOptions.NoWait = true + WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) { + exchangeOptions.NoWait = true + }) } // WithConsumerOptionsExchangeDeclare stops this library from declaring the exchanges existance func WithConsumerOptionsExchangeDeclare(options *ConsumerOptions) { - options.ExchangeOptions.Declare = true + WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) { + exchangeOptions.Declare = true + }) } // WithConsumerOptionsExchangePassive ensures the exchange is a passive exchange func WithConsumerOptionsExchangePassive(options *ConsumerOptions) { - options.ExchangeOptions.Passive = true + WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) { + exchangeOptions.Passive = true + }) } // WithConsumerOptionsExchangeArgs adds optional args to the exchange func WithConsumerOptionsExchangeArgs(args Table) func(*ConsumerOptions) { return func(options *ConsumerOptions) { - options.ExchangeOptions.Args = args + WithSimpleExchangeOptions(options, func(exchangeOptions *ExchangeOptions) { + exchangeOptions.Args = args + }) } } @@ -287,9 +313,55 @@ func WithConsumerOptionsQOSGlobal(options *ConsumerOptions) { // multiple nodes in the cluster will have the messages distributed amongst them // for higher reliability func WithConsumerOptionsQueueQuorum(options *ConsumerOptions) { - if options.QueueOptions.Args == nil { - options.QueueOptions.Args = Table{} + WithSimpleQueueOptions(options, func(queueOptions *QueueOptions) { + if queueOptions.Args == nil { + queueOptions.Args = Table{} + } + + queueOptions.Args["x-queue-type"] = "quorum" + }) +} + +// WithSimpleQueueOptions used for backwards compatibility +// Will set options on the first queue and ensure that queue exists +func WithSimpleQueueOptions(options *ConsumerOptions, handler func(queueOptions *QueueOptions)) { + if len(options.Queues) == 0 { + options.Queues = append(options.Queues, QueueOptions{}) + } + + handler(&options.Queues[0]) +} + +// WithSimpleExchangeOptions used for backwards compatibility +// Will set options on the first exchange and ensure that exchange exists +func WithSimpleExchangeOptions(options *ConsumerOptions, handler func(exchangeOptions *ExchangeOptions)) { + if len(options.Exchanges) == 0 { + options.Exchanges = append(options.Exchanges, ExchangeOptions{}) } - options.QueueOptions.Args["x-queue-type"] = "quorum" + handler(&options.Exchanges[0]) +} + +func WithConsumerQueue(queue QueueOptions) func(options *ConsumerOptions) { + return func(options *ConsumerOptions) { + options.Queues = []QueueOptions{queue} + } +} + +func WithConsumerQueues(queues []QueueOptions) func(options *ConsumerOptions) { + return func(options *ConsumerOptions) { + options.Queues = queues + } +} + +func WithConsumerBindings(bindings []Binding) func(options *ConsumerOptions) { + return func(options *ConsumerOptions) { + options.Bindings = bindings + } +} + +func WithConsumerExchanges(exchanges []ExchangeOptions) func(options *ConsumerOptions) { + return func(options *ConsumerOptions) { + options.Exchanges = exchanges + } } diff --git a/declare.go b/declare.go index 86abe85..627b5ba 100644 --- a/declare.go +++ b/declare.go @@ -1,9 +1,33 @@ package rabbitmq import ( + "fmt" "github.com/wagslane/go-rabbitmq/internal/channelmanager" ) +type BindingDestinationType string + +const ( + BindingTypeQueue BindingDestinationType = "queue" + BindingTypeExchange BindingDestinationType = "exchange" +) + +// Binding describes the bhinding of a queue to a routing key on an exchange +type Binding struct { + BindingOptions + DestinationType BindingDestinationType + DestinationName string + RoutingKey string + ExchangeName string +} + +// BindingOptions describes the options a binding can have +type BindingOptions struct { + NoWait bool + Args Table + Declare bool +} + func declareQueue(chanManager *channelmanager.ChannelManager, options QueueOptions) error { if !options.Declare { return nil @@ -70,21 +94,64 @@ func declareExchange(chanManager *channelmanager.ChannelManager, options Exchang return nil } -func declareBindings(chanManager *channelmanager.ChannelManager, options ConsumerOptions) error { - for _, binding := range options.Bindings { +func declareBindings(chanManager *channelmanager.ChannelManager, bindings []Binding) error { + for _, binding := range bindings { if !binding.Declare { continue } - err := chanManager.QueueBindSafe( - options.QueueOptions.Name, - binding.RoutingKey, - options.ExchangeOptions.Name, - binding.NoWait, - tableToAMQPTable(binding.Args), - ) + if binding.DestinationType == BindingTypeQueue { + err := chanManager.QueueBindSafe( + binding.DestinationName, + binding.RoutingKey, + binding.ExchangeName, + binding.NoWait, + tableToAMQPTable(binding.Args), + ) + if err != nil { + return err + } + } + + if binding.DestinationType == BindingTypeExchange { + err := chanManager.ExchangeBindSafe( + binding.DestinationName, + binding.RoutingKey, + binding.ExchangeName, + binding.NoWait, + tableToAMQPTable(binding.Args), + ) + if err != nil { + return err + } + } + + } + return nil +} + +type declareOptions struct { + Queues []QueueOptions + Exchanges []ExchangeOptions + Bindings []Binding +} + +func declareAll(chanManager *channelmanager.ChannelManager, options declareOptions) error { + for _, exchangeOptions := range options.Exchanges { + err := declareExchange(chanManager, exchangeOptions) if err != nil { - return err + return fmt.Errorf("declare exchange failed: %w", err) } } + for _, queueOptions := range options.Queues { + err := declareQueue(chanManager, queueOptions) + if err != nil { + return fmt.Errorf("declare queue failed: %w", err) + } + } + + err := declareBindings(chanManager, options.Bindings) + if err != nil { + return fmt.Errorf("declare bindings failed: %w", err) + } return nil } diff --git a/internal/channelmanager/safe_wraps.go b/internal/channelmanager/safe_wraps.go index b75a5f5..16c6aa9 100644 --- a/internal/channelmanager/safe_wraps.go +++ b/internal/channelmanager/safe_wraps.go @@ -121,6 +121,22 @@ func (chanManager *ChannelManager) QueueBindSafe( ) } +// ExchangeBindSafe safely wraps the (*amqp.Channel).ExchangeBind method +func (chanManager *ChannelManager) ExchangeBindSafe( + name string, key string, exchange string, noWait bool, args amqp.Table, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.ExchangeBind( + name, + key, + exchange, + noWait, + args, + ) +} + // QosSafe safely wraps the (*amqp.Channel).Qos method func (chanManager *ChannelManager) QosSafe( prefetchCount int, prefetchSize int, global bool, diff --git a/publish.go b/publish.go index a58b48d..f2ecc3b 100644 --- a/publish.go +++ b/publish.go @@ -128,7 +128,16 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe } func (publisher *Publisher) startup() error { - err := declareExchange(publisher.chanManager, publisher.options.ExchangeOptions) + err := declareAll(publisher.chanManager, declareOptions{ + Queues: publisher.options.Queues, + Exchanges: publisher.options.Exchanges, + Bindings: publisher.options.Bindings, + }) + + if err != nil { + return err + } + if err != nil { return fmt.Errorf("declare exchange failed: %w", err) } @@ -167,7 +176,9 @@ func (publisher *Publisher) PublishWithContext( return fmt.Errorf("publishing blocked due to TCP block on the server") } - options := &PublishOptions{} + options := &PublishOptions{ + Exchange: publisher.options.ExchangeName, + } for _, optionFunc := range optionFuncs { optionFunc(options) } @@ -231,7 +242,9 @@ func (publisher *Publisher) PublishWithDeferredConfirmWithContext( return nil, fmt.Errorf("publishing blocked due to TCP block on the server") } - options := &PublishOptions{} + options := &PublishOptions{ + Exchange: publisher.options.ExchangeName, + } for _, optionFunc := range optionFuncs { optionFunc(options) } diff --git a/publisher_options.go b/publisher_options.go index eb283e4..aa2081d 100644 --- a/publisher_options.go +++ b/publisher_options.go @@ -5,24 +5,31 @@ import amqp "github.com/rabbitmq/amqp091-go" // PublisherOptions are used to describe a publisher's configuration. // Logger is a custom logging interface. type PublisherOptions struct { - ExchangeOptions ExchangeOptions - Logger Logger - ConfirmMode bool + ExchangeName string + Logger Logger + ConfirmMode bool + + // Declare these queues, exchanges, and bindings before publishing + Queues []QueueOptions + Exchanges []ExchangeOptions + Bindings []Binding } // getDefaultPublisherOptions describes the options that will be used when a value isn't provided func getDefaultPublisherOptions() PublisherOptions { return PublisherOptions{ - ExchangeOptions: ExchangeOptions{ - Name: "", - Kind: amqp.ExchangeDirect, - Durable: false, - AutoDelete: false, - Internal: false, - NoWait: false, - Passive: false, - Args: Table{}, - Declare: false, + Exchanges: []ExchangeOptions{ + { + Name: "", + Kind: amqp.ExchangeDirect, + Durable: false, + AutoDelete: false, + Internal: false, + NoWait: false, + Passive: false, + Args: Table{}, + Declare: false, + }, }, Logger: stdDebugLogger{}, ConfirmMode: false, @@ -46,51 +53,54 @@ func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) { // WithPublisherOptionsExchangeName sets the exchange name func WithPublisherOptionsExchangeName(name string) func(*PublisherOptions) { return func(options *PublisherOptions) { - options.ExchangeOptions.Name = name + if options.Exchanges[0].Name == "" { + options.Exchanges[0].Name = name + } + options.ExchangeName = name } } // WithPublisherOptionsExchangeKind ensures the queue is a durable queue func WithPublisherOptionsExchangeKind(kind string) func(*PublisherOptions) { return func(options *PublisherOptions) { - options.ExchangeOptions.Kind = kind + options.Exchanges[0].Kind = kind } } // WithPublisherOptionsExchangeDurable ensures the exchange is a durable exchange func WithPublisherOptionsExchangeDurable(options *PublisherOptions) { - options.ExchangeOptions.Durable = true + options.Exchanges[0].Durable = true } // WithPublisherOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange func WithPublisherOptionsExchangeAutoDelete(options *PublisherOptions) { - options.ExchangeOptions.AutoDelete = true + options.Exchanges[0].AutoDelete = true } // WithPublisherOptionsExchangeInternal ensures the exchange is an internal exchange func WithPublisherOptionsExchangeInternal(options *PublisherOptions) { - options.ExchangeOptions.Internal = true + options.Exchanges[0].Internal = true } // WithPublisherOptionsExchangeNoWait ensures the exchange is a no-wait exchange func WithPublisherOptionsExchangeNoWait(options *PublisherOptions) { - options.ExchangeOptions.NoWait = true + options.Exchanges[0].NoWait = true } // WithPublisherOptionsExchangeDeclare stops this library from declaring the exchanges existance func WithPublisherOptionsExchangeDeclare(options *PublisherOptions) { - options.ExchangeOptions.Declare = true + options.Exchanges[0].Declare = true } // WithPublisherOptionsExchangePassive ensures the exchange is a passive exchange func WithPublisherOptionsExchangePassive(options *PublisherOptions) { - options.ExchangeOptions.Passive = true + options.Exchanges[0].Passive = true } // WithPublisherOptionsExchangeArgs adds optional args to the exchange func WithPublisherOptionsExchangeArgs(args Table) func(*PublisherOptions) { return func(options *PublisherOptions) { - options.ExchangeOptions.Args = args + options.Exchanges[0].Args = args } } @@ -99,3 +109,27 @@ func WithPublisherOptionsExchangeArgs(args Table) func(*PublisherOptions) { func WithPublisherOptionsConfirm(options *PublisherOptions) { options.ConfirmMode = true } + +func WithPublisherQueues(queues []QueueOptions) func(options *PublisherOptions) { + return func(options *PublisherOptions) { + options.Queues = queues + } +} + +func WithPublisherBindings(bindings []Binding) func(options *PublisherOptions) { + return func(options *PublisherOptions) { + options.Bindings = bindings + } +} + +func WithPublisherExchange(exchange ExchangeOptions) func(options *PublisherOptions) { + return func(options *PublisherOptions) { + options.Exchanges = []ExchangeOptions{exchange} + } +} + +func WithPublisherExchanges(exchanges []ExchangeOptions) func(options *PublisherOptions) { + return func(options *PublisherOptions) { + options.Exchanges = exchanges + } +}