Browse Source

Added the ability to stop the consumers frmo receiving messages, but not from acknolwedging them

pull/26/head
Victor Castillo 5 years ago
parent
commit
e946a187e5
2 changed files with 23 additions and 6 deletions
  1. +23
    -5
      consume.go
  2. +0
    -1
      consume_options.go

+ 23
- 5
consume.go View File

@ -2,9 +2,8 @@ package rabbitmq
import (
"fmt"
"time"
"github.com/streadway/amqp"
"time"
)
// Consumer allows you to create and connect to queues for data consumption.
@ -107,13 +106,32 @@ func (consumer Consumer) StartConsuming(
return nil
}
// StopConsuming stops the consumption of messages.
// The consumer should be discarded as it's not safe for re-use
func (consumer Consumer) StopConsuming() {
// Disconnect disconnects both the channel and the connection.
// This method doesn't throw a reconnect, and should be used when finishing a program.
// IMPORTANT: If this method is executed before StopConsuming, it could cause unexpected behavior
// such as messages being processed, but not being acknowledged, thus being requeued by the broker
func (consumer Consumer) Disconnect() {
consumer.chManager.channel.Close()
consumer.chManager.connection.Close()
}
// StopConsuming stops the consumption of messages.
// The consumer should be discarded as it's not safe for re-use.
// This method sends a basic.cancel notification.
// The consumerName is the name or delivery tag of the amqp consumer we want to cancel.
// When noWait is true, do not wait for the server to acknowledge the cancel.
// Only use this when you are certain there are no deliveries in flight that
// require an acknowledgment, otherwise they will arrive and be dropped in the
// client without an ack, and will not be redelivered to other consumers.
// IMPORTANT: Since the streadway library doesn't provide a way to retrieve the consumer's tag after the creation
// it's imperative for you to set the name when creating the consumer, if you want to use this function later
// a simple uuid4 should do the trick, since it should be unique.
// If you start many consumers, you should store the name of the consumers when creating them, such that you can
// use them in a for to stop all the consumers.
func (consumer Consumer) StopConsuming(consumerName string, noWait bool) {
consumer.chManager.channel.Cancel(consumerName, noWait)
}
// startGoroutinesWithRetries attempts to start consuming on a channel
// with an exponential backoff
func (consumer Consumer) startGoroutinesWithRetries(


+ 0
- 1
consume_options.go View File

@ -189,7 +189,6 @@ func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) {
}
}
// WithConsumeOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer
// if unset the default will be used (false)
func WithConsumeOptionsConsumerAutoAck(autoAck bool) func(*ConsumeOptions) {


Loading…
Cancel
Save