Browse Source

Merge pull request #31 from ckoehn/feature/add-option-to-skip-exchange-declaration

Add option to skip exchange declaration
pull/36/head
Lane Wagner 4 years ago
committed by GitHub
parent
commit
b05cdbe9c5
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 12 deletions
  1. +15
    -12
      consume.go
  2. +9
    -0
      consume_options.go

+ 15
- 12
consume.go View File

@ -2,8 +2,9 @@ package rabbitmq
import (
"fmt"
"github.com/streadway/amqp"
"time"
"github.com/streadway/amqp"
)
// Consumer allows you to create and connect to queues for data consumption.
@ -188,17 +189,19 @@ func (consumer Consumer) startGoroutines(
if exchange.Name == "" {
return fmt.Errorf("binding to exchange but name not specified")
}
err = consumer.chManager.channel.ExchangeDeclare(
exchange.Name,
exchange.Kind,
exchange.Durable,
exchange.AutoDelete,
exchange.Internal,
exchange.NoWait,
tableToAMQPTable(exchange.ExchangeArgs),
)
if err != nil {
return err
if exchange.Declare {
err = consumer.chManager.channel.ExchangeDeclare(
exchange.Name,
exchange.Kind,
exchange.Durable,
exchange.AutoDelete,
exchange.Internal,
exchange.NoWait,
tableToAMQPTable(exchange.ExchangeArgs),
)
if err != nil {
return err
}
}
for _, routingKey := range routingKeys {
err = consumer.chManager.channel.QueueBind(


+ 9
- 0
consume_options.go View File

@ -55,6 +55,7 @@ func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExch
Internal: false,
NoWait: false,
ExchangeArgs: nil,
Declare: true,
}
}
return options.BindingExchange
@ -70,6 +71,7 @@ type BindingExchangeOptions struct {
Internal bool
NoWait bool
ExchangeArgs Table
Declare bool
}
// WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't
@ -151,6 +153,13 @@ func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) {
}
}
// WithConsumeOptionsBindingExchangeSkipDeclare returns a function that skips the declaration of the
// binding exchange. Use this setting if the exchange already exists and you don't need to declare
// it on consumer start.
func WithConsumeOptionsBindingExchangeSkipDeclare(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Declare = false
}
// WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound
// the channel will not be closed with an error.
func WithConsumeOptionsBindingNoWait(options *ConsumeOptions) {


Loading…
Cancel
Save