From 73f54b2d6b7afefbf9f0c34461543efe638a5608 Mon Sep 17 00:00:00 2001 From: Brian Mori <52781411+brianmori@users.noreply.github.com> Date: Sun, 6 Jun 2021 21:51:31 +0200 Subject: [PATCH 1/4] adds the consume shutdown --- channel.go | 31 +++++++++++++++++++++---------- consume.go | 8 ++++++++ 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/channel.go b/channel.go index 4524f5a..4d7183f 100644 --- a/channel.go +++ b/channel.go @@ -12,12 +12,13 @@ type channelManager struct { logger Logger url string channel *amqp.Channel + connection *amqp.Connection channelMux *sync.RWMutex notifyCancelOrClose chan error } func newChannelManager(url string, log Logger) (*channelManager, error) { - ch, err := getNewChannel(url) + conn, ch, err := getNewChannel(url) if err != nil { return nil, err } @@ -25,6 +26,7 @@ func newChannelManager(url string, log Logger) (*channelManager, error) { chManager := channelManager{ logger: log, url: url, + connection: conn, channel: ch, channelMux: &sync.RWMutex{}, notifyCancelOrClose: make(chan error), @@ -33,16 +35,16 @@ func newChannelManager(url string, log Logger) (*channelManager, error) { return &chManager, nil } -func getNewChannel(url string) (*amqp.Channel, error) { +func getNewChannel(url string) (*amqp.Connection, *amqp.Channel, error) { amqpConn, err := amqp.Dial(url) if err != nil { - return nil, err + return nil, nil, err } ch, err := amqpConn.Channel() if err != nil { - return nil, err + return nil, nil, err } - return ch, err + return amqpConn, ch, err } // startNotifyCancelOrClosed listens on the channel's cancelled and closed @@ -56,10 +58,15 @@ func (chManager *channelManager) startNotifyCancelOrClosed() { notifyCancelChan = chManager.channel.NotifyCancel(notifyCancelChan) select { case err := <-notifyCloseChan: - chManager.logger.Printf("attempting to reconnect to amqp server after close") - chManager.reconnectWithBackoff() - chManager.logger.Printf("successfully reconnected to amqp server after close") - chManager.notifyCancelOrClose <- err + + // If the connection close is triggered by the Server, a reconnection takes place + if err.Server { + chManager.logger.Printf("attempting to reconnect to amqp server after close") + chManager.reconnectWithBackoff() + chManager.logger.Printf("successfully reconnected to amqp server after close") + chManager.notifyCancelOrClose <- err + } + case err := <-notifyCancelChan: chManager.logger.Printf("attempting to reconnect to amqp server after cancel") chManager.reconnectWithBackoff() @@ -101,11 +108,15 @@ func (chManager *channelManager) reconnectWithBackoff() { func (chManager *channelManager) reconnect() error { chManager.channelMux.Lock() defer chManager.channelMux.Unlock() - newChannel, err := getNewChannel(chManager.url) + newConn, newChannel, err := getNewChannel(chManager.url) if err != nil { return err } + chManager.channel.Close() + chManager.connection.Close() + + chManager.connection = newConn chManager.channel = newChannel go chManager.startNotifyCancelOrClosed() return nil diff --git a/consume.go b/consume.go index 1c22504..4003beb 100644 --- a/consume.go +++ b/consume.go @@ -312,6 +312,14 @@ func (consumer Consumer) StartConsuming( return nil } +// StopConsuming stop the consume of messages +func (consumer Consumer) StopConsuming() { + + consumer.chManager.channel.Close() + consumer.chManager.connection.Close() + +} + // startGoroutinesWithRetries attempts to start consuming on a channel // with an exponential backoff func (consumer Consumer) startGoroutinesWithRetries( From da13fca98714ef3472f233323a4db26758c81188 Mon Sep 17 00:00:00 2001 From: wagslane Date: Sun, 6 Jun 2021 14:48:26 -0600 Subject: [PATCH 2/4] refactor --- consume.go | 681 ++++++++++++++++----------------------------- consume_options.go | 206 ++++++++++++++ 2 files changed, 444 insertions(+), 443 deletions(-) create mode 100644 consume_options.go diff --git a/consume.go b/consume.go index 1c22504..99d8232 100644 --- a/consume.go +++ b/consume.go @@ -1,443 +1,238 @@ -package rabbitmq - -import ( - "fmt" - "time" - - "github.com/streadway/amqp" -) - -// Consumer allows you to create and connect to queues for data consumption. -type Consumer struct { - chManager *channelManager - logger Logger -} - -// ConsumerOptions are used to describe a consumer's configuration. -// Logging set to true will enable the consumer to print to stdout -// Logger specifies a custom Logger interface implementation overruling Logging. -type ConsumerOptions struct { - Logging bool - Logger Logger -} - -// Delivery captures the fields for a previously delivered message resident in -// a queue to be delivered by the server to a consumer from Channel.Consume or -// Channel.Get. -type Delivery struct { - amqp.Delivery -} - -// NewConsumer returns a new Consumer connected to the given rabbitmq server -func NewConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) { - options := &ConsumerOptions{} - for _, optionFunc := range optionFuncs { - optionFunc(options) - } - if options.Logger == nil { - options.Logger = &noLogger{} // default no logging - } - - chManager, err := newChannelManager(url, options.Logger) - if err != nil { - return Consumer{}, err - } - consumer := Consumer{ - chManager: chManager, - logger: options.Logger, - } - return consumer, nil -} - -// WithConsumerOptionsLogging sets a logger to log to stdout -func WithConsumerOptionsLogging(options *ConsumerOptions) { - options.Logging = true - options.Logger = &stdLogger{} -} - -// WithConsumerOptionsLogger sets logging to a custom interface. -// Use WithConsumerOptionsLogging to just log to stdout. -func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) { - return func(options *ConsumerOptions) { - options.Logging = true - options.Logger = log - } -} - -// getDefaultConsumeOptions descibes the options that will be used when a value isn't provided -func getDefaultConsumeOptions() ConsumeOptions { - return ConsumeOptions{ - QueueDurable: false, - QueueAutoDelete: false, - QueueExclusive: false, - QueueNoWait: false, - QueueArgs: nil, - BindingExchange: nil, - BindingNoWait: false, - BindingArgs: nil, - Concurrency: 1, - QOSPrefetch: 0, - QOSGlobal: false, - ConsumerName: "", - ConsumerAutoAck: false, - ConsumerExclusive: false, - ConsumerNoWait: false, - ConsumerNoLocal: false, - ConsumerArgs: nil, - } -} - -// ConsumeOptions are used to describe how a new consumer will be created. -type ConsumeOptions struct { - QueueDurable bool - QueueAutoDelete bool - QueueExclusive bool - QueueNoWait bool - QueueArgs Table - BindingExchange *BindingExchangeOptions - BindingNoWait bool - BindingArgs Table - Concurrency int - QOSPrefetch int - QOSGlobal bool - ConsumerName string - ConsumerAutoAck bool - ConsumerExclusive bool - ConsumerNoWait bool - ConsumerNoLocal bool - ConsumerArgs Table -} - -// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values. -func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExchangeOptions { - if options.BindingExchange == nil { - options.BindingExchange = &BindingExchangeOptions{ - Name: "", - Kind: "direct", - Durable: false, - AutoDelete: false, - Internal: false, - NoWait: false, - ExchangeArgs: nil, - } - } - return options.BindingExchange -} - -// BindingExchangeOptions are used when binding to an exchange. -// it will verify the exchange is created before binding to it. -type BindingExchangeOptions struct { - Name string - Kind string - Durable bool - AutoDelete bool - Internal bool - NoWait bool - ExchangeArgs Table -} - -// WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't -// be destroyed when the server restarts. It must only be bound to durable exchanges -func WithConsumeOptionsQueueDurable(options *ConsumeOptions) { - options.QueueDurable = true -} - -// WithConsumeOptionsQueueAutoDelete sets the queue to auto delete, which means it will -// be deleted when there are no more conusmers on it -func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions) { - options.QueueAutoDelete = true -} - -// WithConsumeOptionsQueueExclusive sets the queue to exclusive, which means -// it's are only accessible by the connection that declares it and -// will be deleted when the connection closes. Channels on other connections -// will receive an error when attempting to declare, bind, consume, purge or -// delete a queue with the same name. -func WithConsumeOptionsQueueExclusive(options *ConsumeOptions) { - options.QueueExclusive = true -} - -// WithConsumeOptionsQueueNoWait sets the queue to nowait, which means -// the queue will assume to be declared on the server. A -// channel exception will arrive if the conditions are met for existing queues -// or attempting to modify an existing queue from a different connection. -func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) { - options.QueueNoWait = true -} - -// WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes -// in the cluster will have the messages distributed amongst them for higher reliability -func WithConsumeOptionsQuorum(options *ConsumeOptions) { - if options.QueueArgs == nil { - options.QueueArgs = Table{} - } - options.QueueArgs["x-queue-type"] = "quorum" -} - -// WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to -func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Name = name - } -} - -// WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type -func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Kind = kind - } -} - -// WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag -func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Durable = true -} - -// WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag -func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).AutoDelete = true -} - -// WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag -func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Internal = true -} - -// WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag -func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).NoWait = true -} - -// WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange -func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).ExchangeArgs = args - } -} - -// WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound -// the channel will not be closed with an error. -func WithConsumeOptionsBindingNoWait(options *ConsumeOptions) { - options.BindingNoWait = true -} - -// WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that -// many goroutines will be spawned to run the provided handler on messages -func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - options.Concurrency = concurrency - } -} - -// WithConsumeOptionsQOSPrefetch returns a function that sets the prefetch count, which means that -// many messages will be fetched from the server in advance to help with throughput. -// This doesn't affect the handler, messages are still processed one at a time. -func WithConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - options.QOSPrefetch = prefetchCount - } -} - -// WithConsumeOptionsQOSGlobal sets the qos on the channel to global, which means -// these QOS settings apply to ALL existing and future -// consumers on all channels on the same connection -func WithConsumeOptionsQOSGlobal(options *ConsumeOptions) { - options.QOSGlobal = true -} - -// WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer -// if unset a random name will be given -func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - options.ConsumerName = consumerName - } -} - -// WithConsumeOptionsConsumerExclusive sets the consumer to exclusive, which means -// the server will ensure that this is the sole consumer -// from this queue. When exclusive is false, the server will fairly distribute -// deliveries across multiple consumers. -func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) { - options.ConsumerExclusive = true -} - -// WithConsumeOptionsConsumerNoWait sets the consumer to nowait, which means -// it does not wait for the server to confirm the request and -// immediately begin deliveries. If it is not possible to consume, a channel -// exception will be raised and the channel will be closed. -func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions) { - options.ConsumerNoWait = true -} - -// StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency". -// Each goroutine spawns a handler that consumes off of the qiven queue which binds to the routing key(s). -// The provided handler is called once for each message. If the provided queue doesn't exist, it -// will be created on the cluster -func (consumer Consumer) StartConsuming( - handler func(d Delivery) bool, - queue string, - routingKeys []string, - optionFuncs ...func(*ConsumeOptions), -) error { - defaultOptions := getDefaultConsumeOptions() - options := &ConsumeOptions{} - for _, optionFunc := range optionFuncs { - optionFunc(options) - } - if options.Concurrency < 1 { - options.Concurrency = defaultOptions.Concurrency - } - - err := consumer.startGoroutines( - handler, - queue, - routingKeys, - *options, - ) - if err != nil { - return err - } - - go func() { - for err := range consumer.chManager.notifyCancelOrClose { - consumer.logger.Printf("consume cancel/close handler triggered. err: %v", err) - consumer.startGoroutinesWithRetries( - handler, - queue, - routingKeys, - *options, - ) - } - }() - return nil -} - -// startGoroutinesWithRetries attempts to start consuming on a channel -// with an exponential backoff -func (consumer Consumer) startGoroutinesWithRetries( - handler func(d Delivery) bool, - queue string, - routingKeys []string, - consumeOptions ConsumeOptions, -) { - backoffTime := time.Second - for { - consumer.logger.Printf("waiting %s seconds to attempt to start consumer goroutines", backoffTime) - time.Sleep(backoffTime) - backoffTime *= 2 - err := consumer.startGoroutines( - handler, - queue, - routingKeys, - consumeOptions, - ) - if err != nil { - consumer.logger.Printf("couldn't start consumer goroutines. err: %v", err) - continue - } - break - } -} - -// startGoroutines declares the queue if it doesn't exist, -// binds the queue to the routing key(s), and starts the goroutines -// that will consume from the queue -func (consumer Consumer) startGoroutines( - handler func(d Delivery) bool, - queue string, - routingKeys []string, - consumeOptions ConsumeOptions, -) error { - consumer.chManager.channelMux.RLock() - defer consumer.chManager.channelMux.RUnlock() - - _, err := consumer.chManager.channel.QueueDeclare( - queue, - consumeOptions.QueueDurable, - consumeOptions.QueueAutoDelete, - consumeOptions.QueueExclusive, - consumeOptions.QueueNoWait, - tableToAMQPTable(consumeOptions.QueueArgs), - ) - if err != nil { - return err - } - - if consumeOptions.BindingExchange != nil { - exchange := consumeOptions.BindingExchange - if exchange.Name == "" { - return fmt.Errorf("binding to exchange but name not specified") - } - err = consumer.chManager.channel.ExchangeDeclare( - exchange.Name, - exchange.Kind, - exchange.Durable, - exchange.AutoDelete, - exchange.Internal, - exchange.NoWait, - tableToAMQPTable(exchange.ExchangeArgs), - ) - if err != nil { - return err - } - for _, routingKey := range routingKeys { - err = consumer.chManager.channel.QueueBind( - queue, - routingKey, - exchange.Name, - consumeOptions.BindingNoWait, - tableToAMQPTable(consumeOptions.BindingArgs), - ) - if err != nil { - return err - } - } - } - - err = consumer.chManager.channel.Qos( - consumeOptions.QOSPrefetch, - 0, - consumeOptions.QOSGlobal, - ) - if err != nil { - return err - } - - msgs, err := consumer.chManager.channel.Consume( - queue, - consumeOptions.ConsumerName, - consumeOptions.ConsumerAutoAck, - consumeOptions.ConsumerExclusive, - consumeOptions.ConsumerNoLocal, // no-local is not supported by RabbitMQ - consumeOptions.ConsumerNoWait, - tableToAMQPTable(consumeOptions.ConsumerArgs), - ) - if err != nil { - return err - } - - for i := 0; i < consumeOptions.Concurrency; i++ { - go func() { - for msg := range msgs { - if consumeOptions.ConsumerAutoAck { - handler(Delivery{msg}) - continue - } - if handler(Delivery{msg}) { - err := msg.Ack(false) - if err != nil { - consumer.logger.Printf("can't ack message: %v", err) - } - } else { - err := msg.Nack(false, true) - if err != nil { - consumer.logger.Printf("can't nack message: %v", err) - } - } - } - consumer.logger.Printf("rabbit consumer goroutine closed") - }() - } - consumer.logger.Printf("Processing messages on %v goroutines", consumeOptions.Concurrency) - return nil -} +package rabbitmq + +import ( + "fmt" + "time" + + "github.com/streadway/amqp" +) + +// Consumer allows you to create and connect to queues for data consumption. +type Consumer struct { + chManager *channelManager + logger Logger +} + +// ConsumerOptions are used to describe a consumer's configuration. +// Logging set to true will enable the consumer to print to stdout +// Logger specifies a custom Logger interface implementation overruling Logging. +type ConsumerOptions struct { + Logging bool + Logger Logger +} + +// Delivery captures the fields for a previously delivered message resident in +// a queue to be delivered by the server to a consumer from Channel.Consume or +// Channel.Get. +type Delivery struct { + amqp.Delivery +} + +// NewConsumer returns a new Consumer connected to the given rabbitmq server +func NewConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) { + options := &ConsumerOptions{} + for _, optionFunc := range optionFuncs { + optionFunc(options) + } + if options.Logger == nil { + options.Logger = &noLogger{} // default no logging + } + + chManager, err := newChannelManager(url, options.Logger) + if err != nil { + return Consumer{}, err + } + consumer := Consumer{ + chManager: chManager, + logger: options.Logger, + } + return consumer, nil +} + +// WithConsumerOptionsLogging sets a logger to log to stdout +func WithConsumerOptionsLogging(options *ConsumerOptions) { + options.Logging = true + options.Logger = &stdLogger{} +} + +// WithConsumerOptionsLogger sets logging to a custom interface. +// Use WithConsumerOptionsLogging to just log to stdout. +func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) { + return func(options *ConsumerOptions) { + options.Logging = true + options.Logger = log + } +} + +// StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency". +// Each goroutine spawns a handler that consumes off of the qiven queue which binds to the routing key(s). +// The provided handler is called once for each message. If the provided queue doesn't exist, it +// will be created on the cluster +func (consumer Consumer) StartConsuming( + handler func(d Delivery) bool, + queue string, + routingKeys []string, + optionFuncs ...func(*ConsumeOptions), +) error { + defaultOptions := getDefaultConsumeOptions() + options := &ConsumeOptions{} + for _, optionFunc := range optionFuncs { + optionFunc(options) + } + if options.Concurrency < 1 { + options.Concurrency = defaultOptions.Concurrency + } + + err := consumer.startGoroutines( + handler, + queue, + routingKeys, + *options, + ) + if err != nil { + return err + } + + go func() { + for err := range consumer.chManager.notifyCancelOrClose { + consumer.logger.Printf("consume cancel/close handler triggered. err: %v", err) + consumer.startGoroutinesWithRetries( + handler, + queue, + routingKeys, + *options, + ) + } + }() + return nil +} + +// startGoroutinesWithRetries attempts to start consuming on a channel +// with an exponential backoff +func (consumer Consumer) startGoroutinesWithRetries( + handler func(d Delivery) bool, + queue string, + routingKeys []string, + consumeOptions ConsumeOptions, +) { + backoffTime := time.Second + for { + consumer.logger.Printf("waiting %s seconds to attempt to start consumer goroutines", backoffTime) + time.Sleep(backoffTime) + backoffTime *= 2 + err := consumer.startGoroutines( + handler, + queue, + routingKeys, + consumeOptions, + ) + if err != nil { + consumer.logger.Printf("couldn't start consumer goroutines. err: %v", err) + continue + } + break + } +} + +// startGoroutines declares the queue if it doesn't exist, +// binds the queue to the routing key(s), and starts the goroutines +// that will consume from the queue +func (consumer Consumer) startGoroutines( + handler func(d Delivery) bool, + queue string, + routingKeys []string, + consumeOptions ConsumeOptions, +) error { + consumer.chManager.channelMux.RLock() + defer consumer.chManager.channelMux.RUnlock() + + _, err := consumer.chManager.channel.QueueDeclare( + queue, + consumeOptions.QueueDurable, + consumeOptions.QueueAutoDelete, + consumeOptions.QueueExclusive, + consumeOptions.QueueNoWait, + tableToAMQPTable(consumeOptions.QueueArgs), + ) + if err != nil { + return err + } + + if consumeOptions.BindingExchange != nil { + exchange := consumeOptions.BindingExchange + if exchange.Name == "" { + return fmt.Errorf("binding to exchange but name not specified") + } + err = consumer.chManager.channel.ExchangeDeclare( + exchange.Name, + exchange.Kind, + exchange.Durable, + exchange.AutoDelete, + exchange.Internal, + exchange.NoWait, + tableToAMQPTable(exchange.ExchangeArgs), + ) + if err != nil { + return err + } + for _, routingKey := range routingKeys { + err = consumer.chManager.channel.QueueBind( + queue, + routingKey, + exchange.Name, + consumeOptions.BindingNoWait, + tableToAMQPTable(consumeOptions.BindingArgs), + ) + if err != nil { + return err + } + } + } + + err = consumer.chManager.channel.Qos( + consumeOptions.QOSPrefetch, + 0, + consumeOptions.QOSGlobal, + ) + if err != nil { + return err + } + + msgs, err := consumer.chManager.channel.Consume( + queue, + consumeOptions.ConsumerName, + consumeOptions.ConsumerAutoAck, + consumeOptions.ConsumerExclusive, + consumeOptions.ConsumerNoLocal, // no-local is not supported by RabbitMQ + consumeOptions.ConsumerNoWait, + tableToAMQPTable(consumeOptions.ConsumerArgs), + ) + if err != nil { + return err + } + + for i := 0; i < consumeOptions.Concurrency; i++ { + go func() { + for msg := range msgs { + if consumeOptions.ConsumerAutoAck { + handler(Delivery{msg}) + continue + } + if handler(Delivery{msg}) { + err := msg.Ack(false) + if err != nil { + consumer.logger.Printf("can't ack message: %v", err) + } + } else { + err := msg.Nack(false, true) + if err != nil { + consumer.logger.Printf("can't nack message: %v", err) + } + } + } + consumer.logger.Printf("rabbit consumer goroutine closed") + }() + } + consumer.logger.Printf("Processing messages on %v goroutines", consumeOptions.Concurrency) + return nil +} diff --git a/consume_options.go b/consume_options.go new file mode 100644 index 0000000..be00355 --- /dev/null +++ b/consume_options.go @@ -0,0 +1,206 @@ +package rabbitmq + +// getDefaultConsumeOptions descibes the options that will be used when a value isn't provided +func getDefaultConsumeOptions() ConsumeOptions { + return ConsumeOptions{ + QueueDurable: false, + QueueAutoDelete: false, + QueueExclusive: false, + QueueNoWait: false, + QueueArgs: nil, + BindingExchange: nil, + BindingNoWait: false, + BindingArgs: nil, + Concurrency: 1, + QOSPrefetch: 0, + QOSGlobal: false, + ConsumerName: "", + ConsumerAutoAck: false, + ConsumerExclusive: false, + ConsumerNoWait: false, + ConsumerNoLocal: false, + ConsumerArgs: nil, + } +} + +// ConsumeOptions are used to describe how a new consumer will be created. +type ConsumeOptions struct { + QueueDurable bool + QueueAutoDelete bool + QueueExclusive bool + QueueNoWait bool + QueueArgs Table + BindingExchange *BindingExchangeOptions + BindingNoWait bool + BindingArgs Table + Concurrency int + QOSPrefetch int + QOSGlobal bool + ConsumerName string + ConsumerAutoAck bool + ConsumerExclusive bool + ConsumerNoWait bool + ConsumerNoLocal bool + ConsumerArgs Table +} + +// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values. +func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExchangeOptions { + if options.BindingExchange == nil { + options.BindingExchange = &BindingExchangeOptions{ + Name: "", + Kind: "direct", + Durable: false, + AutoDelete: false, + Internal: false, + NoWait: false, + ExchangeArgs: nil, + } + } + return options.BindingExchange +} + +// BindingExchangeOptions are used when binding to an exchange. +// it will verify the exchange is created before binding to it. +type BindingExchangeOptions struct { + Name string + Kind string + Durable bool + AutoDelete bool + Internal bool + NoWait bool + ExchangeArgs Table +} + +// WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't +// be destroyed when the server restarts. It must only be bound to durable exchanges +func WithConsumeOptionsQueueDurable(options *ConsumeOptions) { + options.QueueDurable = true +} + +// WithConsumeOptionsQueueAutoDelete sets the queue to auto delete, which means it will +// be deleted when there are no more conusmers on it +func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions) { + options.QueueAutoDelete = true +} + +// WithConsumeOptionsQueueExclusive sets the queue to exclusive, which means +// it's are only accessible by the connection that declares it and +// will be deleted when the connection closes. Channels on other connections +// will receive an error when attempting to declare, bind, consume, purge or +// delete a queue with the same name. +func WithConsumeOptionsQueueExclusive(options *ConsumeOptions) { + options.QueueExclusive = true +} + +// WithConsumeOptionsQueueNoWait sets the queue to nowait, which means +// the queue will assume to be declared on the server. A +// channel exception will arrive if the conditions are met for existing queues +// or attempting to modify an existing queue from a different connection. +func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) { + options.QueueNoWait = true +} + +// WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes +// in the cluster will have the messages distributed amongst them for higher reliability +func WithConsumeOptionsQuorum(options *ConsumeOptions) { + if options.QueueArgs == nil { + options.QueueArgs = Table{} + } + options.QueueArgs["x-queue-type"] = "quorum" +} + +// WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to +func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).Name = name + } +} + +// WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type +func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).Kind = kind + } +} + +// WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag +func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).Durable = true +} + +// WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag +func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).AutoDelete = true +} + +// WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag +func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).Internal = true +} + +// WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag +func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).NoWait = true +} + +// WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange +func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).ExchangeArgs = args + } +} + +// WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound +// the channel will not be closed with an error. +func WithConsumeOptionsBindingNoWait(options *ConsumeOptions) { + options.BindingNoWait = true +} + +// WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that +// many goroutines will be spawned to run the provided handler on messages +func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + options.Concurrency = concurrency + } +} + +// WithConsumeOptionsQOSPrefetch returns a function that sets the prefetch count, which means that +// many messages will be fetched from the server in advance to help with throughput. +// This doesn't affect the handler, messages are still processed one at a time. +func WithConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + options.QOSPrefetch = prefetchCount + } +} + +// WithConsumeOptionsQOSGlobal sets the qos on the channel to global, which means +// these QOS settings apply to ALL existing and future +// consumers on all channels on the same connection +func WithConsumeOptionsQOSGlobal(options *ConsumeOptions) { + options.QOSGlobal = true +} + +// WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer +// if unset a random name will be given +func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + options.ConsumerName = consumerName + } +} + +// WithConsumeOptionsConsumerExclusive sets the consumer to exclusive, which means +// the server will ensure that this is the sole consumer +// from this queue. When exclusive is false, the server will fairly distribute +// deliveries across multiple consumers. +func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) { + options.ConsumerExclusive = true +} + +// WithConsumeOptionsConsumerNoWait sets the consumer to nowait, which means +// it does not wait for the server to confirm the request and +// immediately begin deliveries. If it is not possible to consume, a channel +// exception will be raised and the channel will be closed. +func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions) { + options.ConsumerNoWait = true +} From ef822fbb59567de6b529143a97732ee37b18bd2a Mon Sep 17 00:00:00 2001 From: lane-c-wagner Date: Tue, 8 Jun 2021 08:43:47 -0600 Subject: [PATCH 3/4] check for nil --- channel.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/channel.go b/channel.go index d63b1c7..2d1340e 100644 --- a/channel.go +++ b/channel.go @@ -59,7 +59,7 @@ func (chManager *channelManager) startNotifyCancelOrClosed() { select { case err := <-notifyCloseChan: // If the connection close is triggered by the Server, a reconnection takes place - if err.Server { + if err != nil && err.Server { chManager.logger.Printf("attempting to reconnect to amqp server after close") chManager.reconnectWithBackoff() chManager.logger.Printf("successfully reconnected to amqp server after close") From 2559f048e62de516f039ed7721a156d80d4943db Mon Sep 17 00:00:00 2001 From: wagslane Date: Tue, 8 Jun 2021 18:49:29 -0600 Subject: [PATCH 4/4] lf --- consume_options.go | 412 ++++++++++++++++++++++----------------------- 1 file changed, 206 insertions(+), 206 deletions(-) diff --git a/consume_options.go b/consume_options.go index be00355..f6168f8 100644 --- a/consume_options.go +++ b/consume_options.go @@ -1,206 +1,206 @@ -package rabbitmq - -// getDefaultConsumeOptions descibes the options that will be used when a value isn't provided -func getDefaultConsumeOptions() ConsumeOptions { - return ConsumeOptions{ - QueueDurable: false, - QueueAutoDelete: false, - QueueExclusive: false, - QueueNoWait: false, - QueueArgs: nil, - BindingExchange: nil, - BindingNoWait: false, - BindingArgs: nil, - Concurrency: 1, - QOSPrefetch: 0, - QOSGlobal: false, - ConsumerName: "", - ConsumerAutoAck: false, - ConsumerExclusive: false, - ConsumerNoWait: false, - ConsumerNoLocal: false, - ConsumerArgs: nil, - } -} - -// ConsumeOptions are used to describe how a new consumer will be created. -type ConsumeOptions struct { - QueueDurable bool - QueueAutoDelete bool - QueueExclusive bool - QueueNoWait bool - QueueArgs Table - BindingExchange *BindingExchangeOptions - BindingNoWait bool - BindingArgs Table - Concurrency int - QOSPrefetch int - QOSGlobal bool - ConsumerName string - ConsumerAutoAck bool - ConsumerExclusive bool - ConsumerNoWait bool - ConsumerNoLocal bool - ConsumerArgs Table -} - -// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values. -func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExchangeOptions { - if options.BindingExchange == nil { - options.BindingExchange = &BindingExchangeOptions{ - Name: "", - Kind: "direct", - Durable: false, - AutoDelete: false, - Internal: false, - NoWait: false, - ExchangeArgs: nil, - } - } - return options.BindingExchange -} - -// BindingExchangeOptions are used when binding to an exchange. -// it will verify the exchange is created before binding to it. -type BindingExchangeOptions struct { - Name string - Kind string - Durable bool - AutoDelete bool - Internal bool - NoWait bool - ExchangeArgs Table -} - -// WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't -// be destroyed when the server restarts. It must only be bound to durable exchanges -func WithConsumeOptionsQueueDurable(options *ConsumeOptions) { - options.QueueDurable = true -} - -// WithConsumeOptionsQueueAutoDelete sets the queue to auto delete, which means it will -// be deleted when there are no more conusmers on it -func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions) { - options.QueueAutoDelete = true -} - -// WithConsumeOptionsQueueExclusive sets the queue to exclusive, which means -// it's are only accessible by the connection that declares it and -// will be deleted when the connection closes. Channels on other connections -// will receive an error when attempting to declare, bind, consume, purge or -// delete a queue with the same name. -func WithConsumeOptionsQueueExclusive(options *ConsumeOptions) { - options.QueueExclusive = true -} - -// WithConsumeOptionsQueueNoWait sets the queue to nowait, which means -// the queue will assume to be declared on the server. A -// channel exception will arrive if the conditions are met for existing queues -// or attempting to modify an existing queue from a different connection. -func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) { - options.QueueNoWait = true -} - -// WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes -// in the cluster will have the messages distributed amongst them for higher reliability -func WithConsumeOptionsQuorum(options *ConsumeOptions) { - if options.QueueArgs == nil { - options.QueueArgs = Table{} - } - options.QueueArgs["x-queue-type"] = "quorum" -} - -// WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to -func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Name = name - } -} - -// WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type -func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Kind = kind - } -} - -// WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag -func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Durable = true -} - -// WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag -func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).AutoDelete = true -} - -// WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag -func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Internal = true -} - -// WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag -func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).NoWait = true -} - -// WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange -func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).ExchangeArgs = args - } -} - -// WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound -// the channel will not be closed with an error. -func WithConsumeOptionsBindingNoWait(options *ConsumeOptions) { - options.BindingNoWait = true -} - -// WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that -// many goroutines will be spawned to run the provided handler on messages -func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - options.Concurrency = concurrency - } -} - -// WithConsumeOptionsQOSPrefetch returns a function that sets the prefetch count, which means that -// many messages will be fetched from the server in advance to help with throughput. -// This doesn't affect the handler, messages are still processed one at a time. -func WithConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - options.QOSPrefetch = prefetchCount - } -} - -// WithConsumeOptionsQOSGlobal sets the qos on the channel to global, which means -// these QOS settings apply to ALL existing and future -// consumers on all channels on the same connection -func WithConsumeOptionsQOSGlobal(options *ConsumeOptions) { - options.QOSGlobal = true -} - -// WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer -// if unset a random name will be given -func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - options.ConsumerName = consumerName - } -} - -// WithConsumeOptionsConsumerExclusive sets the consumer to exclusive, which means -// the server will ensure that this is the sole consumer -// from this queue. When exclusive is false, the server will fairly distribute -// deliveries across multiple consumers. -func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) { - options.ConsumerExclusive = true -} - -// WithConsumeOptionsConsumerNoWait sets the consumer to nowait, which means -// it does not wait for the server to confirm the request and -// immediately begin deliveries. If it is not possible to consume, a channel -// exception will be raised and the channel will be closed. -func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions) { - options.ConsumerNoWait = true -} +package rabbitmq + +// getDefaultConsumeOptions descibes the options that will be used when a value isn't provided +func getDefaultConsumeOptions() ConsumeOptions { + return ConsumeOptions{ + QueueDurable: false, + QueueAutoDelete: false, + QueueExclusive: false, + QueueNoWait: false, + QueueArgs: nil, + BindingExchange: nil, + BindingNoWait: false, + BindingArgs: nil, + Concurrency: 1, + QOSPrefetch: 0, + QOSGlobal: false, + ConsumerName: "", + ConsumerAutoAck: false, + ConsumerExclusive: false, + ConsumerNoWait: false, + ConsumerNoLocal: false, + ConsumerArgs: nil, + } +} + +// ConsumeOptions are used to describe how a new consumer will be created. +type ConsumeOptions struct { + QueueDurable bool + QueueAutoDelete bool + QueueExclusive bool + QueueNoWait bool + QueueArgs Table + BindingExchange *BindingExchangeOptions + BindingNoWait bool + BindingArgs Table + Concurrency int + QOSPrefetch int + QOSGlobal bool + ConsumerName string + ConsumerAutoAck bool + ConsumerExclusive bool + ConsumerNoWait bool + ConsumerNoLocal bool + ConsumerArgs Table +} + +// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values. +func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExchangeOptions { + if options.BindingExchange == nil { + options.BindingExchange = &BindingExchangeOptions{ + Name: "", + Kind: "direct", + Durable: false, + AutoDelete: false, + Internal: false, + NoWait: false, + ExchangeArgs: nil, + } + } + return options.BindingExchange +} + +// BindingExchangeOptions are used when binding to an exchange. +// it will verify the exchange is created before binding to it. +type BindingExchangeOptions struct { + Name string + Kind string + Durable bool + AutoDelete bool + Internal bool + NoWait bool + ExchangeArgs Table +} + +// WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't +// be destroyed when the server restarts. It must only be bound to durable exchanges +func WithConsumeOptionsQueueDurable(options *ConsumeOptions) { + options.QueueDurable = true +} + +// WithConsumeOptionsQueueAutoDelete sets the queue to auto delete, which means it will +// be deleted when there are no more conusmers on it +func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions) { + options.QueueAutoDelete = true +} + +// WithConsumeOptionsQueueExclusive sets the queue to exclusive, which means +// it's are only accessible by the connection that declares it and +// will be deleted when the connection closes. Channels on other connections +// will receive an error when attempting to declare, bind, consume, purge or +// delete a queue with the same name. +func WithConsumeOptionsQueueExclusive(options *ConsumeOptions) { + options.QueueExclusive = true +} + +// WithConsumeOptionsQueueNoWait sets the queue to nowait, which means +// the queue will assume to be declared on the server. A +// channel exception will arrive if the conditions are met for existing queues +// or attempting to modify an existing queue from a different connection. +func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) { + options.QueueNoWait = true +} + +// WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes +// in the cluster will have the messages distributed amongst them for higher reliability +func WithConsumeOptionsQuorum(options *ConsumeOptions) { + if options.QueueArgs == nil { + options.QueueArgs = Table{} + } + options.QueueArgs["x-queue-type"] = "quorum" +} + +// WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to +func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).Name = name + } +} + +// WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type +func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).Kind = kind + } +} + +// WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag +func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).Durable = true +} + +// WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag +func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).AutoDelete = true +} + +// WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag +func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).Internal = true +} + +// WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag +func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).NoWait = true +} + +// WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange +func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).ExchangeArgs = args + } +} + +// WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound +// the channel will not be closed with an error. +func WithConsumeOptionsBindingNoWait(options *ConsumeOptions) { + options.BindingNoWait = true +} + +// WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that +// many goroutines will be spawned to run the provided handler on messages +func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + options.Concurrency = concurrency + } +} + +// WithConsumeOptionsQOSPrefetch returns a function that sets the prefetch count, which means that +// many messages will be fetched from the server in advance to help with throughput. +// This doesn't affect the handler, messages are still processed one at a time. +func WithConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + options.QOSPrefetch = prefetchCount + } +} + +// WithConsumeOptionsQOSGlobal sets the qos on the channel to global, which means +// these QOS settings apply to ALL existing and future +// consumers on all channels on the same connection +func WithConsumeOptionsQOSGlobal(options *ConsumeOptions) { + options.QOSGlobal = true +} + +// WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer +// if unset a random name will be given +func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + options.ConsumerName = consumerName + } +} + +// WithConsumeOptionsConsumerExclusive sets the consumer to exclusive, which means +// the server will ensure that this is the sole consumer +// from this queue. When exclusive is false, the server will fairly distribute +// deliveries across multiple consumers. +func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) { + options.ConsumerExclusive = true +} + +// WithConsumeOptionsConsumerNoWait sets the consumer to nowait, which means +// it does not wait for the server to confirm the request and +// immediately begin deliveries. If it is not possible to consume, a channel +// exception will be raised and the channel will be closed. +func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions) { + options.ConsumerNoWait = true +}