diff --git a/publish.go b/publish.go index a78d35c..32733a8 100644 --- a/publish.go +++ b/publish.go @@ -155,22 +155,10 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher returnChan := make(chan Return) go func() { - setupNotifyChans := func() { - returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return)) - go func() { - for ret := range returnAMQPChan { - returnChan <- Return{ret} - } - }() - - notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool)) - go publisher.startNotifyFlowHandler(notifyFlowChan) - } - - setupNotifyChans() + publisher.startNotifyHandlers(returnChan) for err := range publisher.chManager.notifyCancelOrClose { publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err) - setupNotifyChans() + publisher.startNotifyHandlers(returnChan) } }() @@ -227,6 +215,18 @@ func (publisher Publisher) StopPublishing() { publisher.chManager.connection.Close() } +func (publisher *Publisher) startNotifyHandlers(returnChan chan Return) { + returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1)) + go func() { + for ret := range returnAMQPChan { + returnChan <- Return{ret} + } + }() + + notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool)) + go publisher.startNotifyFlowHandler(notifyFlowChan) +} + func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) { // Listeners for active=true flow control. When true is sent to a listener, // publishing should pause until false is sent to listeners.