Browse Source

betting logging for closures

pull/63/head v0.7.1
wagslane 4 years ago
parent
commit
6ef45c80f6
3 changed files with 12 additions and 31 deletions
  1. +6
    -0
      channel.go
  2. +5
    -30
      consume.go
  3. +1
    -1
      publish.go

+ 6
- 0
channel.go View File

@ -66,6 +66,12 @@ func (chManager *channelManager) startNotifyCancelOrClosed() {
chManager.logger.Printf("successfully reconnected to amqp server after close")
chManager.notifyCancelOrClose <- err
}
if err != nil {
chManager.logger.Printf("not attempting to reconnect to amqp server because closure was initiated by the client")
}
if err == nil {
chManager.logger.Printf("amqp channel closed gracefully")
}
case err := <-notifyCancelChan:
chManager.logger.Printf("attempting to reconnect to amqp server after cancel")
chManager.reconnectWithBackoff()


+ 5
- 30
consume.go View File

@ -2,7 +2,6 @@ package rabbitmq
import (
"fmt"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
@ -107,13 +106,16 @@ func (consumer Consumer) StartConsuming(
go func() {
for err := range consumer.chManager.notifyCancelOrClose {
consumer.logger.Printf("gorabbit: successful recovery from: %v", err)
consumer.startGoroutinesWithRetries(
consumer.logger.Printf("successful recovery from: %v", err)
err = consumer.startGoroutines(
handler,
queue,
routingKeys,
*options,
)
if err != nil {
consumer.logger.Printf("error restarting consumer goroutines after cancel or close: %v", err)
}
}
}()
return nil
@ -145,33 +147,6 @@ func (consumer Consumer) StopConsuming(consumerName string, noWait bool) {
consumer.chManager.channel.Cancel(consumerName, noWait)
}
// startGoroutinesWithRetries attempts to start consuming on a channel
// with an exponential backoff
func (consumer Consumer) startGoroutinesWithRetries(
handler Handler,
queue string,
routingKeys []string,
consumeOptions ConsumeOptions,
) {
backoffTime := time.Second
for {
consumer.logger.Printf("waiting %s seconds to attempt to start consumer goroutines", backoffTime)
time.Sleep(backoffTime)
backoffTime *= 2
err := consumer.startGoroutines(
handler,
queue,
routingKeys,
consumeOptions,
)
if err != nil {
consumer.logger.Printf("couldn't start consumer goroutines. err: %v", err)
continue
}
break
}
}
// startGoroutines declares the queue if it doesn't exist,
// binds the queue to the routing key(s), and starts the goroutines
// that will consume from the queue


+ 1
- 1
publish.go View File

@ -106,7 +106,7 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher
func (publisher *Publisher) handleRestarts() {
for err := range publisher.chManager.notifyCancelOrClose {
publisher.options.Logger.Printf("gorabbit: successful publisher recovery from: %v", err)
publisher.options.Logger.Printf("successful publisher recovery from: %v", err)
go publisher.startNotifyFlowHandler()
if publisher.notifyReturnChan != nil {
go publisher.startNotifyReturnHandler()


Loading…
Cancel
Save