From 98f15aea034aba9ac51427798a1232c569673983 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Mon, 12 Jul 2021 21:44:56 -0300 Subject: [PATCH 1/5] Stop closing close and cancel channels The channel takes ownership of those channels when we call Notify*, so we should not be closing them or a panic could happen due to double closure or sending to a closed channel. --- channel.go | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/channel.go b/channel.go index 3ced887..3f2c598 100644 --- a/channel.go +++ b/channel.go @@ -53,10 +53,9 @@ func getNewChannel(url string, conf amqp.Config) (*amqp.Connection, *amqp.Channe // backoff. Once reconnected, it sends an error back on the manager's notifyCancelOrClose // channel func (chManager *channelManager) startNotifyCancelOrClosed() { - notifyCloseChan := make(chan *amqp.Error) - notifyCloseChan = chManager.channel.NotifyClose(notifyCloseChan) - notifyCancelChan := make(chan string) - notifyCancelChan = chManager.channel.NotifyCancel(notifyCancelChan) + notifyCloseChan := chManager.channel.NotifyClose(make(chan *amqp.Error, 1)) + notifyCancelChan := chManager.channel.NotifyCancel(make(chan string, 1)) + select { case err := <-notifyCloseChan: // If the connection close is triggered by the Server, a reconnection takes place @@ -72,18 +71,6 @@ func (chManager *channelManager) startNotifyCancelOrClosed() { chManager.logger.Printf("successfully reconnected to amqp server after cancel") chManager.notifyCancelOrClose <- errors.New(err) } - - // these channels can be closed by amqp - select { - case <-notifyCloseChan: - default: - close(notifyCloseChan) - } - select { - case <-notifyCancelChan: - default: - close(notifyCancelChan) - } } // reconnectWithBackoff continuously attempts to reconnect with an From 19266cfffc18eeb3b1add0c26ed20dc96e62a770 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Tue, 13 Jul 2021 12:49:48 -0300 Subject: [PATCH 2/5] Re-configure notify handlers after reconnection We were probably missing those updates after a reconnection happens, since we stayed subscribed to the previous channel notifications instead. --- publish.go | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/publish.go b/publish.go index 846e662..a78d35c 100644 --- a/publish.go +++ b/publish.go @@ -99,8 +99,6 @@ func WithPublishOptionsHeaders(headers Table) func(*PublishOptions) { type Publisher struct { chManager *channelManager - notifyFlowChan chan bool - disablePublishDueToFlow bool disablePublishDueToFlowMux *sync.RWMutex @@ -150,26 +148,31 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher publisher := Publisher{ chManager: chManager, - notifyFlowChan: make(chan bool), disablePublishDueToFlow: false, disablePublishDueToFlowMux: &sync.RWMutex{}, logger: options.Logger, } - returnAMQPChan := make(chan amqp.Return) returnChan := make(chan Return) - returnAMQPChan = publisher.chManager.channel.NotifyReturn(returnAMQPChan) go func() { - for ret := range returnAMQPChan { - returnChan <- Return{ - ret, - } + setupNotifyChans := func() { + returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return)) + go func() { + for ret := range returnAMQPChan { + returnChan <- Return{ret} + } + }() + + notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool)) + go publisher.startNotifyFlowHandler(notifyFlowChan) } - }() - - publisher.notifyFlowChan = publisher.chManager.channel.NotifyFlow(publisher.notifyFlowChan) - go publisher.startNotifyFlowHandler() + setupNotifyChans() + for err := range publisher.chManager.notifyCancelOrClose { + publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err) + setupNotifyChans() + } + }() return publisher, returnChan, nil } @@ -224,10 +227,10 @@ func (publisher Publisher) StopPublishing() { publisher.chManager.connection.Close() } -func (publisher *Publisher) startNotifyFlowHandler() { +func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) { // Listeners for active=true flow control. When true is sent to a listener, // publishing should pause until false is sent to listeners. - for ok := range publisher.notifyFlowChan { + for ok := range notifyFlowChan { publisher.disablePublishDueToFlowMux.Lock() if ok { publisher.logger.Printf("pausing publishing due to flow request from server") From f8fddc8191fba5b7898a765f8440e515700722f6 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Tue, 13 Jul 2021 12:51:08 -0300 Subject: [PATCH 3/5] Move channel setup to a separate startNotify helper Also add a very small buffer to the NotifyReturn channel in case no one is listening on the other side. I event thought about actually having a timeout when sending to that channel, but it felt it could lead to loss of events on non problematic situations, so let's see how this works instead. --- publish.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/publish.go b/publish.go index a78d35c..32733a8 100644 --- a/publish.go +++ b/publish.go @@ -155,22 +155,10 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher returnChan := make(chan Return) go func() { - setupNotifyChans := func() { - returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return)) - go func() { - for ret := range returnAMQPChan { - returnChan <- Return{ret} - } - }() - - notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool)) - go publisher.startNotifyFlowHandler(notifyFlowChan) - } - - setupNotifyChans() + publisher.startNotifyHandlers(returnChan) for err := range publisher.chManager.notifyCancelOrClose { publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err) - setupNotifyChans() + publisher.startNotifyHandlers(returnChan) } }() @@ -227,6 +215,18 @@ func (publisher Publisher) StopPublishing() { publisher.chManager.connection.Close() } +func (publisher *Publisher) startNotifyHandlers(returnChan chan Return) { + returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1)) + go func() { + for ret := range returnAMQPChan { + returnChan <- Return{ret} + } + }() + + notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool)) + go publisher.startNotifyFlowHandler(notifyFlowChan) +} + func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) { // Listeners for active=true flow control. When true is sent to a listener, // publishing should pause until false is sent to listeners. From 306c6bb34b3f8f9f14b2d6c6587c68dcc1b58ac6 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Tue, 13 Jul 2021 13:18:44 -0300 Subject: [PATCH 4/5] Make notifyReturnChan a property from publisher It's the channel that never changes so we might as well keep a copy of it (even to consider closing it when the publisher is closed as well). Also add another little buffer. --- publish.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/publish.go b/publish.go index 32733a8..f189fab 100644 --- a/publish.go +++ b/publish.go @@ -99,6 +99,8 @@ func WithPublishOptionsHeaders(headers Table) func(*PublishOptions) { type Publisher struct { chManager *channelManager + notifyReturnChan chan Return + disablePublishDueToFlow bool disablePublishDueToFlowMux *sync.RWMutex @@ -148,21 +150,21 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher publisher := Publisher{ chManager: chManager, + notifyReturnChan: make(chan Return, 1), disablePublishDueToFlow: false, disablePublishDueToFlowMux: &sync.RWMutex{}, logger: options.Logger, } - returnChan := make(chan Return) go func() { - publisher.startNotifyHandlers(returnChan) + publisher.startNotifyHandlers() for err := range publisher.chManager.notifyCancelOrClose { publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err) - publisher.startNotifyHandlers(returnChan) + publisher.startNotifyHandlers() } }() - return publisher, returnChan, nil + return publisher, publisher.notifyReturnChan, nil } // Publish publishes the provided data to the given routing keys over the connection @@ -215,11 +217,11 @@ func (publisher Publisher) StopPublishing() { publisher.chManager.connection.Close() } -func (publisher *Publisher) startNotifyHandlers(returnChan chan Return) { +func (publisher *Publisher) startNotifyHandlers() { returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1)) go func() { for ret := range returnAMQPChan { - returnChan <- Return{ret} + publisher.notifyReturnChan <- Return{ret} } }() From eafe89237111fbec88fa05e136826211176382c0 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Tue, 13 Jul 2021 13:21:18 -0300 Subject: [PATCH 5/5] Make sure we enable flow for new connections --- publish.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/publish.go b/publish.go index f189fab..ee7cce3 100644 --- a/publish.go +++ b/publish.go @@ -230,6 +230,10 @@ func (publisher *Publisher) startNotifyHandlers() { } func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) { + publisher.disablePublishDueToFlowMux.Lock() + publisher.disablePublishDueToFlow = false + publisher.disablePublishDueToFlowMux.Unlock() + // Listeners for active=true flow control. When true is sent to a listener, // publishing should pause until false is sent to listeners. for ok := range notifyFlowChan {