Browse Source

Merge branch 'wagslane:main' into fix_reconnection

pull/65/head
Fedor Ortyanov 4 years ago
committed by GitHub
parent
commit
41756d67f3
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 68 additions and 67 deletions
  1. +1
    -1
      Makefile
  2. +4
    -0
      README.md
  3. +20
    -15
      channel.go
  4. +22
    -36
      consume.go
  5. +1
    -1
      examples/consumer/main.go
  6. +1
    -1
      go.mod
  7. +0
    -5
      logger.go
  8. +19
    -8
      publish.go

+ 1
- 1
Makefile View File

@ -1,4 +1,4 @@
all: test fmt vet lint staticcheck
all: test vet lint staticcheck
test:
go test ./...


+ 4
- 0
README.md View File

@ -132,6 +132,10 @@ go func() {
See the [examples](examples) directory for more ideas.
## Stability
Note that the API is currently in `v0`. I don't plan on any huge changes, but there may be some small breaking changes before we hit `v1`.
## 💬 Contact
[![Twitter Follow](https://img.shields.io/twitter/follow/wagslane.svg?label=Follow%20Wagslane&style=social)](https://twitter.com/intent/follow?screen_name=wagslane)


+ 20
- 15
channel.go View File

@ -13,12 +13,13 @@ type channelManager struct {
url string
channel *amqp.Channel
connection *amqp.Connection
config amqp.Config
amqpConfig amqp.Config
channelMux *sync.RWMutex
notifyCancelOrClose chan error
reconnectInterval time.Duration
}
func newChannelManager(url string, conf amqp.Config, log Logger) (*channelManager, error) {
func newChannelManager(url string, conf amqp.Config, log Logger, reconnectInterval time.Duration) (*channelManager, error) {
conn, ch, err := getNewChannel(url, conf)
if err != nil {
return nil, err
@ -30,8 +31,9 @@ func newChannelManager(url string, conf amqp.Config, log Logger) (*channelManage
connection: conn,
channel: ch,
channelMux: &sync.RWMutex{},
config: conf,
amqpConfig: conf,
notifyCancelOrClose: make(chan error),
reconnectInterval: reconnectInterval,
}
go chManager.startNotifyCancelOrClosed()
return &chManager, nil
@ -50,8 +52,8 @@ func getNewChannel(url string, conf amqp.Config) (*amqp.Connection, *amqp.Channe
}
// startNotifyCancelOrClosed listens on the channel's cancelled and closed
// notifiers. When it detects a problem, it attempts to reconnect with an exponential
// backoff. Once reconnected, it sends an error back on the manager's notifyCancelOrClose
// notifiers. When it detects a problem, it attempts to reconnect.
// Once reconnected, it sends an error back on the manager's notifyCancelOrClose
// channel
func (chManager *channelManager) startNotifyCancelOrClosed() {
notifyCloseChan := chManager.channel.NotifyClose(make(chan *amqp.Error, 1))
@ -61,7 +63,7 @@ func (chManager *channelManager) startNotifyCancelOrClosed() {
case err := <-notifyCloseChan:
if err != nil && err.Server {
chManager.logger.Printf("attempting to reconnect to amqp server after close")
chManager.reconnectWithBackoff()
chManager.reconnectLoop()
chManager.logger.Printf("successfully reconnected to amqp server after close")
chManager.notifyCancelOrClose <- err
} else if err != nil && err.Reason == "EOF" {
@ -74,22 +76,25 @@ func (chManager *channelManager) startNotifyCancelOrClosed() {
} else if err == nil {
chManager.logger.Printf("amqp channel closed gracefully")
}
if err != nil {
chManager.logger.Printf("not attempting to reconnect to amqp server because closure was initiated by the client")
}
if err == nil {
chManager.logger.Printf("amqp channel closed gracefully")
}
case err := <-notifyCancelChan:
chManager.logger.Printf("attempting to reconnect to amqp server after cancel")
chManager.reconnectWithBackoff()
chManager.reconnectLoop()
chManager.logger.Printf("successfully reconnected to amqp server after cancel")
chManager.notifyCancelOrClose <- errors.New(err)
}
}
// reconnectWithBackoff continuously attempts to reconnect with an
// exponential backoff strategy
func (chManager *channelManager) reconnectWithBackoff() {
backoffTime := time.Second
// reconnectLoop continuously attempts to reconnect
func (chManager *channelManager) reconnectLoop() {
for {
chManager.logger.Printf("waiting %s seconds to attempt to reconnect to amqp server", backoffTime)
time.Sleep(backoffTime)
backoffTime *= 2
chManager.logger.Printf("waiting %s seconds to attempt to reconnect to amqp server", chManager.reconnectInterval)
time.Sleep(chManager.reconnectInterval)
err := chManager.reconnect()
if err != nil {
chManager.logger.Printf("error reconnecting to amqp server: %v", err)
@ -103,7 +108,7 @@ func (chManager *channelManager) reconnectWithBackoff() {
func (chManager *channelManager) reconnect() error {
chManager.channelMux.Lock()
defer chManager.channelMux.Unlock()
newConn, newChannel, err := getNewChannel(chManager.url, chManager.config)
newConn, newChannel, err := getNewChannel(chManager.url, chManager.amqpConfig)
if err != nil {
return err
}


+ 22
- 36
consume.go View File

@ -32,8 +32,9 @@ type Consumer struct {
// Logging set to true will enable the consumer to print to stdout
// Logger specifies a custom Logger interface implementation overruling Logging.
type ConsumerOptions struct {
Logging bool
Logger Logger
Logging bool
Logger Logger
ReconnectInterval time.Duration
}
// Delivery captures the fields for a previously delivered message resident in
@ -45,15 +46,16 @@ type Delivery struct {
// NewConsumer returns a new Consumer connected to the given rabbitmq server
func NewConsumer(url string, config amqp.Config, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) {
options := &ConsumerOptions{}
options := &ConsumerOptions{
Logging: true,
Logger: &stdLogger{},
ReconnectInterval: time.Second * 5,
}
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
if options.Logger == nil {
options.Logger = &noLogger{} // default no logging
}
chManager, err := newChannelManager(url, config, options.Logger)
chManager, err := newChannelManager(url, config, options.Logger, options.ReconnectInterval)
if err != nil {
return Consumer{}, err
}
@ -64,6 +66,14 @@ func NewConsumer(url string, config amqp.Config, optionFuncs ...func(*ConsumerOp
return consumer, nil
}
// WithConsumerOptionsReconnectInterval sets the interval at which the consumer will
// attempt to reconnect to the rabbit server
func WithConsumerOptionsReconnectInterval(reconnectInterval time.Duration) func(options *ConsumerOptions) {
return func(options *ConsumerOptions) {
options.ReconnectInterval = reconnectInterval
}
}
// WithConsumerOptionsLogging sets a logger to log to stdout
func WithConsumerOptionsLogging(options *ConsumerOptions) {
options.Logging = true
@ -107,13 +117,16 @@ func (consumer Consumer) StartConsuming(
go func() {
for err := range consumer.chManager.notifyCancelOrClose {
consumer.logger.Printf("gorabbit: successful recovery from: %v", err)
consumer.startGoroutinesWithRetries(
consumer.logger.Printf("successful recovery from: %v", err)
err = consumer.startGoroutines(
handler,
queue,
routingKeys,
*options,
)
if err != nil {
consumer.logger.Printf("error restarting consumer goroutines after cancel or close: %v", err)
}
}
}()
return nil
@ -145,33 +158,6 @@ func (consumer Consumer) StopConsuming(consumerName string, noWait bool) {
consumer.chManager.channel.Cancel(consumerName, noWait)
}
// startGoroutinesWithRetries attempts to start consuming on a channel
// with an exponential backoff
func (consumer Consumer) startGoroutinesWithRetries(
handler Handler,
queue string,
routingKeys []string,
consumeOptions ConsumeOptions,
) {
backoffTime := time.Second
for {
consumer.logger.Printf("waiting %s seconds to attempt to start consumer goroutines", backoffTime)
time.Sleep(backoffTime)
backoffTime *= 2
err := consumer.startGoroutines(
handler,
queue,
routingKeys,
consumeOptions,
)
if err != nil {
consumer.logger.Printf("couldn't start consumer goroutines. err: %v", err)
continue
}
break
}
}
// startGoroutines declares the queue if it doesn't exist,
// binds the queue to the routing key(s), and starts the goroutines
// that will consume from the queue


+ 1
- 1
examples/consumer/main.go View File

@ -23,7 +23,7 @@ func main() {
}
// wait for server to acknowledge the cancel
noWait := false
const noWait = false
defer consumer.Disconnect()
defer consumer.StopConsuming(consumerName, noWait)


+ 1
- 1
go.mod View File

@ -1,5 +1,5 @@
module github.com/wagslane/go-rabbitmq
go 1.16
go 1.17
require github.com/rabbitmq/amqp091-go v1.3.0

+ 0
- 5
logger.go View File

@ -19,8 +19,3 @@ type stdLogger struct{}
func (l stdLogger) Printf(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...)
}
// noLogger does not log at all, this is the default.
type noLogger struct{}
func (l noLogger) Printf(format string, v ...interface{}) {}

+ 19
- 8
publish.go View File

@ -3,6 +3,7 @@ package rabbitmq
import (
"fmt"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
@ -50,8 +51,17 @@ type Publisher struct {
// PublisherOptions are used to describe a publisher's configuration.
// Logging set to true will enable the consumer to print to stdout
type PublisherOptions struct {
Logging bool
Logger Logger
Logging bool
Logger Logger
ReconnectInterval time.Duration
}
// WithPublisherOptionsReconnectInterval sets the interval at which the publisher will
// attempt to reconnect to the rabbit server
func WithPublisherOptionsReconnectInterval(reconnectInterval time.Duration) func(options *ConsumerOptions) {
return func(options *ConsumerOptions) {
options.ReconnectInterval = reconnectInterval
}
}
// WithPublisherOptionsLogging sets logging to true on the consumer options
@ -75,15 +85,16 @@ func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) {
// Flow controls are automatically handled as they are sent from the server, and publishing
// will fail with an error when the server is requesting a slowdown
func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*PublisherOptions)) (*Publisher, error) {
options := &PublisherOptions{}
options := &PublisherOptions{
Logging: true,
Logger: &stdLogger{},
ReconnectInterval: time.Second * 5,
}
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
if options.Logger == nil {
options.Logger = &noLogger{} // default no logging
}
chManager, err := newChannelManager(url, config, options.Logger)
chManager, err := newChannelManager(url, config, options.Logger, options.ReconnectInterval)
if err != nil {
return nil, err
}
@ -106,7 +117,7 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher
func (publisher *Publisher) handleRestarts() {
for err := range publisher.chManager.notifyCancelOrClose {
publisher.options.Logger.Printf("gorabbit: successful publisher recovery from: %v", err)
publisher.options.Logger.Printf("successful publisher recovery from: %v", err)
go publisher.startNotifyFlowHandler()
if publisher.notifyReturnChan != nil {
go publisher.startNotifyReturnHandler()


Loading…
Cancel
Save