|
|
package rabbitmq
|
|
|
|
|
|
import (
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
|
)
|
|
|
|
|
|
func (publisher *Publisher) startNotifyFlowHandler() {
|
|
|
notifyFlowChan := publisher.chanManager.NotifyFlowSafe(make(chan bool))
|
|
|
publisher.disablePublishDueToFlowMu.Lock()
|
|
|
publisher.disablePublishDueToFlow = false
|
|
|
publisher.disablePublishDueToFlowMu.Unlock()
|
|
|
|
|
|
for ok := range notifyFlowChan {
|
|
|
publisher.disablePublishDueToFlowMu.Lock()
|
|
|
if ok {
|
|
|
publisher.options.Logger.Warnf("pausing publishing due to flow request from server")
|
|
|
publisher.disablePublishDueToFlow = true
|
|
|
} else {
|
|
|
publisher.disablePublishDueToFlow = false
|
|
|
publisher.options.Logger.Warnf("resuming publishing due to flow request from server")
|
|
|
}
|
|
|
publisher.disablePublishDueToFlowMu.Unlock()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func (publisher *Publisher) startNotifyBlockedHandler() {
|
|
|
blockings := publisher.connManager.NotifyBlockedSafe(make(chan amqp.Blocking))
|
|
|
publisher.disablePublishDueToBlockedMu.Lock()
|
|
|
publisher.disablePublishDueToBlocked = false
|
|
|
publisher.disablePublishDueToBlockedMu.Unlock()
|
|
|
|
|
|
for b := range blockings {
|
|
|
publisher.disablePublishDueToBlockedMu.Lock()
|
|
|
if b.Active {
|
|
|
publisher.options.Logger.Warnf("pausing publishing due to TCP blocking from server")
|
|
|
publisher.disablePublishDueToBlocked = true
|
|
|
} else {
|
|
|
publisher.disablePublishDueToBlocked = false
|
|
|
publisher.options.Logger.Warnf("resuming publishing due to TCP blocking from server")
|
|
|
}
|
|
|
publisher.disablePublishDueToBlockedMu.Unlock()
|
|
|
}
|
|
|
}
|