diff --git a/Makefile b/Makefile index 517467f..f8c79c1 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -all: test fmt vet lint staticcheck +all: test vet lint staticcheck test: go test ./... diff --git a/README.md b/README.md index b50b9aa..de32374 100644 --- a/README.md +++ b/README.md @@ -132,6 +132,10 @@ go func() { See the [examples](examples) directory for more ideas. +## Stability + +Note that the API is currently in `v0`. I don't plan on any huge changes, but there may be some small breaking changes before we hit `v1`. + ## 💬 Contact [![Twitter Follow](https://img.shields.io/twitter/follow/wagslane.svg?label=Follow%20Wagslane&style=social)](https://twitter.com/intent/follow?screen_name=wagslane) diff --git a/channel.go b/channel.go index 06e77bf..d869db0 100644 --- a/channel.go +++ b/channel.go @@ -13,12 +13,13 @@ type channelManager struct { url string channel *amqp.Channel connection *amqp.Connection - config amqp.Config + amqpConfig amqp.Config channelMux *sync.RWMutex notifyCancelOrClose chan error + reconnectInterval time.Duration } -func newChannelManager(url string, conf amqp.Config, log Logger) (*channelManager, error) { +func newChannelManager(url string, conf amqp.Config, log Logger, reconnectInterval time.Duration) (*channelManager, error) { conn, ch, err := getNewChannel(url, conf) if err != nil { return nil, err @@ -30,8 +31,9 @@ func newChannelManager(url string, conf amqp.Config, log Logger) (*channelManage connection: conn, channel: ch, channelMux: &sync.RWMutex{}, - config: conf, + amqpConfig: conf, notifyCancelOrClose: make(chan error), + reconnectInterval: reconnectInterval, } go chManager.startNotifyCancelOrClosed() return &chManager, nil @@ -50,8 +52,8 @@ func getNewChannel(url string, conf amqp.Config) (*amqp.Connection, *amqp.Channe } // startNotifyCancelOrClosed listens on the channel's cancelled and closed -// notifiers. When it detects a problem, it attempts to reconnect with an exponential -// backoff. Once reconnected, it sends an error back on the manager's notifyCancelOrClose +// notifiers. When it detects a problem, it attempts to reconnect. +// Once reconnected, it sends an error back on the manager's notifyCancelOrClose // channel func (chManager *channelManager) startNotifyCancelOrClosed() { notifyCloseChan := chManager.channel.NotifyClose(make(chan *amqp.Error, 1)) @@ -61,7 +63,7 @@ func (chManager *channelManager) startNotifyCancelOrClosed() { case err := <-notifyCloseChan: if err != nil && err.Server { chManager.logger.Printf("attempting to reconnect to amqp server after close") - chManager.reconnectWithBackoff() + chManager.reconnectLoop() chManager.logger.Printf("successfully reconnected to amqp server after close") chManager.notifyCancelOrClose <- err } else if err != nil && err.Reason == "EOF" { @@ -74,22 +76,25 @@ func (chManager *channelManager) startNotifyCancelOrClosed() { } else if err == nil { chManager.logger.Printf("amqp channel closed gracefully") } + if err != nil { + chManager.logger.Printf("not attempting to reconnect to amqp server because closure was initiated by the client") + } + if err == nil { + chManager.logger.Printf("amqp channel closed gracefully") + } case err := <-notifyCancelChan: chManager.logger.Printf("attempting to reconnect to amqp server after cancel") - chManager.reconnectWithBackoff() + chManager.reconnectLoop() chManager.logger.Printf("successfully reconnected to amqp server after cancel") chManager.notifyCancelOrClose <- errors.New(err) } } -// reconnectWithBackoff continuously attempts to reconnect with an -// exponential backoff strategy -func (chManager *channelManager) reconnectWithBackoff() { - backoffTime := time.Second +// reconnectLoop continuously attempts to reconnect +func (chManager *channelManager) reconnectLoop() { for { - chManager.logger.Printf("waiting %s seconds to attempt to reconnect to amqp server", backoffTime) - time.Sleep(backoffTime) - backoffTime *= 2 + chManager.logger.Printf("waiting %s seconds to attempt to reconnect to amqp server", chManager.reconnectInterval) + time.Sleep(chManager.reconnectInterval) err := chManager.reconnect() if err != nil { chManager.logger.Printf("error reconnecting to amqp server: %v", err) @@ -103,7 +108,7 @@ func (chManager *channelManager) reconnectWithBackoff() { func (chManager *channelManager) reconnect() error { chManager.channelMux.Lock() defer chManager.channelMux.Unlock() - newConn, newChannel, err := getNewChannel(chManager.url, chManager.config) + newConn, newChannel, err := getNewChannel(chManager.url, chManager.amqpConfig) if err != nil { return err } diff --git a/consume.go b/consume.go index e470bc3..99203c4 100644 --- a/consume.go +++ b/consume.go @@ -32,8 +32,9 @@ type Consumer struct { // Logging set to true will enable the consumer to print to stdout // Logger specifies a custom Logger interface implementation overruling Logging. type ConsumerOptions struct { - Logging bool - Logger Logger + Logging bool + Logger Logger + ReconnectInterval time.Duration } // Delivery captures the fields for a previously delivered message resident in @@ -45,15 +46,16 @@ type Delivery struct { // NewConsumer returns a new Consumer connected to the given rabbitmq server func NewConsumer(url string, config amqp.Config, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) { - options := &ConsumerOptions{} + options := &ConsumerOptions{ + Logging: true, + Logger: &stdLogger{}, + ReconnectInterval: time.Second * 5, + } for _, optionFunc := range optionFuncs { optionFunc(options) } - if options.Logger == nil { - options.Logger = &noLogger{} // default no logging - } - chManager, err := newChannelManager(url, config, options.Logger) + chManager, err := newChannelManager(url, config, options.Logger, options.ReconnectInterval) if err != nil { return Consumer{}, err } @@ -64,6 +66,14 @@ func NewConsumer(url string, config amqp.Config, optionFuncs ...func(*ConsumerOp return consumer, nil } +// WithConsumerOptionsReconnectInterval sets the interval at which the consumer will +// attempt to reconnect to the rabbit server +func WithConsumerOptionsReconnectInterval(reconnectInterval time.Duration) func(options *ConsumerOptions) { + return func(options *ConsumerOptions) { + options.ReconnectInterval = reconnectInterval + } +} + // WithConsumerOptionsLogging sets a logger to log to stdout func WithConsumerOptionsLogging(options *ConsumerOptions) { options.Logging = true @@ -107,13 +117,16 @@ func (consumer Consumer) StartConsuming( go func() { for err := range consumer.chManager.notifyCancelOrClose { - consumer.logger.Printf("gorabbit: successful recovery from: %v", err) - consumer.startGoroutinesWithRetries( + consumer.logger.Printf("successful recovery from: %v", err) + err = consumer.startGoroutines( handler, queue, routingKeys, *options, ) + if err != nil { + consumer.logger.Printf("error restarting consumer goroutines after cancel or close: %v", err) + } } }() return nil @@ -145,33 +158,6 @@ func (consumer Consumer) StopConsuming(consumerName string, noWait bool) { consumer.chManager.channel.Cancel(consumerName, noWait) } -// startGoroutinesWithRetries attempts to start consuming on a channel -// with an exponential backoff -func (consumer Consumer) startGoroutinesWithRetries( - handler Handler, - queue string, - routingKeys []string, - consumeOptions ConsumeOptions, -) { - backoffTime := time.Second - for { - consumer.logger.Printf("waiting %s seconds to attempt to start consumer goroutines", backoffTime) - time.Sleep(backoffTime) - backoffTime *= 2 - err := consumer.startGoroutines( - handler, - queue, - routingKeys, - consumeOptions, - ) - if err != nil { - consumer.logger.Printf("couldn't start consumer goroutines. err: %v", err) - continue - } - break - } -} - // startGoroutines declares the queue if it doesn't exist, // binds the queue to the routing key(s), and starts the goroutines // that will consume from the queue diff --git a/examples/consumer/main.go b/examples/consumer/main.go index f2fbeef..d9e63cf 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -23,7 +23,7 @@ func main() { } // wait for server to acknowledge the cancel - noWait := false + const noWait = false defer consumer.Disconnect() defer consumer.StopConsuming(consumerName, noWait) diff --git a/go.mod b/go.mod index ebf3dba..51d17a0 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,5 @@ module github.com/wagslane/go-rabbitmq -go 1.16 +go 1.17 require github.com/rabbitmq/amqp091-go v1.3.0 diff --git a/logger.go b/logger.go index ebefc1f..920afb6 100644 --- a/logger.go +++ b/logger.go @@ -19,8 +19,3 @@ type stdLogger struct{} func (l stdLogger) Printf(format string, v ...interface{}) { log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...) } - -// noLogger does not log at all, this is the default. -type noLogger struct{} - -func (l noLogger) Printf(format string, v ...interface{}) {} diff --git a/publish.go b/publish.go index 51d7449..3c7f7b3 100644 --- a/publish.go +++ b/publish.go @@ -3,6 +3,7 @@ package rabbitmq import ( "fmt" "sync" + "time" amqp "github.com/rabbitmq/amqp091-go" ) @@ -50,8 +51,17 @@ type Publisher struct { // PublisherOptions are used to describe a publisher's configuration. // Logging set to true will enable the consumer to print to stdout type PublisherOptions struct { - Logging bool - Logger Logger + Logging bool + Logger Logger + ReconnectInterval time.Duration +} + +// WithPublisherOptionsReconnectInterval sets the interval at which the publisher will +// attempt to reconnect to the rabbit server +func WithPublisherOptionsReconnectInterval(reconnectInterval time.Duration) func(options *ConsumerOptions) { + return func(options *ConsumerOptions) { + options.ReconnectInterval = reconnectInterval + } } // WithPublisherOptionsLogging sets logging to true on the consumer options @@ -75,15 +85,16 @@ func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) { // Flow controls are automatically handled as they are sent from the server, and publishing // will fail with an error when the server is requesting a slowdown func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*PublisherOptions)) (*Publisher, error) { - options := &PublisherOptions{} + options := &PublisherOptions{ + Logging: true, + Logger: &stdLogger{}, + ReconnectInterval: time.Second * 5, + } for _, optionFunc := range optionFuncs { optionFunc(options) } - if options.Logger == nil { - options.Logger = &noLogger{} // default no logging - } - chManager, err := newChannelManager(url, config, options.Logger) + chManager, err := newChannelManager(url, config, options.Logger, options.ReconnectInterval) if err != nil { return nil, err } @@ -106,7 +117,7 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher func (publisher *Publisher) handleRestarts() { for err := range publisher.chManager.notifyCancelOrClose { - publisher.options.Logger.Printf("gorabbit: successful publisher recovery from: %v", err) + publisher.options.Logger.Printf("successful publisher recovery from: %v", err) go publisher.startNotifyFlowHandler() if publisher.notifyReturnChan != nil { go publisher.startNotifyReturnHandler()