Browse Source

fix race

pull/73/head
wagslane 4 years ago
parent
commit
733878b751
1 changed files with 8 additions and 6 deletions
  1. +8
    -6
      publish.go

+ 8
- 6
publish.go View File

@ -123,7 +123,7 @@ func (publisher *Publisher) handleRestarts() {
go publisher.startNotifyReturnHandler() go publisher.startNotifyReturnHandler()
} }
if publisher.notifyPublishChan != nil { if publisher.notifyPublishChan != nil {
go publisher.startNotifyPublishHandler()
publisher.startNotifyPublishHandler()
} }
} }
} }
@ -139,7 +139,7 @@ func (publisher *Publisher) NotifyReturn() <-chan Return {
// NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option // NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option
func (publisher *Publisher) NotifyPublish() <-chan Confirmation { func (publisher *Publisher) NotifyPublish() <-chan Confirmation {
publisher.notifyPublishChan = make(chan Confirmation) publisher.notifyPublishChan = make(chan Confirmation)
go publisher.startNotifyPublishHandler()
publisher.startNotifyPublishHandler()
return publisher.notifyPublishChan return publisher.notifyPublishChan
} }
@ -235,8 +235,10 @@ func (publisher *Publisher) startNotifyReturnHandler() {
func (publisher *Publisher) startNotifyPublishHandler() { func (publisher *Publisher) startNotifyPublishHandler() {
publisher.chManager.channel.Confirm(false) publisher.chManager.channel.Confirm(false)
publishAMQPCh := publisher.chManager.channel.NotifyPublish(make(chan amqp.Confirmation, 1))
for conf := range publishAMQPCh {
publisher.notifyPublishChan <- Confirmation{conf}
}
go func() {
publishAMQPCh := publisher.chManager.channel.NotifyPublish(make(chan amqp.Confirmation, 1))
for conf := range publishAMQPCh {
publisher.notifyPublishChan <- Confirmation{conf}
}
}()
} }

Loading…
Cancel
Save