diff --git a/channel.go b/channel.go index 4524f5a..4d7183f 100644 --- a/channel.go +++ b/channel.go @@ -12,12 +12,13 @@ type channelManager struct { logger Logger url string channel *amqp.Channel + connection *amqp.Connection channelMux *sync.RWMutex notifyCancelOrClose chan error } func newChannelManager(url string, log Logger) (*channelManager, error) { - ch, err := getNewChannel(url) + conn, ch, err := getNewChannel(url) if err != nil { return nil, err } @@ -25,6 +26,7 @@ func newChannelManager(url string, log Logger) (*channelManager, error) { chManager := channelManager{ logger: log, url: url, + connection: conn, channel: ch, channelMux: &sync.RWMutex{}, notifyCancelOrClose: make(chan error), @@ -33,16 +35,16 @@ func newChannelManager(url string, log Logger) (*channelManager, error) { return &chManager, nil } -func getNewChannel(url string) (*amqp.Channel, error) { +func getNewChannel(url string) (*amqp.Connection, *amqp.Channel, error) { amqpConn, err := amqp.Dial(url) if err != nil { - return nil, err + return nil, nil, err } ch, err := amqpConn.Channel() if err != nil { - return nil, err + return nil, nil, err } - return ch, err + return amqpConn, ch, err } // startNotifyCancelOrClosed listens on the channel's cancelled and closed @@ -56,10 +58,15 @@ func (chManager *channelManager) startNotifyCancelOrClosed() { notifyCancelChan = chManager.channel.NotifyCancel(notifyCancelChan) select { case err := <-notifyCloseChan: - chManager.logger.Printf("attempting to reconnect to amqp server after close") - chManager.reconnectWithBackoff() - chManager.logger.Printf("successfully reconnected to amqp server after close") - chManager.notifyCancelOrClose <- err + + // If the connection close is triggered by the Server, a reconnection takes place + if err.Server { + chManager.logger.Printf("attempting to reconnect to amqp server after close") + chManager.reconnectWithBackoff() + chManager.logger.Printf("successfully reconnected to amqp server after close") + chManager.notifyCancelOrClose <- err + } + case err := <-notifyCancelChan: chManager.logger.Printf("attempting to reconnect to amqp server after cancel") chManager.reconnectWithBackoff() @@ -101,11 +108,15 @@ func (chManager *channelManager) reconnectWithBackoff() { func (chManager *channelManager) reconnect() error { chManager.channelMux.Lock() defer chManager.channelMux.Unlock() - newChannel, err := getNewChannel(chManager.url) + newConn, newChannel, err := getNewChannel(chManager.url) if err != nil { return err } + chManager.channel.Close() + chManager.connection.Close() + + chManager.connection = newConn chManager.channel = newChannel go chManager.startNotifyCancelOrClosed() return nil diff --git a/consume.go b/consume.go index 1c22504..4003beb 100644 --- a/consume.go +++ b/consume.go @@ -312,6 +312,14 @@ func (consumer Consumer) StartConsuming( return nil } +// StopConsuming stop the consume of messages +func (consumer Consumer) StopConsuming() { + + consumer.chManager.channel.Close() + consumer.chManager.connection.Close() + +} + // startGoroutinesWithRetries attempts to start consuming on a channel // with an exponential backoff func (consumer Consumer) startGoroutinesWithRetries(