Browse Source

feat: add consume options to disable declaring of a queue

Signed-off-by: Tom Brouws <tom@brouws.nl>
pull/49/head
Tom Brouws 4 years ago
parent
commit
6f793db5ec
2 changed files with 24 additions and 13 deletions
  1. +15
    -13
      consume.go
  2. +9
    -0
      consume_options.go

+ 15
- 13
consume.go View File

@ -187,16 +187,18 @@ func (consumer Consumer) startGoroutines(
consumer.chManager.channelMux.RLock()
defer consumer.chManager.channelMux.RUnlock()
_, err := consumer.chManager.channel.QueueDeclare(
queue,
consumeOptions.QueueDurable,
consumeOptions.QueueAutoDelete,
consumeOptions.QueueExclusive,
consumeOptions.QueueNoWait,
tableToAMQPTable(consumeOptions.QueueArgs),
)
if err != nil {
return err
if consumeOptions.QueueDeclare {
_, err := consumer.chManager.channel.QueueDeclare(
queue,
consumeOptions.QueueDurable,
consumeOptions.QueueAutoDelete,
consumeOptions.QueueExclusive,
consumeOptions.QueueNoWait,
tableToAMQPTable(consumeOptions.QueueArgs),
)
if err != nil {
return err
}
}
if consumeOptions.BindingExchange != nil {
@ -205,7 +207,7 @@ func (consumer Consumer) startGoroutines(
return fmt.Errorf("binding to exchange but name not specified")
}
if exchange.Declare {
err = consumer.chManager.channel.ExchangeDeclare(
err := consumer.chManager.channel.ExchangeDeclare(
exchange.Name,
exchange.Kind,
exchange.Durable,
@ -219,7 +221,7 @@ func (consumer Consumer) startGoroutines(
}
}
for _, routingKey := range routingKeys {
err = consumer.chManager.channel.QueueBind(
err := consumer.chManager.channel.QueueBind(
queue,
routingKey,
exchange.Name,
@ -232,7 +234,7 @@ func (consumer Consumer) startGoroutines(
}
}
err = consumer.chManager.channel.Qos(
err := consumer.chManager.channel.Qos(
consumeOptions.QOSPrefetch,
0,
consumeOptions.QOSGlobal,


+ 9
- 0
consume_options.go View File

@ -7,6 +7,7 @@ func getDefaultConsumeOptions() ConsumeOptions {
QueueAutoDelete: false,
QueueExclusive: false,
QueueNoWait: false,
QueueDeclare: true,
QueueArgs: nil,
BindingExchange: nil,
BindingNoWait: false,
@ -29,6 +30,7 @@ type ConsumeOptions struct {
QueueAutoDelete bool
QueueExclusive bool
QueueNoWait bool
QueueDeclare bool
QueueArgs Table
BindingExchange *BindingExchangeOptions
BindingNoWait bool
@ -103,6 +105,13 @@ func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) {
options.QueueNoWait = true
}
// WithConsumeOptionsQueueNoDeclare sets the queue to no declare, which means
// the queue will be assumed to be declared on the server, and won't be
// declared at all.
func WithConsumeOptionsQueueNoDeclare(options *ConsumeOptions) {
options.QueueDeclare = false
}
// WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes
// in the cluster will have the messages distributed amongst them for higher reliability
func WithConsumeOptionsQuorum(options *ConsumeOptions) {


Loading…
Cancel
Save