From 0df88ac7e9af0ee5cd217e1c3a04729b02b8322c Mon Sep 17 00:00:00 2001 From: wagslane Date: Wed, 30 Nov 2022 09:49:30 -0700 Subject: [PATCH] connection mananger --- channel.go | 134 ------ config.go | 9 - connection.go | 110 +++++ connection_options.go | 67 +++ consume.go | 120 +++-- consume_options.go | 250 +++++++--- declare.go | 427 +++--------------- examples/consumer/main.go | 51 ++- examples/consumer_with_declare/.gitignore | 1 - examples/consumer_with_declare/main.go | 74 --- examples/logger/main.go | 14 +- examples/publisher/main.go | 42 +- exchange_options.go | 16 + .../connectionmanager/connection_manager.go | 160 +++++++ .../connection_manager_dispatch.go | 68 +++ internal/connectionmanager/safe_wraps.go | 210 +++++++++ internal/logger/logger.go | 12 + logger.go | 20 +- publish.go | 90 +--- publish_flow_block.go | 4 +- 20 files changed, 1081 insertions(+), 798 deletions(-) delete mode 100644 channel.go delete mode 100644 config.go create mode 100644 connection.go create mode 100644 connection_options.go delete mode 100644 examples/consumer_with_declare/.gitignore delete mode 100644 examples/consumer_with_declare/main.go create mode 100644 exchange_options.go create mode 100644 internal/connectionmanager/connection_manager.go create mode 100644 internal/connectionmanager/connection_manager_dispatch.go create mode 100644 internal/connectionmanager/safe_wraps.go create mode 100644 internal/logger/logger.go diff --git a/channel.go b/channel.go deleted file mode 100644 index e1226db..0000000 --- a/channel.go +++ /dev/null @@ -1,134 +0,0 @@ -package rabbitmq - -import ( - "errors" - "sync" - "time" - - amqp "github.com/rabbitmq/amqp091-go" -) - -type channelManager struct { - logger Logger - url string - channel *amqp.Channel - connection *amqp.Connection - amqpConfig Config - channelMux *sync.RWMutex - notifyCancelOrClose chan error - reconnectInterval time.Duration - reconnectionCount uint -} - -func newChannelManager(url string, conf Config, log Logger, reconnectInterval time.Duration) (*channelManager, error) { - conn, ch, err := getNewChannel(url, conf) - if err != nil { - return nil, err - } - - chManager := channelManager{ - logger: log, - url: url, - connection: conn, - channel: ch, - channelMux: &sync.RWMutex{}, - amqpConfig: conf, - notifyCancelOrClose: make(chan error), - reconnectInterval: reconnectInterval, - } - go chManager.startNotifyCancelOrClosed() - return &chManager, nil -} - -func getNewChannel(url string, conf Config) (*amqp.Connection, *amqp.Channel, error) { - amqpConn, err := amqp.DialConfig(url, amqp.Config(conf)) - if err != nil { - return nil, nil, err - } - ch, err := amqpConn.Channel() - if err != nil { - return nil, nil, err - } - return amqpConn, ch, nil -} - -// startNotifyCancelOrClosed listens on the channel's cancelled and closed -// notifiers. When it detects a problem, it attempts to reconnect. -// Once reconnected, it sends an error back on the manager's notifyCancelOrClose -// channel -func (chManager *channelManager) startNotifyCancelOrClosed() { - notifyCloseChan := chManager.channel.NotifyClose(make(chan *amqp.Error, 1)) - notifyCancelChan := chManager.channel.NotifyCancel(make(chan string, 1)) - select { - case err := <-notifyCloseChan: - if err != nil { - chManager.logger.Errorf("attempting to reconnect to amqp server after close with error: %v", err) - chManager.reconnectLoop() - chManager.logger.Warnf("successfully reconnected to amqp server") - chManager.notifyCancelOrClose <- err - } - if err == nil { - chManager.logger.Infof("amqp channel closed gracefully") - } - case err := <-notifyCancelChan: - chManager.logger.Errorf("attempting to reconnect to amqp server after cancel with error: %s", err) - chManager.reconnectLoop() - chManager.logger.Warnf("successfully reconnected to amqp server after cancel") - chManager.notifyCancelOrClose <- errors.New(err) - } -} - -// reconnectLoop continuously attempts to reconnect -func (chManager *channelManager) reconnectLoop() { - for { - chManager.logger.Infof("waiting %s seconds to attempt to reconnect to amqp server", chManager.reconnectInterval) - time.Sleep(chManager.reconnectInterval) - err := chManager.reconnect() - if err != nil { - chManager.logger.Errorf("error reconnecting to amqp server: %v", err) - } else { - chManager.reconnectionCount++ - go chManager.startNotifyCancelOrClosed() - return - } - } -} - -// reconnect safely closes the current channel and obtains a new one -func (chManager *channelManager) reconnect() error { - chManager.channelMux.Lock() - defer chManager.channelMux.Unlock() - newConn, newChannel, err := getNewChannel(chManager.url, chManager.amqpConfig) - if err != nil { - return err - } - - if err = chManager.channel.Close(); err != nil { - chManager.logger.Warnf("error closing channel while reconnecting: %v", err) - } - - if err = chManager.connection.Close(); err != nil { - chManager.logger.Warnf("error closing connection while reconnecting: %v", err) - } - - chManager.connection = newConn - chManager.channel = newChannel - return nil -} - -// close safely closes the current channel and connection -func (chManager *channelManager) close() error { - chManager.channelMux.Lock() - defer chManager.channelMux.Unlock() - - err := chManager.channel.Close() - if err != nil { - return err - } - - err = chManager.connection.Close() - if err != nil { - return err - } - return nil -} diff --git a/config.go b/config.go deleted file mode 100644 index b55b986..0000000 --- a/config.go +++ /dev/null @@ -1,9 +0,0 @@ -package rabbitmq - -import amqp "github.com/rabbitmq/amqp091-go" - -// Config wraps amqp.Config -// Config is used in DialConfig and Open to specify the desired tuning -// parameters used during a connection open handshake. The negotiated tuning -// will be stored in the returned connection's Config field. -type Config amqp.Config diff --git a/connection.go b/connection.go new file mode 100644 index 0000000..e91c601 --- /dev/null +++ b/connection.go @@ -0,0 +1,110 @@ +package rabbitmq + +import ( + amqp "github.com/rabbitmq/amqp091-go" + "github.com/wagslane/go-rabbitmq/internal/connectionmanager" +) + +// Conn manages the connection to a rabbit cluster +// it is intended to be shared across publishers and consumers +type Conn struct { + connectionManager *connectionmanager.ConnectionManager + reconnectErrCh <-chan error + closeConnectionToManagerCh chan<- struct{} + notifyReturnChan chan Return + notifyPublishChan chan Confirmation + options ConnectionOptions +} + +// Config wraps amqp.Config +// Config is used in DialConfig and Open to specify the desired tuning +// parameters used during a connection open handshake. The negotiated tuning +// will be stored in the returned connection's Config field. +type Config amqp.Config + +// NewConn creates a new connection manager +func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error) { + defaultOptions := getDefaultConnectionOptions() + options := &defaultOptions + for _, optionFunc := range optionFuncs { + optionFunc(options) + } + + manager, err := connectionmanager.NewConnectionManager(url, amqp.Config(options.Config), options.Logger, options.ReconnectInterval) + if err != nil { + return nil, err + } + + err = manager.QosSafe( + options.QOSPrefetch, + 0, + options.QOSGlobal, + ) + if err != nil { + return nil, err + } + + reconnectErrCh, closeCh := manager.NotifyReconnect() + conn := &Conn{ + connectionManager: manager, + reconnectErrCh: reconnectErrCh, + closeConnectionToManagerCh: closeCh, + notifyReturnChan: nil, + notifyPublishChan: nil, + options: *options, + } + + go conn.handleRestarts() + return conn, nil +} + +func (conn *Conn) handleRestarts() { + for err := range conn.reconnectErrCh { + conn.options.Logger.Infof("successful connection recovery from: %v", err) + go conn.startNotifyReturnHandler() + go conn.startNotifyPublishHandler() + } +} + +func (conn *Conn) startNotifyReturnHandler() { + if conn.notifyReturnChan == nil { + return + } + returnAMQPCh := conn.connectionManager.NotifyReturnSafe(make(chan amqp.Return, 1)) + for ret := range returnAMQPCh { + conn.notifyReturnChan <- Return{ret} + } +} + +func (conn *Conn) startNotifyPublishHandler() { + if conn.notifyPublishChan == nil { + return + } + conn.connectionManager.ConfirmSafe(false) + publishAMQPCh := conn.connectionManager.NotifyPublishSafe(make(chan amqp.Confirmation, 1)) + for conf := range publishAMQPCh { + conn.notifyPublishChan <- Confirmation{ + Confirmation: conf, + ReconnectionCount: int(conn.connectionManager.GetReconnectionCount()), + } + } +} + +// NotifyReturn registers a listener for basic.return methods. +// These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags. +// These notifications are shared across an entire connection, so if you're creating multiple +// publishers on the same connection keep that in mind +func (conn *Conn) NotifyReturn() <-chan Return { + conn.notifyReturnChan = make(chan Return) + go conn.startNotifyReturnHandler() + return conn.notifyReturnChan +} + +// NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option +// These notifications are shared across an entire connection, so if you're creating multiple +// publishers on the same connection keep that in mind +func (conn *Conn) NotifyPublish() <-chan Confirmation { + conn.notifyPublishChan = make(chan Confirmation) + go conn.startNotifyPublishHandler() + return conn.notifyPublishChan +} diff --git a/connection_options.go b/connection_options.go new file mode 100644 index 0000000..b56194f --- /dev/null +++ b/connection_options.go @@ -0,0 +1,67 @@ +package rabbitmq + +import "time" + +// ConnectionOptions are used to describe how a new consumer will be created. +type ConnectionOptions struct { + QOSPrefetch int + QOSGlobal bool + ReconnectInterval time.Duration + Logger Logger + Config Config +} + +// getDefaultConnectionOptions describes the options that will be used when a value isn't provided +func getDefaultConnectionOptions() ConnectionOptions { + return ConnectionOptions{ + QOSPrefetch: 0, + QOSGlobal: false, + ReconnectInterval: time.Second * 5, + Logger: stdDebugLogger{}, + Config: Config{}, + } +} + +// WithConnectionOptionsQOSPrefetch returns a function that sets the prefetch count, which means that +// many messages will be fetched from the server in advance to help with throughput. +// This doesn't affect the handler, messages are still processed one at a time. +func WithConnectionOptionsQOSPrefetch(prefetchCount int) func(*ConnectionOptions) { + return func(options *ConnectionOptions) { + options.QOSPrefetch = prefetchCount + } +} + +// WithConnectionOptionsQOSGlobal sets the qos on the channel to global, which means +// these QOS settings apply to ALL existing and future +// consumers on all channels on the same connection +func WithConnectionOptionsQOSGlobal(options *ConnectionOptions) { + options.QOSGlobal = true +} + +// WithConnectionOptionsReconnectInterval sets the reconnection interval +func WithConnectionOptionsReconnectInterval(interval time.Duration) func(options *ConnectionOptions) { + return func(options *ConnectionOptions) { + options.ReconnectInterval = interval + } +} + +// WithConnectionOptionsLogging sets logging to true on the consumer options +// and sets the +func WithConnectionOptionsLogging(options *ConnectionOptions) { + options.Logger = stdDebugLogger{} +} + +// WithConnectionOptionsLogger sets logging to true on the consumer options +// and sets the +func WithConnectionOptionsLogger(log Logger) func(options *ConnectionOptions) { + return func(options *ConnectionOptions) { + options.Logger = log + } +} + +// WithConnectionOptionsConfig sets the Config used in the connection +func WithConnectionOptionsConfig(cfg Config) func(options *ConnectionOptions) { + return func(options *ConnectionOptions) { + options.Config = cfg + } +} diff --git a/consume.go b/consume.go index 771edf4..44df665 100644 --- a/consume.go +++ b/consume.go @@ -1,10 +1,12 @@ package rabbitmq import ( + "errors" "fmt" - "time" amqp "github.com/rabbitmq/amqp091-go" + "github.com/wagslane/go-rabbitmq/internal/connectionmanager" + "github.com/wagslane/go-rabbitmq/internal/logger" ) // Action is an action that occurs after processed this delivery @@ -24,15 +26,16 @@ const ( // Consumer allows you to create and connect to queues for data consumption. type Consumer struct { - chManager *channelManager - logger Logger + connManager *connectionmanager.ConnectionManager + reconnectErrCh <-chan error + closeConnectionToManagerCh chan<- struct{} + options ConsumerOptions } // ConsumerOptions are used to describe a consumer's configuration. // Logger specifies a custom Logger interface implementation. type ConsumerOptions struct { - Logger Logger - ReconnectInterval time.Duration + Logger logger.Logger } // Delivery captures the fields for a previously delivered message resident in @@ -43,32 +46,27 @@ type Delivery struct { } // NewConsumer returns a new Consumer connected to the given rabbitmq server -func NewConsumer(url string, config Config, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) { +func NewConsumer(conn *Conn, optionFuncs ...func(*ConsumerOptions)) (*Consumer, error) { options := &ConsumerOptions{ - Logger: &stdDebugLogger{}, - ReconnectInterval: time.Second * 5, + Logger: &stdDebugLogger{}, } for _, optionFunc := range optionFuncs { optionFunc(options) } - chManager, err := newChannelManager(url, config, options.Logger, options.ReconnectInterval) - if err != nil { - return Consumer{}, err - } - consumer := Consumer{ - chManager: chManager, - logger: options.Logger, + if conn.connectionManager == nil { + return nil, errors.New("connection manager can't be nil") } - return consumer, nil -} + reconnectErrCh, closeCh := conn.connectionManager.NotifyReconnect() -// WithConsumerOptionsReconnectInterval sets the interval at which the consumer will -// attempt to reconnect to the rabbit server -func WithConsumerOptionsReconnectInterval(reconnectInterval time.Duration) func(options *ConsumerOptions) { - return func(options *ConsumerOptions) { - options.ReconnectInterval = reconnectInterval + consumer := &Consumer{ + connManager: conn.connectionManager, + reconnectErrCh: reconnectErrCh, + closeConnectionToManagerCh: closeCh, + options: *options, } + + return consumer, nil } // WithConsumerOptionsLogging uses a default logger that writes to std out @@ -78,7 +76,7 @@ func WithConsumerOptionsLogging(options *ConsumerOptions) { // WithConsumerOptionsLogger sets logging to a custom interface. // Use WithConsumerOptionsLogging to just log to stdout. -func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) { +func WithConsumerOptionsLogger(log logger.Logger) func(options *ConsumerOptions) { return func(options *ConsumerOptions) { options.Logger = log } @@ -88,7 +86,7 @@ func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) { // Each goroutine spawns a handler that consumes off of the given queue which binds to the routing key(s). // The provided handler is called once for each message. If the provided queue doesn't exist, it // will be created on the cluster -func (consumer Consumer) StartConsuming( +func (consumer *Consumer) StartConsuming( handler Handler, queue string, optionFuncs ...func(*ConsumeOptions), @@ -108,14 +106,14 @@ func (consumer Consumer) StartConsuming( } go func() { - for err := range consumer.chManager.notifyCancelOrClose { - consumer.logger.Infof("successful recovery from: %v", err) + for err := range consumer.reconnectErrCh { + consumer.options.Logger.Infof("successful recovery from: %v", err) err = consumer.startGoroutines( handler, *options, ) if err != nil { - consumer.logger.Errorf("error restarting consumer goroutines after cancel or close: %v", err) + consumer.options.Logger.Errorf("error restarting consumer goroutines after cancel or close: %v", err) } } }() @@ -123,59 +121,57 @@ func (consumer Consumer) StartConsuming( } // Close cleans up resources and closes the consumer. -// The consumer is not safe for reuse -func (consumer Consumer) Close() error { - consumer.chManager.logger.Infof("closing consumer...") - return consumer.chManager.close() +// It does not close the connection manager, just the subscription +// to the connection manager +func (consumer *Consumer) Close() { + consumer.options.Logger.Infof("closing consumer...") + consumer.closeConnectionToManagerCh <- struct{}{} } // startGoroutines declares the queue if it doesn't exist, // binds the queue to the routing key(s), and starts the goroutines // that will consume from the queue -func (consumer Consumer) startGoroutines( +func (consumer *Consumer) startGoroutines( handler Handler, - consumeOptions ConsumeOptions, + options ConsumeOptions, ) error { - err := handleDeclare(consumer.chManager, consumeOptions.DeclareOptions) + + err := declareExchange(consumer.connManager, options.ExchangeOptions) if err != nil { - return fmt.Errorf("declare failed: %w", err) + return fmt.Errorf("declare exchange failed: %w", err) } - - consumer.chManager.channelMux.RLock() - defer consumer.chManager.channelMux.RUnlock() - - err = consumer.chManager.channel.Qos( - consumeOptions.QOSPrefetch, - 0, - consumeOptions.QOSGlobal, - ) + err = declareQueue(consumer.connManager, options.QueueOptions) if err != nil { - return err + return fmt.Errorf("declare queue failed: %w", err) + } + err = declareBindings(consumer.connManager, options) + if err != nil { + return fmt.Errorf("declare bindings failed: %w", err) } - msgs, err := consumer.chManager.channel.Consume( - consumeOptions.QueueName, - consumeOptions.ConsumerName, - consumeOptions.ConsumerAutoAck, - consumeOptions.ConsumerExclusive, - consumeOptions.ConsumerNoLocal, // no-local is not supported by RabbitMQ - consumeOptions.ConsumerNoWait, - tableToAMQPTable(consumeOptions.ConsumerArgs), + msgs, err := consumer.connManager.ConsumeSafe( + options.QueueOptions.Name, + options.RabbitConsumerOptions.Name, + options.RabbitConsumerOptions.AutoAck, + options.RabbitConsumerOptions.Exclusive, + false, // no-local is not supported by RabbitMQ + options.RabbitConsumerOptions.NoWait, + tableToAMQPTable(options.RabbitConsumerOptions.Args), ) if err != nil { return err } - for i := 0; i < consumeOptions.Concurrency; i++ { - go handlerGoroutine(consumer, msgs, consumeOptions, handler) + for i := 0; i < options.Concurrency; i++ { + go handlerGoroutine(consumer, msgs, options, handler) } - consumer.logger.Infof("Processing messages on %v goroutines", consumeOptions.Concurrency) + consumer.options.Logger.Infof("Processing messages on %v goroutines", options.Concurrency) return nil } -func handlerGoroutine(consumer Consumer, msgs <-chan amqp.Delivery, consumeOptions ConsumeOptions, handler Handler) { +func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOptions ConsumeOptions, handler Handler) { for msg := range msgs { - if consumeOptions.ConsumerAutoAck { + if consumeOptions.RabbitConsumerOptions.AutoAck { handler(Delivery{msg}) continue } @@ -183,19 +179,19 @@ func handlerGoroutine(consumer Consumer, msgs <-chan amqp.Delivery, consumeOptio case Ack: err := msg.Ack(false) if err != nil { - consumer.logger.Errorf("can't ack message: %v", err) + consumer.options.Logger.Errorf("can't ack message: %v", err) } case NackDiscard: err := msg.Nack(false, false) if err != nil { - consumer.logger.Errorf("can't nack message: %v", err) + consumer.options.Logger.Errorf("can't nack message: %v", err) } case NackRequeue: err := msg.Nack(false, true) if err != nil { - consumer.logger.Errorf("can't nack message: %v", err) + consumer.options.Logger.Errorf("can't nack message: %v", err) } } } - consumer.logger.Infof("rabbit consumer goroutine closed") + consumer.options.Logger.Infof("rabbit consumer goroutine closed") } diff --git a/consume_options.go b/consume_options.go index 2ef2012..bc4fb85 100644 --- a/consume_options.go +++ b/consume_options.go @@ -1,83 +1,225 @@ package rabbitmq +import ( + amqp "github.com/rabbitmq/amqp091-go" +) + // getDefaultConsumeOptions describes the options that will be used when a value isn't provided -func getDefaultConsumeOptions(queue string) ConsumeOptions { +func getDefaultConsumeOptions(queueName string) ConsumeOptions { return ConsumeOptions{ - QueueName: queue, - Concurrency: 1, - QOSPrefetch: 0, - QOSGlobal: false, - ConsumerName: "", - ConsumerAutoAck: false, - ConsumerExclusive: false, - ConsumerNoWait: false, - ConsumerNoLocal: false, - ConsumerArgs: nil, + RabbitConsumerOptions: RabbitConsumerOptions{ + Name: "", + AutoAck: false, + Exclusive: false, + NoWait: false, + NoLocal: false, + Args: Table{}, + }, + QueueOptions: QueueOptions{ + Name: queueName, + Durable: false, + AutoDelete: false, + Exclusive: false, + NoWait: false, + Passive: false, + Args: Table{}, + Declare: true, + }, + ExchangeOptions: ExchangeOptions{ + Name: "", + Kind: amqp.ExchangeDirect, + Durable: false, + AutoDelete: false, + Internal: false, + NoWait: false, + Passive: false, + Args: Table{}, + Declare: true, + }, + Bindings: []Binding{}, + Concurrency: 1, + } +} + +func getDefaultBindingOptions() BindingOptions { + return BindingOptions{ + NoWait: false, + Args: Table{}, + Declare: true, } } // ConsumeOptions are used to describe how a new consumer will be created. +// If QueueOptions is not nil, the options will be used to declare a queue +// If ExchangeOptions is not nil, it will be used to declare an exchange +// If there are Bindings, the queue will be bound to them type ConsumeOptions struct { - DeclareOptions - QueueName string - Concurrency int - QOSPrefetch int - QOSGlobal bool - ConsumerName string - ConsumerAutoAck bool - ConsumerExclusive bool - ConsumerNoWait bool - ConsumerNoLocal bool - ConsumerArgs Table -} - -// WithConsumeDeclareOptions allows to set declare options that can be used to set up queue, exchange or bindings -// before the consumer process starts. -func WithConsumeDeclareOptions(declareOptionsFuncs ...func(options *DeclareOptions)) func(*ConsumeOptions) { + RabbitConsumerOptions RabbitConsumerOptions + QueueOptions QueueOptions + ExchangeOptions ExchangeOptions + Bindings []Binding + Concurrency int +} + +// RabbitConsumerOptions are used to configure the consumer +// on the rabbit server +type RabbitConsumerOptions struct { + Name string + AutoAck bool + Exclusive bool + NoWait bool + NoLocal bool + Args Table +} + +// QueueOptions are used to configure a queue. +// A passive queue is assumed by RabbitMQ to already exist, and attempting to connect +// to a non-existent queue will cause RabbitMQ to throw an exception. +type QueueOptions struct { + Name string + Durable bool + AutoDelete bool + Exclusive bool + NoWait bool + Passive bool // if false, a missing queue will be created on the server + Args Table + Declare bool +} + +// Binding describes the bhinding of a queue to a routing key on an exchange +type Binding struct { + RoutingKey string + BindingOptions +} + +// BindingOptions describes the options a binding can have +type BindingOptions struct { + NoWait bool + Args Table + Declare bool +} + +// WithConsumeOptionsQueueDurable ensures the queue is a durable queue +func WithConsumeOptionsQueueDurable(options *ConsumeOptions) { + options.QueueOptions.Durable = true +} + +// WithConsumeOptionsQueueAutoDelete ensures the queue is an auto-delete queue +func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions) { + options.QueueOptions.AutoDelete = true +} + +// WithConsumeOptionsQueueExclusive ensures the queue is an exclusive queue +func WithConsumeOptionsQueueExclusive(options *ConsumeOptions) { + options.QueueOptions.Exclusive = true +} + +// WithConsumeOptionsQueueNoWait ensures the queue is a no-wait queue +func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) { + options.QueueOptions.NoWait = true +} + +// WithConsumeOptionsQueuePassive ensures the queue is a passive queue +func WithConsumeOptionsQueuePassive(options *ConsumeOptions) { + options.QueueOptions.Passive = true +} + +// WithConsumeOptionsQueueNoDeclare will turn off the declaration of the queue's +// existance upon startup +func WithConsumeOptionsQueueNoDeclare(options *ConsumeOptions) { + options.QueueOptions.Declare = false +} + +// WithConsumeOptionsQueueArgs adds optional args to the queue +func WithConsumeOptionsQueueArgs(args Table) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + options.QueueOptions.Args = args + } +} + +// WithConsumeOptionsExchangeName sets the exchange name +func WithConsumeOptionsExchangeName(name string) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + options.ExchangeOptions.Name = name + } +} + +// WithConsumeOptionsExchangeKind ensures the queue is a durable queue +func WithConsumeOptionsExchangeKind(kind string) func(*ConsumeOptions) { return func(options *ConsumeOptions) { - for _, declareOption := range declareOptionsFuncs { - // If a queue was set to declare, ensure that the queue name is set. - if options.Queue != nil { - if options.Queue.Name == "" { - options.Queue.Name = options.QueueName - } - } + options.ExchangeOptions.Kind = kind + } +} - declareOption(&options.DeclareOptions) - } +// WithConsumeOptionsExchangeDurable ensures the exchange is a durable exchange +func WithConsumeOptionsExchangeDurable(options *ConsumeOptions) { + options.ExchangeOptions.Durable = true +} +// WithConsumeOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange +func WithConsumeOptionsExchangeAutoDelete(options *ConsumeOptions) { + options.ExchangeOptions.AutoDelete = true +} + +// WithConsumeOptionsExchangeInternal ensures the exchange is an internal exchange +func WithConsumeOptionsExchangeInternal(options *ConsumeOptions) { + options.ExchangeOptions.Internal = true +} + +// WithConsumeOptionsExchangeNoWait ensures the exchange is a no-wait exchange +func WithConsumeOptionsExchangeNoWait(options *ConsumeOptions) { + options.ExchangeOptions.NoWait = true +} + +// WithConsumeOptionsExchangeNoDeclare stops this library from declaring the exchanges existance +func WithConsumeOptionsExchangeNoDeclare(options *ConsumeOptions) { + options.ExchangeOptions.Declare = false +} + +// WithConsumeOptionsExchangePassive ensures the exchange is a passive exchange +func WithConsumeOptionsExchangePassive(options *ConsumeOptions) { + options.ExchangeOptions.Passive = true +} + +// WithConsumeOptionsExchangeArgs adds optional args to the exchange +func WithConsumeOptionsExchangeArgs(args Table) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + options.ExchangeOptions.Args = args } } -// WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that -// many goroutines will be spawned to run the provided handler on messages -func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) { +// WithConsumeOptionsDefaultBinding binds the queue to a routing key with the default binding options +func WithConsumeOptionsDefaultBinding(routingKey string) func(*ConsumeOptions) { return func(options *ConsumeOptions) { - options.Concurrency = concurrency + options.Bindings = append(options.Bindings, Binding{ + RoutingKey: routingKey, + BindingOptions: getDefaultBindingOptions(), + }) } } -// WithConsumeOptionsQOSPrefetch returns a function that sets the prefetch count, which means that -// many messages will be fetched from the server in advance to help with throughput. -// This doesn't affect the handler, messages are still processed one at a time. -func WithConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions) { +// WithConsumeOptionsBinding adds a new binding to the queue which allows you to set the binding options +// on a per-binding basis. Keep in mind that everything in the BindingOptions struct will default to +// the zero value. If you want to declare your bindings for example, be sure to set Declare=true +func WithConsumeOptionsBinding(binding Binding) func(*ConsumeOptions) { return func(options *ConsumeOptions) { - options.QOSPrefetch = prefetchCount + options.Bindings = append(options.Bindings, binding) } } -// WithConsumeOptionsQOSGlobal sets the qos on the channel to global, which means -// these QOS settings apply to ALL existing and future -// consumers on all channels on the same connection -func WithConsumeOptionsQOSGlobal(options *ConsumeOptions) { - options.QOSGlobal = true +// WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that +// many goroutines will be spawned to run the provided handler on messages +func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + options.Concurrency = concurrency + } } // WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer // if unset a random name will be given func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) { return func(options *ConsumeOptions) { - options.ConsumerName = consumerName + options.RabbitConsumerOptions.Name = consumerName } } @@ -85,7 +227,7 @@ func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) { // if unset the default will be used (false) func WithConsumeOptionsConsumerAutoAck(autoAck bool) func(*ConsumeOptions) { return func(options *ConsumeOptions) { - options.ConsumerAutoAck = autoAck + options.RabbitConsumerOptions.AutoAck = autoAck } } @@ -94,7 +236,7 @@ func WithConsumeOptionsConsumerAutoAck(autoAck bool) func(*ConsumeOptions) { // from this queue. When exclusive is false, the server will fairly distribute // deliveries across multiple consumers. func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) { - options.ConsumerExclusive = true + options.RabbitConsumerOptions.Exclusive = true } // WithConsumeOptionsConsumerNoWait sets the consumer to nowait, which means @@ -102,5 +244,5 @@ func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) { // immediately begin deliveries. If it is not possible to consume, a channel // exception will be raised and the channel will be closed. func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions) { - options.ConsumerNoWait = true + options.RabbitConsumerOptions.NoWait = true } diff --git a/declare.go b/declare.go index 84d7fe4..2eeda1a 100644 --- a/declare.go +++ b/declare.go @@ -1,373 +1,90 @@ package rabbitmq -import "fmt" - -// DeclareOptions are used to describe how a new queues, exchanges the routing setup should look like. -type DeclareOptions struct { - Queue *QueueOptions - Exchange *ExchangeOptions - Bindings []Binding -} - -// QueueOptions are used to configure a queue. -// If the Passive flag is set the client will only check if the queue exists on the server -// and that the settings match, no creation attempt will be made. -type QueueOptions struct { - Name string - Durable bool - AutoDelete bool - Exclusive bool - NoWait bool - Passive bool // if false, a missing queue will be created on the server - Args Table -} - -// ExchangeOptions are used to configure an exchange. -// If the Passive flag is set the client will only check if the exchange exists on the server -// and that the settings match, no creation attempt will be made. -type ExchangeOptions struct { - Name string - Kind string // possible values: empty string for default exchange or direct, topic, fanout - Durable bool - AutoDelete bool - Internal bool - NoWait bool - Passive bool // if false, a missing exchange will be created on the server - Args Table -} - -// BindingOption are used to configure a queue bindings. -type BindingOption struct { - NoWait bool - Args Table -} - -// Binding describes a queue binding to a specific exchange. -type Binding struct { - BindingOption - QueueName string - ExchangeName string - RoutingKey string -} - -// SetBindings trys to generate bindings for the given routing keys and the queue and exchange options. -// If either Queue or Exchange properties are empty or no queue name is specified, no bindings will be set. -func (o *DeclareOptions) SetBindings(routingKeys []string, opt BindingOption) { - if o.Queue == nil || o.Exchange == nil { - return // nothing to set... - } - - if o.Queue.Name == "" { - return // nothing to set... - } - - for _, routingKey := range routingKeys { - o.Bindings = append(o.Bindings, Binding{ - QueueName: o.Queue.Name, - ExchangeName: o.Exchange.Name, - RoutingKey: routingKey, - BindingOption: opt, - }) - } -} - -// handleDeclare handles the queue, exchange and binding declare process on the server. -// If there are no options set, no actions will be executed. -func handleDeclare(chManager *channelManager, options DeclareOptions) error { - chManager.channelMux.RLock() - defer chManager.channelMux.RUnlock() - - // bind queue - if options.Queue != nil { - queue := options.Queue - if queue.Name == "" { - return fmt.Errorf("missing queue name") - } - if queue.Passive { - _, err := chManager.channel.QueueDeclarePassive( - queue.Name, - queue.Durable, - queue.AutoDelete, - queue.Exclusive, - queue.NoWait, - tableToAMQPTable(queue.Args), - ) - if err != nil { - return err - } - } else { - _, err := chManager.channel.QueueDeclare( - queue.Name, - queue.Durable, - queue.AutoDelete, - queue.Exclusive, - queue.NoWait, - tableToAMQPTable(queue.Args), - ) - if err != nil { - return err - } - } - } - - // bind exchange - if options.Exchange != nil { - exchange := options.Exchange - if exchange.Name == "" { - return fmt.Errorf("missing exchange name") - } - if exchange.Passive { - err := chManager.channel.ExchangeDeclarePassive( - exchange.Name, - exchange.Kind, - exchange.Durable, - exchange.AutoDelete, - exchange.Internal, - exchange.NoWait, - tableToAMQPTable(exchange.Args), - ) - if err != nil { - return err - } - } else { - err := chManager.channel.ExchangeDeclare( - exchange.Name, - exchange.Kind, - exchange.Durable, - exchange.AutoDelete, - exchange.Internal, - exchange.NoWait, - tableToAMQPTable(exchange.Args), - ) - if err != nil { - return err - } - } - } - - // handle binding of queues to exchange - for _, binding := range options.Bindings { - err := chManager.channel.QueueBind( - binding.QueueName, // name of the queue - binding.RoutingKey, // bindingKey - binding.ExchangeName, // sourceExchange - binding.NoWait, // noWait - tableToAMQPTable(binding.Args), // arguments +import ( + "github.com/wagslane/go-rabbitmq/internal/connectionmanager" +) + +func declareQueue(connManager *connectionmanager.ConnectionManager, options QueueOptions) error { + if !options.Declare { + return nil + } + if options.Passive { + _, err := connManager.QueueDeclarePassiveSafe( + options.Name, + options.Durable, + options.AutoDelete, + options.Exclusive, + options.NoWait, + tableToAMQPTable(options.Args), ) if err != nil { return err } + return nil + } + _, err := connManager.QueueDeclareSafe( + options.Name, + options.Durable, + options.AutoDelete, + options.Exclusive, + options.NoWait, + tableToAMQPTable(options.Args), + ) + if err != nil { + return err } - return nil } -// getExchangeOptionsOrSetDefault returns pointer to current ExchangeOptions options. -// If no exchange options are set yet, new options with default values will be defined. -func getExchangeOptionsOrSetDefault(options *DeclareOptions) *ExchangeOptions { - if options.Exchange == nil { - options.Exchange = &ExchangeOptions{ - Name: "", - Kind: "direct", - Durable: false, - AutoDelete: false, - Internal: false, - NoWait: false, - Args: nil, - Passive: false, - } +func declareExchange(connManager *connectionmanager.ConnectionManager, options ExchangeOptions) error { + if !options.Declare { + return nil } - return options.Exchange -} - -// getQueueOptionsOrSetDefault returns pointer to current QueueOptions options. -// If no queue options are set yet, new options with default values will be defined. -func getQueueOptionsOrSetDefault(options *DeclareOptions) *QueueOptions { - if options.Queue == nil { - options.Queue = &QueueOptions{ - Name: "", - Durable: false, - AutoDelete: false, - Exclusive: false, - NoWait: false, - Passive: false, - Args: nil, + if options.Passive { + err := connManager.ExchangeDeclarePassiveSafe( + options.Name, + options.Kind, + options.Durable, + options.AutoDelete, + options.Internal, + options.NoWait, + tableToAMQPTable(options.Args), + ) + if err != nil { + return err } + return nil + } + err := connManager.ExchangeDeclareSafe( + options.Name, + options.Kind, + options.Durable, + options.AutoDelete, + options.Internal, + options.NoWait, + tableToAMQPTable(options.Args), + ) + if err != nil { + return err } - return options.Queue -} - -// region general-options - -// WithDeclareQueue sets the queue that should be declared prior to other RabbitMQ actions are being executed. -// Only the settings will be validated if the queue already exists on the server. -// Matching settings will result in no action, different settings will result in an error. -// If the 'Passive' property is set to false, a missing queue will be created on the server. -func WithDeclareQueue(settings *QueueOptions) func(*DeclareOptions) { - return func(options *DeclareOptions) { - options.Queue = settings - } -} - -// WithDeclareExchange sets the exchange that should be declared prior to other RabbitMQ actions are being executed. -// Only the settings will be validated if the exchange already exists on the server. -// Matching settings will result in no action, different settings will result in an error. -// If the 'Passive' property is set to false, a missing exchange will be created on the server. -func WithDeclareExchange(settings *ExchangeOptions) func(*DeclareOptions) { - return func(options *DeclareOptions) { - options.Exchange = settings - } -} - -// WithDeclareBindings sets the bindings that should be declared prior to other RabbitMQ actions are being executed. -// Only the settings will be validated if one of the bindings already exists on the server. -// Matching settings will result in no action, different settings will result in an error. -// If the 'Passive' property is set to false, missing bindings will be created on the server. -func WithDeclareBindings(bindings []Binding) func(*DeclareOptions) { - return func(options *DeclareOptions) { - options.Bindings = bindings - } -} - -// WithDeclareBindingsForRoutingKeys sets the bindings that should be declared prior to other RabbitMQ -// actions are being executed. -// This function must be called after the queue and exchange declaration settings have been set, -// otherwise this function has no effect. -func WithDeclareBindingsForRoutingKeys(routingKeys []string) func(*DeclareOptions) { - return func(options *DeclareOptions) { - options.SetBindings(routingKeys, BindingOption{}) - } -} - -// endregion general-options - -// region single-options - -// WithDeclareQueueName returns a function that sets the queue name. -func WithDeclareQueueName(name string) func(*DeclareOptions) { - return func(options *DeclareOptions) { - getQueueOptionsOrSetDefault(options).Name = name - } -} - -// WithDeclareQueueDurable sets the queue to durable, which means it won't -// be destroyed when the server restarts. It must only be bound to durable exchanges. -func WithDeclareQueueDurable(options *DeclareOptions) { - getQueueOptionsOrSetDefault(options).Durable = true -} - -// WithDeclareQueueAutoDelete sets the queue to auto delete, which means it will -// be deleted when there are no more consumers on it. -func WithDeclareQueueAutoDelete(options *DeclareOptions) { - getQueueOptionsOrSetDefault(options).AutoDelete = true -} - -// WithDeclareQueueExclusive sets the queue to exclusive, which means -// it's are only accessible by the connection that declares it and -// will be deleted when the connection closes. Channels on other connections -// will receive an error when attempting to declare, bind, consume, purge or -// delete a queue with the same name. -func WithDeclareQueueExclusive(options *DeclareOptions) { - getQueueOptionsOrSetDefault(options).Exclusive = true -} - -// WithDeclareQueueNoWait sets the queue to nowait, which means -// the queue will assume to be declared on the server. A channel -// exception will arrive if the conditions are met for existing queues -// or attempting to modify an existing queue from a different connection. -func WithDeclareQueueNoWait(options *DeclareOptions) { - getQueueOptionsOrSetDefault(options).NoWait = true -} - -// WithDeclareQueueNoDeclare sets the queue to no declare, which means -// the queue will be assumed to be declared on the server, and thus only will be validated. -func WithDeclareQueueNoDeclare(options *DeclareOptions) { - getQueueOptionsOrSetDefault(options).Passive = true -} - -// WithDeclareQueueArgs returns a function that sets the queue arguments. -func WithDeclareQueueArgs(args Table) func(*DeclareOptions) { - return func(options *DeclareOptions) { - getQueueOptionsOrSetDefault(options).Args = args - } -} - -// WithDeclareQueueQuorum sets the queue a quorum type, which means multiple nodes -// in the cluster will have the messages distributed amongst them for higher reliability. -func WithDeclareQueueQuorum(options *DeclareOptions) { - queue := getQueueOptionsOrSetDefault(options) - if queue.Args == nil { - queue.Args = Table{} - } - queue.Args["x-queue-type"] = "quorum" -} - -// WithDeclareExchangeName returns a function that sets the exchange name. -func WithDeclareExchangeName(name string) func(*DeclareOptions) { - return func(options *DeclareOptions) { - getExchangeOptionsOrSetDefault(options).Name = name - } -} - -// WithDeclareExchangeKind returns a function that sets the binding exchange kind/type. -func WithDeclareExchangeKind(kind string) func(*DeclareOptions) { - return func(options *DeclareOptions) { - getExchangeOptionsOrSetDefault(options).Kind = kind - } -} - -// WithDeclareExchangeDurable returns a function that sets the binding exchange durable flag. -func WithDeclareExchangeDurable(options *DeclareOptions) { - getExchangeOptionsOrSetDefault(options).Durable = true -} - -// WithDeclareExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag. -func WithDeclareExchangeAutoDelete(options *DeclareOptions) { - getExchangeOptionsOrSetDefault(options).AutoDelete = true -} - -// WithDeclareExchangeInternal returns a function that sets the binding exchange internal flag. -func WithDeclareExchangeInternal(options *DeclareOptions) { - getExchangeOptionsOrSetDefault(options).Internal = true -} - -// WithDeclareExchangeNoWait returns a function that sets the binding exchange noWait flag. -func WithDeclareExchangeNoWait(options *DeclareOptions) { - getExchangeOptionsOrSetDefault(options).NoWait = true -} - -// WithDeclareExchangeArgs returns a function that sets the binding exchange arguments -// that are specific to the server's implementation of the exchange. -func WithDeclareExchangeArgs(args Table) func(*DeclareOptions) { - return func(options *DeclareOptions) { - getExchangeOptionsOrSetDefault(options).Args = args - } -} - -// WithDeclareExchangeNoDeclare returns a function that skips the declaration of the -// binding exchange. Use this setting if the exchange already exists and you don't need to declare -// it on consumer start. -func WithDeclareExchangeNoDeclare(options *DeclareOptions) { - getExchangeOptionsOrSetDefault(options).Passive = true -} - -// WithDeclareBindingNoWait sets the bindings to nowait, which means if the queue can not be bound -// the channel will not be closed with an error. -// This function must be called after bindings have been defined, otherwise it has no effect. -func WithDeclareBindingNoWait(options *DeclareOptions) { - for i := range options.Bindings { - options.Bindings[i].NoWait = true - } + return nil } -// WithDeclareBindingArgs sets the arguments of the bindings to args. -// This function must be called after bindings have been defined, otherwise it has no effect. -func WithDeclareBindingArgs(args Table) func(*DeclareOptions) { - return func(options *DeclareOptions) { - for i := range options.Bindings { - options.Bindings[i].Args = args +func declareBindings(connManager *connectionmanager.ConnectionManager, options ConsumeOptions) error { + for _, binding := range options.Bindings { + if !binding.Declare { + continue + } + err := connManager.QueueBindSafe( + options.QueueOptions.Name, + binding.RoutingKey, + options.ExchangeOptions.Name, + binding.NoWait, + tableToAMQPTable(binding.Args), + ) + if err != nil { + return err } } + return nil } - -// endregion single-options diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 42e0f11..8705cab 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -13,19 +13,22 @@ import ( var consumerName = "example" func main() { + conn, err := rabbitmq.NewConn( + "amqp://guest:guest@localhost", + rabbitmq.WithConnectionOptionsLogging, + ) + if err != nil { + log.Fatal(err) + } + consumer, err := rabbitmq.NewConsumer( - "amqp://guest:guest@localhost", rabbitmq.Config{}, + conn, rabbitmq.WithConsumerOptionsLogging, ) if err != nil { log.Fatal(err) } - defer func() { - err := consumer.Close() - if err != nil { - log.Fatal(err) - } - }() + defer consumer.Close() err = consumer.StartConsuming( func(d rabbitmq.Delivery) rabbitmq.Action { @@ -34,13 +37,35 @@ func main() { return rabbitmq.Ack }, "my_queue", - rabbitmq.WithConsumeOptionsConcurrency(10), + rabbitmq.WithConsumeOptionsConcurrency(2), rabbitmq.WithConsumeOptionsConsumerName(consumerName), - rabbitmq.WithConsumeDeclareOptions( - // creates a the queue if it doesn't exist yet - rabbitmq.WithDeclareQueueDurable, - rabbitmq.WithDeclareBindingsForRoutingKeys([]string{"my_routing_key"}), - ), + rabbitmq.WithConsumeOptionsDefaultBinding("my_routing_key"), + rabbitmq.WithConsumeOptionsExchangeName("events"), + ) + if err != nil { + log.Fatal(err) + } + + consumer2, err := rabbitmq.NewConsumer( + conn, + rabbitmq.WithConsumerOptionsLogging, + ) + if err != nil { + log.Fatal(err) + } + defer consumer2.Close() + + err = consumer2.StartConsuming( + func(d rabbitmq.Delivery) rabbitmq.Action { + log.Printf("consumed 2: %v", string(d.Body)) + // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue + return rabbitmq.Ack + }, + "my_queue_2", + rabbitmq.WithConsumeOptionsConcurrency(2), + rabbitmq.WithConsumeOptionsConsumerName("consumer3"), + rabbitmq.WithConsumeOptionsDefaultBinding("my_routing_key"), + rabbitmq.WithConsumeOptionsExchangeName("events"), ) if err != nil { log.Fatal(err) diff --git a/examples/consumer_with_declare/.gitignore b/examples/consumer_with_declare/.gitignore deleted file mode 100644 index e8842b8..0000000 --- a/examples/consumer_with_declare/.gitignore +++ /dev/null @@ -1 +0,0 @@ -consumer_with_declare diff --git a/examples/consumer_with_declare/main.go b/examples/consumer_with_declare/main.go deleted file mode 100644 index 02540db..0000000 --- a/examples/consumer_with_declare/main.go +++ /dev/null @@ -1,74 +0,0 @@ -package main - -import ( - "fmt" - "log" - "os" - "os/signal" - "syscall" - - rabbitmq "github.com/wagslane/go-rabbitmq" -) - -var consumerName = "example_with_declare" - -func main() { - consumer, err := rabbitmq.NewConsumer( - "amqp://guest:guest@localhost", rabbitmq.Config{}, - rabbitmq.WithConsumerOptionsLogging, - ) - if err != nil { - log.Fatal(err) - } - defer func() { - err := consumer.Close() - if err != nil { - log.Fatal(err) - } - }() - - err = consumer.StartConsuming( - func(d rabbitmq.Delivery) rabbitmq.Action { - log.Printf("consumed: %v", string(d.Body)) - // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue - return rabbitmq.Ack - }, - "my_queue", - rabbitmq.WithConsumeDeclareOptions( - rabbitmq.WithDeclareQueueDurable, - rabbitmq.WithDeclareQueueQuorum, - rabbitmq.WithDeclareExchangeName("events"), - rabbitmq.WithDeclareExchangeKind("topic"), - rabbitmq.WithDeclareExchangeDurable, - rabbitmq.WithDeclareBindingsForRoutingKeys([]string{"routing_key", "routing_key_2"}), // implicit bindings - rabbitmq.WithDeclareBindings([]rabbitmq.Binding{ // custom bindings - { - QueueName: "my_queue", - ExchangeName: "events", - RoutingKey: "a_custom_key", - }, - }), - ), - rabbitmq.WithConsumeOptionsConsumerName(consumerName), - ) - if err != nil { - log.Fatal(err) - } - - // block main thread - wait for shutdown signal - sigs := make(chan os.Signal, 1) - done := make(chan bool, 1) - - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - - go func() { - sig := <-sigs - fmt.Println() - fmt.Println(sig) - done <- true - }() - - fmt.Println("awaiting signal") - <-done - fmt.Println("stopping consumer") -} diff --git a/examples/logger/main.go b/examples/logger/main.go index ab23808..52686a0 100644 --- a/examples/logger/main.go +++ b/examples/logger/main.go @@ -32,8 +32,16 @@ func (l errorLogger) Tracef(format string, v ...interface{}) {} func main() { mylogger := &errorLogger{} + conn, err := rabbitmq.NewConn( + "amqp://guest:guest@localhost", + rabbitmq.WithConnectionOptionsLogging, + ) + if err != nil { + log.Fatal(err) + } + publisher, err := rabbitmq.NewPublisher( - "amqp://guest:guest@localhost", rabbitmq.Config{}, + conn, rabbitmq.WithPublisherOptionsLogger(mylogger), ) if err != nil { @@ -41,7 +49,7 @@ func main() { } err = publisher.Publish( []byte("hello, world"), - []string{"routing_key"}, + []string{"my_routing_key"}, rabbitmq.WithPublishOptionsContentType("application/json"), rabbitmq.WithPublishOptionsMandatory, rabbitmq.WithPublishOptionsPersistentDelivery, @@ -51,7 +59,7 @@ func main() { log.Fatal(err) } - returns := publisher.NotifyReturn() + returns := conn.NotifyReturn() go func() { for r := range returns { log.Printf("message returned from server: %s", string(r.Body)) diff --git a/examples/publisher/main.go b/examples/publisher/main.go index 5ec6732..98baebd 100644 --- a/examples/publisher/main.go +++ b/examples/publisher/main.go @@ -12,28 +12,39 @@ import ( ) func main() { + conn, err := rabbitmq.NewConn( + "amqp://guest:guest@localhost", + rabbitmq.WithConnectionOptionsLogging, + ) + if err != nil { + log.Fatal(err) + } publisher, err := rabbitmq.NewPublisher( - "amqp://guest:guest@localhost", rabbitmq.Config{}, + conn, rabbitmq.WithPublisherOptionsLogging, ) if err != nil { log.Fatal(err) } - defer func() { - err := publisher.Close() - if err != nil { - log.Fatal(err) - } - }() + defer publisher.Close() + + publisher2, err := rabbitmq.NewPublisher( + conn, + rabbitmq.WithPublisherOptionsLogging, + ) + if err != nil { + log.Fatal(err) + } + defer publisher2.Close() - returns := publisher.NotifyReturn() + returns := conn.NotifyReturn() go func() { for r := range returns { log.Printf("message returned from server: %s", string(r.Body)) } }() - confirmations := publisher.NotifyPublish() + confirmations := conn.NotifyPublish() go func() { for c := range confirmations { log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack) @@ -61,7 +72,18 @@ func main() { case <-ticker.C: err = publisher.Publish( []byte("hello, world"), - []string{"routing_key"}, + []string{"my_routing_key"}, + rabbitmq.WithPublishOptionsContentType("application/json"), + rabbitmq.WithPublishOptionsMandatory, + rabbitmq.WithPublishOptionsPersistentDelivery, + rabbitmq.WithPublishOptionsExchange("events"), + ) + if err != nil { + log.Println(err) + } + err = publisher2.Publish( + []byte("hello, world 2"), + []string{"my_routing_key"}, rabbitmq.WithPublishOptionsContentType("application/json"), rabbitmq.WithPublishOptionsMandatory, rabbitmq.WithPublishOptionsPersistentDelivery, diff --git a/exchange_options.go b/exchange_options.go new file mode 100644 index 0000000..cf5e255 --- /dev/null +++ b/exchange_options.go @@ -0,0 +1,16 @@ +package rabbitmq + +// ExchangeOptions are used to configure an exchange. +// If the Passive flag is set the client will only check if the exchange exists on the server +// and that the settings match, no creation attempt will be made. +type ExchangeOptions struct { + Name string + Kind string // possible values: empty string for default exchange or direct, topic, fanout + Durable bool + AutoDelete bool + Internal bool + NoWait bool + Passive bool // if false, a missing exchange will be created on the server + Args Table + Declare bool +} diff --git a/internal/connectionmanager/connection_manager.go b/internal/connectionmanager/connection_manager.go new file mode 100644 index 0000000..93e3f07 --- /dev/null +++ b/internal/connectionmanager/connection_manager.go @@ -0,0 +1,160 @@ +package connectionmanager + +import ( + "errors" + "sync" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/wagslane/go-rabbitmq/internal/logger" +) + +// ConnectionManager - +type ConnectionManager struct { + logger logger.Logger + url string + channel *amqp.Channel + connection *amqp.Connection + amqpConfig amqp.Config + channelMux *sync.RWMutex + reconnectInterval time.Duration + reconnectionCount uint + reconnectionCountMux *sync.Mutex + dispatcher *dispatcher +} + +// NewConnectionManager creates a new connection manager +func NewConnectionManager(url string, conf amqp.Config, log logger.Logger, reconnectInterval time.Duration) (*ConnectionManager, error) { + conn, ch, err := getNewChannel(url, conf) + if err != nil { + return nil, err + } + + connManager := ConnectionManager{ + logger: log, + url: url, + connection: conn, + channel: ch, + channelMux: &sync.RWMutex{}, + amqpConfig: conf, + reconnectInterval: reconnectInterval, + reconnectionCount: 0, + reconnectionCountMux: &sync.Mutex{}, + dispatcher: newDispatcher(), + } + go connManager.startNotifyCancelOrClosed() + return &connManager, nil +} + +func getNewChannel(url string, conf amqp.Config) (*amqp.Connection, *amqp.Channel, error) { + amqpConn, err := amqp.DialConfig(url, amqp.Config(conf)) + if err != nil { + return nil, nil, err + } + ch, err := amqpConn.Channel() + if err != nil { + return nil, nil, err + } + return amqpConn, ch, nil +} + +// startNotifyCancelOrClosed listens on the channel's cancelled and closed +// notifiers. When it detects a problem, it attempts to reconnect. +// Once reconnected, it sends an error back on the manager's notifyCancelOrClose +// channel +func (connManager *ConnectionManager) startNotifyCancelOrClosed() { + notifyCloseChan := connManager.channel.NotifyClose(make(chan *amqp.Error, 1)) + notifyCancelChan := connManager.channel.NotifyCancel(make(chan string, 1)) + select { + case err := <-notifyCloseChan: + if err != nil { + connManager.logger.Errorf("attempting to reconnect to amqp server after close with error: %v", err) + connManager.reconnectLoop() + connManager.logger.Warnf("successfully reconnected to amqp server") + connManager.dispatcher.dispatch(err) + } + if err == nil { + connManager.logger.Infof("amqp channel closed gracefully") + } + case err := <-notifyCancelChan: + connManager.logger.Errorf("attempting to reconnect to amqp server after cancel with error: %s", err) + connManager.reconnectLoop() + connManager.logger.Warnf("successfully reconnected to amqp server after cancel") + connManager.dispatcher.dispatch(errors.New(err)) + } +} + +// GetReconnectionCount - +func (connManager *ConnectionManager) GetReconnectionCount() uint { + connManager.reconnectionCountMux.Lock() + defer connManager.reconnectionCountMux.Unlock() + return connManager.reconnectionCount +} + +func (connManager *ConnectionManager) incrementReconnectionCount() { + connManager.reconnectionCountMux.Lock() + defer connManager.reconnectionCountMux.Unlock() + connManager.reconnectionCount++ +} + +// reconnectLoop continuously attempts to reconnect +func (connManager *ConnectionManager) reconnectLoop() { + for { + connManager.logger.Infof("waiting %s seconds to attempt to reconnect to amqp server", connManager.reconnectInterval) + time.Sleep(connManager.reconnectInterval) + err := connManager.reconnect() + if err != nil { + connManager.logger.Errorf("error reconnecting to amqp server: %v", err) + } else { + connManager.incrementReconnectionCount() + go connManager.startNotifyCancelOrClosed() + return + } + } +} + +// reconnect safely closes the current channel and obtains a new one +func (connManager *ConnectionManager) reconnect() error { + connManager.channelMux.Lock() + defer connManager.channelMux.Unlock() + newConn, newChannel, err := getNewChannel(connManager.url, connManager.amqpConfig) + if err != nil { + return err + } + + if err = connManager.channel.Close(); err != nil { + connManager.logger.Warnf("error closing channel while reconnecting: %v", err) + } + + if err = connManager.connection.Close(); err != nil { + connManager.logger.Warnf("error closing connection while reconnecting: %v", err) + } + + connManager.connection = newConn + connManager.channel = newChannel + return nil +} + +// close safely closes the current channel and connection +func (connManager *ConnectionManager) close() error { + connManager.logger.Infof("closing connection manager...") + connManager.channelMux.Lock() + defer connManager.channelMux.Unlock() + + err := connManager.channel.Close() + if err != nil { + return err + } + + err = connManager.connection.Close() + if err != nil { + return err + } + return nil +} + +// NotifyReconnect adds a new subscriber that will receive error messages whenever +// the connection manager has successfully reconnect to the server +func (connManager *ConnectionManager) NotifyReconnect() (<-chan error, chan<- struct{}) { + return connManager.dispatcher.addSubscriber() +} diff --git a/internal/connectionmanager/connection_manager_dispatch.go b/internal/connectionmanager/connection_manager_dispatch.go new file mode 100644 index 0000000..5ab0133 --- /dev/null +++ b/internal/connectionmanager/connection_manager_dispatch.go @@ -0,0 +1,68 @@ +package connectionmanager + +import ( + "log" + "math" + "math/rand" + "sync" + "time" +) + +type dispatcher struct { + subscribers map[int]dispatchSubscriber + subscribersMux *sync.Mutex +} + +type dispatchSubscriber struct { + notifyCancelOrCloseChan chan error + closeCh <-chan struct{} +} + +func newDispatcher() *dispatcher { + return &dispatcher{ + subscribers: make(map[int]dispatchSubscriber), + subscribersMux: &sync.Mutex{}, + } +} + +func (d *dispatcher) dispatch(err error) error { + d.subscribersMux.Lock() + defer d.subscribersMux.Unlock() + for _, subscriber := range d.subscribers { + select { + case <-time.After(time.Second * 5): + log.Println("Unexpected rabbitmq error: timeout in dispatch") + case subscriber.notifyCancelOrCloseChan <- err: + } + } + return nil +} + +func (d *dispatcher) addSubscriber() (<-chan error, chan<- struct{}) { + const maxRand = math.MaxInt64 + const minRand = 0 + id := rand.Intn(maxRand-minRand) + minRand + + closeCh := make(chan struct{}) + notifyCancelOrCloseChan := make(chan error) + + d.subscribersMux.Lock() + d.subscribers[id] = dispatchSubscriber{ + notifyCancelOrCloseChan: notifyCancelOrCloseChan, + closeCh: closeCh, + } + d.subscribersMux.Unlock() + + go func(id int) { + <-closeCh + d.subscribersMux.Lock() + defer d.subscribersMux.Unlock() + sub, ok := d.subscribers[id] + if !ok { + return + } + close(sub.notifyCancelOrCloseChan) + delete(d.subscribers, id) + }(id) + return notifyCancelOrCloseChan, closeCh +} diff --git a/internal/connectionmanager/safe_wraps.go b/internal/connectionmanager/safe_wraps.go new file mode 100644 index 0000000..fc18ed6 --- /dev/null +++ b/internal/connectionmanager/safe_wraps.go @@ -0,0 +1,210 @@ +package connectionmanager + +import ( + amqp "github.com/rabbitmq/amqp091-go" +) + +// ConsumeSafe safely wraps the (*amqp.Channel).Consume method +func (connManager *ConnectionManager) ConsumeSafe( + queue, + consumer string, + autoAck, + exclusive, + noLocal, + noWait bool, + args amqp.Table, +) (<-chan amqp.Delivery, error) { + connManager.channelMux.RLock() + defer connManager.channelMux.RUnlock() + + return connManager.channel.Consume( + queue, + consumer, + autoAck, + exclusive, + noLocal, + noWait, + args, + ) +} + +// QueueDeclarePassiveSafe safely wraps the (*amqp.Channel).QueueDeclarePassive method +func (connManager *ConnectionManager) QueueDeclarePassiveSafe( + name string, + durable bool, + autoDelete bool, + exclusive bool, + noWait bool, + args amqp.Table, +) (amqp.Queue, error) { + connManager.channelMux.RLock() + defer connManager.channelMux.RUnlock() + + return connManager.channel.QueueDeclarePassive( + name, + durable, + autoDelete, + exclusive, + noWait, + args, + ) +} + +// QueueDeclareSafe safely wraps the (*amqp.Channel).QueueDeclare method +func (connManager *ConnectionManager) QueueDeclareSafe( + name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table, +) (amqp.Queue, error) { + connManager.channelMux.RLock() + defer connManager.channelMux.RUnlock() + + return connManager.channel.QueueDeclare( + name, + durable, + autoDelete, + exclusive, + noWait, + args, + ) +} + +// ExchangeDeclarePassiveSafe safely wraps the (*amqp.Channel).ExchangeDeclarePassive method +func (connManager *ConnectionManager) ExchangeDeclarePassiveSafe( + name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table, +) error { + connManager.channelMux.RLock() + defer connManager.channelMux.RUnlock() + + return connManager.channel.ExchangeDeclarePassive( + name, + kind, + durable, + autoDelete, + internal, + noWait, + args, + ) +} + +// ExchangeDeclareSafe safely wraps the (*amqp.Channel).ExchangeDeclare method +func (connManager *ConnectionManager) ExchangeDeclareSafe( + name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table, +) error { + connManager.channelMux.RLock() + defer connManager.channelMux.RUnlock() + + return connManager.channel.ExchangeDeclare( + name, + kind, + durable, + autoDelete, + internal, + noWait, + args, + ) +} + +// QueueBindSafe safely wraps the (*amqp.Channel).QueueBind method +func (connManager *ConnectionManager) QueueBindSafe( + name string, key string, exchange string, noWait bool, args amqp.Table, +) error { + connManager.channelMux.RLock() + defer connManager.channelMux.RUnlock() + + return connManager.channel.QueueBind( + name, + key, + exchange, + noWait, + args, + ) +} + +// QosSafe safely wraps the (*amqp.Channel).Qos method +func (connManager *ConnectionManager) QosSafe( + prefetchCount int, prefetchSize int, global bool, +) error { + connManager.channelMux.RLock() + defer connManager.channelMux.RUnlock() + + return connManager.channel.Qos( + prefetchCount, + prefetchSize, + global, + ) +} + +// PublishSafe safely wraps the (*amqp.Channel).Publish method +func (connManager *ConnectionManager) PublishSafe( + exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing, +) error { + connManager.channelMux.RLock() + defer connManager.channelMux.RUnlock() + + return connManager.channel.Publish( + exchange, + key, + mandatory, + immediate, + msg, + ) +} + +// NotifyReturnSafe safely wraps the (*amqp.Channel).NotifyReturn method +func (connManager *ConnectionManager) NotifyReturnSafe( + c chan amqp.Return, +) chan amqp.Return { + connManager.channelMux.RLock() + defer connManager.channelMux.RUnlock() + + return connManager.channel.NotifyReturn( + c, + ) +} + +// ConfirmSafe safely wraps the (*amqp.Channel).Confirm method +func (connManager *ConnectionManager) ConfirmSafe( + noWait bool, +) error { + connManager.channelMux.RLock() + defer connManager.channelMux.RUnlock() + + return connManager.channel.Confirm( + noWait, + ) +} + +// NotifyPublishSafe safely wraps the (*amqp.Channel).NotifyPublish method +func (connManager *ConnectionManager) NotifyPublishSafe( + confirm chan amqp.Confirmation, +) chan amqp.Confirmation { + connManager.channelMux.RLock() + defer connManager.channelMux.RUnlock() + + return connManager.channel.NotifyPublish( + confirm, + ) +} + +// NotifyFlowSafe safely wraps the (*amqp.Channel).NotifyFlow method +func (connManager *ConnectionManager) NotifyFlowSafe( + c chan bool, +) chan bool { + connManager.channelMux.RLock() + defer connManager.channelMux.RUnlock() + + return connManager.channel.NotifyFlow( + c, + ) +} + +// NotifyBlockedSafe safely wraps the (*amqp.Connection).NotifyBlocked method +func (connManager *ConnectionManager) NotifyBlockedSafe( + receiver chan amqp.Blocking, +) chan amqp.Blocking { + connManager.channelMux.RLock() + defer connManager.channelMux.RUnlock() + + return connManager.connection.NotifyBlocked( + receiver, + ) +} diff --git a/internal/logger/logger.go b/internal/logger/logger.go new file mode 100644 index 0000000..8c11b7d --- /dev/null +++ b/internal/logger/logger.go @@ -0,0 +1,12 @@ +package logger + +// Logger is describes a logging structure. It can be set using +// WithPublisherOptionsLogger() or WithConsumerOptionsLogger(). +type Logger interface { + Fatalf(string, ...interface{}) + Errorf(string, ...interface{}) + Warnf(string, ...interface{}) + Infof(string, ...interface{}) + Debugf(string, ...interface{}) + Tracef(string, ...interface{}) +} diff --git a/logger.go b/logger.go index fb9ac97..2c3f231 100644 --- a/logger.go +++ b/logger.go @@ -3,42 +3,42 @@ package rabbitmq import ( "fmt" "log" + + "github.com/wagslane/go-rabbitmq/internal/logger" ) -// Logger is the interface to send logs to. It can be set using +// Logger is describes a logging structure. It can be set using // WithPublisherOptionsLogger() or WithConsumerOptionsLogger(). -type Logger interface { - Fatalf(string, ...interface{}) - Errorf(string, ...interface{}) - Warnf(string, ...interface{}) - Infof(string, ...interface{}) - Debugf(string, ...interface{}) - Tracef(string, ...interface{}) -} +type Logger logger.Logger const loggingPrefix = "gorabbit" -// stdDebugLogger logs to stdout up to the `DebugF` level type stdDebugLogger struct{} +// Fatalf - func (l stdDebugLogger) Fatalf(format string, v ...interface{}) { log.Printf(fmt.Sprintf("%s FATAL: %s", loggingPrefix, format), v...) } +// Errorf - func (l stdDebugLogger) Errorf(format string, v ...interface{}) { log.Printf(fmt.Sprintf("%s ERROR: %s", loggingPrefix, format), v...) } +// Warnf - func (l stdDebugLogger) Warnf(format string, v ...interface{}) { log.Printf(fmt.Sprintf("%s WARN: %s", loggingPrefix, format), v...) } +// Infof - func (l stdDebugLogger) Infof(format string, v ...interface{}) { log.Printf(fmt.Sprintf("%s INFO: %s", loggingPrefix, format), v...) } +// Debugf - func (l stdDebugLogger) Debugf(format string, v ...interface{}) { log.Printf(fmt.Sprintf("%s DEBUG: %s", loggingPrefix, format), v...) } +// Tracef - func (l stdDebugLogger) Tracef(format string, v ...interface{}) {} diff --git a/publish.go b/publish.go index eea644c..680e403 100644 --- a/publish.go +++ b/publish.go @@ -1,11 +1,12 @@ package rabbitmq import ( + "errors" "fmt" "sync" - "time" amqp "github.com/rabbitmq/amqp091-go" + "github.com/wagslane/go-rabbitmq/internal/connectionmanager" ) // DeliveryMode. Transient means higher throughput but messages will not be @@ -39,10 +40,9 @@ type Confirmation struct { // Publisher allows you to publish messages safely across an open connection type Publisher struct { - chManager *channelManager - - notifyReturnChan chan Return - notifyPublishChan chan Confirmation + connManager *connectionmanager.ConnectionManager + reconnectErrCh <-chan error + closeConnectionToManagerCh chan<- struct{} disablePublishDueToFlow bool disablePublishDueToFlowMux *sync.RWMutex @@ -56,16 +56,7 @@ type Publisher struct { // PublisherOptions are used to describe a publisher's configuration. // Logger is a custom logging interface. type PublisherOptions struct { - Logger Logger - ReconnectInterval time.Duration -} - -// WithPublisherOptionsReconnectInterval sets the interval at which the publisher will -// attempt to reconnect to the rabbit server -func WithPublisherOptionsReconnectInterval(reconnectInterval time.Duration) func(options *PublisherOptions) { - return func(options *PublisherOptions) { - options.ReconnectInterval = reconnectInterval - } + Logger Logger } // WithPublisherOptionsLogging sets logging to true on the consumer options @@ -87,29 +78,27 @@ func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) { // on the channel of Returns that you should setup a listener on. // Flow controls are automatically handled as they are sent from the server, and publishing // will fail with an error when the server is requesting a slowdown -func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptions)) (*Publisher, error) { +func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publisher, error) { options := &PublisherOptions{ - Logger: &stdDebugLogger{}, - ReconnectInterval: time.Second * 5, + Logger: &stdDebugLogger{}, } for _, optionFunc := range optionFuncs { optionFunc(options) } - chManager, err := newChannelManager(url, config, options.Logger, options.ReconnectInterval) - if err != nil { - return nil, err + if conn.connectionManager == nil { + return nil, errors.New("connection manager can't be nil") } - + reconnectErrCh, closeCh := conn.connectionManager.NotifyReconnect() publisher := &Publisher{ - chManager: chManager, + connManager: conn.connectionManager, + reconnectErrCh: reconnectErrCh, + closeConnectionToManagerCh: closeCh, disablePublishDueToFlow: false, disablePublishDueToFlowMux: &sync.RWMutex{}, disablePublishDueToBlocked: false, disablePublishDueToBlockedMux: &sync.RWMutex{}, options: *options, - notifyReturnChan: nil, - notifyPublishChan: nil, } go publisher.startNotifyFlowHandler() @@ -121,34 +110,13 @@ func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptio } func (publisher *Publisher) handleRestarts() { - for err := range publisher.chManager.notifyCancelOrClose { + for err := range publisher.reconnectErrCh { publisher.options.Logger.Infof("successful publisher recovery from: %v", err) go publisher.startNotifyFlowHandler() go publisher.startNotifyBlockedHandler() - if publisher.notifyReturnChan != nil { - go publisher.startNotifyReturnHandler() - } - if publisher.notifyPublishChan != nil { - publisher.startNotifyPublishHandler() - } } } -// NotifyReturn registers a listener for basic.return methods. -// These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags. -func (publisher *Publisher) NotifyReturn() <-chan Return { - publisher.notifyReturnChan = make(chan Return) - go publisher.startNotifyReturnHandler() - return publisher.notifyReturnChan -} - -// NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option -func (publisher *Publisher) NotifyPublish() <-chan Confirmation { - publisher.notifyPublishChan = make(chan Confirmation) - publisher.startNotifyPublishHandler() - return publisher.notifyPublishChan -} - // Publish publishes the provided data to the given routing keys over the connection func (publisher *Publisher) Publish( data []byte, @@ -193,7 +161,7 @@ func (publisher *Publisher) Publish( message.AppId = options.AppID // Actual publish. - err := publisher.chManager.channel.Publish( + err := publisher.connManager.PublishSafe( options.Exchange, routingKey, options.Mandatory, @@ -209,27 +177,7 @@ func (publisher *Publisher) Publish( // Close closes the publisher and releases resources // The publisher should be discarded as it's not safe for re-use -func (publisher Publisher) Close() error { - publisher.chManager.logger.Infof("closing publisher...") - return publisher.chManager.close() -} - -func (publisher *Publisher) startNotifyReturnHandler() { - returnAMQPCh := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1)) - for ret := range returnAMQPCh { - publisher.notifyReturnChan <- Return{ret} - } -} - -func (publisher *Publisher) startNotifyPublishHandler() { - publisher.chManager.channel.Confirm(false) - go func() { - publishAMQPCh := publisher.chManager.channel.NotifyPublish(make(chan amqp.Confirmation, 1)) - for conf := range publishAMQPCh { - publisher.notifyPublishChan <- Confirmation{ - Confirmation: conf, - ReconnectionCount: int(publisher.chManager.reconnectionCount), - } - } - }() +func (publisher *Publisher) Close() { + publisher.options.Logger.Infof("closing publisher...") + publisher.closeConnectionToManagerCh <- struct{}{} } diff --git a/publish_flow_block.go b/publish_flow_block.go index 6e27868..0bb82c1 100644 --- a/publish_flow_block.go +++ b/publish_flow_block.go @@ -5,7 +5,7 @@ import ( ) func (publisher *Publisher) startNotifyFlowHandler() { - notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool)) + notifyFlowChan := publisher.connManager.NotifyFlowSafe(make(chan bool)) publisher.disablePublishDueToFlowMux.Lock() publisher.disablePublishDueToFlow = false publisher.disablePublishDueToFlowMux.Unlock() @@ -24,7 +24,7 @@ func (publisher *Publisher) startNotifyFlowHandler() { } func (publisher *Publisher) startNotifyBlockedHandler() { - blockings := publisher.chManager.connection.NotifyBlocked(make(chan amqp.Blocking)) + blockings := publisher.connManager.NotifyBlockedSafe(make(chan amqp.Blocking)) publisher.disablePublishDueToBlockedMux.Lock() publisher.disablePublishDueToBlocked = false publisher.disablePublishDueToBlockedMux.Unlock()