|
|
|
@ -12,12 +12,13 @@ type channelManager struct { |
|
|
|
logger Logger |
|
|
|
url string |
|
|
|
channel *amqp.Channel |
|
|
|
connection *amqp.Connection |
|
|
|
channelMux *sync.RWMutex |
|
|
|
notifyCancelOrClose chan error |
|
|
|
} |
|
|
|
|
|
|
|
func newChannelManager(url string, log Logger) (*channelManager, error) { |
|
|
|
ch, err := getNewChannel(url) |
|
|
|
conn, ch, err := getNewChannel(url) |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
@ -25,6 +26,7 @@ func newChannelManager(url string, log Logger) (*channelManager, error) { |
|
|
|
chManager := channelManager{ |
|
|
|
logger: log, |
|
|
|
url: url, |
|
|
|
connection: conn, |
|
|
|
channel: ch, |
|
|
|
channelMux: &sync.RWMutex{}, |
|
|
|
notifyCancelOrClose: make(chan error), |
|
|
|
@ -33,16 +35,16 @@ func newChannelManager(url string, log Logger) (*channelManager, error) { |
|
|
|
return &chManager, nil |
|
|
|
} |
|
|
|
|
|
|
|
func getNewChannel(url string) (*amqp.Channel, error) { |
|
|
|
func getNewChannel(url string) (*amqp.Connection, *amqp.Channel, error) { |
|
|
|
amqpConn, err := amqp.Dial(url) |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
return nil, nil, err |
|
|
|
} |
|
|
|
ch, err := amqpConn.Channel() |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
return nil, nil, err |
|
|
|
} |
|
|
|
return ch, err |
|
|
|
return amqpConn, ch, err |
|
|
|
} |
|
|
|
|
|
|
|
// startNotifyCancelOrClosed listens on the channel's cancelled and closed
|
|
|
|
@ -56,10 +58,15 @@ func (chManager *channelManager) startNotifyCancelOrClosed() { |
|
|
|
notifyCancelChan = chManager.channel.NotifyCancel(notifyCancelChan) |
|
|
|
select { |
|
|
|
case err := <-notifyCloseChan: |
|
|
|
chManager.logger.Printf("attempting to reconnect to amqp server after close") |
|
|
|
chManager.reconnectWithBackoff() |
|
|
|
chManager.logger.Printf("successfully reconnected to amqp server after close") |
|
|
|
chManager.notifyCancelOrClose <- err |
|
|
|
|
|
|
|
// If the connection close is triggered by the Server, a reconnection takes place
|
|
|
|
if err.Server { |
|
|
|
chManager.logger.Printf("attempting to reconnect to amqp server after close") |
|
|
|
chManager.reconnectWithBackoff() |
|
|
|
chManager.logger.Printf("successfully reconnected to amqp server after close") |
|
|
|
chManager.notifyCancelOrClose <- err |
|
|
|
} |
|
|
|
|
|
|
|
case err := <-notifyCancelChan: |
|
|
|
chManager.logger.Printf("attempting to reconnect to amqp server after cancel") |
|
|
|
chManager.reconnectWithBackoff() |
|
|
|
@ -101,11 +108,15 @@ func (chManager *channelManager) reconnectWithBackoff() { |
|
|
|
func (chManager *channelManager) reconnect() error { |
|
|
|
chManager.channelMux.Lock() |
|
|
|
defer chManager.channelMux.Unlock() |
|
|
|
newChannel, err := getNewChannel(chManager.url) |
|
|
|
newConn, newChannel, err := getNewChannel(chManager.url) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
chManager.channel.Close() |
|
|
|
chManager.connection.Close() |
|
|
|
|
|
|
|
chManager.connection = newConn |
|
|
|
chManager.channel = newChannel |
|
|
|
go chManager.startNotifyCancelOrClosed() |
|
|
|
return nil |
|
|
|
|