From 26cc90b71a689118e2c937dbde031a877fe9ead2 Mon Sep 17 00:00:00 2001 From: wagslane Date: Tue, 7 Dec 2021 13:34:20 -0700 Subject: [PATCH] fix publishing confirms --- channel.go | 12 +++++----- consume.go | 2 +- examples/publisher/main.go | 3 +-- publish.go | 46 ++++++++++++++++---------------------- 4 files changed, 26 insertions(+), 37 deletions(-) diff --git a/channel.go b/channel.go index 42c3853..aee8515 100644 --- a/channel.go +++ b/channel.go @@ -93,18 +93,16 @@ func (chManager *channelManager) reconnectWithBackoff() { // reconnect safely closes the current channel and obtains a new one func (chManager *channelManager) reconnect() error { - err := chManager.close() - if err != nil { - return err - } - + chManager.channelMux.Lock() + defer chManager.channelMux.Unlock() newConn, newChannel, err := getNewChannel(chManager.url, chManager.config) if err != nil { return err } - chManager.channelMux.Lock() - defer chManager.channelMux.Unlock() + chManager.channel.Close() + chManager.connection.Close() + chManager.connection = newConn chManager.channel = newChannel go chManager.startNotifyCancelOrClosed() diff --git a/consume.go b/consume.go index c8b9c75..77bca61 100644 --- a/consume.go +++ b/consume.go @@ -107,7 +107,7 @@ func (consumer Consumer) StartConsuming( go func() { for err := range consumer.chManager.notifyCancelOrClose { - consumer.logger.Printf("consume cancel/close handler triggered. err: %v", err) + consumer.logger.Printf("gorabbit: successful recovery from: %v", err) consumer.startGoroutinesWithRetries( handler, queue, diff --git a/examples/publisher/main.go b/examples/publisher/main.go index c89da82..0e222a3 100644 --- a/examples/publisher/main.go +++ b/examples/publisher/main.go @@ -16,7 +16,6 @@ func main() { publisher, err := rabbitmq.NewPublisher( "amqp://guest:guest@localhost", amqp.Config{}, rabbitmq.WithPublisherOptionsLogging, - rabbitmq.WithPublishOptionsConfirmPublishings, ) if err != nil { log.Fatal(err) @@ -64,7 +63,7 @@ func main() { rabbitmq.WithPublishOptionsExchange("events"), ) if err != nil { - log.Fatal(err) + log.Println(err) } case <-done: fmt.Println("stopping publisher") diff --git a/publish.go b/publish.go index ec464d0..51d7449 100644 --- a/publish.go +++ b/publish.go @@ -50,9 +50,8 @@ type Publisher struct { // PublisherOptions are used to describe a publisher's configuration. // Logging set to true will enable the consumer to print to stdout type PublisherOptions struct { - Logging bool - Logger Logger - ConfirmPublishings bool + Logging bool + Logger Logger } // WithPublisherOptionsLogging sets logging to true on the consumer options @@ -61,11 +60,6 @@ func WithPublisherOptionsLogging(options *PublisherOptions) { options.Logger = &stdLogger{} } -// WithPublishOptionsConfirmPublishings allows NotifyPublish to work -func WithPublishOptionsConfirmPublishings(options *PublisherOptions) { - options.ConfirmPublishings = true -} - // WithPublisherOptionsLogger sets logging to a custom interface. // Use WithPublisherOptionsLogging to just log to stdout. func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) { @@ -80,7 +74,7 @@ func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) { // on the channel of Returns that you should setup a listener on. // Flow controls are automatically handled as they are sent from the server, and publishing // will fail with an error when the server is requesting a slowdown -func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*PublisherOptions)) (Publisher, error) { +func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*PublisherOptions)) (*Publisher, error) { options := &PublisherOptions{} for _, optionFunc := range optionFuncs { optionFunc(options) @@ -91,10 +85,10 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher chManager, err := newChannelManager(url, config, options.Logger) if err != nil { - return Publisher{}, err + return nil, err } - publisher := Publisher{ + publisher := &Publisher{ chManager: chManager, disablePublishDueToFlow: false, disablePublishDueToFlowMux: &sync.RWMutex{}, @@ -105,23 +99,24 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher go publisher.startNotifyFlowHandler() - // restart notifiers when cancel/close is triggered - go func() { - for err := range publisher.chManager.notifyCancelOrClose { - publisher.options.Logger.Printf("publish cancel/close handler triggered. err: %v", err) - go publisher.startNotifyFlowHandler() - if publisher.notifyReturnChan != nil { - go publisher.startNotifyReturnHandler() - } - if publisher.notifyPublishChan != nil && publisher.options.ConfirmPublishings { - go publisher.startNotifyPublishHandler() - } - } - }() + go publisher.handleRestarts() return publisher, nil } +func (publisher *Publisher) handleRestarts() { + for err := range publisher.chManager.notifyCancelOrClose { + publisher.options.Logger.Printf("gorabbit: successful publisher recovery from: %v", err) + go publisher.startNotifyFlowHandler() + if publisher.notifyReturnChan != nil { + go publisher.startNotifyReturnHandler() + } + if publisher.notifyPublishChan != nil { + go publisher.startNotifyPublishHandler() + } + } +} + // NotifyReturn registers a listener for basic.return methods. // These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags. func (publisher *Publisher) NotifyReturn() <-chan Return { @@ -132,9 +127,6 @@ func (publisher *Publisher) NotifyReturn() <-chan Return { // NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option func (publisher *Publisher) NotifyPublish() <-chan Confirmation { - if !publisher.options.ConfirmPublishings { - return nil - } publisher.notifyPublishChan = make(chan Confirmation) go publisher.startNotifyPublishHandler() return publisher.notifyPublishChan