Browse Source

Merge pull request #29 from victorges/fix/notify-chans-lifecycle

Fix notify channels lifecycle for publisher
pull/31/head v0.6.2
Lane Wagner 4 years ago
committed by GitHub
parent
commit
c7ae4f6482
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 32 deletions
  1. +3
    -16
      channel.go
  2. +25
    -16
      publish.go

+ 3
- 16
channel.go View File

@ -53,10 +53,9 @@ func getNewChannel(url string, conf amqp.Config) (*amqp.Connection, *amqp.Channe
// backoff. Once reconnected, it sends an error back on the manager's notifyCancelOrClose
// channel
func (chManager *channelManager) startNotifyCancelOrClosed() {
notifyCloseChan := make(chan *amqp.Error)
notifyCloseChan = chManager.channel.NotifyClose(notifyCloseChan)
notifyCancelChan := make(chan string)
notifyCancelChan = chManager.channel.NotifyCancel(notifyCancelChan)
notifyCloseChan := chManager.channel.NotifyClose(make(chan *amqp.Error, 1))
notifyCancelChan := chManager.channel.NotifyCancel(make(chan string, 1))
select {
case err := <-notifyCloseChan:
// If the connection close is triggered by the Server, a reconnection takes place
@ -72,18 +71,6 @@ func (chManager *channelManager) startNotifyCancelOrClosed() {
chManager.logger.Printf("successfully reconnected to amqp server after cancel")
chManager.notifyCancelOrClose <- errors.New(err)
}
// these channels can be closed by amqp
select {
case <-notifyCloseChan:
default:
close(notifyCloseChan)
}
select {
case <-notifyCancelChan:
default:
close(notifyCancelChan)
}
}
// reconnectWithBackoff continuously attempts to reconnect with an


+ 25
- 16
publish.go View File

@ -99,7 +99,7 @@ func WithPublishOptionsHeaders(headers Table) func(*PublishOptions) {
type Publisher struct {
chManager *channelManager
notifyFlowChan chan bool
notifyReturnChan chan Return
disablePublishDueToFlow bool
disablePublishDueToFlowMux *sync.RWMutex
@ -150,28 +150,21 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher
publisher := Publisher{
chManager: chManager,
notifyFlowChan: make(chan bool),
notifyReturnChan: make(chan Return, 1),
disablePublishDueToFlow: false,
disablePublishDueToFlowMux: &sync.RWMutex{},
logger: options.Logger,
}
returnAMQPChan := make(chan amqp.Return)
returnChan := make(chan Return)
returnAMQPChan = publisher.chManager.channel.NotifyReturn(returnAMQPChan)
go func() {
for ret := range returnAMQPChan {
returnChan <- Return{
ret,
}
publisher.startNotifyHandlers()
for err := range publisher.chManager.notifyCancelOrClose {
publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err)
publisher.startNotifyHandlers()
}
}()
publisher.notifyFlowChan = publisher.chManager.channel.NotifyFlow(publisher.notifyFlowChan)
go publisher.startNotifyFlowHandler()
return publisher, returnChan, nil
return publisher, publisher.notifyReturnChan, nil
}
// Publish publishes the provided data to the given routing keys over the connection
@ -224,10 +217,26 @@ func (publisher Publisher) StopPublishing() {
publisher.chManager.connection.Close()
}
func (publisher *Publisher) startNotifyFlowHandler() {
func (publisher *Publisher) startNotifyHandlers() {
returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1))
go func() {
for ret := range returnAMQPChan {
publisher.notifyReturnChan <- Return{ret}
}
}()
notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool))
go publisher.startNotifyFlowHandler(notifyFlowChan)
}
func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) {
publisher.disablePublishDueToFlowMux.Lock()
publisher.disablePublishDueToFlow = false
publisher.disablePublishDueToFlowMux.Unlock()
// Listeners for active=true flow control. When true is sent to a listener,
// publishing should pause until false is sent to listeners.
for ok := range publisher.notifyFlowChan {
for ok := range notifyFlowChan {
publisher.disablePublishDueToFlowMux.Lock()
if ok {
publisher.logger.Printf("pausing publishing due to flow request from server")


Loading…
Cancel
Save