From 0d0b74c7e902f3f8f760032756b2b8ca52952aa3 Mon Sep 17 00:00:00 2001 From: Victor Castillo <> Date: Tue, 8 Jun 2021 22:23:28 -0500 Subject: [PATCH 1/2] Add function WithConsumeOptionsConsumerAutoAck to change the auto acknowledge property, when creating a consumer --- consume_options.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/consume_options.go b/consume_options.go index f6168f8..1c61dbc 100644 --- a/consume_options.go +++ b/consume_options.go @@ -189,6 +189,15 @@ func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) { } } + +// WithConsumeOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer +// if unset the default will be used (false) +func WithConsumeOptionsConsumerAutoAck(autoAck bool) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + options.ConsumerAutoAck = autoAck + } +} + // WithConsumeOptionsConsumerExclusive sets the consumer to exclusive, which means // the server will ensure that this is the sole consumer // from this queue. When exclusive is false, the server will fairly distribute From e946a187e5be8799c0e6adcd6be09a79a80c72c0 Mon Sep 17 00:00:00 2001 From: Victor Castillo <> Date: Tue, 8 Jun 2021 23:11:14 -0500 Subject: [PATCH 2/2] Added the ability to stop the consumers frmo receiving messages, but not from acknolwedging them --- consume.go | 28 +++++++++++++++++++++++----- consume_options.go | 1 - 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/consume.go b/consume.go index 393fa24..15f243d 100644 --- a/consume.go +++ b/consume.go @@ -2,9 +2,8 @@ package rabbitmq import ( "fmt" - "time" - "github.com/streadway/amqp" + "time" ) // Consumer allows you to create and connect to queues for data consumption. @@ -107,13 +106,32 @@ func (consumer Consumer) StartConsuming( return nil } -// StopConsuming stops the consumption of messages. -// The consumer should be discarded as it's not safe for re-use -func (consumer Consumer) StopConsuming() { +// Disconnect disconnects both the channel and the connection. +// This method doesn't throw a reconnect, and should be used when finishing a program. +// IMPORTANT: If this method is executed before StopConsuming, it could cause unexpected behavior +// such as messages being processed, but not being acknowledged, thus being requeued by the broker +func (consumer Consumer) Disconnect() { consumer.chManager.channel.Close() consumer.chManager.connection.Close() } +// StopConsuming stops the consumption of messages. +// The consumer should be discarded as it's not safe for re-use. +// This method sends a basic.cancel notification. +// The consumerName is the name or delivery tag of the amqp consumer we want to cancel. +// When noWait is true, do not wait for the server to acknowledge the cancel. +// Only use this when you are certain there are no deliveries in flight that +// require an acknowledgment, otherwise they will arrive and be dropped in the +// client without an ack, and will not be redelivered to other consumers. +// IMPORTANT: Since the streadway library doesn't provide a way to retrieve the consumer's tag after the creation +// it's imperative for you to set the name when creating the consumer, if you want to use this function later +// a simple uuid4 should do the trick, since it should be unique. +// If you start many consumers, you should store the name of the consumers when creating them, such that you can +// use them in a for to stop all the consumers. +func (consumer Consumer) StopConsuming(consumerName string, noWait bool) { + consumer.chManager.channel.Cancel(consumerName, noWait) +} + // startGoroutinesWithRetries attempts to start consuming on a channel // with an exponential backoff func (consumer Consumer) startGoroutinesWithRetries( diff --git a/consume_options.go b/consume_options.go index 1c61dbc..e8eb1bb 100644 --- a/consume_options.go +++ b/consume_options.go @@ -189,7 +189,6 @@ func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) { } } - // WithConsumeOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer // if unset the default will be used (false) func WithConsumeOptionsConsumerAutoAck(autoAck bool) func(*ConsumeOptions) {