From 19266cfffc18eeb3b1add0c26ed20dc96e62a770 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Tue, 13 Jul 2021 12:49:48 -0300 Subject: [PATCH] Re-configure notify handlers after reconnection We were probably missing those updates after a reconnection happens, since we stayed subscribed to the previous channel notifications instead. --- publish.go | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/publish.go b/publish.go index 846e662..a78d35c 100644 --- a/publish.go +++ b/publish.go @@ -99,8 +99,6 @@ func WithPublishOptionsHeaders(headers Table) func(*PublishOptions) { type Publisher struct { chManager *channelManager - notifyFlowChan chan bool - disablePublishDueToFlow bool disablePublishDueToFlowMux *sync.RWMutex @@ -150,26 +148,31 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher publisher := Publisher{ chManager: chManager, - notifyFlowChan: make(chan bool), disablePublishDueToFlow: false, disablePublishDueToFlowMux: &sync.RWMutex{}, logger: options.Logger, } - returnAMQPChan := make(chan amqp.Return) returnChan := make(chan Return) - returnAMQPChan = publisher.chManager.channel.NotifyReturn(returnAMQPChan) go func() { - for ret := range returnAMQPChan { - returnChan <- Return{ - ret, - } + setupNotifyChans := func() { + returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return)) + go func() { + for ret := range returnAMQPChan { + returnChan <- Return{ret} + } + }() + + notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool)) + go publisher.startNotifyFlowHandler(notifyFlowChan) } - }() - - publisher.notifyFlowChan = publisher.chManager.channel.NotifyFlow(publisher.notifyFlowChan) - go publisher.startNotifyFlowHandler() + setupNotifyChans() + for err := range publisher.chManager.notifyCancelOrClose { + publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err) + setupNotifyChans() + } + }() return publisher, returnChan, nil } @@ -224,10 +227,10 @@ func (publisher Publisher) StopPublishing() { publisher.chManager.connection.Close() } -func (publisher *Publisher) startNotifyFlowHandler() { +func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) { // Listeners for active=true flow control. When true is sent to a listener, // publishing should pause until false is sent to listeners. - for ok := range publisher.notifyFlowChan { + for ok := range notifyFlowChan { publisher.disablePublishDueToFlowMux.Lock() if ok { publisher.logger.Printf("pausing publishing due to flow request from server")