diff --git a/channel.go b/channel.go index 3ced887..3f2c598 100644 --- a/channel.go +++ b/channel.go @@ -53,10 +53,9 @@ func getNewChannel(url string, conf amqp.Config) (*amqp.Connection, *amqp.Channe // backoff. Once reconnected, it sends an error back on the manager's notifyCancelOrClose // channel func (chManager *channelManager) startNotifyCancelOrClosed() { - notifyCloseChan := make(chan *amqp.Error) - notifyCloseChan = chManager.channel.NotifyClose(notifyCloseChan) - notifyCancelChan := make(chan string) - notifyCancelChan = chManager.channel.NotifyCancel(notifyCancelChan) + notifyCloseChan := chManager.channel.NotifyClose(make(chan *amqp.Error, 1)) + notifyCancelChan := chManager.channel.NotifyCancel(make(chan string, 1)) + select { case err := <-notifyCloseChan: // If the connection close is triggered by the Server, a reconnection takes place @@ -72,18 +71,6 @@ func (chManager *channelManager) startNotifyCancelOrClosed() { chManager.logger.Printf("successfully reconnected to amqp server after cancel") chManager.notifyCancelOrClose <- errors.New(err) } - - // these channels can be closed by amqp - select { - case <-notifyCloseChan: - default: - close(notifyCloseChan) - } - select { - case <-notifyCancelChan: - default: - close(notifyCancelChan) - } } // reconnectWithBackoff continuously attempts to reconnect with an diff --git a/publish.go b/publish.go index 846e662..ee7cce3 100644 --- a/publish.go +++ b/publish.go @@ -99,7 +99,7 @@ func WithPublishOptionsHeaders(headers Table) func(*PublishOptions) { type Publisher struct { chManager *channelManager - notifyFlowChan chan bool + notifyReturnChan chan Return disablePublishDueToFlow bool disablePublishDueToFlowMux *sync.RWMutex @@ -150,28 +150,21 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher publisher := Publisher{ chManager: chManager, - notifyFlowChan: make(chan bool), + notifyReturnChan: make(chan Return, 1), disablePublishDueToFlow: false, disablePublishDueToFlowMux: &sync.RWMutex{}, logger: options.Logger, } - returnAMQPChan := make(chan amqp.Return) - returnChan := make(chan Return) - returnAMQPChan = publisher.chManager.channel.NotifyReturn(returnAMQPChan) go func() { - for ret := range returnAMQPChan { - returnChan <- Return{ - ret, - } + publisher.startNotifyHandlers() + for err := range publisher.chManager.notifyCancelOrClose { + publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err) + publisher.startNotifyHandlers() } }() - publisher.notifyFlowChan = publisher.chManager.channel.NotifyFlow(publisher.notifyFlowChan) - - go publisher.startNotifyFlowHandler() - - return publisher, returnChan, nil + return publisher, publisher.notifyReturnChan, nil } // Publish publishes the provided data to the given routing keys over the connection @@ -224,10 +217,26 @@ func (publisher Publisher) StopPublishing() { publisher.chManager.connection.Close() } -func (publisher *Publisher) startNotifyFlowHandler() { +func (publisher *Publisher) startNotifyHandlers() { + returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1)) + go func() { + for ret := range returnAMQPChan { + publisher.notifyReturnChan <- Return{ret} + } + }() + + notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool)) + go publisher.startNotifyFlowHandler(notifyFlowChan) +} + +func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) { + publisher.disablePublishDueToFlowMux.Lock() + publisher.disablePublishDueToFlow = false + publisher.disablePublishDueToFlowMux.Unlock() + // Listeners for active=true flow control. When true is sent to a listener, // publishing should pause until false is sent to listeners. - for ok := range publisher.notifyFlowChan { + for ok := range notifyFlowChan { publisher.disablePublishDueToFlowMux.Lock() if ok { publisher.logger.Printf("pausing publishing due to flow request from server")