From f8fddc8191fba5b7898a765f8440e515700722f6 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Tue, 13 Jul 2021 12:51:08 -0300 Subject: [PATCH] Move channel setup to a separate startNotify helper Also add a very small buffer to the NotifyReturn channel in case no one is listening on the other side. I event thought about actually having a timeout when sending to that channel, but it felt it could lead to loss of events on non problematic situations, so let's see how this works instead. --- publish.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/publish.go b/publish.go index a78d35c..32733a8 100644 --- a/publish.go +++ b/publish.go @@ -155,22 +155,10 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher returnChan := make(chan Return) go func() { - setupNotifyChans := func() { - returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return)) - go func() { - for ret := range returnAMQPChan { - returnChan <- Return{ret} - } - }() - - notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool)) - go publisher.startNotifyFlowHandler(notifyFlowChan) - } - - setupNotifyChans() + publisher.startNotifyHandlers(returnChan) for err := range publisher.chManager.notifyCancelOrClose { publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err) - setupNotifyChans() + publisher.startNotifyHandlers(returnChan) } }() @@ -227,6 +215,18 @@ func (publisher Publisher) StopPublishing() { publisher.chManager.connection.Close() } +func (publisher *Publisher) startNotifyHandlers(returnChan chan Return) { + returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1)) + go func() { + for ret := range returnAMQPChan { + returnChan <- Return{ret} + } + }() + + notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool)) + go publisher.startNotifyFlowHandler(notifyFlowChan) +} + func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) { // Listeners for active=true flow control. When true is sent to a listener, // publishing should pause until false is sent to listeners.