Browse Source

fix notify publish

pull/138/head v0.12.4
wagslane 2 years ago
parent
commit
d6212f3470
1 changed files with 29 additions and 22 deletions
  1. +29
    -22
      publish.go

+ 29
- 22
publish.go View File

@ -104,6 +104,12 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe
return nil, err
}
if options.ConfirmMode {
publisher.NotifyPublish(func(_ Confirmation) {
// set a blank handler to set the channel in confirm mode
})
}
go func() {
for err := range publisher.reconnectErrCh {
publisher.options.Logger.Infof("successful publisher recovery from: %v", err)
@ -113,15 +119,11 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe
publisher.options.Logger.Fatalf("publisher closing, unable to recover")
return
}
go publisher.startReturnHandler()
go publisher.startPublishHandler()
publisher.startReturnHandler()
publisher.startPublishHandler()
}
}()
if options.ConfirmMode {
publisher.NotifyPublish(func(_ Confirmation) {})
}
return publisher, nil
}
@ -210,6 +212,7 @@ func (publisher *Publisher) PublishWithContext(
// if the publisher is in confirm mode (which can be either done by calling `NotifyPublish` with a custom handler
// or by using `WithPublisherOptionsConfirm`) a publisher confirmation is returned.
// This confirmation can be used to check if the message was actually published or wait for this to happen.
// If the publisher is not in confirm mode, the returned confirmation will always be nil.
func (publisher *Publisher) PublishWithDeferredConfirmWithContext(
ctx context.Context,
data []byte,
@ -299,7 +302,7 @@ func (publisher *Publisher) NotifyReturn(handler func(r Return)) {
publisher.handlerMux.Unlock()
if start {
go publisher.startReturnHandler()
publisher.startReturnHandler()
}
}
@ -308,12 +311,12 @@ func (publisher *Publisher) NotifyReturn(handler func(r Return)) {
// publishers on the same connection keep that in mind
func (publisher *Publisher) NotifyPublish(handler func(p Confirmation)) {
publisher.handlerMux.Lock()
start := publisher.notifyPublishHandler == nil
shouldStart := publisher.notifyPublishHandler == nil
publisher.notifyPublishHandler = handler
publisher.handlerMux.Unlock()
if start {
go publisher.startPublishHandler()
if shouldStart {
publisher.startPublishHandler()
}
}
@ -325,10 +328,12 @@ func (publisher *Publisher) startReturnHandler() {
}
publisher.handlerMux.Unlock()
returns := publisher.chanManager.NotifyReturnSafe(make(chan amqp.Return, 1))
for ret := range returns {
go publisher.notifyReturnHandler(Return{ret})
}
go func() {
returns := publisher.chanManager.NotifyReturnSafe(make(chan amqp.Return, 1))
for ret := range returns {
go publisher.notifyReturnHandler(Return{ret})
}
}()
}
func (publisher *Publisher) startPublishHandler() {
@ -338,13 +343,15 @@ func (publisher *Publisher) startPublishHandler() {
return
}
publisher.handlerMux.Unlock()
publisher.chanManager.ConfirmSafe(false)
confirmationCh := publisher.chanManager.NotifyPublishSafe(make(chan amqp.Confirmation, 1))
for conf := range confirmationCh {
go publisher.notifyPublishHandler(Confirmation{
Confirmation: conf,
ReconnectionCount: int(publisher.chanManager.GetReconnectionCount()),
})
}
go func() {
confirmationCh := publisher.chanManager.NotifyPublishSafe(make(chan amqp.Confirmation, 1))
for conf := range confirmationCh {
go publisher.notifyPublishHandler(Confirmation{
Confirmation: conf,
ReconnectionCount: int(publisher.chanManager.GetReconnectionCount()),
})
}
}()
}

Loading…
Cancel
Save