From 306c6bb34b3f8f9f14b2d6c6587c68dcc1b58ac6 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Tue, 13 Jul 2021 13:18:44 -0300 Subject: [PATCH] Make notifyReturnChan a property from publisher It's the channel that never changes so we might as well keep a copy of it (even to consider closing it when the publisher is closed as well). Also add another little buffer. --- publish.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/publish.go b/publish.go index 32733a8..f189fab 100644 --- a/publish.go +++ b/publish.go @@ -99,6 +99,8 @@ func WithPublishOptionsHeaders(headers Table) func(*PublishOptions) { type Publisher struct { chManager *channelManager + notifyReturnChan chan Return + disablePublishDueToFlow bool disablePublishDueToFlowMux *sync.RWMutex @@ -148,21 +150,21 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher publisher := Publisher{ chManager: chManager, + notifyReturnChan: make(chan Return, 1), disablePublishDueToFlow: false, disablePublishDueToFlowMux: &sync.RWMutex{}, logger: options.Logger, } - returnChan := make(chan Return) go func() { - publisher.startNotifyHandlers(returnChan) + publisher.startNotifyHandlers() for err := range publisher.chManager.notifyCancelOrClose { publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err) - publisher.startNotifyHandlers(returnChan) + publisher.startNotifyHandlers() } }() - return publisher, returnChan, nil + return publisher, publisher.notifyReturnChan, nil } // Publish publishes the provided data to the given routing keys over the connection @@ -215,11 +217,11 @@ func (publisher Publisher) StopPublishing() { publisher.chManager.connection.Close() } -func (publisher *Publisher) startNotifyHandlers(returnChan chan Return) { +func (publisher *Publisher) startNotifyHandlers() { returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1)) go func() { for ret := range returnAMQPChan { - returnChan <- Return{ret} + publisher.notifyReturnChan <- Return{ret} } }()