diff --git a/consume.go b/consume.go index 393fa24..15f243d 100644 --- a/consume.go +++ b/consume.go @@ -2,9 +2,8 @@ package rabbitmq import ( "fmt" - "time" - "github.com/streadway/amqp" + "time" ) // Consumer allows you to create and connect to queues for data consumption. @@ -107,13 +106,32 @@ func (consumer Consumer) StartConsuming( return nil } -// StopConsuming stops the consumption of messages. -// The consumer should be discarded as it's not safe for re-use -func (consumer Consumer) StopConsuming() { +// Disconnect disconnects both the channel and the connection. +// This method doesn't throw a reconnect, and should be used when finishing a program. +// IMPORTANT: If this method is executed before StopConsuming, it could cause unexpected behavior +// such as messages being processed, but not being acknowledged, thus being requeued by the broker +func (consumer Consumer) Disconnect() { consumer.chManager.channel.Close() consumer.chManager.connection.Close() } +// StopConsuming stops the consumption of messages. +// The consumer should be discarded as it's not safe for re-use. +// This method sends a basic.cancel notification. +// The consumerName is the name or delivery tag of the amqp consumer we want to cancel. +// When noWait is true, do not wait for the server to acknowledge the cancel. +// Only use this when you are certain there are no deliveries in flight that +// require an acknowledgment, otherwise they will arrive and be dropped in the +// client without an ack, and will not be redelivered to other consumers. +// IMPORTANT: Since the streadway library doesn't provide a way to retrieve the consumer's tag after the creation +// it's imperative for you to set the name when creating the consumer, if you want to use this function later +// a simple uuid4 should do the trick, since it should be unique. +// If you start many consumers, you should store the name of the consumers when creating them, such that you can +// use them in a for to stop all the consumers. +func (consumer Consumer) StopConsuming(consumerName string, noWait bool) { + consumer.chManager.channel.Cancel(consumerName, noWait) +} + // startGoroutinesWithRetries attempts to start consuming on a channel // with an exponential backoff func (consumer Consumer) startGoroutinesWithRetries( diff --git a/consume_options.go b/consume_options.go index 1c61dbc..e8eb1bb 100644 --- a/consume_options.go +++ b/consume_options.go @@ -189,7 +189,6 @@ func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) { } } - // WithConsumeOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer // if unset the default will be used (false) func WithConsumeOptionsConsumerAutoAck(autoAck bool) func(*ConsumeOptions) {