Browse Source

reconnectionCount

pull/73/head v0.8.1
wagslane 4 years ago
parent
commit
444180b142
4 changed files with 11 additions and 7 deletions
  1. +1
    -1
      README.md
  2. +3
    -1
      channel.go
  3. +0
    -3
      examples/consumer/main.go
  4. +7
    -2
      publish.go

+ 1
- 1
README.md View File

@ -2,7 +2,7 @@
Wrapper of [rabbitmq/amqp091-go](https://github.com/rabbitmq/amqp091-go) that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful ⭐ Wrapper of [rabbitmq/amqp091-go](https://github.com/rabbitmq/amqp091-go) that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful ⭐
Supported by [Qvault](https://qvault.io)
Supported by [Boot.dev](https://boot.dev)
[![](https://godoc.org/github.com/wagslane/go-rabbitmq?status.svg)](https://godoc.org/github.com/wagslane/go-rabbitmq)![Deploy](https://github.com/wagslane/go-rabbitmq/workflows/Tests/badge.svg) [![](https://godoc.org/github.com/wagslane/go-rabbitmq?status.svg)](https://godoc.org/github.com/wagslane/go-rabbitmq)![Deploy](https://github.com/wagslane/go-rabbitmq/workflows/Tests/badge.svg)


+ 3
- 1
channel.go View File

@ -17,6 +17,7 @@ type channelManager struct {
channelMux *sync.RWMutex channelMux *sync.RWMutex
notifyCancelOrClose chan error notifyCancelOrClose chan error
reconnectInterval time.Duration reconnectInterval time.Duration
reconnectionCount uint
} }
func newChannelManager(url string, conf Config, log Logger, reconnectInterval time.Duration) (*channelManager, error) { func newChannelManager(url string, conf Config, log Logger, reconnectInterval time.Duration) (*channelManager, error) {
@ -86,6 +87,8 @@ func (chManager *channelManager) reconnectLoop() {
if err != nil { if err != nil {
chManager.logger.Printf("error reconnecting to amqp server: %v", err) chManager.logger.Printf("error reconnecting to amqp server: %v", err)
} else { } else {
chManager.reconnectionCount++
go chManager.startNotifyCancelOrClosed()
return return
} }
} }
@ -105,7 +108,6 @@ func (chManager *channelManager) reconnect() error {
chManager.connection = newConn chManager.connection = newConn
chManager.channel = newChannel chManager.channel = newChannel
go chManager.startNotifyCancelOrClosed()
return nil return nil
} }


+ 0
- 3
examples/consumer/main.go View File

@ -27,9 +27,6 @@ func main() {
} }
}() }()
// wait for server to acknowledge the cancel
const noWait = false
err = consumer.StartConsuming( err = consumer.StartConsuming(
func(d rabbitmq.Delivery) rabbitmq.Action { func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed: %v", string(d.Body)) log.Printf("consumed: %v", string(d.Body))


+ 7
- 2
publish.go View File

@ -30,9 +30,11 @@ type Return struct {
} }
// Confirmation notifies the acknowledgment or negative acknowledgement of a publishing identified by its delivery tag. // Confirmation notifies the acknowledgment or negative acknowledgement of a publishing identified by its delivery tag.
// Use NotifyPublish to consume these events.
// Use NotifyPublish to consume these events. ReconnectionCount is useful in that each time it increments, the DeliveryTag
// is reset to 0, meaning you can use ReconnectionCount+DeliveryTag to ensure uniqueness
type Confirmation struct { type Confirmation struct {
amqp.Confirmation amqp.Confirmation
ReconnectionCount int
} }
// Publisher allows you to publish messages safely across an open connection // Publisher allows you to publish messages safely across an open connection
@ -235,7 +237,10 @@ func (publisher *Publisher) startNotifyPublishHandler() {
go func() { go func() {
publishAMQPCh := publisher.chManager.channel.NotifyPublish(make(chan amqp.Confirmation, 1)) publishAMQPCh := publisher.chManager.channel.NotifyPublish(make(chan amqp.Confirmation, 1))
for conf := range publishAMQPCh { for conf := range publishAMQPCh {
publisher.notifyPublishChan <- Confirmation{conf}
publisher.notifyPublishChan <- Confirmation{
Confirmation: conf,
ReconnectionCount: int(publisher.chManager.reconnectionCount),
}
} }
}() }()
} }

Loading…
Cancel
Save