You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

43 lines
1.4 KiB

package rabbitmq
import (
amqp "github.com/rabbitmq/amqp091-go"
)
func (publisher *Publisher) startNotifyFlowHandler() {
notifyFlowChan := publisher.chanManager.NotifyFlowSafe(make(chan bool))
publisher.disablePublishDueToFlowMu.Lock()
publisher.disablePublishDueToFlow = false
publisher.disablePublishDueToFlowMu.Unlock()
for ok := range notifyFlowChan {
publisher.disablePublishDueToFlowMu.Lock()
if ok {
publisher.options.Logger.Warnf("pausing publishing due to flow request from server")
publisher.disablePublishDueToFlow = true
} else {
publisher.disablePublishDueToFlow = false
publisher.options.Logger.Warnf("resuming publishing due to flow request from server")
}
publisher.disablePublishDueToFlowMu.Unlock()
}
}
func (publisher *Publisher) startNotifyBlockedHandler() {
blockings := publisher.connManager.NotifyBlockedSafe(make(chan amqp.Blocking))
publisher.disablePublishDueToBlockedMu.Lock()
publisher.disablePublishDueToBlocked = false
publisher.disablePublishDueToBlockedMu.Unlock()
for b := range blockings {
publisher.disablePublishDueToBlockedMu.Lock()
if b.Active {
publisher.options.Logger.Warnf("pausing publishing due to TCP blocking from server")
publisher.disablePublishDueToBlocked = true
} else {
publisher.disablePublishDueToBlocked = false
publisher.options.Logger.Warnf("resuming publishing due to TCP blocking from server")
}
publisher.disablePublishDueToBlockedMu.Unlock()
}
}