diff --git a/publish.go b/publish.go index 59526d7..a58b48d 100644 --- a/publish.go +++ b/publish.go @@ -104,6 +104,12 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe return nil, err } + if options.ConfirmMode { + publisher.NotifyPublish(func(_ Confirmation) { + // set a blank handler to set the channel in confirm mode + }) + } + go func() { for err := range publisher.reconnectErrCh { publisher.options.Logger.Infof("successful publisher recovery from: %v", err) @@ -113,15 +119,11 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe publisher.options.Logger.Fatalf("publisher closing, unable to recover") return } - go publisher.startReturnHandler() - go publisher.startPublishHandler() + publisher.startReturnHandler() + publisher.startPublishHandler() } }() - if options.ConfirmMode { - publisher.NotifyPublish(func(_ Confirmation) {}) - } - return publisher, nil } @@ -210,6 +212,7 @@ func (publisher *Publisher) PublishWithContext( // if the publisher is in confirm mode (which can be either done by calling `NotifyPublish` with a custom handler // or by using `WithPublisherOptionsConfirm`) a publisher confirmation is returned. // This confirmation can be used to check if the message was actually published or wait for this to happen. +// If the publisher is not in confirm mode, the returned confirmation will always be nil. func (publisher *Publisher) PublishWithDeferredConfirmWithContext( ctx context.Context, data []byte, @@ -299,7 +302,7 @@ func (publisher *Publisher) NotifyReturn(handler func(r Return)) { publisher.handlerMux.Unlock() if start { - go publisher.startReturnHandler() + publisher.startReturnHandler() } } @@ -308,12 +311,12 @@ func (publisher *Publisher) NotifyReturn(handler func(r Return)) { // publishers on the same connection keep that in mind func (publisher *Publisher) NotifyPublish(handler func(p Confirmation)) { publisher.handlerMux.Lock() - start := publisher.notifyPublishHandler == nil + shouldStart := publisher.notifyPublishHandler == nil publisher.notifyPublishHandler = handler publisher.handlerMux.Unlock() - if start { - go publisher.startPublishHandler() + if shouldStart { + publisher.startPublishHandler() } } @@ -325,10 +328,12 @@ func (publisher *Publisher) startReturnHandler() { } publisher.handlerMux.Unlock() - returns := publisher.chanManager.NotifyReturnSafe(make(chan amqp.Return, 1)) - for ret := range returns { - go publisher.notifyReturnHandler(Return{ret}) - } + go func() { + returns := publisher.chanManager.NotifyReturnSafe(make(chan amqp.Return, 1)) + for ret := range returns { + go publisher.notifyReturnHandler(Return{ret}) + } + }() } func (publisher *Publisher) startPublishHandler() { @@ -338,13 +343,15 @@ func (publisher *Publisher) startPublishHandler() { return } publisher.handlerMux.Unlock() - publisher.chanManager.ConfirmSafe(false) - confirmationCh := publisher.chanManager.NotifyPublishSafe(make(chan amqp.Confirmation, 1)) - for conf := range confirmationCh { - go publisher.notifyPublishHandler(Confirmation{ - Confirmation: conf, - ReconnectionCount: int(publisher.chanManager.GetReconnectionCount()), - }) - } + + go func() { + confirmationCh := publisher.chanManager.NotifyPublishSafe(make(chan amqp.Confirmation, 1)) + for conf := range confirmationCh { + go publisher.notifyPublishHandler(Confirmation{ + Confirmation: conf, + ReconnectionCount: int(publisher.chanManager.GetReconnectionCount()), + }) + } + }() }