diff --git a/channel.go b/channel.go index aee8515..3acf0c1 100644 --- a/channel.go +++ b/channel.go @@ -66,6 +66,12 @@ func (chManager *channelManager) startNotifyCancelOrClosed() { chManager.logger.Printf("successfully reconnected to amqp server after close") chManager.notifyCancelOrClose <- err } + if err != nil { + chManager.logger.Printf("not attempting to reconnect to amqp server because closure was initiated by the client") + } + if err == nil { + chManager.logger.Printf("amqp channel closed gracefully") + } case err := <-notifyCancelChan: chManager.logger.Printf("attempting to reconnect to amqp server after cancel") chManager.reconnectWithBackoff() diff --git a/consume.go b/consume.go index e470bc3..baf82f3 100644 --- a/consume.go +++ b/consume.go @@ -2,7 +2,6 @@ package rabbitmq import ( "fmt" - "time" amqp "github.com/rabbitmq/amqp091-go" ) @@ -107,13 +106,16 @@ func (consumer Consumer) StartConsuming( go func() { for err := range consumer.chManager.notifyCancelOrClose { - consumer.logger.Printf("gorabbit: successful recovery from: %v", err) - consumer.startGoroutinesWithRetries( + consumer.logger.Printf("successful recovery from: %v", err) + err = consumer.startGoroutines( handler, queue, routingKeys, *options, ) + if err != nil { + consumer.logger.Printf("error restarting consumer goroutines after cancel or close: %v", err) + } } }() return nil @@ -145,33 +147,6 @@ func (consumer Consumer) StopConsuming(consumerName string, noWait bool) { consumer.chManager.channel.Cancel(consumerName, noWait) } -// startGoroutinesWithRetries attempts to start consuming on a channel -// with an exponential backoff -func (consumer Consumer) startGoroutinesWithRetries( - handler Handler, - queue string, - routingKeys []string, - consumeOptions ConsumeOptions, -) { - backoffTime := time.Second - for { - consumer.logger.Printf("waiting %s seconds to attempt to start consumer goroutines", backoffTime) - time.Sleep(backoffTime) - backoffTime *= 2 - err := consumer.startGoroutines( - handler, - queue, - routingKeys, - consumeOptions, - ) - if err != nil { - consumer.logger.Printf("couldn't start consumer goroutines. err: %v", err) - continue - } - break - } -} - // startGoroutines declares the queue if it doesn't exist, // binds the queue to the routing key(s), and starts the goroutines // that will consume from the queue diff --git a/publish.go b/publish.go index 51d7449..f522062 100644 --- a/publish.go +++ b/publish.go @@ -106,7 +106,7 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher func (publisher *Publisher) handleRestarts() { for err := range publisher.chManager.notifyCancelOrClose { - publisher.options.Logger.Printf("gorabbit: successful publisher recovery from: %v", err) + publisher.options.Logger.Printf("successful publisher recovery from: %v", err) go publisher.startNotifyFlowHandler() if publisher.notifyReturnChan != nil { go publisher.startNotifyReturnHandler()