From 5ed50624436948aeb7790b90936fa0f08fbd5434 Mon Sep 17 00:00:00 2001 From: NicklasWallgren Date: Fri, 27 May 2022 11:31:57 +0200 Subject: [PATCH] Option to create exchange in publisher --- .gitignore | 1 + consume.go | 27 +++++-------- consume_options.go | 50 +++++------------------ examples/logger/main.go | 2 +- examples/publisher/main.go | 2 +- exchange.go | 71 ++++++++++++++++++++++++++++++++ publish.go | 83 +++++++++++++++++++++++++++++++------- publish_options.go | 8 ---- 8 files changed, 161 insertions(+), 83 deletions(-) create mode 100644 exchange.go diff --git a/.gitignore b/.gitignore index 9f11b75..32901d9 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ .idea/ +TODO.md \ No newline at end of file diff --git a/consume.go b/consume.go index ede1e96..0348ccd 100644 --- a/consume.go +++ b/consume.go @@ -160,30 +160,21 @@ func (consumer Consumer) startGoroutines( } } - if consumeOptions.BindingExchange != nil { - exchange := consumeOptions.BindingExchange - if exchange.Name == "" { - return fmt.Errorf("binding to exchange but name not specified") + if consumeOptions.ExchangeOptions != nil { + exchangeOptions := consumeOptions.ExchangeOptions + if exchangeOptions.Name == "" { + return fmt.Errorf("binding to exchangeOptions but name not specified") } - if exchange.Declare { - err := consumer.chManager.channel.ExchangeDeclare( - exchange.Name, - exchange.Kind, - exchange.Durable, - exchange.AutoDelete, - exchange.Internal, - exchange.NoWait, - tableToAMQPTable(exchange.ExchangeArgs), - ) - if err != nil { - return err - } + + if err := declareOrVerifyExchange(consumeOptions.ExchangeOptions, consumer.chManager.channel); err != nil { + return err } + for _, routingKey := range routingKeys { err := consumer.chManager.channel.QueueBind( queue, routingKey, - exchange.Name, + exchangeOptions.Name, consumeOptions.BindingNoWait, tableToAMQPTable(consumeOptions.BindingArgs), ) diff --git a/consume_options.go b/consume_options.go index 0a66857..4605c26 100644 --- a/consume_options.go +++ b/consume_options.go @@ -9,9 +9,9 @@ func getDefaultConsumeOptions() ConsumeOptions { QueueNoWait: false, QueueDeclare: true, QueueArgs: nil, - BindingExchange: nil, BindingNoWait: false, BindingArgs: nil, + ExchangeOptions: nil, Concurrency: 1, QOSPrefetch: 0, QOSGlobal: false, @@ -32,9 +32,9 @@ type ConsumeOptions struct { QueueNoWait bool QueueDeclare bool QueueArgs Table - BindingExchange *BindingExchangeOptions BindingNoWait bool BindingArgs Table + ExchangeOptions *ExchangeOptions Concurrency int QOSPrefetch int QOSGlobal bool @@ -46,36 +46,6 @@ type ConsumeOptions struct { ConsumerArgs Table } -// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values. -func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExchangeOptions { - if options.BindingExchange == nil { - options.BindingExchange = &BindingExchangeOptions{ - Name: "", - Kind: "direct", - Durable: false, - AutoDelete: false, - Internal: false, - NoWait: false, - ExchangeArgs: nil, - Declare: true, - } - } - return options.BindingExchange -} - -// BindingExchangeOptions are used when binding to an exchange. -// it will verify the exchange is created before binding to it. -type BindingExchangeOptions struct { - Name string - Kind string - Durable bool - AutoDelete bool - Internal bool - NoWait bool - ExchangeArgs Table - Declare bool -} - // WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't // be destroyed when the server restarts. It must only be bound to durable exchanges func WithConsumeOptionsQueueDurable(options *ConsumeOptions) { @@ -124,41 +94,41 @@ func WithConsumeOptionsQuorum(options *ConsumeOptions) { // WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions) { return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Name = name + getConsumerExchangeOptionsOrSetDefault(options).Name = name } } // WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) { return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Kind = kind + getConsumerExchangeOptionsOrSetDefault(options).Kind = kind } } // WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Durable = true + getConsumerExchangeOptionsOrSetDefault(options).Durable = true } // WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).AutoDelete = true + getConsumerExchangeOptionsOrSetDefault(options).AutoDelete = true } // WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Internal = true + getConsumerExchangeOptionsOrSetDefault(options).Internal = true } // WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).NoWait = true + getConsumerExchangeOptionsOrSetDefault(options).NoWait = true } // WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) { return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).ExchangeArgs = args + getConsumerExchangeOptionsOrSetDefault(options).ExchangeArgs = args } } @@ -166,7 +136,7 @@ func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) { // binding exchange. Use this setting if the exchange already exists and you don't need to declare // it on consumer start. func WithConsumeOptionsBindingExchangeSkipDeclare(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Declare = false + getConsumerExchangeOptionsOrSetDefault(options).Declare = false } // WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound diff --git a/examples/logger/main.go b/examples/logger/main.go index ab23808..a7d28a0 100644 --- a/examples/logger/main.go +++ b/examples/logger/main.go @@ -35,6 +35,7 @@ func main() { publisher, err := rabbitmq.NewPublisher( "amqp://guest:guest@localhost", rabbitmq.Config{}, rabbitmq.WithPublisherOptionsLogger(mylogger), + rabbitmq.WithPublisherOptionsExchangeName("events"), ) if err != nil { log.Fatal(err) @@ -45,7 +46,6 @@ func main() { rabbitmq.WithPublishOptionsContentType("application/json"), rabbitmq.WithPublishOptionsMandatory, rabbitmq.WithPublishOptionsPersistentDelivery, - rabbitmq.WithPublishOptionsExchange("events"), ) if err != nil { log.Fatal(err) diff --git a/examples/publisher/main.go b/examples/publisher/main.go index 5ec6732..7a30844 100644 --- a/examples/publisher/main.go +++ b/examples/publisher/main.go @@ -15,6 +15,7 @@ func main() { publisher, err := rabbitmq.NewPublisher( "amqp://guest:guest@localhost", rabbitmq.Config{}, rabbitmq.WithPublisherOptionsLogging, + rabbitmq.WithPublisherOptionsExchangeName("events"), ) if err != nil { log.Fatal(err) @@ -65,7 +66,6 @@ func main() { rabbitmq.WithPublishOptionsContentType("application/json"), rabbitmq.WithPublishOptionsMandatory, rabbitmq.WithPublishOptionsPersistentDelivery, - rabbitmq.WithPublishOptionsExchange("events"), ) if err != nil { log.Println(err) diff --git a/exchange.go b/exchange.go new file mode 100644 index 0000000..d58258a --- /dev/null +++ b/exchange.go @@ -0,0 +1,71 @@ +package rabbitmq + +import amqp "github.com/rabbitmq/amqp091-go" + +// ExchangeOptions are used when configuring or binding to an exchange. +// it will verify the exchange is created before binding to it. +type ExchangeOptions struct { + Name string + Kind string + Durable bool + AutoDelete bool + Internal bool + NoWait bool + ExchangeArgs Table + Declare bool +} + +// getConsumerExchangeOptionsOrSetDefault returns pointer to current Exchange options. if no Exchange options are set yet, it will set it with default values. +func getConsumerExchangeOptionsOrSetDefault(options *ConsumeOptions) *ExchangeOptions { + if options.ExchangeOptions == nil { + options.ExchangeOptions = getDefaultExchangeOptions() + } + return options.ExchangeOptions +} + +// getPublisherExchangeOptionsOrSetDefault returns pointer to current Exchange options. if no Exchange options are set yet, it will set it with default values. +func getPublisherExchangeOptionsOrSetDefault(options *PublisherOptions) *ExchangeOptions { + if options.ExchangeOptions == nil { + options.ExchangeOptions = getDefaultExchangeOptions() + } + return options.ExchangeOptions +} + +// getDefaultExchangeOptions returns pointer to the default Exchange options. +func getDefaultExchangeOptions() *ExchangeOptions { + return &ExchangeOptions{ + Name: "", + Kind: "direct", + Durable: false, + AutoDelete: false, + Internal: false, + NoWait: false, + ExchangeArgs: nil, + Declare: true, + } +} + +// getDefaultExchangeOptions declares or verifies the existence of an exchange. +func declareOrVerifyExchange(exchangeOptions *ExchangeOptions, channel *amqp.Channel) error { + if exchangeOptions.Declare { + return channel.ExchangeDeclare( + exchangeOptions.Name, + exchangeOptions.Kind, + exchangeOptions.Durable, + exchangeOptions.AutoDelete, + exchangeOptions.Internal, + exchangeOptions.NoWait, + tableToAMQPTable(exchangeOptions.ExchangeArgs), + ) + } + + return channel.ExchangeDeclarePassive( + exchangeOptions.Name, + exchangeOptions.Kind, + exchangeOptions.Durable, + exchangeOptions.AutoDelete, + exchangeOptions.Internal, + exchangeOptions.NoWait, + tableToAMQPTable(exchangeOptions.ExchangeArgs), + ) +} diff --git a/publish.go b/publish.go index eea644c..cd3d544 100644 --- a/publish.go +++ b/publish.go @@ -58,6 +58,54 @@ type Publisher struct { type PublisherOptions struct { Logger Logger ReconnectInterval time.Duration + ExchangeOptions *ExchangeOptions +} + +// WithPublisherOptionsExchangeName returns a function that sets the exchange to publish to +func WithPublisherOptionsExchangeName(name string) func(*PublisherOptions) { + return func(options *PublisherOptions) { + getPublisherExchangeOptionsOrSetDefault(options).Name = name + } +} + +// WithPublisherOptionsExchangeKind returns a function that sets the binding exchange kind/type +func WithPublisherOptionsExchangeKind(kind string) func(*PublisherOptions) { + return func(options *PublisherOptions) { + getPublisherExchangeOptionsOrSetDefault(options).Kind = kind + } +} + +// WithPublisherOptionsExchangeDurable returns a function that sets the binding exchange durable flag +func WithPublisherOptionsExchangeDurable(options *PublisherOptions) { + getPublisherExchangeOptionsOrSetDefault(options).Durable = true +} + +// WithPublisherOptionsExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag +func WithPublisherOptionsExchangeAutoDelete(options *PublisherOptions) { + getPublisherExchangeOptionsOrSetDefault(options).AutoDelete = true +} + +// WithPublisherOptionsExchangeInternal returns a function that sets the binding exchange internal flag +func WithPublisherOptionsExchangeInternal(options *PublisherOptions) { + getPublisherExchangeOptionsOrSetDefault(options).Internal = true +} + +// WithPublisherOptionsExchangeNoWait returns a function that sets the binding exchange noWait flag +func WithPublisherOptionsExchangeNoWait(options *PublisherOptions) { + getPublisherExchangeOptionsOrSetDefault(options).NoWait = true +} + +// WithPublisherOptionsExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange +func WithPublisherOptionsExchangeArgs(args Table) func(*PublisherOptions) { + return func(options *PublisherOptions) { + getPublisherExchangeOptionsOrSetDefault(options).ExchangeArgs = args + } +} + +// WithPublisherOptionsExchangeDeclare returns a function that declares the binding exchange. +// Use this setting if you want the consumer to create the exchange on start. +func WithPublisherOptionsExchangeDeclare(options *PublisherOptions) { + getPublisherExchangeOptionsOrSetDefault(options).Declare = true } // WithPublisherOptionsReconnectInterval sets the interval at which the publisher will @@ -91,6 +139,7 @@ func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptio options := &PublisherOptions{ Logger: &stdDebugLogger{}, ReconnectInterval: time.Second * 5, + ExchangeOptions: getDefaultExchangeOptions(), } for _, optionFunc := range optionFuncs { optionFunc(options) @@ -112,6 +161,10 @@ func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptio notifyPublishChan: nil, } + if err = declareOrVerifyExchange(publisher.options.ExchangeOptions, chManager.channel); err != nil { + return nil, err + } + go publisher.startNotifyFlowHandler() go publisher.startNotifyBlockedHandler() @@ -120,20 +173,6 @@ func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptio return publisher, nil } -func (publisher *Publisher) handleRestarts() { - for err := range publisher.chManager.notifyCancelOrClose { - publisher.options.Logger.Infof("successful publisher recovery from: %v", err) - go publisher.startNotifyFlowHandler() - go publisher.startNotifyBlockedHandler() - if publisher.notifyReturnChan != nil { - go publisher.startNotifyReturnHandler() - } - if publisher.notifyPublishChan != nil { - publisher.startNotifyPublishHandler() - } - } -} - // NotifyReturn registers a listener for basic.return methods. // These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags. func (publisher *Publisher) NotifyReturn() <-chan Return { @@ -194,7 +233,7 @@ func (publisher *Publisher) Publish( // Actual publish. err := publisher.chManager.channel.Publish( - options.Exchange, + publisher.options.ExchangeOptions.Name, routingKey, options.Mandatory, options.Immediate, @@ -214,6 +253,20 @@ func (publisher Publisher) Close() error { return publisher.chManager.close() } +func (publisher *Publisher) handleRestarts() { + for err := range publisher.chManager.notifyCancelOrClose { + publisher.options.Logger.Infof("successful publisher recovery from: %v", err) + go publisher.startNotifyFlowHandler() + go publisher.startNotifyBlockedHandler() + if publisher.notifyReturnChan != nil { + go publisher.startNotifyReturnHandler() + } + if publisher.notifyPublishChan != nil { + publisher.startNotifyPublishHandler() + } + } +} + func (publisher *Publisher) startNotifyReturnHandler() { returnAMQPCh := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1)) for ret := range returnAMQPCh { diff --git a/publish_options.go b/publish_options.go index 210ba42..9cedf29 100644 --- a/publish_options.go +++ b/publish_options.go @@ -6,7 +6,6 @@ import ( // PublishOptions are used to control how data is published type PublishOptions struct { - Exchange string // Mandatory fails to publish if there are no queues // bound to the routing key Mandatory bool @@ -43,13 +42,6 @@ type PublishOptions struct { Headers Table } -// WithPublishOptionsExchange returns a function that sets the exchange to publish to -func WithPublishOptionsExchange(exchange string) func(*PublishOptions) { - return func(options *PublishOptions) { - options.Exchange = exchange - } -} - // WithPublishOptionsMandatory makes the publishing mandatory, which means when a queue is not // bound to the routing key a message will be sent back on the returns channel for you to handle func WithPublishOptionsMandatory(options *PublishOptions) {