From 6540e6b99ae0f4893df06972516b1b6a07f051ba Mon Sep 17 00:00:00 2001 From: wagslane Date: Tue, 10 May 2022 10:14:15 -0600 Subject: [PATCH] add blocking support --- publish.go | 46 +++++++++++++++++-------------------------- publish_flow_block.go | 43 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 28 deletions(-) create mode 100644 publish_flow_block.go diff --git a/publish.go b/publish.go index 13db87a..4e3046f 100644 --- a/publish.go +++ b/publish.go @@ -47,6 +47,9 @@ type Publisher struct { disablePublishDueToFlow bool disablePublishDueToFlowMux *sync.RWMutex + disablePublishDueToBlocked bool + disablePublishDueToBlockedMux *sync.RWMutex + options PublisherOptions } @@ -102,12 +105,14 @@ func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptio } publisher := &Publisher{ - chManager: chManager, - disablePublishDueToFlow: false, - disablePublishDueToFlowMux: &sync.RWMutex{}, - options: *options, - notifyReturnChan: nil, - notifyPublishChan: nil, + chManager: chManager, + disablePublishDueToFlow: false, + disablePublishDueToFlowMux: &sync.RWMutex{}, + disablePublishDueToBlocked: false, + disablePublishDueToBlockedMux: &sync.RWMutex{}, + options: *options, + notifyReturnChan: nil, + notifyPublishChan: nil, } go publisher.startNotifyFlowHandler() @@ -152,10 +157,16 @@ func (publisher *Publisher) Publish( optionFuncs ...func(*PublishOptions), ) error { publisher.disablePublishDueToFlowMux.RLock() + defer publisher.disablePublishDueToFlowMux.RUnlock() if publisher.disablePublishDueToFlow { return fmt.Errorf("publishing blocked due to high flow on the server") } - publisher.disablePublishDueToFlowMux.RUnlock() + + publisher.disablePublishDueToBlockedMux.RLock() + defer publisher.disablePublishDueToBlockedMux.RUnlock() + if publisher.disablePublishDueToBlocked { + return fmt.Errorf("publishing blocked due to TCP block on the server") + } options := &PublishOptions{} for _, optionFunc := range optionFuncs { @@ -204,27 +215,6 @@ func (publisher Publisher) Close() error { return publisher.chManager.close() } -func (publisher *Publisher) startNotifyFlowHandler() { - notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool)) - publisher.disablePublishDueToFlowMux.Lock() - publisher.disablePublishDueToFlow = false - publisher.disablePublishDueToFlowMux.Unlock() - - // Listeners for active=true flow control. When true is sent to a listener, - // publishing should pause until false is sent to listeners. - for ok := range notifyFlowChan { - publisher.disablePublishDueToFlowMux.Lock() - if ok { - publisher.options.Logger.Printf("pausing publishing due to flow request from server") - publisher.disablePublishDueToFlow = true - } else { - publisher.disablePublishDueToFlow = false - publisher.options.Logger.Printf("resuming publishing due to flow request from server") - } - publisher.disablePublishDueToFlowMux.Unlock() - } -} - func (publisher *Publisher) startNotifyReturnHandler() { returnAMQPCh := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1)) for ret := range returnAMQPCh { diff --git a/publish_flow_block.go b/publish_flow_block.go new file mode 100644 index 0000000..9d70fe6 --- /dev/null +++ b/publish_flow_block.go @@ -0,0 +1,43 @@ +package rabbitmq + +import ( + amqp "github.com/rabbitmq/amqp091-go" +) + +func (publisher *Publisher) startNotifyFlowHandler() { + notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool)) + publisher.disablePublishDueToFlowMux.Lock() + publisher.disablePublishDueToFlow = false + publisher.disablePublishDueToFlowMux.Unlock() + + for ok := range notifyFlowChan { + publisher.disablePublishDueToFlowMux.Lock() + if ok { + publisher.options.Logger.Printf("pausing publishing due to flow request from server") + publisher.disablePublishDueToFlow = true + } else { + publisher.disablePublishDueToFlow = false + publisher.options.Logger.Printf("resuming publishing due to flow request from server") + } + publisher.disablePublishDueToFlowMux.Unlock() + } +} + +func (publisher *Publisher) startNotifyBlockedHandler() { + blockings := publisher.chManager.connection.NotifyBlocked(make(chan amqp.Blocking)) + publisher.disablePublishDueToBlockedMux.Lock() + publisher.disablePublishDueToBlocked = false + publisher.disablePublishDueToBlockedMux.Unlock() + + for b := range blockings { + publisher.disablePublishDueToBlockedMux.Lock() + if b.Active { + publisher.options.Logger.Printf("pausing publishing due to TCP blocking from server") + publisher.disablePublishDueToBlocked = true + } else { + publisher.disablePublishDueToBlocked = false + publisher.options.Logger.Printf("resuming publishing due to TCP blocking from server") + } + publisher.disablePublishDueToBlockedMux.Unlock() + } +}