Browse Source

notify return safety

pull/35/head
wagslane 4 years ago
parent
commit
355fdf613d
3 changed files with 32 additions and 21 deletions
  1. +2
    -1
      examples/logger/main.go
  2. +2
    -1
      examples/publisher/main.go
  3. +28
    -19
      publish.go

+ 2
- 1
examples/logger/main.go View File

@ -18,7 +18,7 @@ func (c *customLogger) Printf(fmt string, args ...interface{}) {
func main() {
mylogger := &customLogger{}
publisher, returns, err := rabbitmq.NewPublisher(
publisher, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost", amqp.Config{},
rabbitmq.WithPublisherOptionsLogger(mylogger),
)
@ -37,6 +37,7 @@ func main() {
log.Fatal(err)
}
returns := publisher.NotifyReturn()
go func() {
for r := range returns {
log.Printf("message returned from server: %s", string(r.Body))


+ 2
- 1
examples/publisher/main.go View File

@ -8,7 +8,7 @@ import (
)
func main() {
publisher, returns, err := rabbitmq.NewPublisher(
publisher, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost", amqp.Config{},
rabbitmq.WithPublisherOptionsLogging,
)
@ -27,6 +27,7 @@ func main() {
log.Fatal(err)
}
returns := publisher.NotifyReturn()
go func() {
for r := range returns {
log.Printf("message returned from server: %s", string(r.Body))


+ 28
- 19
publish.go View File

@ -99,7 +99,8 @@ func WithPublishOptionsHeaders(headers Table) func(*PublishOptions) {
type Publisher struct {
chManager *channelManager
notifyReturnChan chan Return
notifyReturnChan chan Return
shouldNotifyReturn bool
disablePublishDueToFlow bool
disablePublishDueToFlowMux *sync.RWMutex
@ -134,7 +135,7 @@ func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) {
// on the channel of Returns that you should setup a listener on.
// Flow controls are automatically handled as they are sent from the server, and publishing
// will fail with an error when the server is requesting a slowdown
func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*PublisherOptions)) (Publisher, <-chan Return, error) {
func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*PublisherOptions)) (Publisher, error) {
options := &PublisherOptions{}
for _, optionFunc := range optionFuncs {
optionFunc(options)
@ -145,26 +146,38 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher
chManager, err := newChannelManager(url, config, options.Logger)
if err != nil {
return Publisher{}, nil, err
return Publisher{}, err
}
publisher := Publisher{
chManager: chManager,
notifyReturnChan: make(chan Return, 1),
disablePublishDueToFlow: false,
disablePublishDueToFlowMux: &sync.RWMutex{},
logger: options.Logger,
notifyReturnChan: make(chan Return),
shouldNotifyReturn: false,
}
go func() {
publisher.startNotifyHandlers()
go publisher.startNotifyFlowHandler()
for err := range publisher.chManager.notifyCancelOrClose {
publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err)
publisher.startNotifyHandlers()
go publisher.startNotifyFlowHandler()
if publisher.shouldNotifyReturn {
go publisher.startNotifyReturnHandler()
}
}
}()
return publisher, publisher.notifyReturnChan, nil
return publisher, nil
}
// NotifyReturn registers a listener for basic.return methods.
// These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags.
func (publisher *Publisher) NotifyReturn() <-chan Return {
publisher.shouldNotifyReturn = true
return publisher.notifyReturnChan
}
// Publish publishes the provided data to the given routing keys over the connection
@ -217,19 +230,8 @@ func (publisher Publisher) StopPublishing() {
publisher.chManager.connection.Close()
}
func (publisher *Publisher) startNotifyHandlers() {
returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1))
go func() {
for ret := range returnAMQPChan {
publisher.notifyReturnChan <- Return{ret}
}
}()
func (publisher *Publisher) startNotifyFlowHandler() {
notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool))
go publisher.startNotifyFlowHandler(notifyFlowChan)
}
func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) {
publisher.disablePublishDueToFlowMux.Lock()
publisher.disablePublishDueToFlow = false
publisher.disablePublishDueToFlowMux.Unlock()
@ -248,3 +250,10 @@ func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) {
publisher.disablePublishDueToFlowMux.Unlock()
}
}
func (publisher *Publisher) startNotifyReturnHandler() {
returnAMQPCh := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1))
for ret := range returnAMQPCh {
publisher.notifyReturnChan <- Return{ret}
}
}

Loading…
Cancel
Save