Browse Source

Declare exchanges, queues, and bindings before running the consumer using the PreDeclare option.

pull/208/head
Amangeldy 2 years ago
parent
commit
47df0803de
Failed to extract signature
2 changed files with 49 additions and 19 deletions
  1. +41
    -15
      consume.go
  2. +8
    -4
      consumer_options.go

+ 41
- 15
consume.go View File

@ -60,7 +60,11 @@ func NewConsumer(
return nil, errors.New("connection manager can't be nil")
}
chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval)
chanManager, err := channelmanager.NewChannelManager(
conn.connectionManager,
options.Logger,
conn.connectionManager.ReconnectInterval,
)
if err != nil {
return nil, err
}
@ -75,6 +79,12 @@ func NewConsumer(
isClosed: false,
}
if options.PreDeclare {
if err := consumer.declareResources(*options); err != nil {
return nil, err
}
}
return consumer, nil
}
@ -124,6 +134,27 @@ func (consumer *Consumer) Close() {
}()
}
// declareResources it's a helper function to declare exchanges, queues and bindings
// based on the options provided by the user.
// It returns an error if any of the declarations fail
func (consumer *Consumer) declareResources(options ConsumerOptions) error {
for _, exchangeOption := range options.ExchangeOptions {
if err := declareExchange(consumer.chanManager, exchangeOption); err != nil {
return fmt.Errorf("declare exchange failed: %w", err)
}
}
if err := declareQueue(consumer.chanManager, options.QueueOptions); err != nil {
return fmt.Errorf("declare queue failed: %w", err)
}
if err := declareBindings(consumer.chanManager, options); err != nil {
return fmt.Errorf("declare bindings failed: %w", err)
}
return nil
}
// startGoroutines declares the queue if it doesn't exist,
// binds the queue to the routing key(s), and starts the goroutines
// that will consume from the queue
@ -139,19 +170,9 @@ func (consumer *Consumer) startGoroutines(
if err != nil {
return fmt.Errorf("declare qos failed: %w", err)
}
for _, exchangeOption := range options.ExchangeOptions {
err = declareExchange(consumer.chanManager, exchangeOption)
if err != nil {
return fmt.Errorf("declare exchange failed: %w", err)
}
}
err = declareQueue(consumer.chanManager, options.QueueOptions)
if err != nil {
return fmt.Errorf("declare queue failed: %w", err)
}
err = declareBindings(consumer.chanManager, options)
if err != nil {
return fmt.Errorf("declare bindings failed: %w", err)
if err := consumer.declareResources(options); err != nil {
return err
}
msgs, err := consumer.chanManager.ConsumeSafe(
@ -180,7 +201,12 @@ func (consumer *Consumer) getIsClosed() bool {
return consumer.isClosed
}
func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOptions ConsumerOptions, handler Handler) {
func handlerGoroutine(
consumer *Consumer,
msgs <-chan amqp.Delivery,
consumeOptions ConsumerOptions,
handler Handler,
) {
for msg := range msgs {
if consumer.getIsClosed() {
break


+ 8
- 4
consumer_options.go View File

@ -31,6 +31,7 @@ func getDefaultConsumerOptions(queueName string) ConsumerOptions {
Logger: stdDebugLogger{},
QOSPrefetch: 10,
QOSGlobal: false,
PreDeclare: false,
}
}
@ -69,6 +70,7 @@ type ConsumerOptions struct {
Logger logger.Logger
QOSPrefetch int
QOSGlobal bool
PreDeclare bool
}
// RabbitConsumerOptions are used to configure the consumer
@ -217,10 +219,12 @@ func WithConsumerOptionsExchangeArgs(args Table) func(*ConsumerOptions) {
func WithConsumerOptionsRoutingKey(routingKey string) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
ensureExchangeOptions(options)
options.ExchangeOptions[0].Bindings = append(options.ExchangeOptions[0].Bindings, Binding{
RoutingKey: routingKey,
BindingOptions: getDefaultBindingOptions(),
})
options.ExchangeOptions[0].Bindings = append(
options.ExchangeOptions[0].Bindings, Binding{
RoutingKey: routingKey,
BindingOptions: getDefaultBindingOptions(),
},
)
}
}


Loading…
Cancel
Save