Browse Source

Merge pull request #63 from wagslane/lw_reconnect

reconnect interval
pull/64/head
Lane Wagner 4 years ago
committed by GitHub
parent
commit
a0074abcb3
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 22 deletions
  1. +14
    -15
      channel.go
  2. +18
    -7
      consume.go

+ 14
- 15
channel.go View File

@ -13,12 +13,13 @@ type channelManager struct {
url string url string
channel *amqp.Channel channel *amqp.Channel
connection *amqp.Connection connection *amqp.Connection
config amqp.Config
amqpConfig amqp.Config
channelMux *sync.RWMutex channelMux *sync.RWMutex
notifyCancelOrClose chan error notifyCancelOrClose chan error
reconnectInterval time.Duration
} }
func newChannelManager(url string, conf amqp.Config, log Logger) (*channelManager, error) {
func newChannelManager(url string, conf amqp.Config, log Logger, reconnectInterval time.Duration) (*channelManager, error) {
conn, ch, err := getNewChannel(url, conf) conn, ch, err := getNewChannel(url, conf)
if err != nil { if err != nil {
return nil, err return nil, err
@ -30,8 +31,9 @@ func newChannelManager(url string, conf amqp.Config, log Logger) (*channelManage
connection: conn, connection: conn,
channel: ch, channel: ch,
channelMux: &sync.RWMutex{}, channelMux: &sync.RWMutex{},
config: conf,
amqpConfig: conf,
notifyCancelOrClose: make(chan error), notifyCancelOrClose: make(chan error),
reconnectInterval: reconnectInterval,
} }
go chManager.startNotifyCancelOrClosed() go chManager.startNotifyCancelOrClosed()
return &chManager, nil return &chManager, nil
@ -50,8 +52,8 @@ func getNewChannel(url string, conf amqp.Config) (*amqp.Connection, *amqp.Channe
} }
// startNotifyCancelOrClosed listens on the channel's cancelled and closed // startNotifyCancelOrClosed listens on the channel's cancelled and closed
// notifiers. When it detects a problem, it attempts to reconnect with an exponential
// backoff. Once reconnected, it sends an error back on the manager's notifyCancelOrClose
// notifiers. When it detects a problem, it attempts to reconnect.
// Once reconnected, it sends an error back on the manager's notifyCancelOrClose
// channel // channel
func (chManager *channelManager) startNotifyCancelOrClosed() { func (chManager *channelManager) startNotifyCancelOrClosed() {
notifyCloseChan := chManager.channel.NotifyClose(make(chan *amqp.Error, 1)) notifyCloseChan := chManager.channel.NotifyClose(make(chan *amqp.Error, 1))
@ -62,7 +64,7 @@ func (chManager *channelManager) startNotifyCancelOrClosed() {
// If the connection close is triggered by the Server, a reconnection takes place // If the connection close is triggered by the Server, a reconnection takes place
if err != nil && err.Server { if err != nil && err.Server {
chManager.logger.Printf("attempting to reconnect to amqp server after close") chManager.logger.Printf("attempting to reconnect to amqp server after close")
chManager.reconnectWithBackoff()
chManager.reconnectLoop()
chManager.logger.Printf("successfully reconnected to amqp server after close") chManager.logger.Printf("successfully reconnected to amqp server after close")
chManager.notifyCancelOrClose <- err chManager.notifyCancelOrClose <- err
} }
@ -74,20 +76,17 @@ func (chManager *channelManager) startNotifyCancelOrClosed() {
} }
case err := <-notifyCancelChan: case err := <-notifyCancelChan:
chManager.logger.Printf("attempting to reconnect to amqp server after cancel") chManager.logger.Printf("attempting to reconnect to amqp server after cancel")
chManager.reconnectWithBackoff()
chManager.reconnectLoop()
chManager.logger.Printf("successfully reconnected to amqp server after cancel") chManager.logger.Printf("successfully reconnected to amqp server after cancel")
chManager.notifyCancelOrClose <- errors.New(err) chManager.notifyCancelOrClose <- errors.New(err)
} }
} }
// reconnectWithBackoff continuously attempts to reconnect with an
// exponential backoff strategy
func (chManager *channelManager) reconnectWithBackoff() {
backoffTime := time.Second
// reconnectLoop continuously attempts to reconnect
func (chManager *channelManager) reconnectLoop() {
for { for {
chManager.logger.Printf("waiting %s seconds to attempt to reconnect to amqp server", backoffTime)
time.Sleep(backoffTime)
backoffTime *= 2
chManager.logger.Printf("waiting %s seconds to attempt to reconnect to amqp server", chManager.reconnectInterval)
time.Sleep(chManager.reconnectInterval)
err := chManager.reconnect() err := chManager.reconnect()
if err != nil { if err != nil {
chManager.logger.Printf("error reconnecting to amqp server: %v", err) chManager.logger.Printf("error reconnecting to amqp server: %v", err)
@ -101,7 +100,7 @@ func (chManager *channelManager) reconnectWithBackoff() {
func (chManager *channelManager) reconnect() error { func (chManager *channelManager) reconnect() error {
chManager.channelMux.Lock() chManager.channelMux.Lock()
defer chManager.channelMux.Unlock() defer chManager.channelMux.Unlock()
newConn, newChannel, err := getNewChannel(chManager.url, chManager.config)
newConn, newChannel, err := getNewChannel(chManager.url, chManager.amqpConfig)
if err != nil { if err != nil {
return err return err
} }


+ 18
- 7
consume.go View File

@ -2,6 +2,7 @@ package rabbitmq
import ( import (
"fmt" "fmt"
"time"
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
) )
@ -31,8 +32,9 @@ type Consumer struct {
// Logging set to true will enable the consumer to print to stdout // Logging set to true will enable the consumer to print to stdout
// Logger specifies a custom Logger interface implementation overruling Logging. // Logger specifies a custom Logger interface implementation overruling Logging.
type ConsumerOptions struct { type ConsumerOptions struct {
Logging bool
Logger Logger
Logging bool
Logger Logger
ReconnectInterval time.Duration
} }
// Delivery captures the fields for a previously delivered message resident in // Delivery captures the fields for a previously delivered message resident in
@ -44,15 +46,16 @@ type Delivery struct {
// NewConsumer returns a new Consumer connected to the given rabbitmq server // NewConsumer returns a new Consumer connected to the given rabbitmq server
func NewConsumer(url string, config amqp.Config, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) { func NewConsumer(url string, config amqp.Config, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) {
options := &ConsumerOptions{}
options := &ConsumerOptions{
Logging: true,
Logger: &stdLogger{},
ReconnectInterval: time.Second * 5,
}
for _, optionFunc := range optionFuncs { for _, optionFunc := range optionFuncs {
optionFunc(options) optionFunc(options)
} }
if options.Logger == nil {
options.Logger = &noLogger{} // default no logging
}
chManager, err := newChannelManager(url, config, options.Logger)
chManager, err := newChannelManager(url, config, options.Logger, options.ReconnectInterval)
if err != nil { if err != nil {
return Consumer{}, err return Consumer{}, err
} }
@ -63,6 +66,14 @@ func NewConsumer(url string, config amqp.Config, optionFuncs ...func(*ConsumerOp
return consumer, nil return consumer, nil
} }
// WithConsumerOptionsReconnectInterval sets the interval at which the consumer will
// attempt to reconnect to the rabbit server
func WithConsumerOptionsReconnectInterval(reconnectInterval time.Duration) func(options *ConsumerOptions) {
return func(options *ConsumerOptions) {
options.ReconnectInterval = reconnectInterval
}
}
// WithConsumerOptionsLogging sets a logger to log to stdout // WithConsumerOptionsLogging sets a logger to log to stdout
func WithConsumerOptionsLogging(options *ConsumerOptions) { func WithConsumerOptionsLogging(options *ConsumerOptions) {
options.Logging = true options.Logging = true


Loading…
Cancel
Save