Browse Source

add blocking support

pull/80/head
wagslane 4 years ago
parent
commit
6540e6b99a
2 changed files with 61 additions and 28 deletions
  1. +18
    -28
      publish.go
  2. +43
    -0
      publish_flow_block.go

+ 18
- 28
publish.go View File

@ -47,6 +47,9 @@ type Publisher struct {
disablePublishDueToFlow bool disablePublishDueToFlow bool
disablePublishDueToFlowMux *sync.RWMutex disablePublishDueToFlowMux *sync.RWMutex
disablePublishDueToBlocked bool
disablePublishDueToBlockedMux *sync.RWMutex
options PublisherOptions options PublisherOptions
} }
@ -102,12 +105,14 @@ func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptio
} }
publisher := &Publisher{ publisher := &Publisher{
chManager: chManager,
disablePublishDueToFlow: false,
disablePublishDueToFlowMux: &sync.RWMutex{},
options: *options,
notifyReturnChan: nil,
notifyPublishChan: nil,
chManager: chManager,
disablePublishDueToFlow: false,
disablePublishDueToFlowMux: &sync.RWMutex{},
disablePublishDueToBlocked: false,
disablePublishDueToBlockedMux: &sync.RWMutex{},
options: *options,
notifyReturnChan: nil,
notifyPublishChan: nil,
} }
go publisher.startNotifyFlowHandler() go publisher.startNotifyFlowHandler()
@ -152,10 +157,16 @@ func (publisher *Publisher) Publish(
optionFuncs ...func(*PublishOptions), optionFuncs ...func(*PublishOptions),
) error { ) error {
publisher.disablePublishDueToFlowMux.RLock() publisher.disablePublishDueToFlowMux.RLock()
defer publisher.disablePublishDueToFlowMux.RUnlock()
if publisher.disablePublishDueToFlow { if publisher.disablePublishDueToFlow {
return fmt.Errorf("publishing blocked due to high flow on the server") return fmt.Errorf("publishing blocked due to high flow on the server")
} }
publisher.disablePublishDueToFlowMux.RUnlock()
publisher.disablePublishDueToBlockedMux.RLock()
defer publisher.disablePublishDueToBlockedMux.RUnlock()
if publisher.disablePublishDueToBlocked {
return fmt.Errorf("publishing blocked due to TCP block on the server")
}
options := &PublishOptions{} options := &PublishOptions{}
for _, optionFunc := range optionFuncs { for _, optionFunc := range optionFuncs {
@ -204,27 +215,6 @@ func (publisher Publisher) Close() error {
return publisher.chManager.close() return publisher.chManager.close()
} }
func (publisher *Publisher) startNotifyFlowHandler() {
notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool))
publisher.disablePublishDueToFlowMux.Lock()
publisher.disablePublishDueToFlow = false
publisher.disablePublishDueToFlowMux.Unlock()
// Listeners for active=true flow control. When true is sent to a listener,
// publishing should pause until false is sent to listeners.
for ok := range notifyFlowChan {
publisher.disablePublishDueToFlowMux.Lock()
if ok {
publisher.options.Logger.Printf("pausing publishing due to flow request from server")
publisher.disablePublishDueToFlow = true
} else {
publisher.disablePublishDueToFlow = false
publisher.options.Logger.Printf("resuming publishing due to flow request from server")
}
publisher.disablePublishDueToFlowMux.Unlock()
}
}
func (publisher *Publisher) startNotifyReturnHandler() { func (publisher *Publisher) startNotifyReturnHandler() {
returnAMQPCh := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1)) returnAMQPCh := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1))
for ret := range returnAMQPCh { for ret := range returnAMQPCh {


+ 43
- 0
publish_flow_block.go View File

@ -0,0 +1,43 @@
package rabbitmq
import (
amqp "github.com/rabbitmq/amqp091-go"
)
func (publisher *Publisher) startNotifyFlowHandler() {
notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool))
publisher.disablePublishDueToFlowMux.Lock()
publisher.disablePublishDueToFlow = false
publisher.disablePublishDueToFlowMux.Unlock()
for ok := range notifyFlowChan {
publisher.disablePublishDueToFlowMux.Lock()
if ok {
publisher.options.Logger.Printf("pausing publishing due to flow request from server")
publisher.disablePublishDueToFlow = true
} else {
publisher.disablePublishDueToFlow = false
publisher.options.Logger.Printf("resuming publishing due to flow request from server")
}
publisher.disablePublishDueToFlowMux.Unlock()
}
}
func (publisher *Publisher) startNotifyBlockedHandler() {
blockings := publisher.chManager.connection.NotifyBlocked(make(chan amqp.Blocking))
publisher.disablePublishDueToBlockedMux.Lock()
publisher.disablePublishDueToBlocked = false
publisher.disablePublishDueToBlockedMux.Unlock()
for b := range blockings {
publisher.disablePublishDueToBlockedMux.Lock()
if b.Active {
publisher.options.Logger.Printf("pausing publishing due to TCP blocking from server")
publisher.disablePublishDueToBlocked = true
} else {
publisher.disablePublishDueToBlocked = false
publisher.options.Logger.Printf("resuming publishing due to TCP blocking from server")
}
publisher.disablePublishDueToBlockedMux.Unlock()
}
}

Loading…
Cancel
Save