diff --git a/README.md b/README.md index dd0caff..7dc5465 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Wrapper of [rabbitmq/amqp091-go](https://github.com/rabbitmq/amqp091-go) that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful ⭐ -Supported by [Qvault](https://qvault.io) +Supported by [Boot.dev](https://boot.dev) [![](https://godoc.org/github.com/wagslane/go-rabbitmq?status.svg)](https://godoc.org/github.com/wagslane/go-rabbitmq)![Deploy](https://github.com/wagslane/go-rabbitmq/workflows/Tests/badge.svg) diff --git a/channel.go b/channel.go index 7f4479a..c2dba8d 100644 --- a/channel.go +++ b/channel.go @@ -17,6 +17,7 @@ type channelManager struct { channelMux *sync.RWMutex notifyCancelOrClose chan error reconnectInterval time.Duration + reconnectionCount uint } func newChannelManager(url string, conf Config, log Logger, reconnectInterval time.Duration) (*channelManager, error) { @@ -86,6 +87,8 @@ func (chManager *channelManager) reconnectLoop() { if err != nil { chManager.logger.Printf("error reconnecting to amqp server: %v", err) } else { + chManager.reconnectionCount++ + go chManager.startNotifyCancelOrClosed() return } } @@ -105,7 +108,6 @@ func (chManager *channelManager) reconnect() error { chManager.connection = newConn chManager.channel = newChannel - go chManager.startNotifyCancelOrClosed() return nil } diff --git a/examples/consumer/main.go b/examples/consumer/main.go index b6c1b13..21d3dad 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -27,9 +27,6 @@ func main() { } }() - // wait for server to acknowledge the cancel - const noWait = false - err = consumer.StartConsuming( func(d rabbitmq.Delivery) rabbitmq.Action { log.Printf("consumed: %v", string(d.Body)) diff --git a/publish.go b/publish.go index 3b480b1..13db87a 100644 --- a/publish.go +++ b/publish.go @@ -30,9 +30,11 @@ type Return struct { } // Confirmation notifies the acknowledgment or negative acknowledgement of a publishing identified by its delivery tag. -// Use NotifyPublish to consume these events. +// Use NotifyPublish to consume these events. ReconnectionCount is useful in that each time it increments, the DeliveryTag +// is reset to 0, meaning you can use ReconnectionCount+DeliveryTag to ensure uniqueness type Confirmation struct { amqp.Confirmation + ReconnectionCount int } // Publisher allows you to publish messages safely across an open connection @@ -235,7 +237,10 @@ func (publisher *Publisher) startNotifyPublishHandler() { go func() { publishAMQPCh := publisher.chManager.channel.NotifyPublish(make(chan amqp.Confirmation, 1)) for conf := range publishAMQPCh { - publisher.notifyPublishChan <- Confirmation{conf} + publisher.notifyPublishChan <- Confirmation{ + Confirmation: conf, + ReconnectionCount: int(publisher.chManager.reconnectionCount), + } } }() }