|
|
|
@ -13,12 +13,13 @@ type channelManager struct { |
|
|
|
url string |
|
|
|
channel *amqp.Channel |
|
|
|
connection *amqp.Connection |
|
|
|
config amqp.Config |
|
|
|
amqpConfig amqp.Config |
|
|
|
channelMux *sync.RWMutex |
|
|
|
notifyCancelOrClose chan error |
|
|
|
reconnectInterval time.Duration |
|
|
|
} |
|
|
|
|
|
|
|
func newChannelManager(url string, conf amqp.Config, log Logger) (*channelManager, error) { |
|
|
|
func newChannelManager(url string, conf amqp.Config, log Logger, reconnectInterval time.Duration) (*channelManager, error) { |
|
|
|
conn, ch, err := getNewChannel(url, conf) |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
@ -30,8 +31,9 @@ func newChannelManager(url string, conf amqp.Config, log Logger) (*channelManage |
|
|
|
connection: conn, |
|
|
|
channel: ch, |
|
|
|
channelMux: &sync.RWMutex{}, |
|
|
|
config: conf, |
|
|
|
amqpConfig: conf, |
|
|
|
notifyCancelOrClose: make(chan error), |
|
|
|
reconnectInterval: reconnectInterval, |
|
|
|
} |
|
|
|
go chManager.startNotifyCancelOrClosed() |
|
|
|
return &chManager, nil |
|
|
|
@ -50,8 +52,8 @@ func getNewChannel(url string, conf amqp.Config) (*amqp.Connection, *amqp.Channe |
|
|
|
} |
|
|
|
|
|
|
|
// startNotifyCancelOrClosed listens on the channel's cancelled and closed
|
|
|
|
// notifiers. When it detects a problem, it attempts to reconnect with an exponential
|
|
|
|
// backoff. Once reconnected, it sends an error back on the manager's notifyCancelOrClose
|
|
|
|
// notifiers. When it detects a problem, it attempts to reconnect.
|
|
|
|
// Once reconnected, it sends an error back on the manager's notifyCancelOrClose
|
|
|
|
// channel
|
|
|
|
func (chManager *channelManager) startNotifyCancelOrClosed() { |
|
|
|
notifyCloseChan := chManager.channel.NotifyClose(make(chan *amqp.Error, 1)) |
|
|
|
@ -62,7 +64,7 @@ func (chManager *channelManager) startNotifyCancelOrClosed() { |
|
|
|
// If the connection close is triggered by the Server, a reconnection takes place
|
|
|
|
if err != nil && err.Server { |
|
|
|
chManager.logger.Printf("attempting to reconnect to amqp server after close") |
|
|
|
chManager.reconnectWithBackoff() |
|
|
|
chManager.reconnectLoop() |
|
|
|
chManager.logger.Printf("successfully reconnected to amqp server after close") |
|
|
|
chManager.notifyCancelOrClose <- err |
|
|
|
} |
|
|
|
@ -74,20 +76,17 @@ func (chManager *channelManager) startNotifyCancelOrClosed() { |
|
|
|
} |
|
|
|
case err := <-notifyCancelChan: |
|
|
|
chManager.logger.Printf("attempting to reconnect to amqp server after cancel") |
|
|
|
chManager.reconnectWithBackoff() |
|
|
|
chManager.reconnectLoop() |
|
|
|
chManager.logger.Printf("successfully reconnected to amqp server after cancel") |
|
|
|
chManager.notifyCancelOrClose <- errors.New(err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// reconnectWithBackoff continuously attempts to reconnect with an
|
|
|
|
// exponential backoff strategy
|
|
|
|
func (chManager *channelManager) reconnectWithBackoff() { |
|
|
|
backoffTime := time.Second |
|
|
|
// reconnectLoop continuously attempts to reconnect
|
|
|
|
func (chManager *channelManager) reconnectLoop() { |
|
|
|
for { |
|
|
|
chManager.logger.Printf("waiting %s seconds to attempt to reconnect to amqp server", backoffTime) |
|
|
|
time.Sleep(backoffTime) |
|
|
|
backoffTime *= 2 |
|
|
|
chManager.logger.Printf("waiting %s seconds to attempt to reconnect to amqp server", chManager.reconnectInterval) |
|
|
|
time.Sleep(chManager.reconnectInterval) |
|
|
|
err := chManager.reconnect() |
|
|
|
if err != nil { |
|
|
|
chManager.logger.Printf("error reconnecting to amqp server: %v", err) |
|
|
|
@ -101,7 +100,7 @@ func (chManager *channelManager) reconnectWithBackoff() { |
|
|
|
func (chManager *channelManager) reconnect() error { |
|
|
|
chManager.channelMux.Lock() |
|
|
|
defer chManager.channelMux.Unlock() |
|
|
|
newConn, newChannel, err := getNewChannel(chManager.url, chManager.config) |
|
|
|
newConn, newChannel, err := getNewChannel(chManager.url, chManager.amqpConfig) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|