|
|
|
@ -99,8 +99,7 @@ func WithPublishOptionsHeaders(headers Table) func(*PublishOptions) { |
|
|
|
type Publisher struct { |
|
|
|
chManager *channelManager |
|
|
|
|
|
|
|
notifyReturnChan chan Return |
|
|
|
shouldNotifyReturn bool |
|
|
|
notifyReturnChan chan Return |
|
|
|
|
|
|
|
disablePublishDueToFlow bool |
|
|
|
disablePublishDueToFlowMux *sync.RWMutex |
|
|
|
@ -154,17 +153,17 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher |
|
|
|
disablePublishDueToFlow: false, |
|
|
|
disablePublishDueToFlowMux: &sync.RWMutex{}, |
|
|
|
logger: options.Logger, |
|
|
|
notifyReturnChan: make(chan Return), |
|
|
|
shouldNotifyReturn: false, |
|
|
|
notifyReturnChan: nil, |
|
|
|
} |
|
|
|
|
|
|
|
go func() { |
|
|
|
go publisher.startNotifyFlowHandler() |
|
|
|
go publisher.startNotifyFlowHandler() |
|
|
|
|
|
|
|
// restart notifiers when cancel/close is triggered
|
|
|
|
go func() { |
|
|
|
for err := range publisher.chManager.notifyCancelOrClose { |
|
|
|
publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err) |
|
|
|
go publisher.startNotifyFlowHandler() |
|
|
|
if publisher.shouldNotifyReturn { |
|
|
|
if publisher.notifyReturnChan != nil { |
|
|
|
go publisher.startNotifyReturnHandler() |
|
|
|
} |
|
|
|
} |
|
|
|
@ -176,7 +175,8 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher |
|
|
|
// NotifyReturn registers a listener for basic.return methods.
|
|
|
|
// These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags.
|
|
|
|
func (publisher *Publisher) NotifyReturn() <-chan Return { |
|
|
|
publisher.shouldNotifyReturn = true |
|
|
|
publisher.notifyReturnChan = make(chan Return) |
|
|
|
go publisher.startNotifyReturnHandler() |
|
|
|
return publisher.notifyReturnChan |
|
|
|
} |
|
|
|
|
|
|
|
|