From f6219786b7b9bcf382d44f710df8b75e34995ab7 Mon Sep 17 00:00:00 2001 From: xmapst Date: Wed, 31 May 2023 12:09:55 +0800 Subject: [PATCH] =?UTF-8?q?1.=E5=88=A0=E9=99=A4Fatal=202.=E6=9A=B4?= =?UTF-8?q?=E9=9C=B2channel=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- channel.go | 31 ++ channel_options.go | 28 + connection.go | 64 +-- consume.go | 319 ++++++----- consumer_options.go | 252 ++++----- declare.go | 154 +++--- examples/channel/.gitignore | 1 + examples/channel/main.go | 38 ++ examples/consumer/main.go | 96 ++-- examples/logger/main.go | 74 ++- examples/multiconsumer/main.go | 124 ++--- examples/multipublisher/main.go | 184 +++---- examples/publisher/main.go | 128 ++--- examples/publisher_confirm/main.go | 152 +++--- go.mod | 2 +- go.sum | 3 + internal/channelmanager/channel_manager.go | 202 +++---- .../connectionmanager/connection_manager.go | 174 +++--- internal/logger/logger.go | 11 +- logger.go | 19 +- publish.go | 511 +++++++++--------- table.go | 10 +- .../rabbitmq/amqp091-go/CHANGELOG.md | 34 ++ .../rabbitmq/amqp091-go/CONTRIBUTING.md | 18 +- .../github.com/rabbitmq/amqp091-go/Makefile | 5 + .../github.com/rabbitmq/amqp091-go/channel.go | 14 +- .../rabbitmq/amqp091-go/confirms.go | 23 +- .../rabbitmq/amqp091-go/connection.go | 49 +- .../rabbitmq/amqp091-go/consumers.go | 27 + .../github.com/rabbitmq/amqp091-go/types.go | 5 + vendor/modules.txt | 2 +- 31 files changed, 1500 insertions(+), 1254 deletions(-) create mode 100644 channel.go create mode 100644 channel_options.go create mode 100644 examples/channel/.gitignore create mode 100644 examples/channel/main.go diff --git a/channel.go b/channel.go new file mode 100644 index 0000000..84aa9f1 --- /dev/null +++ b/channel.go @@ -0,0 +1,31 @@ +package rabbitmq + +import ( + "errors" + + "github.com/wagslane/go-rabbitmq/internal/channelmanager" +) + +type Channel struct { + *channelmanager.ChannelManager +} + +// NewChannel returns a new channel to the cluster. +func NewChannel(conn *Conn, optionFuncs ...func(*ChannelOptions)) (*Channel, error) { + defaultOptions := getDefaultChannelOptions() + options := &defaultOptions + for _, optionFunc := range optionFuncs { + optionFunc(options) + } + + if conn.connectionManager == nil { + return nil, errors.New("connection manager can't be nil") + } + channel, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval) + if err != nil { + return nil, err + } + return &Channel{ + channel, + }, nil +} diff --git a/channel_options.go b/channel_options.go new file mode 100644 index 0000000..617fd41 --- /dev/null +++ b/channel_options.go @@ -0,0 +1,28 @@ +package rabbitmq + +// ChannelOptions are used to describe a channel's configuration. +// Logger is a custom logging interface. +type ChannelOptions struct { + Logger Logger +} + +// getDefaultChannelOptions describes the options that will be used when a value isn't provided +func getDefaultChannelOptions() ChannelOptions { + return ChannelOptions{ + Logger: stdDebugLogger{}, + } +} + +// WithChannelOptionsLogging sets logging to true on the channel options +// and sets the +func WithChannelOptionsLogging(options *ChannelOptions) { + options.Logger = &stdDebugLogger{} +} + +// WithChannelOptionsLogger sets logging to a custom interface. +// Use WithChannelOptionsLogging to just log to stdout. +func WithChannelOptionsLogger(log Logger) func(options *ChannelOptions) { + return func(options *ChannelOptions) { + options.Logger = log + } +} diff --git a/connection.go b/connection.go index 97b8bb4..8ffeafa 100644 --- a/connection.go +++ b/connection.go @@ -1,18 +1,18 @@ package rabbitmq import ( - amqp "github.com/rabbitmq/amqp091-go" - "github.com/wagslane/go-rabbitmq/internal/connectionmanager" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/wagslane/go-rabbitmq/internal/connectionmanager" ) // Conn manages the connection to a rabbit cluster // it is intended to be shared across publishers and consumers type Conn struct { - connectionManager *connectionmanager.ConnectionManager - reconnectErrCh <-chan error - closeConnectionToManagerCh chan<- struct{} + connectionManager *connectionmanager.ConnectionManager + reconnectErrCh <-chan error + closeConnectionToManagerCh chan<- struct{} - options ConnectionOptions + options ConnectionOptions } // Config wraps amqp.Config @@ -23,39 +23,39 @@ type Config amqp.Config // NewConn creates a new connection manager func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error) { - defaultOptions := getDefaultConnectionOptions() - options := &defaultOptions - for _, optionFunc := range optionFuncs { - optionFunc(options) - } - - manager, err := connectionmanager.NewConnectionManager(url, amqp.Config(options.Config), options.Logger, options.ReconnectInterval) - if err != nil { - return nil, err - } - - reconnectErrCh, closeCh := manager.NotifyReconnect() - conn := &Conn{ - connectionManager: manager, - reconnectErrCh: reconnectErrCh, - closeConnectionToManagerCh: closeCh, - options: *options, - } - - go conn.handleRestarts() - return conn, nil + defaultOptions := getDefaultConnectionOptions() + options := &defaultOptions + for _, optionFunc := range optionFuncs { + optionFunc(options) + } + + manager, err := connectionmanager.NewConnectionManager(url, amqp.Config(options.Config), options.Logger, options.ReconnectInterval) + if err != nil { + return nil, err + } + + reconnectErrCh, closeCh := manager.NotifyReconnect() + conn := &Conn{ + connectionManager: manager, + reconnectErrCh: reconnectErrCh, + closeConnectionToManagerCh: closeCh, + options: *options, + } + + go conn.handleRestarts() + return conn, nil } func (conn *Conn) handleRestarts() { - for err := range conn.reconnectErrCh { - conn.options.Logger.Infof("successful connection recovery from: %v", err) - } + for err := range conn.reconnectErrCh { + conn.options.Logger.Infof("successful connection recovery from: %v", err) + } } // Close closes the connection, it's not safe for re-use. // You should also close any consumers and publishers before // closing the connection func (conn *Conn) Close() error { - conn.closeConnectionToManagerCh <- struct{}{} - return conn.connectionManager.Close() + conn.closeConnectionToManagerCh <- struct{}{} + return conn.connectionManager.Close() } diff --git a/consume.go b/consume.go index d1f802f..f436c23 100644 --- a/consume.go +++ b/consume.go @@ -1,12 +1,12 @@ package rabbitmq import ( - "errors" - "fmt" - "sync" + "errors" + "fmt" + "sync" - amqp "github.com/rabbitmq/amqp091-go" - "github.com/wagslane/go-rabbitmq/internal/channelmanager" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/wagslane/go-rabbitmq/internal/channelmanager" ) // Action is an action that occurs after processed this delivery @@ -16,92 +16,91 @@ type Action int type Handler func(d Delivery) (action Action) const ( - // Ack default ack this msg after you have successfully processed this delivery. - Ack Action = iota - // NackDiscard the message will be dropped or delivered to a server configured dead-letter queue. - NackDiscard - // NackRequeue deliver this message to a different consumer. - NackRequeue - // Message acknowledgement is left to the user using the msg.Ack() method - Manual + // Ack default ack this msg after you have successfully processed this delivery. + Ack Action = iota + // NackDiscard the message will be dropped or delivered to a server configured dead-letter queue. + NackDiscard + // NackRequeue deliver this message to a different consumer. + NackRequeue + // Message acknowledgement is left to the user using the msg.Ack() method + Manual ) // Consumer allows you to create and connect to queues for data consumption. type Consumer struct { - chanManager *channelmanager.ChannelManager - reconnectErrCh <-chan error - closeConnectionToManagerCh chan<- struct{} - options ConsumerOptions + chanManager *channelmanager.ChannelManager + reconnectErrCh <-chan error + closeConnectionToManagerCh chan<- struct{} + options ConsumerOptions - isClosedMux *sync.RWMutex - isClosed bool + isClosedMux *sync.RWMutex + isClosed bool } // Delivery captures the fields for a previously delivered message resident in // a queue to be delivered by the server to a consumer from Channel.Consume or // Channel.Get. type Delivery struct { - amqp.Delivery + amqp.Delivery } // NewConsumer returns a new Consumer connected to the given rabbitmq server // it also starts consuming on the given connection with automatic reconnection handling // Do not reuse the returned consumer for anything other than to close it func NewConsumer( - conn *Conn, - handler Handler, - queue string, - optionFuncs ...func(*ConsumerOptions), + conn *Conn, + handler Handler, + queue string, + optionFuncs ...func(*ConsumerOptions), ) (*Consumer, error) { - defaultOptions := getDefaultConsumerOptions(queue) - options := &defaultOptions - for _, optionFunc := range optionFuncs { - optionFunc(options) - } - - if conn.connectionManager == nil { - return nil, errors.New("connection manager can't be nil") - } - - chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval) - if err != nil { - return nil, err - } - reconnectErrCh, closeCh := chanManager.NotifyReconnect() - - consumer := &Consumer{ - chanManager: chanManager, - reconnectErrCh: reconnectErrCh, - closeConnectionToManagerCh: closeCh, - options: *options, - isClosedMux: &sync.RWMutex{}, - isClosed: false, - } - - err = consumer.startGoroutines( - handler, - *options, - ) - if err != nil { - return nil, err - } - - go func() { - for err := range consumer.reconnectErrCh { - consumer.options.Logger.Infof("successful consumer recovery from: %v", err) - err = consumer.startGoroutines( - handler, - *options, - ) - if err != nil { - consumer.options.Logger.Fatalf("error restarting consumer goroutines after cancel or close: %v", err) - consumer.options.Logger.Fatalf("consumer closing, unable to recover") - return - } - } - }() - - return consumer, nil + defaultOptions := getDefaultConsumerOptions(queue) + options := &defaultOptions + for _, optionFunc := range optionFuncs { + optionFunc(options) + } + + if conn.connectionManager == nil { + return nil, errors.New("connection manager can't be nil") + } + + chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval) + if err != nil { + return nil, err + } + reconnectErrCh, closeCh := chanManager.NotifyReconnect() + + consumer := &Consumer{ + chanManager: chanManager, + reconnectErrCh: reconnectErrCh, + closeConnectionToManagerCh: closeCh, + options: *options, + isClosedMux: &sync.RWMutex{}, + isClosed: false, + } + + err = consumer.startGoroutines( + handler, + *options, + ) + if err != nil { + return nil, err + } + + go func() { + for err = range consumer.reconnectErrCh { + consumer.options.Logger.Infof("successful consumer recovery from: %v", err) + err = consumer.startGoroutines( + handler, + *options, + ) + if err != nil { + consumer.options.Logger.Errorf("error restarting consumer goroutines after cancel or close: %v", err) + return + } + } + }() + + return consumer, nil } // Close cleans up resources and closes the consumer. @@ -109,104 +108,104 @@ func NewConsumer( // to the connection manager and the consuming goroutines. // Only call once. func (consumer *Consumer) Close() { - consumer.isClosedMux.Lock() - defer consumer.isClosedMux.Unlock() - consumer.isClosed = true - // close the channel so that rabbitmq server knows that the - // consumer has been stopped. - err := consumer.chanManager.Close() - if err != nil { - consumer.options.Logger.Warnf("error while closing the channel: %v", err) - } - - consumer.options.Logger.Infof("closing consumer...") - go func() { - consumer.closeConnectionToManagerCh <- struct{}{} - }() + consumer.isClosedMux.Lock() + defer consumer.isClosedMux.Unlock() + consumer.isClosed = true + // close the channel so that rabbitmq server knows that the + // consumer has been stopped. + err := consumer.chanManager.Close() + if err != nil { + consumer.options.Logger.Warnf("error while closing the channel: %v", err) + } + + consumer.options.Logger.Infof("closing consumer...") + go func() { + consumer.closeConnectionToManagerCh <- struct{}{} + }() } // startGoroutines declares the queue if it doesn't exist, // binds the queue to the routing key(s), and starts the goroutines // that will consume from the queue func (consumer *Consumer) startGoroutines( - handler Handler, - options ConsumerOptions, + handler Handler, + options ConsumerOptions, ) error { - err := consumer.chanManager.QosSafe( - options.QOSPrefetch, - 0, - options.QOSGlobal, - ) - if err != nil { - return fmt.Errorf("declare qos failed: %w", err) - } - err = declareExchange(consumer.chanManager, options.ExchangeOptions) - if err != nil { - return fmt.Errorf("declare exchange failed: %w", err) - } - err = declareQueue(consumer.chanManager, options.QueueOptions) - if err != nil { - return fmt.Errorf("declare queue failed: %w", err) - } - err = declareBindings(consumer.chanManager, options) - if err != nil { - return fmt.Errorf("declare bindings failed: %w", err) - } - - msgs, err := consumer.chanManager.ConsumeSafe( - options.QueueOptions.Name, - options.RabbitConsumerOptions.Name, - options.RabbitConsumerOptions.AutoAck, - options.RabbitConsumerOptions.Exclusive, - false, // no-local is not supported by RabbitMQ - options.RabbitConsumerOptions.NoWait, - tableToAMQPTable(options.RabbitConsumerOptions.Args), - ) - if err != nil { - return err - } - - for i := 0; i < options.Concurrency; i++ { - go handlerGoroutine(consumer, msgs, options, handler) - } - consumer.options.Logger.Infof("Processing messages on %v goroutines", options.Concurrency) - return nil + err := consumer.chanManager.QosSafe( + options.QOSPrefetch, + 0, + options.QOSGlobal, + ) + if err != nil { + return fmt.Errorf("declare qos failed: %w", err) + } + err = declareExchange(consumer.chanManager, options.ExchangeOptions) + if err != nil { + return fmt.Errorf("declare exchange failed: %w", err) + } + err = declareQueue(consumer.chanManager, options.QueueOptions) + if err != nil { + return fmt.Errorf("declare queue failed: %w", err) + } + err = declareBindings(consumer.chanManager, options) + if err != nil { + return fmt.Errorf("declare bindings failed: %w", err) + } + + msgs, err := consumer.chanManager.ConsumeSafe( + options.QueueOptions.Name, + options.RabbitConsumerOptions.Name, + options.RabbitConsumerOptions.AutoAck, + options.RabbitConsumerOptions.Exclusive, + false, // no-local is not supported by RabbitMQ + options.RabbitConsumerOptions.NoWait, + tableToAMQPTable(options.RabbitConsumerOptions.Args), + ) + if err != nil { + return err + } + + for i := 0; i < options.Concurrency; i++ { + go handlerGoroutine(consumer, msgs, options, handler) + } + consumer.options.Logger.Infof("Processing messages on %v goroutines", options.Concurrency) + return nil } func (consumer *Consumer) getIsClosed() bool { - consumer.isClosedMux.RLock() - defer consumer.isClosedMux.RUnlock() - return consumer.isClosed + consumer.isClosedMux.RLock() + defer consumer.isClosedMux.RUnlock() + return consumer.isClosed } func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOptions ConsumerOptions, handler Handler) { - for msg := range msgs { - if consumer.getIsClosed() { - break - } - - if consumeOptions.RabbitConsumerOptions.AutoAck { - handler(Delivery{msg}) - continue - } - - switch handler(Delivery{msg}) { - case Ack: - err := msg.Ack(false) - if err != nil { - consumer.options.Logger.Errorf("can't ack message: %v", err) - } - case NackDiscard: - err := msg.Nack(false, false) - if err != nil { - consumer.options.Logger.Errorf("can't nack message: %v", err) - } - case NackRequeue: - err := msg.Nack(false, true) - if err != nil { - consumer.options.Logger.Errorf("can't nack message: %v", err) - } - } - } - consumer.options.Logger.Infof("rabbit consumer goroutine closed") + for msg := range msgs { + if consumer.getIsClosed() { + break + } + + if consumeOptions.RabbitConsumerOptions.AutoAck { + handler(Delivery{msg}) + continue + } + + switch handler(Delivery{msg}) { + case Ack: + err := msg.Ack(false) + if err != nil { + consumer.options.Logger.Errorf("can't ack message: %v", err) + } + case NackDiscard: + err := msg.Nack(false, false) + if err != nil { + consumer.options.Logger.Errorf("can't nack message: %v", err) + } + case NackRequeue: + err := msg.Nack(false, true) + if err != nil { + consumer.options.Logger.Errorf("can't nack message: %v", err) + } + } + } + consumer.options.Logger.Infof("rabbit consumer goroutine closed") } diff --git a/consumer_options.go b/consumer_options.go index 80f2979..24ce681 100644 --- a/consumer_options.go +++ b/consumer_options.go @@ -1,56 +1,56 @@ package rabbitmq import ( - amqp "github.com/rabbitmq/amqp091-go" - "github.com/wagslane/go-rabbitmq/internal/logger" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/wagslane/go-rabbitmq/internal/logger" ) // getDefaultConsumerOptions describes the options that will be used when a value isn't provided func getDefaultConsumerOptions(queueName string) ConsumerOptions { - return ConsumerOptions{ - RabbitConsumerOptions: RabbitConsumerOptions{ - Name: "", - AutoAck: false, - Exclusive: false, - NoWait: false, - NoLocal: false, - Args: Table{}, - }, - QueueOptions: QueueOptions{ - Name: queueName, - Durable: false, - AutoDelete: false, - Exclusive: false, - NoWait: false, - Passive: false, - Args: Table{}, - Declare: true, - }, - ExchangeOptions: ExchangeOptions{ - Name: "", - Kind: amqp.ExchangeDirect, - Durable: false, - AutoDelete: false, - Internal: false, - NoWait: false, - Passive: false, - Args: Table{}, - Declare: false, - }, - Bindings: []Binding{}, - Concurrency: 1, - Logger: stdDebugLogger{}, - QOSPrefetch: 10, - QOSGlobal: false, - } + return ConsumerOptions{ + RabbitConsumerOptions: RabbitConsumerOptions{ + Name: "", + AutoAck: false, + Exclusive: false, + NoWait: false, + NoLocal: false, + Args: Table{}, + }, + QueueOptions: QueueOptions{ + Name: queueName, + Durable: false, + AutoDelete: false, + Exclusive: false, + NoWait: false, + Passive: false, + Args: Table{}, + Declare: true, + }, + ExchangeOptions: ExchangeOptions{ + Name: "", + Kind: amqp.ExchangeDirect, + Durable: false, + AutoDelete: false, + Internal: false, + NoWait: false, + Passive: false, + Args: Table{}, + Declare: false, + }, + Bindings: []Binding{}, + Concurrency: 1, + Logger: stdDebugLogger{}, + QOSPrefetch: 10, + QOSGlobal: false, + } } func getDefaultBindingOptions() BindingOptions { - return BindingOptions{ - NoWait: false, - Args: Table{}, - Declare: true, - } + return BindingOptions{ + NoWait: false, + Args: Table{}, + Declare: true, + } } // ConsumerOptions are used to describe how a new consumer will be created. @@ -58,184 +58,184 @@ func getDefaultBindingOptions() BindingOptions { // If ExchangeOptions is not nil, it will be used to declare an exchange // If there are Bindings, the queue will be bound to them type ConsumerOptions struct { - RabbitConsumerOptions RabbitConsumerOptions - QueueOptions QueueOptions - ExchangeOptions ExchangeOptions - Bindings []Binding - Concurrency int - Logger logger.Logger - QOSPrefetch int - QOSGlobal bool + RabbitConsumerOptions RabbitConsumerOptions + QueueOptions QueueOptions + ExchangeOptions ExchangeOptions + Bindings []Binding + Concurrency int + Logger logger.Logger + QOSPrefetch int + QOSGlobal bool } // RabbitConsumerOptions are used to configure the consumer // on the rabbit server type RabbitConsumerOptions struct { - Name string - AutoAck bool - Exclusive bool - NoWait bool - NoLocal bool - Args Table + Name string + AutoAck bool + Exclusive bool + NoWait bool + NoLocal bool + Args Table } // QueueOptions are used to configure a queue. // A passive queue is assumed by RabbitMQ to already exist, and attempting to connect // to a non-existent queue will cause RabbitMQ to throw an exception. type QueueOptions struct { - Name string - Durable bool - AutoDelete bool - Exclusive bool - NoWait bool - Passive bool // if false, a missing queue will be created on the server - Args Table - Declare bool + Name string + Durable bool + AutoDelete bool + Exclusive bool + NoWait bool + Passive bool // if false, a missing queue will be created on the server + Args Table + Declare bool } // Binding describes the bhinding of a queue to a routing key on an exchange type Binding struct { - RoutingKey string - BindingOptions + RoutingKey string + BindingOptions } // BindingOptions describes the options a binding can have type BindingOptions struct { - NoWait bool - Args Table - Declare bool + NoWait bool + Args Table + Declare bool } // WithConsumerOptionsQueueDurable ensures the queue is a durable queue func WithConsumerOptionsQueueDurable(options *ConsumerOptions) { - options.QueueOptions.Durable = true + options.QueueOptions.Durable = true } // WithConsumerOptionsQueueAutoDelete ensures the queue is an auto-delete queue func WithConsumerOptionsQueueAutoDelete(options *ConsumerOptions) { - options.QueueOptions.AutoDelete = true + options.QueueOptions.AutoDelete = true } // WithConsumerOptionsQueueExclusive ensures the queue is an exclusive queue func WithConsumerOptionsQueueExclusive(options *ConsumerOptions) { - options.QueueOptions.Exclusive = true + options.QueueOptions.Exclusive = true } // WithConsumerOptionsQueueNoWait ensures the queue is a no-wait queue func WithConsumerOptionsQueueNoWait(options *ConsumerOptions) { - options.QueueOptions.NoWait = true + options.QueueOptions.NoWait = true } // WithConsumerOptionsQueuePassive ensures the queue is a passive queue func WithConsumerOptionsQueuePassive(options *ConsumerOptions) { - options.QueueOptions.Passive = true + options.QueueOptions.Passive = true } // WithConsumerOptionsQueueNoDeclare will turn off the declaration of the queue's // existance upon startup func WithConsumerOptionsQueueNoDeclare(options *ConsumerOptions) { - options.QueueOptions.Declare = false + options.QueueOptions.Declare = false } // WithConsumerOptionsQueueArgs adds optional args to the queue func WithConsumerOptionsQueueArgs(args Table) func(*ConsumerOptions) { - return func(options *ConsumerOptions) { - options.QueueOptions.Args = args - } + return func(options *ConsumerOptions) { + options.QueueOptions.Args = args + } } // WithConsumerOptionsExchangeName sets the exchange name func WithConsumerOptionsExchangeName(name string) func(*ConsumerOptions) { - return func(options *ConsumerOptions) { - options.ExchangeOptions.Name = name - } + return func(options *ConsumerOptions) { + options.ExchangeOptions.Name = name + } } // WithConsumerOptionsExchangeKind ensures the queue is a durable queue func WithConsumerOptionsExchangeKind(kind string) func(*ConsumerOptions) { - return func(options *ConsumerOptions) { - options.ExchangeOptions.Kind = kind - } + return func(options *ConsumerOptions) { + options.ExchangeOptions.Kind = kind + } } // WithConsumerOptionsExchangeDurable ensures the exchange is a durable exchange func WithConsumerOptionsExchangeDurable(options *ConsumerOptions) { - options.ExchangeOptions.Durable = true + options.ExchangeOptions.Durable = true } // WithConsumerOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange func WithConsumerOptionsExchangeAutoDelete(options *ConsumerOptions) { - options.ExchangeOptions.AutoDelete = true + options.ExchangeOptions.AutoDelete = true } // WithConsumerOptionsExchangeInternal ensures the exchange is an internal exchange func WithConsumerOptionsExchangeInternal(options *ConsumerOptions) { - options.ExchangeOptions.Internal = true + options.ExchangeOptions.Internal = true } // WithConsumerOptionsExchangeNoWait ensures the exchange is a no-wait exchange func WithConsumerOptionsExchangeNoWait(options *ConsumerOptions) { - options.ExchangeOptions.NoWait = true + options.ExchangeOptions.NoWait = true } // WithConsumerOptionsExchangeDeclare stops this library from declaring the exchanges existance func WithConsumerOptionsExchangeDeclare(options *ConsumerOptions) { - options.ExchangeOptions.Declare = true + options.ExchangeOptions.Declare = true } // WithConsumerOptionsExchangePassive ensures the exchange is a passive exchange func WithConsumerOptionsExchangePassive(options *ConsumerOptions) { - options.ExchangeOptions.Passive = true + options.ExchangeOptions.Passive = true } // WithConsumerOptionsExchangeArgs adds optional args to the exchange func WithConsumerOptionsExchangeArgs(args Table) func(*ConsumerOptions) { - return func(options *ConsumerOptions) { - options.ExchangeOptions.Args = args - } + return func(options *ConsumerOptions) { + options.ExchangeOptions.Args = args + } } // WithConsumerOptionsRoutingKey binds the queue to a routing key with the default binding options func WithConsumerOptionsRoutingKey(routingKey string) func(*ConsumerOptions) { - return func(options *ConsumerOptions) { - options.Bindings = append(options.Bindings, Binding{ - RoutingKey: routingKey, - BindingOptions: getDefaultBindingOptions(), - }) - } + return func(options *ConsumerOptions) { + options.Bindings = append(options.Bindings, Binding{ + RoutingKey: routingKey, + BindingOptions: getDefaultBindingOptions(), + }) + } } // WithConsumerOptionsBinding adds a new binding to the queue which allows you to set the binding options // on a per-binding basis. Keep in mind that everything in the BindingOptions struct will default to // the zero value. If you want to declare your bindings for example, be sure to set Declare=true func WithConsumerOptionsBinding(binding Binding) func(*ConsumerOptions) { - return func(options *ConsumerOptions) { - options.Bindings = append(options.Bindings, binding) - } + return func(options *ConsumerOptions) { + options.Bindings = append(options.Bindings, binding) + } } // WithConsumerOptionsConcurrency returns a function that sets the concurrency, which means that // many goroutines will be spawned to run the provided handler on messages func WithConsumerOptionsConcurrency(concurrency int) func(*ConsumerOptions) { - return func(options *ConsumerOptions) { - options.Concurrency = concurrency - } + return func(options *ConsumerOptions) { + options.Concurrency = concurrency + } } // WithConsumerOptionsConsumerName returns a function that sets the name on the server of this consumer // if unset a random name will be given func WithConsumerOptionsConsumerName(consumerName string) func(*ConsumerOptions) { - return func(options *ConsumerOptions) { - options.RabbitConsumerOptions.Name = consumerName - } + return func(options *ConsumerOptions) { + options.RabbitConsumerOptions.Name = consumerName + } } // WithConsumerOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer // if unset the default will be used (false) func WithConsumerOptionsConsumerAutoAck(autoAck bool) func(*ConsumerOptions) { - return func(options *ConsumerOptions) { - options.RabbitConsumerOptions.AutoAck = autoAck - } + return func(options *ConsumerOptions) { + options.RabbitConsumerOptions.AutoAck = autoAck + } } // WithConsumerOptionsConsumerExclusive sets the consumer to exclusive, which means @@ -243,7 +243,7 @@ func WithConsumerOptionsConsumerAutoAck(autoAck bool) func(*ConsumerOptions) { // from this queue. When exclusive is false, the server will fairly distribute // deliveries across multiple consumers. func WithConsumerOptionsConsumerExclusive(options *ConsumerOptions) { - options.RabbitConsumerOptions.Exclusive = true + options.RabbitConsumerOptions.Exclusive = true } // WithConsumerOptionsConsumerNoWait sets the consumer to nowait, which means @@ -251,45 +251,45 @@ func WithConsumerOptionsConsumerExclusive(options *ConsumerOptions) { // immediately begin deliveries. If it is not possible to consume, a channel // exception will be raised and the channel will be closed. func WithConsumerOptionsConsumerNoWait(options *ConsumerOptions) { - options.RabbitConsumerOptions.NoWait = true + options.RabbitConsumerOptions.NoWait = true } // WithConsumerOptionsLogging uses a default logger that writes to std out func WithConsumerOptionsLogging(options *ConsumerOptions) { - options.Logger = &stdDebugLogger{} + options.Logger = &stdDebugLogger{} } // WithConsumerOptionsLogger sets logging to a custom interface. // Use WithConsumerOptionsLogging to just log to stdout. func WithConsumerOptionsLogger(log logger.Logger) func(options *ConsumerOptions) { - return func(options *ConsumerOptions) { - options.Logger = log - } + return func(options *ConsumerOptions) { + options.Logger = log + } } // WithConsumerOptionsQOSPrefetch returns a function that sets the prefetch count, which means that // many messages will be fetched from the server in advance to help with throughput. // This doesn't affect the handler, messages are still processed one at a time. func WithConsumerOptionsQOSPrefetch(prefetchCount int) func(*ConsumerOptions) { - return func(options *ConsumerOptions) { - options.QOSPrefetch = prefetchCount - } + return func(options *ConsumerOptions) { + options.QOSPrefetch = prefetchCount + } } // WithConsumerOptionsQOSGlobal sets the qos on the channel to global, which means // these QOS settings apply to ALL existing and future // consumers on all channels on the same connection func WithConsumerOptionsQOSGlobal(options *ConsumerOptions) { - options.QOSGlobal = true + options.QOSGlobal = true } // WithConsumerOptionsQueueQuorum sets the queue a quorum type, which means // multiple nodes in the cluster will have the messages distributed amongst them // for higher reliability func WithConsumerOptionsQueueQuorum(options *ConsumerOptions) { - if options.QueueOptions.Args == nil { - options.QueueOptions.Args = Table{} - } + if options.QueueOptions.Args == nil { + options.QueueOptions.Args = Table{} + } - options.QueueOptions.Args["x-queue-type"] = "quorum" + options.QueueOptions.Args["x-queue-type"] = "quorum" } diff --git a/declare.go b/declare.go index 86abe85..5062902 100644 --- a/declare.go +++ b/declare.go @@ -1,90 +1,90 @@ package rabbitmq import ( - "github.com/wagslane/go-rabbitmq/internal/channelmanager" + "github.com/wagslane/go-rabbitmq/internal/channelmanager" ) func declareQueue(chanManager *channelmanager.ChannelManager, options QueueOptions) error { - if !options.Declare { - return nil - } - if options.Passive { - _, err := chanManager.QueueDeclarePassiveSafe( - options.Name, - options.Durable, - options.AutoDelete, - options.Exclusive, - options.NoWait, - tableToAMQPTable(options.Args), - ) - if err != nil { - return err - } - return nil - } - _, err := chanManager.QueueDeclareSafe( - options.Name, - options.Durable, - options.AutoDelete, - options.Exclusive, - options.NoWait, - tableToAMQPTable(options.Args), - ) - if err != nil { - return err - } - return nil + if !options.Declare { + return nil + } + if options.Passive { + _, err := chanManager.QueueDeclarePassiveSafe( + options.Name, + options.Durable, + options.AutoDelete, + options.Exclusive, + options.NoWait, + tableToAMQPTable(options.Args), + ) + if err != nil { + return err + } + return nil + } + _, err := chanManager.QueueDeclareSafe( + options.Name, + options.Durable, + options.AutoDelete, + options.Exclusive, + options.NoWait, + tableToAMQPTable(options.Args), + ) + if err != nil { + return err + } + return nil } func declareExchange(chanManager *channelmanager.ChannelManager, options ExchangeOptions) error { - if !options.Declare { - return nil - } - if options.Passive { - err := chanManager.ExchangeDeclarePassiveSafe( - options.Name, - options.Kind, - options.Durable, - options.AutoDelete, - options.Internal, - options.NoWait, - tableToAMQPTable(options.Args), - ) - if err != nil { - return err - } - return nil - } - err := chanManager.ExchangeDeclareSafe( - options.Name, - options.Kind, - options.Durable, - options.AutoDelete, - options.Internal, - options.NoWait, - tableToAMQPTable(options.Args), - ) - if err != nil { - return err - } - return nil + if !options.Declare { + return nil + } + if options.Passive { + err := chanManager.ExchangeDeclarePassiveSafe( + options.Name, + options.Kind, + options.Durable, + options.AutoDelete, + options.Internal, + options.NoWait, + tableToAMQPTable(options.Args), + ) + if err != nil { + return err + } + return nil + } + err := chanManager.ExchangeDeclareSafe( + options.Name, + options.Kind, + options.Durable, + options.AutoDelete, + options.Internal, + options.NoWait, + tableToAMQPTable(options.Args), + ) + if err != nil { + return err + } + return nil } func declareBindings(chanManager *channelmanager.ChannelManager, options ConsumerOptions) error { - for _, binding := range options.Bindings { - if !binding.Declare { - continue - } - err := chanManager.QueueBindSafe( - options.QueueOptions.Name, - binding.RoutingKey, - options.ExchangeOptions.Name, - binding.NoWait, - tableToAMQPTable(binding.Args), - ) - if err != nil { - return err - } - } - return nil + for _, binding := range options.Bindings { + if !binding.Declare { + continue + } + err := chanManager.QueueBindSafe( + options.QueueOptions.Name, + binding.RoutingKey, + options.ExchangeOptions.Name, + binding.NoWait, + tableToAMQPTable(binding.Args), + ) + if err != nil { + return err + } + } + return nil } diff --git a/examples/channel/.gitignore b/examples/channel/.gitignore new file mode 100644 index 0000000..6dca835 --- /dev/null +++ b/examples/channel/.gitignore @@ -0,0 +1 @@ +channel diff --git a/examples/channel/main.go b/examples/channel/main.go new file mode 100644 index 0000000..0a3cbf4 --- /dev/null +++ b/examples/channel/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "log" + + "github.com/rabbitmq/amqp091-go" + "github.com/wagslane/go-rabbitmq" +) + +func main() { + conn, err := rabbitmq.NewConn( + "amqp://guest:guest@localhost", + rabbitmq.WithConnectionOptionsLogging, + ) + if err != nil { + log.Fatal(err) + } + defer conn.Close() + + channel, err := rabbitmq.NewChannel(conn) + if err != nil { + log.Fatal(err) + } + defer channel.Close() + + table := amqp091.Table{ + "x-dead-letter-exchange": "events", + "x-dead-letter-routing-key": "my_routing_key", + } + _, err = channel.QueueDeclareSafe("re_my_routing_key", true, false, false, false, table) + if err != nil { + log.Fatal(err) + } + err = channel.QueueBindSafe("re_my_queue", "re_my_routing_key", "events", false, table) + if err != nil { + log.Fatal(err) + } +} diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 7c68733..4eea8f3 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -1,56 +1,56 @@ package main import ( - "fmt" - "log" - "os" - "os/signal" - "syscall" + "fmt" + "log" + "os" + "os/signal" + "syscall" - rabbitmq "github.com/wagslane/go-rabbitmq" + "github.com/wagslane/go-rabbitmq" ) func main() { - conn, err := rabbitmq.NewConn( - "amqp://guest:guest@localhost", - rabbitmq.WithConnectionOptionsLogging, - ) - if err != nil { - log.Fatal(err) - } - defer conn.Close() - - consumer, err := rabbitmq.NewConsumer( - conn, - func(d rabbitmq.Delivery) rabbitmq.Action { - log.Printf("consumed: %v", string(d.Body)) - // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue - return rabbitmq.Ack - }, - "my_queue", - rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"), - rabbitmq.WithConsumerOptionsExchangeName("events"), - rabbitmq.WithConsumerOptionsExchangeDeclare, - ) - if err != nil { - log.Fatal(err) - } - defer consumer.Close() - - // block main thread - wait for shutdown signal - sigs := make(chan os.Signal, 1) - done := make(chan bool, 1) - - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - - go func() { - sig := <-sigs - fmt.Println() - fmt.Println(sig) - done <- true - }() - - fmt.Println("awaiting signal") - <-done - fmt.Println("stopping consumer") + conn, err := rabbitmq.NewConn( + "amqp://guest:guest@localhost", + rabbitmq.WithConnectionOptionsLogging, + ) + if err != nil { + log.Fatal(err) + } + defer conn.Close() + + consumer, err := rabbitmq.NewConsumer( + conn, + func(d rabbitmq.Delivery) rabbitmq.Action { + log.Printf("consumed: %v", string(d.Body)) + // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue + return rabbitmq.Ack + }, + "my_queue", + rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"), + rabbitmq.WithConsumerOptionsExchangeName("events"), + rabbitmq.WithConsumerOptionsExchangeDeclare, + ) + if err != nil { + log.Fatal(err) + } + defer consumer.Close() + + // block main thread - wait for shutdown signal + sigs := make(chan os.Signal, 1) + done := make(chan bool, 1) + + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + + go func() { + sig := <-sigs + fmt.Println() + fmt.Println(sig) + done <- true + }() + + fmt.Println("awaiting signal") + <-done + fmt.Println("stopping consumer") } diff --git a/examples/logger/main.go b/examples/logger/main.go index 8620e4c..cecd06c 100644 --- a/examples/logger/main.go +++ b/examples/logger/main.go @@ -1,22 +1,18 @@ package main import ( - "context" - "log" + "context" + "log" - rabbitmq "github.com/wagslane/go-rabbitmq" + "github.com/wagslane/go-rabbitmq" ) // errorLogger is used in WithPublisherOptionsLogger to create a custom logger // that only logs ERROR and FATAL log levels type errorLogger struct{} -func (l errorLogger) Fatalf(format string, v ...interface{}) { - log.Printf("mylogger: "+format, v...) -} - func (l errorLogger) Errorf(format string, v ...interface{}) { - log.Printf("mylogger: "+format, v...) + log.Printf("mylogger: "+format, v...) } func (l errorLogger) Warnf(format string, v ...interface{}) { @@ -31,38 +27,38 @@ func (l errorLogger) Debugf(format string, v ...interface{}) { func (l errorLogger) Tracef(format string, v ...interface{}) {} func main() { - mylogger := &errorLogger{} + mylogger := &errorLogger{} - conn, err := rabbitmq.NewConn( - "amqp://guest:guest@localhost", - rabbitmq.WithConnectionOptionsLogging, - ) - if err != nil { - log.Fatal(err) - } - defer conn.Close() + conn, err := rabbitmq.NewConn( + "amqp://guest:guest@localhost", + rabbitmq.WithConnectionOptionsLogging, + ) + if err != nil { + log.Fatal(err) + } + defer conn.Close() - publisher, err := rabbitmq.NewPublisher( - conn, - rabbitmq.WithPublisherOptionsLogger(mylogger), - ) - if err != nil { - log.Fatal(err) - } - err = publisher.PublishWithContext( - context.Background(), - []byte("hello, world"), - []string{"my_routing_key"}, - rabbitmq.WithPublishOptionsContentType("application/json"), - rabbitmq.WithPublishOptionsMandatory, - rabbitmq.WithPublishOptionsPersistentDelivery, - rabbitmq.WithPublishOptionsExchange("events"), - ) - if err != nil { - log.Fatal(err) - } + publisher, err := rabbitmq.NewPublisher( + conn, + rabbitmq.WithPublisherOptionsLogger(mylogger), + ) + if err != nil { + log.Fatal(err) + } + err = publisher.PublishWithContext( + context.Background(), + []byte("hello, world"), + []string{"my_routing_key"}, + rabbitmq.WithPublishOptionsContentType("application/json"), + rabbitmq.WithPublishOptionsMandatory, + rabbitmq.WithPublishOptionsPersistentDelivery, + rabbitmq.WithPublishOptionsExchange("events"), + ) + if err != nil { + log.Fatal(err) + } - publisher.NotifyReturn(func(r rabbitmq.Return) { - log.Printf("message returned from server: %s", string(r.Body)) - }) + publisher.NotifyReturn(func(r rabbitmq.Return) { + log.Printf("message returned from server: %s", string(r.Body)) + }) } diff --git a/examples/multiconsumer/main.go b/examples/multiconsumer/main.go index 571af8b..d00b791 100644 --- a/examples/multiconsumer/main.go +++ b/examples/multiconsumer/main.go @@ -1,76 +1,76 @@ package main import ( - "fmt" - "log" - "os" - "os/signal" - "syscall" + "fmt" + "log" + "os" + "os/signal" + "syscall" - rabbitmq "github.com/wagslane/go-rabbitmq" + "github.com/wagslane/go-rabbitmq" ) func main() { - conn, err := rabbitmq.NewConn( - "amqp://guest:guest@localhost", - rabbitmq.WithConnectionOptionsLogging, - ) - if err != nil { - log.Fatal(err) - } - defer conn.Close() + conn, err := rabbitmq.NewConn( + "amqp://guest:guest@localhost", + rabbitmq.WithConnectionOptionsLogging, + ) + if err != nil { + log.Fatal(err) + } + defer conn.Close() - consumer, err := rabbitmq.NewConsumer( - conn, - func(d rabbitmq.Delivery) rabbitmq.Action { - log.Printf("consumed: %v", string(d.Body)) - // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue - return rabbitmq.Ack - }, - "my_queue", - rabbitmq.WithConsumerOptionsConcurrency(2), - rabbitmq.WithConsumerOptionsConsumerName("consumer_1"), - rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"), - rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key_2"), - rabbitmq.WithConsumerOptionsExchangeName("events"), - ) - if err != nil { - log.Fatal(err) - } - defer consumer.Close() + consumer, err := rabbitmq.NewConsumer( + conn, + func(d rabbitmq.Delivery) rabbitmq.Action { + log.Printf("consumed: %v", string(d.Body)) + // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue + return rabbitmq.Ack + }, + "my_queue", + rabbitmq.WithConsumerOptionsConcurrency(2), + rabbitmq.WithConsumerOptionsConsumerName("consumer_1"), + rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"), + rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key_2"), + rabbitmq.WithConsumerOptionsExchangeName("events"), + ) + if err != nil { + log.Fatal(err) + } + defer consumer.Close() - consumer2, err := rabbitmq.NewConsumer( - conn, - func(d rabbitmq.Delivery) rabbitmq.Action { - log.Printf("consumed 2: %v", string(d.Body)) - // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue - return rabbitmq.Ack - }, - "my_queue", - rabbitmq.WithConsumerOptionsConcurrency(2), - rabbitmq.WithConsumerOptionsConsumerName("consumer_2"), - rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"), - rabbitmq.WithConsumerOptionsExchangeName("events"), - ) - if err != nil { - log.Fatal(err) - } - defer consumer2.Close() + consumer2, err := rabbitmq.NewConsumer( + conn, + func(d rabbitmq.Delivery) rabbitmq.Action { + log.Printf("consumed 2: %v", string(d.Body)) + // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue + return rabbitmq.Ack + }, + "my_queue", + rabbitmq.WithConsumerOptionsConcurrency(2), + rabbitmq.WithConsumerOptionsConsumerName("consumer_2"), + rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"), + rabbitmq.WithConsumerOptionsExchangeName("events"), + ) + if err != nil { + log.Fatal(err) + } + defer consumer2.Close() - // block main thread - wait for shutdown signal - sigs := make(chan os.Signal, 1) - done := make(chan bool, 1) + // block main thread - wait for shutdown signal + sigs := make(chan os.Signal, 1) + done := make(chan bool, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - go func() { - sig := <-sigs - fmt.Println() - fmt.Println(sig) - done <- true - }() + go func() { + sig := <-sigs + fmt.Println() + fmt.Println(sig) + done <- true + }() - fmt.Println("awaiting signal") - <-done - fmt.Println("stopping consumer") + fmt.Println("awaiting signal") + <-done + fmt.Println("stopping consumer") } diff --git a/examples/multipublisher/main.go b/examples/multipublisher/main.go index 5121a3f..ea67406 100644 --- a/examples/multipublisher/main.go +++ b/examples/multipublisher/main.go @@ -1,111 +1,111 @@ package main import ( - "context" - "fmt" - "log" - "os" - "os/signal" - "syscall" - "time" + "context" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" - rabbitmq "github.com/wagslane/go-rabbitmq" + "github.com/wagslane/go-rabbitmq" ) func main() { - conn, err := rabbitmq.NewConn( - "amqp://guest:guest@localhost", - rabbitmq.WithConnectionOptionsLogging, - ) - if err != nil { - log.Fatal(err) - } - defer conn.Close() + conn, err := rabbitmq.NewConn( + "amqp://guest:guest@localhost", + rabbitmq.WithConnectionOptionsLogging, + ) + if err != nil { + log.Fatal(err) + } + defer conn.Close() - publisher, err := rabbitmq.NewPublisher( - conn, - rabbitmq.WithPublisherOptionsLogging, - rabbitmq.WithPublisherOptionsExchangeName("events"), - rabbitmq.WithPublisherOptionsExchangeDeclare, - ) - if err != nil { - log.Fatal(err) - } - defer publisher.Close() + publisher, err := rabbitmq.NewPublisher( + conn, + rabbitmq.WithPublisherOptionsLogging, + rabbitmq.WithPublisherOptionsExchangeName("events"), + rabbitmq.WithPublisherOptionsExchangeDeclare, + ) + if err != nil { + log.Fatal(err) + } + defer publisher.Close() - publisher.NotifyReturn(func(r rabbitmq.Return) { - log.Printf("message returned from server: %s", string(r.Body)) - }) + publisher.NotifyReturn(func(r rabbitmq.Return) { + log.Printf("message returned from server: %s", string(r.Body)) + }) - publisher.NotifyPublish(func(c rabbitmq.Confirmation) { - log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack) - }) + publisher.NotifyPublish(func(c rabbitmq.Confirmation) { + log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack) + }) - publisher2, err := rabbitmq.NewPublisher( - conn, - rabbitmq.WithPublisherOptionsLogging, - rabbitmq.WithPublisherOptionsExchangeName("events"), - rabbitmq.WithPublisherOptionsExchangeDeclare, - ) - if err != nil { - log.Fatal(err) - } - defer publisher2.Close() + publisher2, err := rabbitmq.NewPublisher( + conn, + rabbitmq.WithPublisherOptionsLogging, + rabbitmq.WithPublisherOptionsExchangeName("events"), + rabbitmq.WithPublisherOptionsExchangeDeclare, + ) + if err != nil { + log.Fatal(err) + } + defer publisher2.Close() - publisher2.NotifyReturn(func(r rabbitmq.Return) { - log.Printf("message returned from server: %s", string(r.Body)) - }) + publisher2.NotifyReturn(func(r rabbitmq.Return) { + log.Printf("message returned from server: %s", string(r.Body)) + }) - publisher2.NotifyPublish(func(c rabbitmq.Confirmation) { - log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack) - }) + publisher2.NotifyPublish(func(c rabbitmq.Confirmation) { + log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack) + }) - // block main thread - wait for shutdown signal - sigs := make(chan os.Signal, 1) - done := make(chan bool, 1) + // block main thread - wait for shutdown signal + sigs := make(chan os.Signal, 1) + done := make(chan bool, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - go func() { - sig := <-sigs - fmt.Println() - fmt.Println(sig) - done <- true - }() + go func() { + sig := <-sigs + fmt.Println() + fmt.Println(sig) + done <- true + }() - fmt.Println("awaiting signal") + fmt.Println("awaiting signal") - ticker := time.NewTicker(time.Second) - for { - select { - case <-ticker.C: - err = publisher.PublishWithContext( - context.Background(), - []byte("hello, world"), - []string{"my_routing_key"}, - rabbitmq.WithPublishOptionsContentType("application/json"), - rabbitmq.WithPublishOptionsMandatory, - rabbitmq.WithPublishOptionsPersistentDelivery, - rabbitmq.WithPublishOptionsExchange("events"), - ) - if err != nil { - log.Println(err) - } - err = publisher2.PublishWithContext( - context.Background(), - []byte("hello, world 2"), - []string{"my_routing_key_2"}, - rabbitmq.WithPublishOptionsContentType("application/json"), - rabbitmq.WithPublishOptionsMandatory, - rabbitmq.WithPublishOptionsPersistentDelivery, - rabbitmq.WithPublishOptionsExchange("events"), - ) - if err != nil { - log.Println(err) - } - case <-done: - fmt.Println("stopping publisher") - return - } - } + ticker := time.NewTicker(time.Second) + for { + select { + case <-ticker.C: + err = publisher.PublishWithContext( + context.Background(), + []byte("hello, world"), + []string{"my_routing_key"}, + rabbitmq.WithPublishOptionsContentType("application/json"), + rabbitmq.WithPublishOptionsMandatory, + rabbitmq.WithPublishOptionsPersistentDelivery, + rabbitmq.WithPublishOptionsExchange("events"), + ) + if err != nil { + log.Println(err) + } + err = publisher2.PublishWithContext( + context.Background(), + []byte("hello, world 2"), + []string{"my_routing_key_2"}, + rabbitmq.WithPublishOptionsContentType("application/json"), + rabbitmq.WithPublishOptionsMandatory, + rabbitmq.WithPublishOptionsPersistentDelivery, + rabbitmq.WithPublishOptionsExchange("events"), + ) + if err != nil { + log.Println(err) + } + case <-done: + fmt.Println("stopping publisher") + return + } + } } diff --git a/examples/publisher/main.go b/examples/publisher/main.go index d07cc27..89ffe37 100644 --- a/examples/publisher/main.go +++ b/examples/publisher/main.go @@ -1,80 +1,80 @@ package main import ( - "context" - "fmt" - "log" - "os" - "os/signal" - "syscall" - "time" + "context" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" - rabbitmq "github.com/wagslane/go-rabbitmq" + "github.com/wagslane/go-rabbitmq" ) func main() { - conn, err := rabbitmq.NewConn( - "amqp://guest:guest@localhost", - rabbitmq.WithConnectionOptionsLogging, - ) - if err != nil { - log.Fatal(err) - } - defer conn.Close() + conn, err := rabbitmq.NewConn( + "amqp://guest:guest@localhost", + rabbitmq.WithConnectionOptionsLogging, + ) + if err != nil { + log.Fatal(err) + } + defer conn.Close() - publisher, err := rabbitmq.NewPublisher( - conn, - rabbitmq.WithPublisherOptionsLogging, - rabbitmq.WithPublisherOptionsExchangeName("events"), - rabbitmq.WithPublisherOptionsExchangeDeclare, - ) - if err != nil { - log.Fatal(err) - } - defer publisher.Close() + publisher, err := rabbitmq.NewPublisher( + conn, + rabbitmq.WithPublisherOptionsLogging, + rabbitmq.WithPublisherOptionsExchangeName("events"), + rabbitmq.WithPublisherOptionsExchangeDeclare, + ) + if err != nil { + log.Fatal(err) + } + defer publisher.Close() - publisher.NotifyReturn(func(r rabbitmq.Return) { - log.Printf("message returned from server: %s", string(r.Body)) - }) + publisher.NotifyReturn(func(r rabbitmq.Return) { + log.Printf("message returned from server: %s", string(r.Body)) + }) - publisher.NotifyPublish(func(c rabbitmq.Confirmation) { - log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack) - }) + publisher.NotifyPublish(func(c rabbitmq.Confirmation) { + log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack) + }) - // block main thread - wait for shutdown signal - sigs := make(chan os.Signal, 1) - done := make(chan bool, 1) + // block main thread - wait for shutdown signal + sigs := make(chan os.Signal, 1) + done := make(chan bool, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - go func() { - sig := <-sigs - fmt.Println() - fmt.Println(sig) - done <- true - }() + go func() { + sig := <-sigs + fmt.Println() + fmt.Println(sig) + done <- true + }() - fmt.Println("awaiting signal") + fmt.Println("awaiting signal") - ticker := time.NewTicker(time.Second) - for { - select { - case <-ticker.C: - err = publisher.PublishWithContext( - context.Background(), - []byte("hello, world"), - []string{"my_routing_key"}, - rabbitmq.WithPublishOptionsContentType("application/json"), - rabbitmq.WithPublishOptionsMandatory, - rabbitmq.WithPublishOptionsPersistentDelivery, - rabbitmq.WithPublishOptionsExchange("events"), - ) - if err != nil { - log.Println(err) - } - case <-done: - fmt.Println("stopping publisher") - return - } - } + ticker := time.NewTicker(time.Second) + for { + select { + case <-ticker.C: + err = publisher.PublishWithContext( + context.Background(), + []byte("hello, world"), + []string{"my_routing_key"}, + rabbitmq.WithPublishOptionsContentType("application/json"), + rabbitmq.WithPublishOptionsMandatory, + rabbitmq.WithPublishOptionsPersistentDelivery, + rabbitmq.WithPublishOptionsExchange("events"), + ) + if err != nil { + log.Println(err) + } + case <-done: + fmt.Println("stopping publisher") + return + } + } } diff --git a/examples/publisher_confirm/main.go b/examples/publisher_confirm/main.go index f5aecaf..a026541 100644 --- a/examples/publisher_confirm/main.go +++ b/examples/publisher_confirm/main.go @@ -1,91 +1,91 @@ package main import ( - "context" - "fmt" - "log" - "os" - "os/signal" - "syscall" - "time" + "context" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" - rabbitmq "github.com/wagslane/go-rabbitmq" + "github.com/wagslane/go-rabbitmq" ) func main() { - conn, err := rabbitmq.NewConn( - "amqp://guest:guest@localhost", - rabbitmq.WithConnectionOptionsLogging, - ) - if err != nil { - log.Fatal(err) - } - defer conn.Close() + conn, err := rabbitmq.NewConn( + "amqp://guest:guest@localhost", + rabbitmq.WithConnectionOptionsLogging, + ) + if err != nil { + log.Fatal(err) + } + defer conn.Close() - publisher, err := rabbitmq.NewPublisher( - conn, - rabbitmq.WithPublisherOptionsLogging, - rabbitmq.WithPublisherOptionsExchangeName("events"), - rabbitmq.WithPublisherOptionsExchangeDeclare, - rabbitmq.WithPublisherOptionsConfirm, - ) - if err != nil { - log.Fatal(err) - } - defer publisher.Close() + publisher, err := rabbitmq.NewPublisher( + conn, + rabbitmq.WithPublisherOptionsLogging, + rabbitmq.WithPublisherOptionsExchangeName("events"), + rabbitmq.WithPublisherOptionsExchangeDeclare, + rabbitmq.WithPublisherOptionsConfirm, + ) + if err != nil { + log.Fatal(err) + } + defer publisher.Close() - publisher.NotifyReturn(func(r rabbitmq.Return) { - log.Printf("message returned from server: %s", string(r.Body)) - }) + publisher.NotifyReturn(func(r rabbitmq.Return) { + log.Printf("message returned from server: %s", string(r.Body)) + }) - // block main thread - wait for shutdown signal - sigs := make(chan os.Signal, 1) - done := make(chan bool, 1) + // block main thread - wait for shutdown signal + sigs := make(chan os.Signal, 1) + done := make(chan bool, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - go func() { - sig := <-sigs - fmt.Println() - fmt.Println(sig) - done <- true - }() + go func() { + sig := <-sigs + fmt.Println() + fmt.Println(sig) + done <- true + }() - fmt.Println("awaiting signal") + fmt.Println("awaiting signal") - ticker := time.NewTicker(time.Second) - for { - select { - case <-ticker.C: - confirms, err := publisher.PublishWithDeferredConfirmWithContext( - context.Background(), - []byte("hello, world"), - []string{"my_routing_key"}, - rabbitmq.WithPublishOptionsContentType("application/json"), - rabbitmq.WithPublishOptionsMandatory, - rabbitmq.WithPublishOptionsPersistentDelivery, - rabbitmq.WithPublishOptionsExchange("events"), - ) - if err != nil { - log.Println(err) - continue - } else if len(confirms) == 0 || confirms[0] == nil { - fmt.Println("message publishing not confirmed") - continue - } - fmt.Println("message published") - ok, err := confirms[0].WaitContext(context.Background()) - if err != nil { - log.Println(err) - } - if ok { - fmt.Println("message publishing confirmed") - } else { - fmt.Println("message publishing not confirmed") - } - case <-done: - fmt.Println("stopping publisher") - return - } - } + ticker := time.NewTicker(time.Second) + for { + select { + case <-ticker.C: + confirms, err := publisher.PublishWithDeferredConfirmWithContext( + context.Background(), + []byte("hello, world"), + []string{"my_routing_key"}, + rabbitmq.WithPublishOptionsContentType("application/json"), + rabbitmq.WithPublishOptionsMandatory, + rabbitmq.WithPublishOptionsPersistentDelivery, + rabbitmq.WithPublishOptionsExchange("events"), + ) + if err != nil { + log.Println(err) + continue + } else if len(confirms) == 0 || confirms[0] == nil { + fmt.Println("message publishing not confirmed") + continue + } + fmt.Println("message published") + ok, err := confirms[0].WaitContext(context.Background()) + if err != nil { + log.Println(err) + } + if ok { + fmt.Println("message publishing confirmed") + } else { + fmt.Println("message publishing not confirmed") + } + case <-done: + fmt.Println("stopping publisher") + return + } + } } diff --git a/go.mod b/go.mod index 3c21622..6ea76dd 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/wagslane/go-rabbitmq go 1.20 -require github.com/rabbitmq/amqp091-go v1.7.0 +require github.com/rabbitmq/amqp091-go v1.8.1 diff --git a/go.sum b/go.sum index e627709..70f2ba7 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo= github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI= +github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA= +github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -13,6 +15,7 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= diff --git a/internal/channelmanager/channel_manager.go b/internal/channelmanager/channel_manager.go index 67faf0a..42bbfcf 100644 --- a/internal/channelmanager/channel_manager.go +++ b/internal/channelmanager/channel_manager.go @@ -1,58 +1,58 @@ package channelmanager import ( - "errors" - "sync" - "time" - - amqp "github.com/rabbitmq/amqp091-go" - "github.com/wagslane/go-rabbitmq/internal/connectionmanager" - "github.com/wagslane/go-rabbitmq/internal/dispatcher" - "github.com/wagslane/go-rabbitmq/internal/logger" + "errors" + "sync" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/wagslane/go-rabbitmq/internal/connectionmanager" + "github.com/wagslane/go-rabbitmq/internal/dispatcher" + "github.com/wagslane/go-rabbitmq/internal/logger" ) // ChannelManager - type ChannelManager struct { - logger logger.Logger - channel *amqp.Channel - connManager *connectionmanager.ConnectionManager - channelMux *sync.RWMutex - reconnectInterval time.Duration - reconnectionCount uint - reconnectionCountMux *sync.Mutex - dispatcher *dispatcher.Dispatcher + logger logger.Logger + channel *amqp.Channel + connManager *connectionmanager.ConnectionManager + channelMux *sync.RWMutex + reconnectInterval time.Duration + reconnectionCount uint + reconnectionCountMux *sync.Mutex + dispatcher *dispatcher.Dispatcher } // NewChannelManager creates a new connection manager func NewChannelManager(connManager *connectionmanager.ConnectionManager, log logger.Logger, reconnectInterval time.Duration) (*ChannelManager, error) { - ch, err := getNewChannel(connManager) - if err != nil { - return nil, err - } - - chanManager := ChannelManager{ - logger: log, - connManager: connManager, - channel: ch, - channelMux: &sync.RWMutex{}, - reconnectInterval: reconnectInterval, - reconnectionCount: 0, - reconnectionCountMux: &sync.Mutex{}, - dispatcher: dispatcher.NewDispatcher(), - } - go chanManager.startNotifyCancelOrClosed() - return &chanManager, nil + ch, err := getNewChannel(connManager) + if err != nil { + return nil, err + } + + chanManager := ChannelManager{ + logger: log, + connManager: connManager, + channel: ch, + channelMux: &sync.RWMutex{}, + reconnectInterval: reconnectInterval, + reconnectionCount: 0, + reconnectionCountMux: &sync.Mutex{}, + dispatcher: dispatcher.NewDispatcher(), + } + go chanManager.startNotifyCancelOrClosed() + return &chanManager, nil } func getNewChannel(connManager *connectionmanager.ConnectionManager) (*amqp.Channel, error) { - conn := connManager.CheckoutConnection() - defer connManager.CheckinConnection() - - ch, err := conn.Channel() - if err != nil { - return nil, err - } - return ch, nil + conn := connManager.CheckoutConnection() + defer connManager.CheckinConnection() + + ch, err := conn.Channel() + if err != nil { + return nil, err + } + return ch, nil } // startNotifyCancelOrClosed listens on the channel's cancelled and closed @@ -60,90 +60,90 @@ func getNewChannel(connManager *connectionmanager.ConnectionManager) (*amqp.Chan // Once reconnected, it sends an error back on the manager's notifyCancelOrClose // channel func (chanManager *ChannelManager) startNotifyCancelOrClosed() { - notifyCloseChan := chanManager.channel.NotifyClose(make(chan *amqp.Error, 1)) - notifyCancelChan := chanManager.channel.NotifyCancel(make(chan string, 1)) - - select { - case err := <-notifyCloseChan: - if err != nil { - chanManager.logger.Errorf("attempting to reconnect to amqp server after close with error: %v", err) - chanManager.reconnectLoop() - chanManager.logger.Warnf("successfully reconnected to amqp server") - chanManager.dispatcher.Dispatch(err) - } - if err == nil { - chanManager.logger.Infof("amqp channel closed gracefully") - } - case err := <-notifyCancelChan: - chanManager.logger.Errorf("attempting to reconnect to amqp server after cancel with error: %s", err) - chanManager.reconnectLoop() - chanManager.logger.Warnf("successfully reconnected to amqp server after cancel") - chanManager.dispatcher.Dispatch(errors.New(err)) - } + notifyCloseChan := chanManager.channel.NotifyClose(make(chan *amqp.Error, 1)) + notifyCancelChan := chanManager.channel.NotifyCancel(make(chan string, 1)) + + select { + case err := <-notifyCloseChan: + if err != nil { + chanManager.logger.Errorf("attempting to reconnect to amqp server after close with error: %v", err) + chanManager.reconnectLoop() + chanManager.logger.Warnf("successfully reconnected to amqp server") + _ = chanManager.dispatcher.Dispatch(err) + } + if err == nil { + chanManager.logger.Infof("amqp channel closed gracefully") + } + case err := <-notifyCancelChan: + chanManager.logger.Errorf("attempting to reconnect to amqp server after cancel with error: %s", err) + chanManager.reconnectLoop() + chanManager.logger.Warnf("successfully reconnected to amqp server after cancel") + _ = chanManager.dispatcher.Dispatch(errors.New(err)) + } } // GetReconnectionCount - func (chanManager *ChannelManager) GetReconnectionCount() uint { - chanManager.reconnectionCountMux.Lock() - defer chanManager.reconnectionCountMux.Unlock() - return chanManager.reconnectionCount + chanManager.reconnectionCountMux.Lock() + defer chanManager.reconnectionCountMux.Unlock() + return chanManager.reconnectionCount } func (chanManager *ChannelManager) incrementReconnectionCount() { - chanManager.reconnectionCountMux.Lock() - defer chanManager.reconnectionCountMux.Unlock() - chanManager.reconnectionCount++ + chanManager.reconnectionCountMux.Lock() + defer chanManager.reconnectionCountMux.Unlock() + chanManager.reconnectionCount++ } // reconnectLoop continuously attempts to reconnect func (chanManager *ChannelManager) reconnectLoop() { - for { - chanManager.logger.Infof("waiting %s seconds to attempt to reconnect to amqp server", chanManager.reconnectInterval) - time.Sleep(chanManager.reconnectInterval) - err := chanManager.reconnect() - if err != nil { - chanManager.logger.Errorf("error reconnecting to amqp server: %v", err) - } else { - chanManager.incrementReconnectionCount() - go chanManager.startNotifyCancelOrClosed() - return - } - } + for { + chanManager.logger.Infof("waiting %s seconds to attempt to reconnect to amqp server", chanManager.reconnectInterval) + time.Sleep(chanManager.reconnectInterval) + err := chanManager.reconnect() + if err != nil { + chanManager.logger.Errorf("error reconnecting to amqp server: %v", err) + } else { + chanManager.incrementReconnectionCount() + go chanManager.startNotifyCancelOrClosed() + return + } + } } // reconnect safely closes the current channel and obtains a new one func (chanManager *ChannelManager) reconnect() error { - chanManager.channelMux.Lock() - defer chanManager.channelMux.Unlock() - newChannel, err := getNewChannel(chanManager.connManager) - if err != nil { - return err - } - - if err = chanManager.channel.Close(); err != nil { - chanManager.logger.Warnf("error closing channel while reconnecting: %v", err) - } - - chanManager.channel = newChannel - return nil + chanManager.channelMux.Lock() + defer chanManager.channelMux.Unlock() + newChannel, err := getNewChannel(chanManager.connManager) + if err != nil { + return err + } + + if err = chanManager.channel.Close(); err != nil { + chanManager.logger.Warnf("error closing channel while reconnecting: %v", err) + } + + chanManager.channel = newChannel + return nil } // Close safely closes the current channel and connection func (chanManager *ChannelManager) Close() error { - chanManager.logger.Infof("closing channel manager...") - chanManager.channelMux.Lock() - defer chanManager.channelMux.Unlock() + chanManager.logger.Infof("closing channel manager...") + chanManager.channelMux.Lock() + defer chanManager.channelMux.Unlock() - err := chanManager.channel.Close() - if err != nil { - return err - } + err := chanManager.channel.Close() + if err != nil { + return err + } - return nil + return nil } // NotifyReconnect adds a new subscriber that will receive error messages whenever // the connection manager has successfully reconnect to the server func (chanManager *ChannelManager) NotifyReconnect() (<-chan error, chan<- struct{}) { - return chanManager.dispatcher.AddSubscriber() + return chanManager.dispatcher.AddSubscriber() } diff --git a/internal/connectionmanager/connection_manager.go b/internal/connectionmanager/connection_manager.go index fce1f2b..b10ae1f 100644 --- a/internal/connectionmanager/connection_manager.go +++ b/internal/connectionmanager/connection_manager.go @@ -1,76 +1,76 @@ package connectionmanager import ( - "sync" - "time" + "sync" + "time" - amqp "github.com/rabbitmq/amqp091-go" - "github.com/wagslane/go-rabbitmq/internal/dispatcher" - "github.com/wagslane/go-rabbitmq/internal/logger" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/wagslane/go-rabbitmq/internal/dispatcher" + "github.com/wagslane/go-rabbitmq/internal/logger" ) // ConnectionManager - type ConnectionManager struct { - logger logger.Logger - url string - connection *amqp.Connection - amqpConfig amqp.Config - connectionMux *sync.RWMutex - ReconnectInterval time.Duration - reconnectionCount uint - reconnectionCountMux *sync.Mutex - dispatcher *dispatcher.Dispatcher + logger logger.Logger + url string + connection *amqp.Connection + amqpConfig amqp.Config + connectionMux *sync.RWMutex + ReconnectInterval time.Duration + reconnectionCount uint + reconnectionCountMux *sync.Mutex + dispatcher *dispatcher.Dispatcher } // NewConnectionManager creates a new connection manager func NewConnectionManager(url string, conf amqp.Config, log logger.Logger, reconnectInterval time.Duration) (*ConnectionManager, error) { - conn, err := amqp.DialConfig(url, amqp.Config(conf)) - if err != nil { - return nil, err - } - connManager := ConnectionManager{ - logger: log, - url: url, - connection: conn, - amqpConfig: conf, - connectionMux: &sync.RWMutex{}, - ReconnectInterval: reconnectInterval, - reconnectionCount: 0, - reconnectionCountMux: &sync.Mutex{}, - dispatcher: dispatcher.NewDispatcher(), - } - go connManager.startNotifyClose() - return &connManager, nil + conn, err := amqp.DialConfig(url, amqp.Config(conf)) + if err != nil { + return nil, err + } + connManager := ConnectionManager{ + logger: log, + url: url, + connection: conn, + amqpConfig: conf, + connectionMux: &sync.RWMutex{}, + ReconnectInterval: reconnectInterval, + reconnectionCount: 0, + reconnectionCountMux: &sync.Mutex{}, + dispatcher: dispatcher.NewDispatcher(), + } + go connManager.startNotifyClose() + return &connManager, nil } // Close safely closes the current channel and connection func (connManager *ConnectionManager) Close() error { - connManager.logger.Infof("closing connection manager...") - connManager.connectionMux.Lock() - defer connManager.connectionMux.Unlock() - - err := connManager.connection.Close() - if err != nil { - return err - } - return nil + connManager.logger.Infof("closing connection manager...") + connManager.connectionMux.Lock() + defer connManager.connectionMux.Unlock() + + err := connManager.connection.Close() + if err != nil { + return err + } + return nil } // NotifyReconnect adds a new subscriber that will receive error messages whenever // the connection manager has successfully reconnected to the server func (connManager *ConnectionManager) NotifyReconnect() (<-chan error, chan<- struct{}) { - return connManager.dispatcher.AddSubscriber() + return connManager.dispatcher.AddSubscriber() } // CheckoutConnection - func (connManager *ConnectionManager) CheckoutConnection() *amqp.Connection { - connManager.connectionMux.RLock() - return connManager.connection + connManager.connectionMux.RLock() + return connManager.connection } // CheckinConnection - func (connManager *ConnectionManager) CheckinConnection() { - connManager.connectionMux.RUnlock() + connManager.connectionMux.RUnlock() } // startNotifyCancelOrClosed listens on the channel's cancelled and closed @@ -78,62 +78,62 @@ func (connManager *ConnectionManager) CheckinConnection() { // Once reconnected, it sends an error back on the manager's notifyCancelOrClose // channel func (connManager *ConnectionManager) startNotifyClose() { - notifyCloseChan := connManager.connection.NotifyClose(make(chan *amqp.Error, 1)) - - err := <-notifyCloseChan - if err != nil { - connManager.logger.Errorf("attempting to reconnect to amqp server after connection close with error: %v", err) - connManager.reconnectLoop() - connManager.logger.Warnf("successfully reconnected to amqp server") - connManager.dispatcher.Dispatch(err) - } - if err == nil { - connManager.logger.Infof("amqp connection closed gracefully") - } + notifyCloseChan := connManager.connection.NotifyClose(make(chan *amqp.Error, 1)) + + err := <-notifyCloseChan + if err != nil { + connManager.logger.Errorf("attempting to reconnect to amqp server after connection close with error: %v", err) + connManager.reconnectLoop() + connManager.logger.Warnf("successfully reconnected to amqp server") + _ = connManager.dispatcher.Dispatch(err) + } + if err == nil { + connManager.logger.Infof("amqp connection closed gracefully") + } } // GetReconnectionCount - func (connManager *ConnectionManager) GetReconnectionCount() uint { - connManager.reconnectionCountMux.Lock() - defer connManager.reconnectionCountMux.Unlock() - return connManager.reconnectionCount + connManager.reconnectionCountMux.Lock() + defer connManager.reconnectionCountMux.Unlock() + return connManager.reconnectionCount } func (connManager *ConnectionManager) incrementReconnectionCount() { - connManager.reconnectionCountMux.Lock() - defer connManager.reconnectionCountMux.Unlock() - connManager.reconnectionCount++ + connManager.reconnectionCountMux.Lock() + defer connManager.reconnectionCountMux.Unlock() + connManager.reconnectionCount++ } // reconnectLoop continuously attempts to reconnect func (connManager *ConnectionManager) reconnectLoop() { - for { - connManager.logger.Infof("waiting %s seconds to attempt to reconnect to amqp server", connManager.ReconnectInterval) - time.Sleep(connManager.ReconnectInterval) - err := connManager.reconnect() - if err != nil { - connManager.logger.Errorf("error reconnecting to amqp server: %v", err) - } else { - connManager.incrementReconnectionCount() - go connManager.startNotifyClose() - return - } - } + for { + connManager.logger.Infof("waiting %s seconds to attempt to reconnect to amqp server", connManager.ReconnectInterval) + time.Sleep(connManager.ReconnectInterval) + err := connManager.reconnect() + if err != nil { + connManager.logger.Errorf("error reconnecting to amqp server: %v", err) + } else { + connManager.incrementReconnectionCount() + go connManager.startNotifyClose() + return + } + } } // reconnect safely closes the current channel and obtains a new one func (connManager *ConnectionManager) reconnect() error { - connManager.connectionMux.Lock() - defer connManager.connectionMux.Unlock() - newConn, err := amqp.DialConfig(connManager.url, amqp.Config(connManager.amqpConfig)) - if err != nil { - return err - } - - if err = connManager.connection.Close(); err != nil { - connManager.logger.Warnf("error closing connection while reconnecting: %v", err) - } - - connManager.connection = newConn - return nil + connManager.connectionMux.Lock() + defer connManager.connectionMux.Unlock() + newConn, err := amqp.DialConfig(connManager.url, amqp.Config(connManager.amqpConfig)) + if err != nil { + return err + } + + if err = connManager.connection.Close(); err != nil { + connManager.logger.Warnf("error closing connection while reconnecting: %v", err) + } + + connManager.connection = newConn + return nil } diff --git a/internal/logger/logger.go b/internal/logger/logger.go index 8c11b7d..6fd5ea0 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -3,10 +3,9 @@ package logger // Logger is describes a logging structure. It can be set using // WithPublisherOptionsLogger() or WithConsumerOptionsLogger(). type Logger interface { - Fatalf(string, ...interface{}) - Errorf(string, ...interface{}) - Warnf(string, ...interface{}) - Infof(string, ...interface{}) - Debugf(string, ...interface{}) - Tracef(string, ...interface{}) + Errorf(string, ...interface{}) + Warnf(string, ...interface{}) + Infof(string, ...interface{}) + Debugf(string, ...interface{}) + Tracef(string, ...interface{}) } diff --git a/logger.go b/logger.go index 2c3f231..3ffa3ba 100644 --- a/logger.go +++ b/logger.go @@ -1,10 +1,10 @@ package rabbitmq import ( - "fmt" - "log" + "fmt" + "log" - "github.com/wagslane/go-rabbitmq/internal/logger" + "github.com/wagslane/go-rabbitmq/internal/logger" ) // Logger is describes a logging structure. It can be set using @@ -15,29 +15,24 @@ const loggingPrefix = "gorabbit" type stdDebugLogger struct{} -// Fatalf - -func (l stdDebugLogger) Fatalf(format string, v ...interface{}) { - log.Printf(fmt.Sprintf("%s FATAL: %s", loggingPrefix, format), v...) -} - // Errorf - func (l stdDebugLogger) Errorf(format string, v ...interface{}) { - log.Printf(fmt.Sprintf("%s ERROR: %s", loggingPrefix, format), v...) + log.Printf(fmt.Sprintf("%s ERROR: %s", loggingPrefix, format), v...) } // Warnf - func (l stdDebugLogger) Warnf(format string, v ...interface{}) { - log.Printf(fmt.Sprintf("%s WARN: %s", loggingPrefix, format), v...) + log.Printf(fmt.Sprintf("%s WARN: %s", loggingPrefix, format), v...) } // Infof - func (l stdDebugLogger) Infof(format string, v ...interface{}) { - log.Printf(fmt.Sprintf("%s INFO: %s", loggingPrefix, format), v...) + log.Printf(fmt.Sprintf("%s INFO: %s", loggingPrefix, format), v...) } // Debugf - func (l stdDebugLogger) Debugf(format string, v ...interface{}) { - log.Printf(fmt.Sprintf("%s DEBUG: %s", loggingPrefix, format), v...) + log.Printf(fmt.Sprintf("%s DEBUG: %s", loggingPrefix, format), v...) } // Tracef - diff --git a/publish.go b/publish.go index 59526d7..0123aaa 100644 --- a/publish.go +++ b/publish.go @@ -1,14 +1,14 @@ package rabbitmq import ( - "context" - "errors" - "fmt" - "sync" - - amqp "github.com/rabbitmq/amqp091-go" - "github.com/wagslane/go-rabbitmq/internal/channelmanager" - "github.com/wagslane/go-rabbitmq/internal/connectionmanager" + "context" + "errors" + "fmt" + "sync" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/wagslane/go-rabbitmq/internal/channelmanager" + "github.com/wagslane/go-rabbitmq/internal/connectionmanager" ) // DeliveryMode. Transient means higher throughput but messages will not be @@ -21,43 +21,43 @@ import ( // delivery modes specific to custom queue implementations are not enumerated // here. const ( - Transient uint8 = amqp.Transient - Persistent uint8 = amqp.Persistent + Transient uint8 = amqp.Transient + Persistent uint8 = amqp.Persistent ) // Return captures a flattened struct of fields returned by the server when a // Publishing is unable to be delivered either due to the `mandatory` flag set // and no route found, or `immediate` flag set and no free consumer. type Return struct { - amqp.Return + amqp.Return } // Confirmation notifies the acknowledgment or negative acknowledgement of a publishing identified by its delivery tag. // Use NotifyPublish to consume these events. ReconnectionCount is useful in that each time it increments, the DeliveryTag // is reset to 0, meaning you can use ReconnectionCount+DeliveryTag to ensure uniqueness type Confirmation struct { - amqp.Confirmation - ReconnectionCount int + amqp.Confirmation + ReconnectionCount int } // Publisher allows you to publish messages safely across an open connection type Publisher struct { - chanManager *channelmanager.ChannelManager - connManager *connectionmanager.ConnectionManager - reconnectErrCh <-chan error - closeConnectionToManagerCh chan<- struct{} + chanManager *channelmanager.ChannelManager + connManager *connectionmanager.ConnectionManager + reconnectErrCh <-chan error + closeConnectionToManagerCh chan<- struct{} - disablePublishDueToFlow bool - disablePublishDueToFlowMux *sync.RWMutex + disablePublishDueToFlow bool + disablePublishDueToFlowMux *sync.RWMutex - disablePublishDueToBlocked bool - disablePublishDueToBlockedMux *sync.RWMutex + disablePublishDueToBlocked bool + disablePublishDueToBlockedMux *sync.RWMutex - handlerMux *sync.Mutex - notifyReturnHandler func(r Return) - notifyPublishHandler func(p Confirmation) + handlerMux *sync.Mutex + notifyReturnHandler func(r Return) + notifyPublishHandler func(p Confirmation) - options PublisherOptions + options PublisherOptions } type PublisherConfirmation []*amqp.DeferredConfirmation @@ -68,142 +68,141 @@ type PublisherConfirmation []*amqp.DeferredConfirmation // Flow controls are automatically handled as they are sent from the server, and publishing // will fail with an error when the server is requesting a slowdown func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publisher, error) { - defaultOptions := getDefaultPublisherOptions() - options := &defaultOptions - for _, optionFunc := range optionFuncs { - optionFunc(options) - } - - if conn.connectionManager == nil { - return nil, errors.New("connection manager can't be nil") - } - - chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval) - if err != nil { - return nil, err - } - - reconnectErrCh, closeCh := chanManager.NotifyReconnect() - publisher := &Publisher{ - chanManager: chanManager, - connManager: conn.connectionManager, - reconnectErrCh: reconnectErrCh, - closeConnectionToManagerCh: closeCh, - disablePublishDueToFlow: false, - disablePublishDueToFlowMux: &sync.RWMutex{}, - disablePublishDueToBlocked: false, - disablePublishDueToBlockedMux: &sync.RWMutex{}, - handlerMux: &sync.Mutex{}, - notifyReturnHandler: nil, - notifyPublishHandler: nil, - options: *options, - } - - err = publisher.startup() - if err != nil { - return nil, err - } - - go func() { - for err := range publisher.reconnectErrCh { - publisher.options.Logger.Infof("successful publisher recovery from: %v", err) - err := publisher.startup() - if err != nil { - publisher.options.Logger.Fatalf("error on startup for publisher after cancel or close: %v", err) - publisher.options.Logger.Fatalf("publisher closing, unable to recover") - return - } - go publisher.startReturnHandler() - go publisher.startPublishHandler() - } - }() - - if options.ConfirmMode { - publisher.NotifyPublish(func(_ Confirmation) {}) - } - - return publisher, nil + defaultOptions := getDefaultPublisherOptions() + options := &defaultOptions + for _, optionFunc := range optionFuncs { + optionFunc(options) + } + + if conn.connectionManager == nil { + return nil, errors.New("connection manager can't be nil") + } + + chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval) + if err != nil { + return nil, err + } + + reconnectErrCh, closeCh := chanManager.NotifyReconnect() + publisher := &Publisher{ + chanManager: chanManager, + connManager: conn.connectionManager, + reconnectErrCh: reconnectErrCh, + closeConnectionToManagerCh: closeCh, + disablePublishDueToFlow: false, + disablePublishDueToFlowMux: &sync.RWMutex{}, + disablePublishDueToBlocked: false, + disablePublishDueToBlockedMux: &sync.RWMutex{}, + handlerMux: &sync.Mutex{}, + notifyReturnHandler: nil, + notifyPublishHandler: nil, + options: *options, + } + + err = publisher.startup() + if err != nil { + return nil, err + } + + go func() { + for err := range publisher.reconnectErrCh { + publisher.options.Logger.Infof("successful publisher recovery from: %v", err) + err := publisher.startup() + if err != nil { + publisher.options.Logger.Errorf("error on startup for publisher after cancel or close: %v", err) + return + } + go publisher.startReturnHandler() + go publisher.startPublishHandler() + } + }() + + if options.ConfirmMode { + publisher.NotifyPublish(func(_ Confirmation) {}) + } + + return publisher, nil } func (publisher *Publisher) startup() error { - err := declareExchange(publisher.chanManager, publisher.options.ExchangeOptions) - if err != nil { - return fmt.Errorf("declare exchange failed: %w", err) - } - go publisher.startNotifyFlowHandler() - go publisher.startNotifyBlockedHandler() - return nil + err := declareExchange(publisher.chanManager, publisher.options.ExchangeOptions) + if err != nil { + return fmt.Errorf("declare exchange failed: %w", err) + } + go publisher.startNotifyFlowHandler() + go publisher.startNotifyBlockedHandler() + return nil } /* Publish publishes the provided data to the given routing keys over the connection. */ func (publisher *Publisher) Publish( - data []byte, - routingKeys []string, - optionFuncs ...func(*PublishOptions), + data []byte, + routingKeys []string, + optionFuncs ...func(*PublishOptions), ) error { - return publisher.PublishWithContext(context.Background(), data, routingKeys, optionFuncs...) + return publisher.PublishWithContext(context.Background(), data, routingKeys, optionFuncs...) } // PublishWithContext publishes the provided data to the given routing keys over the connection. func (publisher *Publisher) PublishWithContext( - ctx context.Context, - data []byte, - routingKeys []string, - optionFuncs ...func(*PublishOptions), + ctx context.Context, + data []byte, + routingKeys []string, + optionFuncs ...func(*PublishOptions), ) error { - publisher.disablePublishDueToFlowMux.RLock() - defer publisher.disablePublishDueToFlowMux.RUnlock() - if publisher.disablePublishDueToFlow { - return fmt.Errorf("publishing blocked due to high flow on the server") - } - - publisher.disablePublishDueToBlockedMux.RLock() - defer publisher.disablePublishDueToBlockedMux.RUnlock() - if publisher.disablePublishDueToBlocked { - return fmt.Errorf("publishing blocked due to TCP block on the server") - } - - options := &PublishOptions{} - for _, optionFunc := range optionFuncs { - optionFunc(options) - } - if options.DeliveryMode == 0 { - options.DeliveryMode = Transient - } - - for _, routingKey := range routingKeys { - message := amqp.Publishing{} - message.ContentType = options.ContentType - message.DeliveryMode = options.DeliveryMode - message.Body = data - message.Headers = tableToAMQPTable(options.Headers) - message.Expiration = options.Expiration - message.ContentEncoding = options.ContentEncoding - message.Priority = options.Priority - message.CorrelationId = options.CorrelationID - message.ReplyTo = options.ReplyTo - message.MessageId = options.MessageID - message.Timestamp = options.Timestamp - message.Type = options.Type - message.UserId = options.UserID - message.AppId = options.AppID - - // Actual publish. - err := publisher.chanManager.PublishWithContextSafe( - ctx, - options.Exchange, - routingKey, - options.Mandatory, - options.Immediate, - message, - ) - if err != nil { - return err - } - } - return nil + publisher.disablePublishDueToFlowMux.RLock() + defer publisher.disablePublishDueToFlowMux.RUnlock() + if publisher.disablePublishDueToFlow { + return fmt.Errorf("publishing blocked due to high flow on the server") + } + + publisher.disablePublishDueToBlockedMux.RLock() + defer publisher.disablePublishDueToBlockedMux.RUnlock() + if publisher.disablePublishDueToBlocked { + return fmt.Errorf("publishing blocked due to TCP block on the server") + } + + options := &PublishOptions{} + for _, optionFunc := range optionFuncs { + optionFunc(options) + } + if options.DeliveryMode == 0 { + options.DeliveryMode = Transient + } + + for _, routingKey := range routingKeys { + message := amqp.Publishing{} + message.ContentType = options.ContentType + message.DeliveryMode = options.DeliveryMode + message.Body = data + message.Headers = tableToAMQPTable(options.Headers) + message.Expiration = options.Expiration + message.ContentEncoding = options.ContentEncoding + message.Priority = options.Priority + message.CorrelationId = options.CorrelationID + message.ReplyTo = options.ReplyTo + message.MessageId = options.MessageID + message.Timestamp = options.Timestamp + message.Type = options.Type + message.UserId = options.UserID + message.AppId = options.AppID + + // Actual publish. + err := publisher.chanManager.PublishWithContextSafe( + ctx, + options.Exchange, + routingKey, + options.Mandatory, + options.Immediate, + message, + ) + if err != nil { + return err + } + } + return nil } // PublishWithContext publishes the provided data to the given routing keys over the connection. @@ -211,81 +210,81 @@ func (publisher *Publisher) PublishWithContext( // or by using `WithPublisherOptionsConfirm`) a publisher confirmation is returned. // This confirmation can be used to check if the message was actually published or wait for this to happen. func (publisher *Publisher) PublishWithDeferredConfirmWithContext( - ctx context.Context, - data []byte, - routingKeys []string, - optionFuncs ...func(*PublishOptions), + ctx context.Context, + data []byte, + routingKeys []string, + optionFuncs ...func(*PublishOptions), ) (PublisherConfirmation, error) { - publisher.disablePublishDueToFlowMux.RLock() - defer publisher.disablePublishDueToFlowMux.RUnlock() - if publisher.disablePublishDueToFlow { - return nil, fmt.Errorf("publishing blocked due to high flow on the server") - } - - publisher.disablePublishDueToBlockedMux.RLock() - defer publisher.disablePublishDueToBlockedMux.RUnlock() - if publisher.disablePublishDueToBlocked { - return nil, fmt.Errorf("publishing blocked due to TCP block on the server") - } - - options := &PublishOptions{} - for _, optionFunc := range optionFuncs { - optionFunc(options) - } - if options.DeliveryMode == 0 { - options.DeliveryMode = Transient - } - - var deferredConfirmations []*amqp.DeferredConfirmation - - for _, routingKey := range routingKeys { - message := amqp.Publishing{} - message.ContentType = options.ContentType - message.DeliveryMode = options.DeliveryMode - message.Body = data - message.Headers = tableToAMQPTable(options.Headers) - message.Expiration = options.Expiration - message.ContentEncoding = options.ContentEncoding - message.Priority = options.Priority - message.CorrelationId = options.CorrelationID - message.ReplyTo = options.ReplyTo - message.MessageId = options.MessageID - message.Timestamp = options.Timestamp - message.Type = options.Type - message.UserId = options.UserID - message.AppId = options.AppID - - // Actual publish. - conf, err := publisher.chanManager.PublishWithDeferredConfirmWithContextSafe( - ctx, - options.Exchange, - routingKey, - options.Mandatory, - options.Immediate, - message, - ) - if err != nil { - return nil, err - } - deferredConfirmations = append(deferredConfirmations, conf) - } - return deferredConfirmations, nil + publisher.disablePublishDueToFlowMux.RLock() + defer publisher.disablePublishDueToFlowMux.RUnlock() + if publisher.disablePublishDueToFlow { + return nil, fmt.Errorf("publishing blocked due to high flow on the server") + } + + publisher.disablePublishDueToBlockedMux.RLock() + defer publisher.disablePublishDueToBlockedMux.RUnlock() + if publisher.disablePublishDueToBlocked { + return nil, fmt.Errorf("publishing blocked due to TCP block on the server") + } + + options := &PublishOptions{} + for _, optionFunc := range optionFuncs { + optionFunc(options) + } + if options.DeliveryMode == 0 { + options.DeliveryMode = Transient + } + + var deferredConfirmations []*amqp.DeferredConfirmation + + for _, routingKey := range routingKeys { + message := amqp.Publishing{} + message.ContentType = options.ContentType + message.DeliveryMode = options.DeliveryMode + message.Body = data + message.Headers = tableToAMQPTable(options.Headers) + message.Expiration = options.Expiration + message.ContentEncoding = options.ContentEncoding + message.Priority = options.Priority + message.CorrelationId = options.CorrelationID + message.ReplyTo = options.ReplyTo + message.MessageId = options.MessageID + message.Timestamp = options.Timestamp + message.Type = options.Type + message.UserId = options.UserID + message.AppId = options.AppID + + // Actual publish. + conf, err := publisher.chanManager.PublishWithDeferredConfirmWithContextSafe( + ctx, + options.Exchange, + routingKey, + options.Mandatory, + options.Immediate, + message, + ) + if err != nil { + return nil, err + } + deferredConfirmations = append(deferredConfirmations, conf) + } + return deferredConfirmations, nil } // Close closes the publisher and releases resources // The publisher should be discarded as it's not safe for re-use // Only call Close() once func (publisher *Publisher) Close() { - // close the channel so that rabbitmq server knows that the - // publisher has been stopped. - err := publisher.chanManager.Close() - if err != nil { - publisher.options.Logger.Warnf("error while closing the channel: %v", err) - } - publisher.options.Logger.Infof("closing publisher...") - go func() { - publisher.closeConnectionToManagerCh <- struct{}{} - }() + // close the channel so that rabbitmq server knows that the + // publisher has been stopped. + err := publisher.chanManager.Close() + if err != nil { + publisher.options.Logger.Warnf("error while closing the channel: %v", err) + } + publisher.options.Logger.Infof("closing publisher...") + go func() { + publisher.closeConnectionToManagerCh <- struct{}{} + }() } // NotifyReturn registers a listener for basic.return methods. @@ -293,58 +292,58 @@ func (publisher *Publisher) Close() { // These notifications are shared across an entire connection, so if you're creating multiple // publishers on the same connection keep that in mind func (publisher *Publisher) NotifyReturn(handler func(r Return)) { - publisher.handlerMux.Lock() - start := publisher.notifyReturnHandler == nil - publisher.notifyReturnHandler = handler - publisher.handlerMux.Unlock() - - if start { - go publisher.startReturnHandler() - } + publisher.handlerMux.Lock() + start := publisher.notifyReturnHandler == nil + publisher.notifyReturnHandler = handler + publisher.handlerMux.Unlock() + + if start { + go publisher.startReturnHandler() + } } // NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option // These notifications are shared across an entire connection, so if you're creating multiple // publishers on the same connection keep that in mind func (publisher *Publisher) NotifyPublish(handler func(p Confirmation)) { - publisher.handlerMux.Lock() - start := publisher.notifyPublishHandler == nil - publisher.notifyPublishHandler = handler - publisher.handlerMux.Unlock() - - if start { - go publisher.startPublishHandler() - } + publisher.handlerMux.Lock() + start := publisher.notifyPublishHandler == nil + publisher.notifyPublishHandler = handler + publisher.handlerMux.Unlock() + + if start { + go publisher.startPublishHandler() + } } func (publisher *Publisher) startReturnHandler() { - publisher.handlerMux.Lock() - if publisher.notifyReturnHandler == nil { - publisher.handlerMux.Unlock() - return - } - publisher.handlerMux.Unlock() - - returns := publisher.chanManager.NotifyReturnSafe(make(chan amqp.Return, 1)) - for ret := range returns { - go publisher.notifyReturnHandler(Return{ret}) - } + publisher.handlerMux.Lock() + if publisher.notifyReturnHandler == nil { + publisher.handlerMux.Unlock() + return + } + publisher.handlerMux.Unlock() + + returns := publisher.chanManager.NotifyReturnSafe(make(chan amqp.Return, 1)) + for ret := range returns { + go publisher.notifyReturnHandler(Return{ret}) + } } func (publisher *Publisher) startPublishHandler() { - publisher.handlerMux.Lock() - if publisher.notifyPublishHandler == nil { - publisher.handlerMux.Unlock() - return - } - publisher.handlerMux.Unlock() - - publisher.chanManager.ConfirmSafe(false) - confirmationCh := publisher.chanManager.NotifyPublishSafe(make(chan amqp.Confirmation, 1)) - for conf := range confirmationCh { - go publisher.notifyPublishHandler(Confirmation{ - Confirmation: conf, - ReconnectionCount: int(publisher.chanManager.GetReconnectionCount()), - }) - } + publisher.handlerMux.Lock() + if publisher.notifyPublishHandler == nil { + publisher.handlerMux.Unlock() + return + } + publisher.handlerMux.Unlock() + + _ = publisher.chanManager.ConfirmSafe(false) + confirmationCh := publisher.chanManager.NotifyPublishSafe(make(chan amqp.Confirmation, 1)) + for conf := range confirmationCh { + go publisher.notifyPublishHandler(Confirmation{ + Confirmation: conf, + ReconnectionCount: int(publisher.chanManager.GetReconnectionCount()), + }) + } } diff --git a/table.go b/table.go index 1b351f4..54cf338 100644 --- a/table.go +++ b/table.go @@ -33,9 +33,9 @@ import amqp "github.com/rabbitmq/amqp091-go" type Table map[string]interface{} func tableToAMQPTable(table Table) amqp.Table { - new := amqp.Table{} - for k, v := range table { - new[k] = v - } - return new + newTable := amqp.Table{} + for k, v := range table { + newTable[k] = v + } + return newTable } diff --git a/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md b/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md index 1165775..02523c2 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md +++ b/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md @@ -1,5 +1,39 @@ # Changelog +## [v1.8.0](https://github.com/rabbitmq/amqp091-go/tree/v1.8.0) (2023-03-21) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.7.0...v1.8.0) + +**Closed issues:** + +- memory leak [\#179](https://github.com/rabbitmq/amqp091-go/issues/179) +- the publishWithContext interface will not return when it times out [\#178](https://github.com/rabbitmq/amqp091-go/issues/178) + +**Merged pull requests:** + +- Fix race condition on confirms [\#183](https://github.com/rabbitmq/amqp091-go/pull/183) ([calloway-jacob](https://github.com/calloway-jacob)) +- Add a CloseDeadline function to Connection [\#181](https://github.com/rabbitmq/amqp091-go/pull/181) ([Zerpet](https://github.com/Zerpet)) +- Fix memory leaks [\#180](https://github.com/rabbitmq/amqp091-go/pull/180) ([GXKe](https://github.com/GXKe)) +- Bump go.uber.org/goleak from 1.2.0 to 1.2.1 [\#177](https://github.com/rabbitmq/amqp091-go/pull/177) ([dependabot[bot]](https://github.com/apps/dependabot)) + +## [v1.7.0](https://github.com/rabbitmq/amqp091-go/tree/v1.7.0) (2023-02-09) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.1...v1.7.0) + +**Closed issues:** + +- \#31 resurfacing \(?\) [\#170](https://github.com/rabbitmq/amqp091-go/issues/170) +- Deprecate QueueInspect [\#167](https://github.com/rabbitmq/amqp091-go/issues/167) +- v1.6.0 causing rabbit connection errors [\#160](https://github.com/rabbitmq/amqp091-go/issues/160) + +**Merged pull requests:** + +- Set channels and allocator to nil in shutdown [\#172](https://github.com/rabbitmq/amqp091-go/pull/172) ([lukebakken](https://github.com/lukebakken)) +- Fix racing in Open [\#171](https://github.com/rabbitmq/amqp091-go/pull/171) ([Zerpet](https://github.com/Zerpet)) +- adding go 1.20 to tests [\#169](https://github.com/rabbitmq/amqp091-go/pull/169) ([halilylm](https://github.com/halilylm)) +- Deprecate the QueueInspect function [\#168](https://github.com/rabbitmq/amqp091-go/pull/168) ([lukebakken](https://github.com/lukebakken)) +- Check if channel is nil before updating it [\#150](https://github.com/rabbitmq/amqp091-go/pull/150) ([julienschmidt](https://github.com/julienschmidt)) + ## [v1.6.1](https://github.com/rabbitmq/amqp091-go/tree/v1.6.1) (2023-02-01) [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.1-rc.2...v1.6.1) diff --git a/vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md b/vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md index ed1b971..ec86fe5 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md +++ b/vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md @@ -9,11 +9,13 @@ Here is the recommended workflow: 1. Run Static Checks 1. Run integration tests (see below) 1. **Implement tests** -1. Implement fixs -1. Commit your changes (`git commit -am 'Add some feature'`) +1. Implement fixes +1. Commit your changes. Use a [good, descriptive, commit message][good-commit]. 1. Push to a branch (`git push -u origin my-new-feature`) 1. Submit a pull request +[good-commit]: https://cbea.ms/git-commit/ + ## Running Static Checks golangci-lint must be installed to run the static checks. See [installation @@ -43,6 +45,18 @@ The integration tests can be run via: make tests ``` +Some tests require access to `rabbitmqctl` CLI. Use the environment variable +`RABBITMQ_RABBITMQCTL_PATH=/some/path/to/rabbitmqctl` to run those tests. + +If you have Docker available in your machine, you can run: + +```shell +make tests-docker +``` + +This target will start a RabbitMQ container, run the test suite with the environment +variable setup, and stop RabbitMQ container after a successful run. + All integration tests should use the `integrationConnection(...)` test helpers defined in `integration_test.go` to setup the integration environment and logging. diff --git a/vendor/github.com/rabbitmq/amqp091-go/Makefile b/vendor/github.com/rabbitmq/amqp091-go/Makefile index 7342731..69e9e2b 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/Makefile +++ b/vendor/github.com/rabbitmq/amqp091-go/Makefile @@ -19,6 +19,11 @@ fmt: ## Run go fmt against code tests: ## Run all tests and requires a running rabbitmq-server. Use GO_TEST_FLAGS to add extra flags to go test go test -race -v -tags integration $(GO_TEST_FLAGS) +.PHONY: tests-docker +tests-docker: rabbitmq-server + RABBITMQ_RABBITMQCTL_PATH="DOCKER:$(CONTAINER_NAME)" go test -race -v -tags integration $(GO_TEST_FLAGS) + $(MAKE) stop-rabbitmq-server + .PHONY: check check: golangci-lint run ./... diff --git a/vendor/github.com/rabbitmq/amqp091-go/channel.go b/vendor/github.com/rabbitmq/amqp091-go/channel.go index 8ba9bab..ae6f2d1 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/channel.go +++ b/vendor/github.com/rabbitmq/amqp091-go/channel.go @@ -1435,6 +1435,11 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex ch.m.Lock() defer ch.m.Unlock() + var dc *DeferredConfirmation + if ch.confirming { + dc = ch.confirms.publish() + } + if err := ch.send(&basicPublish{ Exchange: exchange, RoutingKey: key, @@ -1457,14 +1462,13 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex AppId: msg.AppId, }, }); err != nil { + if ch.confirming { + ch.confirms.unpublish() + } return nil, err } - if ch.confirming { - return ch.confirms.Publish(), nil - } - - return nil, nil + return dc, nil } /* diff --git a/vendor/github.com/rabbitmq/amqp091-go/confirms.go b/vendor/github.com/rabbitmq/amqp091-go/confirms.go index f9973b7..577e042 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/confirms.go +++ b/vendor/github.com/rabbitmq/amqp091-go/confirms.go @@ -39,7 +39,7 @@ func (c *confirms) Listen(l chan Confirmation) { } // Publish increments the publishing counter -func (c *confirms) Publish() *DeferredConfirmation { +func (c *confirms) publish() *DeferredConfirmation { c.publishedMut.Lock() defer c.publishedMut.Unlock() @@ -47,6 +47,15 @@ func (c *confirms) Publish() *DeferredConfirmation { return c.deferredConfirmations.Add(c.published) } +// unpublish decrements the publishing counter and removes the +// DeferredConfirmation. It must be called immediately after a publish fails. +func (c *confirms) unpublish() { + c.publishedMut.Lock() + defer c.publishedMut.Unlock() + c.deferredConfirmations.remove(c.published) + c.published-- +} + // confirm confirms one publishing, increments the expecting delivery tag, and // removes bookkeeping for that delivery tag. func (c *confirms) confirm(confirmation Confirmation) { @@ -135,6 +144,18 @@ func (d *deferredConfirmations) Add(tag uint64) *DeferredConfirmation { return dc } +// remove is only used to drop a tag whose publish failed +func (d *deferredConfirmations) remove(tag uint64) { + d.m.Lock() + defer d.m.Unlock() + dc, found := d.confirmations[tag] + if !found { + return + } + close(dc.done) + delete(d.confirmations, tag) +} + func (d *deferredConfirmations) Confirm(confirmation Confirmation) { d.m.Lock() defer d.m.Unlock() diff --git a/vendor/github.com/rabbitmq/amqp091-go/connection.go b/vendor/github.com/rabbitmq/amqp091-go/connection.go index def2260..3d50d95 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/connection.go +++ b/vendor/github.com/rabbitmq/amqp091-go/connection.go @@ -28,7 +28,7 @@ const ( defaultHeartbeat = 10 * time.Second defaultConnectionTimeout = 30 * time.Second defaultProduct = "AMQP 0.9.1 Client" - buildVersion = "1.6.0" + buildVersion = "1.8.1" platform = "golang" // Safer default that makes channel leaks a lot easier to spot // before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593. @@ -399,12 +399,47 @@ func (c *Connection) Close() error { ) } +// CloseDeadline requests and waits for the response to close this AMQP connection. +// +// Accepts a deadline for waiting the server response. The deadline is passed +// to the low-level connection i.e. network socket. +// +// Regardless of the error returned, the connection is considered closed, and it +// should not be used after calling this function. +// +// In the event of an I/O timeout, connection-closed listeners are NOT informed. +// +// After returning from this call, all resources associated with this connection, +// including the underlying io, Channels, Notify listeners and Channel consumers +// will also be closed. +func (c *Connection) CloseDeadline(deadline time.Time) error { + if c.IsClosed() { + return ErrClosed + } + + defer c.shutdown(nil) + + err := c.setDeadline(deadline) + if err != nil { + return err + } + + return c.call( + &connectionClose{ + ReplyCode: replySuccess, + ReplyText: "kthxbai", + }, + &connectionCloseOk{}, + ) +} + func (c *Connection) closeWith(err *Error) error { if c.IsClosed() { return ErrClosed } defer c.shutdown(err) + return c.call( &connectionClose{ ReplyCode: uint16(err.Code), @@ -420,6 +455,18 @@ func (c *Connection) IsClosed() bool { return atomic.LoadInt32(&c.closed) == 1 } +// setDeadline is a wrapper to type assert Connection.conn and set an I/O +// deadline in the underlying TCP connection socket, by calling +// net.Conn.SetDeadline(). It returns an error, in case the type assertion fails, +// although this should never happen. +func (c *Connection) setDeadline(t time.Time) error { + con, ok := c.conn.(net.Conn) + if !ok { + return errInvalidTypeAssertion + } + return con.SetDeadline(t) +} + func (c *Connection) send(f frame) error { if c.IsClosed() { return ErrClosed diff --git a/vendor/github.com/rabbitmq/amqp091-go/consumers.go b/vendor/github.com/rabbitmq/amqp091-go/consumers.go index 8c23fad..c352fec 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/consumers.go +++ b/vendor/github.com/rabbitmq/amqp091-go/consumers.go @@ -75,6 +75,33 @@ func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) { } case out <- *queue[0]: + /* + * https://github.com/rabbitmq/amqp091-go/issues/179 + * https://github.com/rabbitmq/amqp091-go/pull/180 + * + * Comment from @lars-t-hansen: + * + * Given Go's slice semantics, and barring any information + * available to the compiler that proves that queue is the only + * pointer to the memory it references, the only meaning that + * queue = queue[1:] can have is basically queue += sizeof(queue + * element), ie, it bumps a pointer. Looking at the generated + * code for a simple example (on ARM64 in this case) bears this + * out. So what we're left with is an array that we have a + * pointer into the middle of. When the GC traces this pointer, + * it too does not know whether the array has multiple + * referents, and so its only sensible choice is to find the + * beginning of the array, and if the array is not already + * visited, mark every element in it, including the "dead" + * pointer. + * + * (Depending on the program dynamics, an element may eventually + * be appended to the queue when the queue is at capacity, and + * in this case the live elements are copied into a new array + * and the old array is left to be GC'd eventually, along with + * the dead object. But that can take time.) + */ + queue[0] = nil queue = queue[1:] } } diff --git a/vendor/github.com/rabbitmq/amqp091-go/types.go b/vendor/github.com/rabbitmq/amqp091-go/types.go index 427eefb..e8d8986 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/types.go +++ b/vendor/github.com/rabbitmq/amqp091-go/types.go @@ -63,6 +63,11 @@ var ( ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"} ) +// internal errors used inside the library +var ( + errInvalidTypeAssertion = &Error{Code: InternalError, Reason: "type assertion unsuccessful", Server: false, Recover: true} +) + // Error captures the code and reason a channel or connection has been closed // by the server. type Error struct { diff --git a/vendor/modules.txt b/vendor/modules.txt index 8765d0d..3187b8c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,3 +1,3 @@ -# github.com/rabbitmq/amqp091-go v1.7.0 +# github.com/rabbitmq/amqp091-go v1.8.1 ## explicit; go 1.16 github.com/rabbitmq/amqp091-go