diff --git a/channel.go b/channel.go index 6b93f47..8cf2d60 100644 --- a/channel.go +++ b/channel.go @@ -10,6 +10,7 @@ import ( type channelManager struct { logger Logger + backoff time.Duration url string channel *amqp.Channel connection *amqp.Connection @@ -18,7 +19,7 @@ type channelManager struct { notifyCancelOrClose chan error } -func newChannelManager(url string, conf amqp.Config, log Logger) (*channelManager, error) { +func newChannelManager(url string, conf amqp.Config, log Logger, backoff time.Duration) (*channelManager, error) { conn, ch, err := getNewChannel(url, conf) if err != nil { return nil, err @@ -26,6 +27,7 @@ func newChannelManager(url string, conf amqp.Config, log Logger) (*channelManage chManager := channelManager{ logger: log, + backoff: backoff, url: url, connection: conn, channel: ch, @@ -77,11 +79,9 @@ func (chManager *channelManager) startNotifyCancelOrClosed() { // reconnectWithBackoff continuously attempts to reconnect with an // exponential backoff strategy func (chManager *channelManager) reconnectWithBackoff() { - backoffTime := time.Second for { - chManager.logger.Printf("waiting %s seconds to attempt to reconnect to amqp server", backoffTime) - time.Sleep(backoffTime) - backoffTime *= 2 + chManager.logger.Printf("waiting %s seconds to attempt to reconnect to amqp server", chManager.backoff) + time.Sleep(chManager.backoff) err := chManager.reconnect() if err != nil { chManager.logger.Printf("error reconnecting to amqp server: %v", err) diff --git a/consume.go b/consume.go index b882fc7..c4b4c8d 100644 --- a/consume.go +++ b/consume.go @@ -34,6 +34,7 @@ type Consumer struct { type ConsumerOptions struct { Logging bool Logger Logger + Backoff time.Duration } // Delivery captures the fields for a previously delivered message resident in @@ -45,7 +46,7 @@ type Delivery struct { // NewConsumer returns a new Consumer connected to the given rabbitmq server func NewConsumer(url string, config amqp.Config, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) { - options := &ConsumerOptions{} + options := &ConsumerOptions{Backoff: 1 * time.Second} for _, optionFunc := range optionFuncs { optionFunc(options) } @@ -53,7 +54,7 @@ func NewConsumer(url string, config amqp.Config, optionFuncs ...func(*ConsumerOp options.Logger = &noLogger{} // default no logging } - chManager, err := newChannelManager(url, config, options.Logger) + chManager, err := newChannelManager(url, config, options.Logger, options.Backoff) if err != nil { return Consumer{}, err } @@ -79,6 +80,13 @@ func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) { } } +// WithConsumerOptionsBackoff sets the duration to wait until a new reconnection try. +func WithConsumerOptionsBackoff(backoff time.Duration) func(options *ConsumerOptions) { + return func(options *ConsumerOptions) { + options.Backoff = backoff + } +} + // StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency". // Each goroutine spawns a handler that consumes off of the qiven queue which binds to the routing key(s). // The provided handler is called once for each message. If the provided queue doesn't exist, it diff --git a/publish.go b/publish.go index faf85e2..d7ff9c5 100644 --- a/publish.go +++ b/publish.go @@ -3,6 +3,7 @@ package rabbitmq import ( "fmt" "sync" + "time" amqp "github.com/rabbitmq/amqp091-go" ) @@ -112,6 +113,7 @@ type Publisher struct { type PublisherOptions struct { Logging bool Logger Logger + Backoff time.Duration } // WithPublisherOptionsLogging sets logging to true on the consumer options @@ -129,13 +131,20 @@ func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) { } } +// WithPublisherOptionsBackoff sets the duration to wait until a new reconnection try. +func WithPublisherOptionsBackoff(backoff time.Duration) func(options *PublisherOptions) { + return func(options *PublisherOptions) { + options.Backoff = backoff + } +} + // NewPublisher returns a new publisher with an open channel to the cluster. // If you plan to enforce mandatory or immediate publishing, those failures will be reported // on the channel of Returns that you should setup a listener on. // Flow controls are automatically handled as they are sent from the server, and publishing // will fail with an error when the server is requesting a slowdown func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*PublisherOptions)) (Publisher, <-chan Return, error) { - options := &PublisherOptions{} + options := &PublisherOptions{Backoff: 1 * time.Second} for _, optionFunc := range optionFuncs { optionFunc(options) } @@ -143,7 +152,7 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher options.Logger = &noLogger{} // default no logging } - chManager, err := newChannelManager(url, config, options.Logger) + chManager, err := newChannelManager(url, config, options.Logger, options.Backoff) if err != nil { return Publisher{}, nil, err }