Browse Source

Add backoff duration option

pull/39/head
Erik Wolf 4 years ago
parent
commit
886e6255c0
3 changed files with 26 additions and 9 deletions
  1. +5
    -5
      channel.go
  2. +10
    -2
      consume.go
  3. +11
    -2
      publish.go

+ 5
- 5
channel.go View File

@ -10,6 +10,7 @@ import (
type channelManager struct {
logger Logger
backoff time.Duration
url string
channel *amqp.Channel
connection *amqp.Connection
@ -18,7 +19,7 @@ type channelManager struct {
notifyCancelOrClose chan error
}
func newChannelManager(url string, conf amqp.Config, log Logger) (*channelManager, error) {
func newChannelManager(url string, conf amqp.Config, log Logger, backoff time.Duration) (*channelManager, error) {
conn, ch, err := getNewChannel(url, conf)
if err != nil {
return nil, err
@ -26,6 +27,7 @@ func newChannelManager(url string, conf amqp.Config, log Logger) (*channelManage
chManager := channelManager{
logger: log,
backoff: backoff,
url: url,
connection: conn,
channel: ch,
@ -77,11 +79,9 @@ func (chManager *channelManager) startNotifyCancelOrClosed() {
// reconnectWithBackoff continuously attempts to reconnect with an
// exponential backoff strategy
func (chManager *channelManager) reconnectWithBackoff() {
backoffTime := time.Second
for {
chManager.logger.Printf("waiting %s seconds to attempt to reconnect to amqp server", backoffTime)
time.Sleep(backoffTime)
backoffTime *= 2
chManager.logger.Printf("waiting %s seconds to attempt to reconnect to amqp server", chManager.backoff)
time.Sleep(chManager.backoff)
err := chManager.reconnect()
if err != nil {
chManager.logger.Printf("error reconnecting to amqp server: %v", err)


+ 10
- 2
consume.go View File

@ -34,6 +34,7 @@ type Consumer struct {
type ConsumerOptions struct {
Logging bool
Logger Logger
Backoff time.Duration
}
// Delivery captures the fields for a previously delivered message resident in
@ -45,7 +46,7 @@ type Delivery struct {
// NewConsumer returns a new Consumer connected to the given rabbitmq server
func NewConsumer(url string, config amqp.Config, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) {
options := &ConsumerOptions{}
options := &ConsumerOptions{Backoff: 1 * time.Second}
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
@ -53,7 +54,7 @@ func NewConsumer(url string, config amqp.Config, optionFuncs ...func(*ConsumerOp
options.Logger = &noLogger{} // default no logging
}
chManager, err := newChannelManager(url, config, options.Logger)
chManager, err := newChannelManager(url, config, options.Logger, options.Backoff)
if err != nil {
return Consumer{}, err
}
@ -79,6 +80,13 @@ func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) {
}
}
// WithConsumerOptionsBackoff sets the duration to wait until a new reconnection try.
func WithConsumerOptionsBackoff(backoff time.Duration) func(options *ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Backoff = backoff
}
}
// StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency".
// Each goroutine spawns a handler that consumes off of the qiven queue which binds to the routing key(s).
// The provided handler is called once for each message. If the provided queue doesn't exist, it


+ 11
- 2
publish.go View File

@ -3,6 +3,7 @@ package rabbitmq
import (
"fmt"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
@ -112,6 +113,7 @@ type Publisher struct {
type PublisherOptions struct {
Logging bool
Logger Logger
Backoff time.Duration
}
// WithPublisherOptionsLogging sets logging to true on the consumer options
@ -129,13 +131,20 @@ func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) {
}
}
// WithPublisherOptionsBackoff sets the duration to wait until a new reconnection try.
func WithPublisherOptionsBackoff(backoff time.Duration) func(options *PublisherOptions) {
return func(options *PublisherOptions) {
options.Backoff = backoff
}
}
// NewPublisher returns a new publisher with an open channel to the cluster.
// If you plan to enforce mandatory or immediate publishing, those failures will be reported
// on the channel of Returns that you should setup a listener on.
// Flow controls are automatically handled as they are sent from the server, and publishing
// will fail with an error when the server is requesting a slowdown
func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*PublisherOptions)) (Publisher, <-chan Return, error) {
options := &PublisherOptions{}
options := &PublisherOptions{Backoff: 1 * time.Second}
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
@ -143,7 +152,7 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher
options.Logger = &noLogger{} // default no logging
}
chManager, err := newChannelManager(url, config, options.Logger)
chManager, err := newChannelManager(url, config, options.Logger, options.Backoff)
if err != nil {
return Publisher{}, nil, err
}


Loading…
Cancel
Save