diff --git a/publish.go b/publish.go index 32733a8..f189fab 100644 --- a/publish.go +++ b/publish.go @@ -99,6 +99,8 @@ func WithPublishOptionsHeaders(headers Table) func(*PublishOptions) { type Publisher struct { chManager *channelManager + notifyReturnChan chan Return + disablePublishDueToFlow bool disablePublishDueToFlowMux *sync.RWMutex @@ -148,21 +150,21 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher publisher := Publisher{ chManager: chManager, + notifyReturnChan: make(chan Return, 1), disablePublishDueToFlow: false, disablePublishDueToFlowMux: &sync.RWMutex{}, logger: options.Logger, } - returnChan := make(chan Return) go func() { - publisher.startNotifyHandlers(returnChan) + publisher.startNotifyHandlers() for err := range publisher.chManager.notifyCancelOrClose { publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err) - publisher.startNotifyHandlers(returnChan) + publisher.startNotifyHandlers() } }() - return publisher, returnChan, nil + return publisher, publisher.notifyReturnChan, nil } // Publish publishes the provided data to the given routing keys over the connection @@ -215,11 +217,11 @@ func (publisher Publisher) StopPublishing() { publisher.chManager.connection.Close() } -func (publisher *Publisher) startNotifyHandlers(returnChan chan Return) { +func (publisher *Publisher) startNotifyHandlers() { returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1)) go func() { for ret := range returnAMQPChan { - returnChan <- Return{ret} + publisher.notifyReturnChan <- Return{ret} } }()