diff --git a/consume.go b/consume.go index b882fc7..467d5f6 100644 --- a/consume.go +++ b/consume.go @@ -187,16 +187,18 @@ func (consumer Consumer) startGoroutines( consumer.chManager.channelMux.RLock() defer consumer.chManager.channelMux.RUnlock() - _, err := consumer.chManager.channel.QueueDeclare( - queue, - consumeOptions.QueueDurable, - consumeOptions.QueueAutoDelete, - consumeOptions.QueueExclusive, - consumeOptions.QueueNoWait, - tableToAMQPTable(consumeOptions.QueueArgs), - ) - if err != nil { - return err + if consumeOptions.QueueDeclare { + _, err := consumer.chManager.channel.QueueDeclare( + queue, + consumeOptions.QueueDurable, + consumeOptions.QueueAutoDelete, + consumeOptions.QueueExclusive, + consumeOptions.QueueNoWait, + tableToAMQPTable(consumeOptions.QueueArgs), + ) + if err != nil { + return err + } } if consumeOptions.BindingExchange != nil { @@ -205,7 +207,7 @@ func (consumer Consumer) startGoroutines( return fmt.Errorf("binding to exchange but name not specified") } if exchange.Declare { - err = consumer.chManager.channel.ExchangeDeclare( + err := consumer.chManager.channel.ExchangeDeclare( exchange.Name, exchange.Kind, exchange.Durable, @@ -219,7 +221,7 @@ func (consumer Consumer) startGoroutines( } } for _, routingKey := range routingKeys { - err = consumer.chManager.channel.QueueBind( + err := consumer.chManager.channel.QueueBind( queue, routingKey, exchange.Name, @@ -232,7 +234,7 @@ func (consumer Consumer) startGoroutines( } } - err = consumer.chManager.channel.Qos( + err := consumer.chManager.channel.Qos( consumeOptions.QOSPrefetch, 0, consumeOptions.QOSGlobal, diff --git a/consume_options.go b/consume_options.go index fd226f6..0a66857 100644 --- a/consume_options.go +++ b/consume_options.go @@ -7,6 +7,7 @@ func getDefaultConsumeOptions() ConsumeOptions { QueueAutoDelete: false, QueueExclusive: false, QueueNoWait: false, + QueueDeclare: true, QueueArgs: nil, BindingExchange: nil, BindingNoWait: false, @@ -29,6 +30,7 @@ type ConsumeOptions struct { QueueAutoDelete bool QueueExclusive bool QueueNoWait bool + QueueDeclare bool QueueArgs Table BindingExchange *BindingExchangeOptions BindingNoWait bool @@ -103,6 +105,13 @@ func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) { options.QueueNoWait = true } +// WithConsumeOptionsQueueNoDeclare sets the queue to no declare, which means +// the queue will be assumed to be declared on the server, and won't be +// declared at all. +func WithConsumeOptionsQueueNoDeclare(options *ConsumeOptions) { + options.QueueDeclare = false +} + // WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes // in the cluster will have the messages distributed amongst them for higher reliability func WithConsumeOptionsQuorum(options *ConsumeOptions) {