From 47df0803de0bf0b8e1f8ee6a3d161a69ed577189 Mon Sep 17 00:00:00 2001 From: Amangeldy Date: Wed, 15 May 2024 14:55:59 +0500 Subject: [PATCH] Declare exchanges, queues, and bindings before running the consumer using the PreDeclare option. --- consume.go | 56 +++++++++++++++++++++++++++++++++------------ consumer_options.go | 12 ++++++---- 2 files changed, 49 insertions(+), 19 deletions(-) diff --git a/consume.go b/consume.go index 4517c5d..37473d4 100644 --- a/consume.go +++ b/consume.go @@ -60,7 +60,11 @@ func NewConsumer( return nil, errors.New("connection manager can't be nil") } - chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval) + chanManager, err := channelmanager.NewChannelManager( + conn.connectionManager, + options.Logger, + conn.connectionManager.ReconnectInterval, + ) if err != nil { return nil, err } @@ -75,6 +79,12 @@ func NewConsumer( isClosed: false, } + if options.PreDeclare { + if err := consumer.declareResources(*options); err != nil { + return nil, err + } + } + return consumer, nil } @@ -124,6 +134,27 @@ func (consumer *Consumer) Close() { }() } +// declareResources it's a helper function to declare exchanges, queues and bindings +// based on the options provided by the user. +// It returns an error if any of the declarations fail +func (consumer *Consumer) declareResources(options ConsumerOptions) error { + for _, exchangeOption := range options.ExchangeOptions { + if err := declareExchange(consumer.chanManager, exchangeOption); err != nil { + return fmt.Errorf("declare exchange failed: %w", err) + } + } + + if err := declareQueue(consumer.chanManager, options.QueueOptions); err != nil { + return fmt.Errorf("declare queue failed: %w", err) + } + + if err := declareBindings(consumer.chanManager, options); err != nil { + return fmt.Errorf("declare bindings failed: %w", err) + } + + return nil +} + // startGoroutines declares the queue if it doesn't exist, // binds the queue to the routing key(s), and starts the goroutines // that will consume from the queue @@ -139,19 +170,9 @@ func (consumer *Consumer) startGoroutines( if err != nil { return fmt.Errorf("declare qos failed: %w", err) } - for _, exchangeOption := range options.ExchangeOptions { - err = declareExchange(consumer.chanManager, exchangeOption) - if err != nil { - return fmt.Errorf("declare exchange failed: %w", err) - } - } - err = declareQueue(consumer.chanManager, options.QueueOptions) - if err != nil { - return fmt.Errorf("declare queue failed: %w", err) - } - err = declareBindings(consumer.chanManager, options) - if err != nil { - return fmt.Errorf("declare bindings failed: %w", err) + + if err := consumer.declareResources(options); err != nil { + return err } msgs, err := consumer.chanManager.ConsumeSafe( @@ -180,7 +201,12 @@ func (consumer *Consumer) getIsClosed() bool { return consumer.isClosed } -func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOptions ConsumerOptions, handler Handler) { +func handlerGoroutine( + consumer *Consumer, + msgs <-chan amqp.Delivery, + consumeOptions ConsumerOptions, + handler Handler, +) { for msg := range msgs { if consumer.getIsClosed() { break diff --git a/consumer_options.go b/consumer_options.go index 7de85cb..64a66af 100644 --- a/consumer_options.go +++ b/consumer_options.go @@ -31,6 +31,7 @@ func getDefaultConsumerOptions(queueName string) ConsumerOptions { Logger: stdDebugLogger{}, QOSPrefetch: 10, QOSGlobal: false, + PreDeclare: false, } } @@ -69,6 +70,7 @@ type ConsumerOptions struct { Logger logger.Logger QOSPrefetch int QOSGlobal bool + PreDeclare bool } // RabbitConsumerOptions are used to configure the consumer @@ -217,10 +219,12 @@ func WithConsumerOptionsExchangeArgs(args Table) func(*ConsumerOptions) { func WithConsumerOptionsRoutingKey(routingKey string) func(*ConsumerOptions) { return func(options *ConsumerOptions) { ensureExchangeOptions(options) - options.ExchangeOptions[0].Bindings = append(options.ExchangeOptions[0].Bindings, Binding{ - RoutingKey: routingKey, - BindingOptions: getDefaultBindingOptions(), - }) + options.ExchangeOptions[0].Bindings = append( + options.ExchangeOptions[0].Bindings, Binding{ + RoutingKey: routingKey, + BindingOptions: getDefaultBindingOptions(), + }, + ) } }