Browse Source

Move channel setup to a separate startNotify helper

Also add a very small buffer to the NotifyReturn channel
in case no one is listening on the other side.

I event thought about actually having a timeout when sending
to that channel, but it felt it could lead to loss of
events on non problematic situations, so let's see how this
works instead.
pull/29/head
Victor Elias 4 years ago
parent
commit
f8fddc8191
1 changed files with 14 additions and 14 deletions
  1. +14
    -14
      publish.go

+ 14
- 14
publish.go View File

@ -155,22 +155,10 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher
returnChan := make(chan Return) returnChan := make(chan Return)
go func() { go func() {
setupNotifyChans := func() {
returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return))
go func() {
for ret := range returnAMQPChan {
returnChan <- Return{ret}
}
}()
notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool))
go publisher.startNotifyFlowHandler(notifyFlowChan)
}
setupNotifyChans()
publisher.startNotifyHandlers(returnChan)
for err := range publisher.chManager.notifyCancelOrClose { for err := range publisher.chManager.notifyCancelOrClose {
publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err) publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err)
setupNotifyChans()
publisher.startNotifyHandlers(returnChan)
} }
}() }()
@ -227,6 +215,18 @@ func (publisher Publisher) StopPublishing() {
publisher.chManager.connection.Close() publisher.chManager.connection.Close()
} }
func (publisher *Publisher) startNotifyHandlers(returnChan chan Return) {
returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1))
go func() {
for ret := range returnAMQPChan {
returnChan <- Return{ret}
}
}()
notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool))
go publisher.startNotifyFlowHandler(notifyFlowChan)
}
func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) { func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) {
// Listeners for active=true flow control. When true is sent to a listener, // Listeners for active=true flow control. When true is sent to a listener,
// publishing should pause until false is sent to listeners. // publishing should pause until false is sent to listeners.


Loading…
Cancel
Save