From 733878b7517e42c71fe77062ccc14ae8e2f4fc1a Mon Sep 17 00:00:00 2001 From: wagslane Date: Thu, 31 Mar 2022 16:10:29 -0600 Subject: [PATCH] fix race --- publish.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/publish.go b/publish.go index 3b47959..70ad138 100644 --- a/publish.go +++ b/publish.go @@ -123,7 +123,7 @@ func (publisher *Publisher) handleRestarts() { go publisher.startNotifyReturnHandler() } if publisher.notifyPublishChan != nil { - go publisher.startNotifyPublishHandler() + publisher.startNotifyPublishHandler() } } } @@ -139,7 +139,7 @@ func (publisher *Publisher) NotifyReturn() <-chan Return { // NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option func (publisher *Publisher) NotifyPublish() <-chan Confirmation { publisher.notifyPublishChan = make(chan Confirmation) - go publisher.startNotifyPublishHandler() + publisher.startNotifyPublishHandler() return publisher.notifyPublishChan } @@ -235,8 +235,10 @@ func (publisher *Publisher) startNotifyReturnHandler() { func (publisher *Publisher) startNotifyPublishHandler() { publisher.chManager.channel.Confirm(false) - publishAMQPCh := publisher.chManager.channel.NotifyPublish(make(chan amqp.Confirmation, 1)) - for conf := range publishAMQPCh { - publisher.notifyPublishChan <- Confirmation{conf} - } + go func() { + publishAMQPCh := publisher.chManager.channel.NotifyPublish(make(chan amqp.Confirmation, 1)) + for conf := range publishAMQPCh { + publisher.notifyPublishChan <- Confirmation{conf} + } + }() }