Browse Source

adds the consume shutdown

pull/18/head
Brian Mori 5 years ago
parent
commit
73f54b2d6b
2 changed files with 29 additions and 10 deletions
  1. +21
    -10
      channel.go
  2. +8
    -0
      consume.go

+ 21
- 10
channel.go View File

@ -12,12 +12,13 @@ type channelManager struct {
logger Logger logger Logger
url string url string
channel *amqp.Channel channel *amqp.Channel
connection *amqp.Connection
channelMux *sync.RWMutex channelMux *sync.RWMutex
notifyCancelOrClose chan error notifyCancelOrClose chan error
} }
func newChannelManager(url string, log Logger) (*channelManager, error) { func newChannelManager(url string, log Logger) (*channelManager, error) {
ch, err := getNewChannel(url)
conn, ch, err := getNewChannel(url)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -25,6 +26,7 @@ func newChannelManager(url string, log Logger) (*channelManager, error) {
chManager := channelManager{ chManager := channelManager{
logger: log, logger: log,
url: url, url: url,
connection: conn,
channel: ch, channel: ch,
channelMux: &sync.RWMutex{}, channelMux: &sync.RWMutex{},
notifyCancelOrClose: make(chan error), notifyCancelOrClose: make(chan error),
@ -33,16 +35,16 @@ func newChannelManager(url string, log Logger) (*channelManager, error) {
return &chManager, nil return &chManager, nil
} }
func getNewChannel(url string) (*amqp.Channel, error) {
func getNewChannel(url string) (*amqp.Connection, *amqp.Channel, error) {
amqpConn, err := amqp.Dial(url) amqpConn, err := amqp.Dial(url)
if err != nil { if err != nil {
return nil, err
return nil, nil, err
} }
ch, err := amqpConn.Channel() ch, err := amqpConn.Channel()
if err != nil { if err != nil {
return nil, err
return nil, nil, err
} }
return ch, err
return amqpConn, ch, err
} }
// startNotifyCancelOrClosed listens on the channel's cancelled and closed // startNotifyCancelOrClosed listens on the channel's cancelled and closed
@ -56,10 +58,15 @@ func (chManager *channelManager) startNotifyCancelOrClosed() {
notifyCancelChan = chManager.channel.NotifyCancel(notifyCancelChan) notifyCancelChan = chManager.channel.NotifyCancel(notifyCancelChan)
select { select {
case err := <-notifyCloseChan: case err := <-notifyCloseChan:
chManager.logger.Printf("attempting to reconnect to amqp server after close")
chManager.reconnectWithBackoff()
chManager.logger.Printf("successfully reconnected to amqp server after close")
chManager.notifyCancelOrClose <- err
// If the connection close is triggered by the Server, a reconnection takes place
if err.Server {
chManager.logger.Printf("attempting to reconnect to amqp server after close")
chManager.reconnectWithBackoff()
chManager.logger.Printf("successfully reconnected to amqp server after close")
chManager.notifyCancelOrClose <- err
}
case err := <-notifyCancelChan: case err := <-notifyCancelChan:
chManager.logger.Printf("attempting to reconnect to amqp server after cancel") chManager.logger.Printf("attempting to reconnect to amqp server after cancel")
chManager.reconnectWithBackoff() chManager.reconnectWithBackoff()
@ -101,11 +108,15 @@ func (chManager *channelManager) reconnectWithBackoff() {
func (chManager *channelManager) reconnect() error { func (chManager *channelManager) reconnect() error {
chManager.channelMux.Lock() chManager.channelMux.Lock()
defer chManager.channelMux.Unlock() defer chManager.channelMux.Unlock()
newChannel, err := getNewChannel(chManager.url)
newConn, newChannel, err := getNewChannel(chManager.url)
if err != nil { if err != nil {
return err return err
} }
chManager.channel.Close() chManager.channel.Close()
chManager.connection.Close()
chManager.connection = newConn
chManager.channel = newChannel chManager.channel = newChannel
go chManager.startNotifyCancelOrClosed() go chManager.startNotifyCancelOrClosed()
return nil return nil


+ 8
- 0
consume.go View File

@ -312,6 +312,14 @@ func (consumer Consumer) StartConsuming(
return nil return nil
} }
// StopConsuming stop the consume of messages
func (consumer Consumer) StopConsuming() {
consumer.chManager.channel.Close()
consumer.chManager.connection.Close()
}
// startGoroutinesWithRetries attempts to start consuming on a channel // startGoroutinesWithRetries attempts to start consuming on a channel
// with an exponential backoff // with an exponential backoff
func (consumer Consumer) startGoroutinesWithRetries( func (consumer Consumer) startGoroutinesWithRetries(


Loading…
Cancel
Save