Browse Source

Re-configure notify handlers after reconnection

We were probably missing those updates after a
reconnection happens, since we stayed subscribed
to the previous channel notifications instead.
pull/29/head
Victor Elias 4 years ago
parent
commit
19266cfffc
1 changed files with 18 additions and 15 deletions
  1. +18
    -15
      publish.go

+ 18
- 15
publish.go View File

@ -99,8 +99,6 @@ func WithPublishOptionsHeaders(headers Table) func(*PublishOptions) {
type Publisher struct { type Publisher struct {
chManager *channelManager chManager *channelManager
notifyFlowChan chan bool
disablePublishDueToFlow bool disablePublishDueToFlow bool
disablePublishDueToFlowMux *sync.RWMutex disablePublishDueToFlowMux *sync.RWMutex
@ -150,26 +148,31 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher
publisher := Publisher{ publisher := Publisher{
chManager: chManager, chManager: chManager,
notifyFlowChan: make(chan bool),
disablePublishDueToFlow: false, disablePublishDueToFlow: false,
disablePublishDueToFlowMux: &sync.RWMutex{}, disablePublishDueToFlowMux: &sync.RWMutex{},
logger: options.Logger, logger: options.Logger,
} }
returnAMQPChan := make(chan amqp.Return)
returnChan := make(chan Return) returnChan := make(chan Return)
returnAMQPChan = publisher.chManager.channel.NotifyReturn(returnAMQPChan)
go func() { go func() {
for ret := range returnAMQPChan {
returnChan <- Return{
ret,
}
setupNotifyChans := func() {
returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return))
go func() {
for ret := range returnAMQPChan {
returnChan <- Return{ret}
}
}()
notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool))
go publisher.startNotifyFlowHandler(notifyFlowChan)
} }
}()
publisher.notifyFlowChan = publisher.chManager.channel.NotifyFlow(publisher.notifyFlowChan)
go publisher.startNotifyFlowHandler()
setupNotifyChans()
for err := range publisher.chManager.notifyCancelOrClose {
publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err)
setupNotifyChans()
}
}()
return publisher, returnChan, nil return publisher, returnChan, nil
} }
@ -224,10 +227,10 @@ func (publisher Publisher) StopPublishing() {
publisher.chManager.connection.Close() publisher.chManager.connection.Close()
} }
func (publisher *Publisher) startNotifyFlowHandler() {
func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) {
// Listeners for active=true flow control. When true is sent to a listener, // Listeners for active=true flow control. When true is sent to a listener,
// publishing should pause until false is sent to listeners. // publishing should pause until false is sent to listeners.
for ok := range publisher.notifyFlowChan {
for ok := range notifyFlowChan {
publisher.disablePublishDueToFlowMux.Lock() publisher.disablePublishDueToFlowMux.Lock()
if ok { if ok {
publisher.logger.Printf("pausing publishing due to flow request from server") publisher.logger.Printf("pausing publishing due to flow request from server")


Loading…
Cancel
Save