Browse Source

Make notifyReturnChan a property from publisher

It's the channel that never changes so we might
as well keep a copy of it (even to consider closing
it when the publisher is closed as well).

Also add another little buffer.
pull/29/head
Victor Elias 4 years ago
parent
commit
306c6bb34b
1 changed files with 8 additions and 6 deletions
  1. +8
    -6
      publish.go

+ 8
- 6
publish.go View File

@ -99,6 +99,8 @@ func WithPublishOptionsHeaders(headers Table) func(*PublishOptions) {
type Publisher struct {
chManager *channelManager
notifyReturnChan chan Return
disablePublishDueToFlow bool
disablePublishDueToFlowMux *sync.RWMutex
@ -148,21 +150,21 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher
publisher := Publisher{
chManager: chManager,
notifyReturnChan: make(chan Return, 1),
disablePublishDueToFlow: false,
disablePublishDueToFlowMux: &sync.RWMutex{},
logger: options.Logger,
}
returnChan := make(chan Return)
go func() {
publisher.startNotifyHandlers(returnChan)
publisher.startNotifyHandlers()
for err := range publisher.chManager.notifyCancelOrClose {
publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err)
publisher.startNotifyHandlers(returnChan)
publisher.startNotifyHandlers()
}
}()
return publisher, returnChan, nil
return publisher, publisher.notifyReturnChan, nil
}
// Publish publishes the provided data to the given routing keys over the connection
@ -215,11 +217,11 @@ func (publisher Publisher) StopPublishing() {
publisher.chManager.connection.Close()
}
func (publisher *Publisher) startNotifyHandlers(returnChan chan Return) {
func (publisher *Publisher) startNotifyHandlers() {
returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1))
go func() {
for ret := range returnAMQPChan {
returnChan <- Return{ret}
publisher.notifyReturnChan <- Return{ret}
}
}()


Loading…
Cancel
Save