diff --git a/channel.go b/channel.go index 3acf0c1..61f0aa4 100644 --- a/channel.go +++ b/channel.go @@ -13,12 +13,13 @@ type channelManager struct { url string channel *amqp.Channel connection *amqp.Connection - config amqp.Config + amqpConfig amqp.Config channelMux *sync.RWMutex notifyCancelOrClose chan error + reconnectInterval time.Duration } -func newChannelManager(url string, conf amqp.Config, log Logger) (*channelManager, error) { +func newChannelManager(url string, conf amqp.Config, log Logger, reconnectInterval time.Duration) (*channelManager, error) { conn, ch, err := getNewChannel(url, conf) if err != nil { return nil, err @@ -30,8 +31,9 @@ func newChannelManager(url string, conf amqp.Config, log Logger) (*channelManage connection: conn, channel: ch, channelMux: &sync.RWMutex{}, - config: conf, + amqpConfig: conf, notifyCancelOrClose: make(chan error), + reconnectInterval: reconnectInterval, } go chManager.startNotifyCancelOrClosed() return &chManager, nil @@ -50,8 +52,8 @@ func getNewChannel(url string, conf amqp.Config) (*amqp.Connection, *amqp.Channe } // startNotifyCancelOrClosed listens on the channel's cancelled and closed -// notifiers. When it detects a problem, it attempts to reconnect with an exponential -// backoff. Once reconnected, it sends an error back on the manager's notifyCancelOrClose +// notifiers. When it detects a problem, it attempts to reconnect. +// Once reconnected, it sends an error back on the manager's notifyCancelOrClose // channel func (chManager *channelManager) startNotifyCancelOrClosed() { notifyCloseChan := chManager.channel.NotifyClose(make(chan *amqp.Error, 1)) @@ -62,7 +64,7 @@ func (chManager *channelManager) startNotifyCancelOrClosed() { // If the connection close is triggered by the Server, a reconnection takes place if err != nil && err.Server { chManager.logger.Printf("attempting to reconnect to amqp server after close") - chManager.reconnectWithBackoff() + chManager.reconnectLoop() chManager.logger.Printf("successfully reconnected to amqp server after close") chManager.notifyCancelOrClose <- err } @@ -74,20 +76,17 @@ func (chManager *channelManager) startNotifyCancelOrClosed() { } case err := <-notifyCancelChan: chManager.logger.Printf("attempting to reconnect to amqp server after cancel") - chManager.reconnectWithBackoff() + chManager.reconnectLoop() chManager.logger.Printf("successfully reconnected to amqp server after cancel") chManager.notifyCancelOrClose <- errors.New(err) } } -// reconnectWithBackoff continuously attempts to reconnect with an -// exponential backoff strategy -func (chManager *channelManager) reconnectWithBackoff() { - backoffTime := time.Second +// reconnectLoop continuously attempts to reconnect +func (chManager *channelManager) reconnectLoop() { for { - chManager.logger.Printf("waiting %s seconds to attempt to reconnect to amqp server", backoffTime) - time.Sleep(backoffTime) - backoffTime *= 2 + chManager.logger.Printf("waiting %s seconds to attempt to reconnect to amqp server", chManager.reconnectInterval) + time.Sleep(chManager.reconnectInterval) err := chManager.reconnect() if err != nil { chManager.logger.Printf("error reconnecting to amqp server: %v", err) @@ -101,7 +100,7 @@ func (chManager *channelManager) reconnectWithBackoff() { func (chManager *channelManager) reconnect() error { chManager.channelMux.Lock() defer chManager.channelMux.Unlock() - newConn, newChannel, err := getNewChannel(chManager.url, chManager.config) + newConn, newChannel, err := getNewChannel(chManager.url, chManager.amqpConfig) if err != nil { return err } diff --git a/consume.go b/consume.go index baf82f3..99203c4 100644 --- a/consume.go +++ b/consume.go @@ -2,6 +2,7 @@ package rabbitmq import ( "fmt" + "time" amqp "github.com/rabbitmq/amqp091-go" ) @@ -31,8 +32,9 @@ type Consumer struct { // Logging set to true will enable the consumer to print to stdout // Logger specifies a custom Logger interface implementation overruling Logging. type ConsumerOptions struct { - Logging bool - Logger Logger + Logging bool + Logger Logger + ReconnectInterval time.Duration } // Delivery captures the fields for a previously delivered message resident in @@ -44,15 +46,16 @@ type Delivery struct { // NewConsumer returns a new Consumer connected to the given rabbitmq server func NewConsumer(url string, config amqp.Config, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) { - options := &ConsumerOptions{} + options := &ConsumerOptions{ + Logging: true, + Logger: &stdLogger{}, + ReconnectInterval: time.Second * 5, + } for _, optionFunc := range optionFuncs { optionFunc(options) } - if options.Logger == nil { - options.Logger = &noLogger{} // default no logging - } - chManager, err := newChannelManager(url, config, options.Logger) + chManager, err := newChannelManager(url, config, options.Logger, options.ReconnectInterval) if err != nil { return Consumer{}, err } @@ -63,6 +66,14 @@ func NewConsumer(url string, config amqp.Config, optionFuncs ...func(*ConsumerOp return consumer, nil } +// WithConsumerOptionsReconnectInterval sets the interval at which the consumer will +// attempt to reconnect to the rabbit server +func WithConsumerOptionsReconnectInterval(reconnectInterval time.Duration) func(options *ConsumerOptions) { + return func(options *ConsumerOptions) { + options.ReconnectInterval = reconnectInterval + } +} + // WithConsumerOptionsLogging sets a logger to log to stdout func WithConsumerOptionsLogging(options *ConsumerOptions) { options.Logging = true