diff --git a/README.md b/README.md index 3d12e32..20578a8 100644 --- a/README.md +++ b/README.md @@ -50,9 +50,9 @@ err = consumer.StartConsuming( }, "my_queue", // spawns 10 goroutines to handle incoming messages - rabbitmq.WithConsumeOptionsConcurrency(10), + rabbitmq.WithConsumerOptionsConcurrency(10), // assigns a name to this consumer on the cluster - rabbitmq.WithConsumeOptionsConsumerName(consumerName), + rabbitmq.WithConsumerOptionsConsumerName(consumerName), rabbitmq.WithConsumeDeclareOptions( // creates a durable queue named "my_queue" // if it doesn't exist yet diff --git a/connection.go b/connection.go index e91c601..ceadc52 100644 --- a/connection.go +++ b/connection.go @@ -1,6 +1,8 @@ package rabbitmq import ( + "sync" + amqp "github.com/rabbitmq/amqp091-go" "github.com/wagslane/go-rabbitmq/internal/connectionmanager" ) @@ -11,9 +13,12 @@ type Conn struct { connectionManager *connectionmanager.ConnectionManager reconnectErrCh <-chan error closeConnectionToManagerCh chan<- struct{} - notifyReturnChan chan Return - notifyPublishChan chan Confirmation - options ConnectionOptions + + handlerMux *sync.Mutex + notifyReturnHandler func(r Return) + notifyPublishHandler func(p Confirmation) + + options ConnectionOptions } // Config wraps amqp.Config @@ -49,8 +54,9 @@ func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error) connectionManager: manager, reconnectErrCh: reconnectErrCh, closeConnectionToManagerCh: closeCh, - notifyReturnChan: nil, - notifyPublishChan: nil, + handlerMux: &sync.Mutex{}, + notifyReturnHandler: nil, + notifyPublishHandler: nil, options: *options, } @@ -61,50 +67,68 @@ func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error) func (conn *Conn) handleRestarts() { for err := range conn.reconnectErrCh { conn.options.Logger.Infof("successful connection recovery from: %v", err) - go conn.startNotifyReturnHandler() - go conn.startNotifyPublishHandler() + go conn.startReturnHandler() + go conn.startPublishHandler() } } -func (conn *Conn) startNotifyReturnHandler() { - if conn.notifyReturnChan == nil { +// NotifyReturn registers a listener for basic.return methods. +// These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags. +// These notifications are shared across an entire connection, so if you're creating multiple +// publishers on the same connection keep that in mind +func (conn *Conn) NotifyReturn(handler func(r Return)) { + conn.handlerMux.Lock() + conn.notifyReturnHandler = handler + conn.handlerMux.Unlock() + + go conn.startReturnHandler() +} + +// NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option +// These notifications are shared across an entire connection, so if you're creating multiple +// publishers on the same connection keep that in mind +func (conn *Conn) NotifyPublish(handler func(p Confirmation)) { + conn.handlerMux.Lock() + conn.notifyPublishHandler = handler + conn.handlerMux.Unlock() + + go conn.startPublishHandler() +} + +func (conn *Conn) startReturnHandler() { + conn.handlerMux.Lock() + if conn.notifyReturnHandler == nil { return } - returnAMQPCh := conn.connectionManager.NotifyReturnSafe(make(chan amqp.Return, 1)) - for ret := range returnAMQPCh { - conn.notifyReturnChan <- Return{ret} + conn.handlerMux.Unlock() + + returns := conn.connectionManager.NotifyReturnSafe(make(chan amqp.Return, 1)) + for ret := range returns { + go conn.notifyReturnHandler(Return{ret}) } } -func (conn *Conn) startNotifyPublishHandler() { - if conn.notifyPublishChan == nil { +func (conn *Conn) startPublishHandler() { + conn.handlerMux.Lock() + if conn.notifyPublishHandler == nil { return } + conn.handlerMux.Unlock() + conn.connectionManager.ConfirmSafe(false) - publishAMQPCh := conn.connectionManager.NotifyPublishSafe(make(chan amqp.Confirmation, 1)) - for conf := range publishAMQPCh { - conn.notifyPublishChan <- Confirmation{ + confirmationCh := conn.connectionManager.NotifyPublishSafe(make(chan amqp.Confirmation, 1)) + for conf := range confirmationCh { + go conn.notifyPublishHandler(Confirmation{ Confirmation: conf, ReconnectionCount: int(conn.connectionManager.GetReconnectionCount()), - } + }) } } -// NotifyReturn registers a listener for basic.return methods. -// These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags. -// These notifications are shared across an entire connection, so if you're creating multiple -// publishers on the same connection keep that in mind -func (conn *Conn) NotifyReturn() <-chan Return { - conn.notifyReturnChan = make(chan Return) - go conn.startNotifyReturnHandler() - return conn.notifyReturnChan -} - -// NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option -// These notifications are shared across an entire connection, so if you're creating multiple -// publishers on the same connection keep that in mind -func (conn *Conn) NotifyPublish() <-chan Confirmation { - conn.notifyPublishChan = make(chan Confirmation) - go conn.startNotifyPublishHandler() - return conn.notifyPublishChan +// Close closes the connection, it's not safe for re-use. +// You should also close any consumers and publishers before +// closing the connection +func (conn *Conn) Close() error { + conn.closeConnectionToManagerCh <- struct{}{} + return conn.connectionManager.Close() } diff --git a/consume.go b/consume.go index 44df665..00852db 100644 --- a/consume.go +++ b/consume.go @@ -3,10 +3,10 @@ package rabbitmq import ( "errors" "fmt" + "sync" amqp "github.com/rabbitmq/amqp091-go" "github.com/wagslane/go-rabbitmq/internal/connectionmanager" - "github.com/wagslane/go-rabbitmq/internal/logger" ) // Action is an action that occurs after processed this delivery @@ -30,12 +30,9 @@ type Consumer struct { reconnectErrCh <-chan error closeConnectionToManagerCh chan<- struct{} options ConsumerOptions -} -// ConsumerOptions are used to describe a consumer's configuration. -// Logger specifies a custom Logger interface implementation. -type ConsumerOptions struct { - Logger logger.Logger + isClosedMux *sync.RWMutex + isClosed bool } // Delivery captures the fields for a previously delivered message resident in @@ -46,10 +43,16 @@ type Delivery struct { } // NewConsumer returns a new Consumer connected to the given rabbitmq server -func NewConsumer(conn *Conn, optionFuncs ...func(*ConsumerOptions)) (*Consumer, error) { - options := &ConsumerOptions{ - Logger: &stdDebugLogger{}, - } +// it also starts consuming on the given connection with automatic reconnection handling +// Do do reuse the returned consumer for anything other than to close it +func NewConsumer( + conn *Conn, + handler Handler, + queue string, + optionFuncs ...func(*ConsumerOptions), +) (*Consumer, error) { + defaultOptions := getDefaultConsumerOptions(queue) + options := &defaultOptions for _, optionFunc := range optionFuncs { optionFunc(options) } @@ -64,37 +67,8 @@ func NewConsumer(conn *Conn, optionFuncs ...func(*ConsumerOptions)) (*Consumer, reconnectErrCh: reconnectErrCh, closeConnectionToManagerCh: closeCh, options: *options, - } - - return consumer, nil -} - -// WithConsumerOptionsLogging uses a default logger that writes to std out -func WithConsumerOptionsLogging(options *ConsumerOptions) { - options.Logger = &stdDebugLogger{} -} - -// WithConsumerOptionsLogger sets logging to a custom interface. -// Use WithConsumerOptionsLogging to just log to stdout. -func WithConsumerOptionsLogger(log logger.Logger) func(options *ConsumerOptions) { - return func(options *ConsumerOptions) { - options.Logger = log - } -} - -// StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency". -// Each goroutine spawns a handler that consumes off of the given queue which binds to the routing key(s). -// The provided handler is called once for each message. If the provided queue doesn't exist, it -// will be created on the cluster -func (consumer *Consumer) StartConsuming( - handler Handler, - queue string, - optionFuncs ...func(*ConsumeOptions), -) error { - defaultOptions := getDefaultConsumeOptions(queue) - options := &defaultOptions - for _, optionFunc := range optionFuncs { - optionFunc(options) + isClosedMux: &sync.RWMutex{}, + isClosed: false, } err := consumer.startGoroutines( @@ -102,7 +76,7 @@ func (consumer *Consumer) StartConsuming( *options, ) if err != nil { - return err + return nil, err } go func() { @@ -117,15 +91,22 @@ func (consumer *Consumer) StartConsuming( } } }() - return nil + + return consumer, nil } // Close cleans up resources and closes the consumer. // It does not close the connection manager, just the subscription -// to the connection manager +// to the connection manager and the consuming goroutines. +// Only call once. func (consumer *Consumer) Close() { + consumer.isClosedMux.Lock() + defer consumer.isClosedMux.Unlock() + consumer.isClosed = true consumer.options.Logger.Infof("closing consumer...") - consumer.closeConnectionToManagerCh <- struct{}{} + go func() { + consumer.closeConnectionToManagerCh <- struct{}{} + }() } // startGoroutines declares the queue if it doesn't exist, @@ -133,7 +114,7 @@ func (consumer *Consumer) Close() { // that will consume from the queue func (consumer *Consumer) startGoroutines( handler Handler, - options ConsumeOptions, + options ConsumerOptions, ) error { err := declareExchange(consumer.connManager, options.ExchangeOptions) @@ -169,12 +150,23 @@ func (consumer *Consumer) startGoroutines( return nil } -func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOptions ConsumeOptions, handler Handler) { +func (consumer *Consumer) getIsClosed() bool { + consumer.isClosedMux.RLock() + defer consumer.isClosedMux.RUnlock() + return consumer.isClosed +} + +func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOptions ConsumerOptions, handler Handler) { for msg := range msgs { + if consumer.getIsClosed() { + break + } + if consumeOptions.RabbitConsumerOptions.AutoAck { handler(Delivery{msg}) continue } + switch handler(Delivery{msg}) { case Ack: err := msg.Ack(false) diff --git a/consume_options.go b/consume_options.go deleted file mode 100644 index bc4fb85..0000000 --- a/consume_options.go +++ /dev/null @@ -1,248 +0,0 @@ -package rabbitmq - -import ( - amqp "github.com/rabbitmq/amqp091-go" -) - -// getDefaultConsumeOptions describes the options that will be used when a value isn't provided -func getDefaultConsumeOptions(queueName string) ConsumeOptions { - return ConsumeOptions{ - RabbitConsumerOptions: RabbitConsumerOptions{ - Name: "", - AutoAck: false, - Exclusive: false, - NoWait: false, - NoLocal: false, - Args: Table{}, - }, - QueueOptions: QueueOptions{ - Name: queueName, - Durable: false, - AutoDelete: false, - Exclusive: false, - NoWait: false, - Passive: false, - Args: Table{}, - Declare: true, - }, - ExchangeOptions: ExchangeOptions{ - Name: "", - Kind: amqp.ExchangeDirect, - Durable: false, - AutoDelete: false, - Internal: false, - NoWait: false, - Passive: false, - Args: Table{}, - Declare: true, - }, - Bindings: []Binding{}, - Concurrency: 1, - } -} - -func getDefaultBindingOptions() BindingOptions { - return BindingOptions{ - NoWait: false, - Args: Table{}, - Declare: true, - } -} - -// ConsumeOptions are used to describe how a new consumer will be created. -// If QueueOptions is not nil, the options will be used to declare a queue -// If ExchangeOptions is not nil, it will be used to declare an exchange -// If there are Bindings, the queue will be bound to them -type ConsumeOptions struct { - RabbitConsumerOptions RabbitConsumerOptions - QueueOptions QueueOptions - ExchangeOptions ExchangeOptions - Bindings []Binding - Concurrency int -} - -// RabbitConsumerOptions are used to configure the consumer -// on the rabbit server -type RabbitConsumerOptions struct { - Name string - AutoAck bool - Exclusive bool - NoWait bool - NoLocal bool - Args Table -} - -// QueueOptions are used to configure a queue. -// A passive queue is assumed by RabbitMQ to already exist, and attempting to connect -// to a non-existent queue will cause RabbitMQ to throw an exception. -type QueueOptions struct { - Name string - Durable bool - AutoDelete bool - Exclusive bool - NoWait bool - Passive bool // if false, a missing queue will be created on the server - Args Table - Declare bool -} - -// Binding describes the bhinding of a queue to a routing key on an exchange -type Binding struct { - RoutingKey string - BindingOptions -} - -// BindingOptions describes the options a binding can have -type BindingOptions struct { - NoWait bool - Args Table - Declare bool -} - -// WithConsumeOptionsQueueDurable ensures the queue is a durable queue -func WithConsumeOptionsQueueDurable(options *ConsumeOptions) { - options.QueueOptions.Durable = true -} - -// WithConsumeOptionsQueueAutoDelete ensures the queue is an auto-delete queue -func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions) { - options.QueueOptions.AutoDelete = true -} - -// WithConsumeOptionsQueueExclusive ensures the queue is an exclusive queue -func WithConsumeOptionsQueueExclusive(options *ConsumeOptions) { - options.QueueOptions.Exclusive = true -} - -// WithConsumeOptionsQueueNoWait ensures the queue is a no-wait queue -func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) { - options.QueueOptions.NoWait = true -} - -// WithConsumeOptionsQueuePassive ensures the queue is a passive queue -func WithConsumeOptionsQueuePassive(options *ConsumeOptions) { - options.QueueOptions.Passive = true -} - -// WithConsumeOptionsQueueNoDeclare will turn off the declaration of the queue's -// existance upon startup -func WithConsumeOptionsQueueNoDeclare(options *ConsumeOptions) { - options.QueueOptions.Declare = false -} - -// WithConsumeOptionsQueueArgs adds optional args to the queue -func WithConsumeOptionsQueueArgs(args Table) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - options.QueueOptions.Args = args - } -} - -// WithConsumeOptionsExchangeName sets the exchange name -func WithConsumeOptionsExchangeName(name string) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - options.ExchangeOptions.Name = name - } -} - -// WithConsumeOptionsExchangeKind ensures the queue is a durable queue -func WithConsumeOptionsExchangeKind(kind string) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - options.ExchangeOptions.Kind = kind - } -} - -// WithConsumeOptionsExchangeDurable ensures the exchange is a durable exchange -func WithConsumeOptionsExchangeDurable(options *ConsumeOptions) { - options.ExchangeOptions.Durable = true -} - -// WithConsumeOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange -func WithConsumeOptionsExchangeAutoDelete(options *ConsumeOptions) { - options.ExchangeOptions.AutoDelete = true -} - -// WithConsumeOptionsExchangeInternal ensures the exchange is an internal exchange -func WithConsumeOptionsExchangeInternal(options *ConsumeOptions) { - options.ExchangeOptions.Internal = true -} - -// WithConsumeOptionsExchangeNoWait ensures the exchange is a no-wait exchange -func WithConsumeOptionsExchangeNoWait(options *ConsumeOptions) { - options.ExchangeOptions.NoWait = true -} - -// WithConsumeOptionsExchangeNoDeclare stops this library from declaring the exchanges existance -func WithConsumeOptionsExchangeNoDeclare(options *ConsumeOptions) { - options.ExchangeOptions.Declare = false -} - -// WithConsumeOptionsExchangePassive ensures the exchange is a passive exchange -func WithConsumeOptionsExchangePassive(options *ConsumeOptions) { - options.ExchangeOptions.Passive = true -} - -// WithConsumeOptionsExchangeArgs adds optional args to the exchange -func WithConsumeOptionsExchangeArgs(args Table) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - options.ExchangeOptions.Args = args - } -} - -// WithConsumeOptionsDefaultBinding binds the queue to a routing key with the default binding options -func WithConsumeOptionsDefaultBinding(routingKey string) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - options.Bindings = append(options.Bindings, Binding{ - RoutingKey: routingKey, - BindingOptions: getDefaultBindingOptions(), - }) - } -} - -// WithConsumeOptionsBinding adds a new binding to the queue which allows you to set the binding options -// on a per-binding basis. Keep in mind that everything in the BindingOptions struct will default to -// the zero value. If you want to declare your bindings for example, be sure to set Declare=true -func WithConsumeOptionsBinding(binding Binding) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - options.Bindings = append(options.Bindings, binding) - } -} - -// WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that -// many goroutines will be spawned to run the provided handler on messages -func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - options.Concurrency = concurrency - } -} - -// WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer -// if unset a random name will be given -func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - options.RabbitConsumerOptions.Name = consumerName - } -} - -// WithConsumeOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer -// if unset the default will be used (false) -func WithConsumeOptionsConsumerAutoAck(autoAck bool) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - options.RabbitConsumerOptions.AutoAck = autoAck - } -} - -// WithConsumeOptionsConsumerExclusive sets the consumer to exclusive, which means -// the server will ensure that this is the sole consumer -// from this queue. When exclusive is false, the server will fairly distribute -// deliveries across multiple consumers. -func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) { - options.RabbitConsumerOptions.Exclusive = true -} - -// WithConsumeOptionsConsumerNoWait sets the consumer to nowait, which means -// it does not wait for the server to confirm the request and -// immediately begin deliveries. If it is not possible to consume, a channel -// exception will be raised and the channel will be closed. -func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions) { - options.RabbitConsumerOptions.NoWait = true -} diff --git a/consumer_options.go b/consumer_options.go new file mode 100644 index 0000000..71dabbb --- /dev/null +++ b/consumer_options.go @@ -0,0 +1,264 @@ +package rabbitmq + +import ( + amqp "github.com/rabbitmq/amqp091-go" + "github.com/wagslane/go-rabbitmq/internal/logger" +) + +// getDefaultConsumerOptions describes the options that will be used when a value isn't provided +func getDefaultConsumerOptions(queueName string) ConsumerOptions { + return ConsumerOptions{ + RabbitConsumerOptions: RabbitConsumerOptions{ + Name: "", + AutoAck: false, + Exclusive: false, + NoWait: false, + NoLocal: false, + Args: Table{}, + }, + QueueOptions: QueueOptions{ + Name: queueName, + Durable: false, + AutoDelete: false, + Exclusive: false, + NoWait: false, + Passive: false, + Args: Table{}, + Declare: true, + }, + ExchangeOptions: ExchangeOptions{ + Name: "", + Kind: amqp.ExchangeDirect, + Durable: false, + AutoDelete: false, + Internal: false, + NoWait: false, + Passive: false, + Args: Table{}, + Declare: false, + }, + Bindings: []Binding{}, + Concurrency: 1, + Logger: stdDebugLogger{}, + } +} + +func getDefaultBindingOptions() BindingOptions { + return BindingOptions{ + NoWait: false, + Args: Table{}, + Declare: true, + } +} + +// ConsumerOptions are used to describe how a new consumer will be created. +// If QueueOptions is not nil, the options will be used to declare a queue +// If ExchangeOptions is not nil, it will be used to declare an exchange +// If there are Bindings, the queue will be bound to them +type ConsumerOptions struct { + RabbitConsumerOptions RabbitConsumerOptions + QueueOptions QueueOptions + ExchangeOptions ExchangeOptions + Bindings []Binding + Concurrency int + Logger logger.Logger +} + +// RabbitConsumerOptions are used to configure the consumer +// on the rabbit server +type RabbitConsumerOptions struct { + Name string + AutoAck bool + Exclusive bool + NoWait bool + NoLocal bool + Args Table +} + +// QueueOptions are used to configure a queue. +// A passive queue is assumed by RabbitMQ to already exist, and attempting to connect +// to a non-existent queue will cause RabbitMQ to throw an exception. +type QueueOptions struct { + Name string + Durable bool + AutoDelete bool + Exclusive bool + NoWait bool + Passive bool // if false, a missing queue will be created on the server + Args Table + Declare bool +} + +// Binding describes the bhinding of a queue to a routing key on an exchange +type Binding struct { + RoutingKey string + BindingOptions +} + +// BindingOptions describes the options a binding can have +type BindingOptions struct { + NoWait bool + Args Table + Declare bool +} + +// WithConsumerOptionsQueueDurable ensures the queue is a durable queue +func WithConsumerOptionsQueueDurable(options *ConsumerOptions) { + options.QueueOptions.Durable = true +} + +// WithConsumerOptionsQueueAutoDelete ensures the queue is an auto-delete queue +func WithConsumerOptionsQueueAutoDelete(options *ConsumerOptions) { + options.QueueOptions.AutoDelete = true +} + +// WithConsumerOptionsQueueExclusive ensures the queue is an exclusive queue +func WithConsumerOptionsQueueExclusive(options *ConsumerOptions) { + options.QueueOptions.Exclusive = true +} + +// WithConsumerOptionsQueueNoWait ensures the queue is a no-wait queue +func WithConsumerOptionsQueueNoWait(options *ConsumerOptions) { + options.QueueOptions.NoWait = true +} + +// WithConsumerOptionsQueuePassive ensures the queue is a passive queue +func WithConsumerOptionsQueuePassive(options *ConsumerOptions) { + options.QueueOptions.Passive = true +} + +// WithConsumerOptionsQueueNoDeclare will turn off the declaration of the queue's +// existance upon startup +func WithConsumerOptionsQueueNoDeclare(options *ConsumerOptions) { + options.QueueOptions.Declare = false +} + +// WithConsumerOptionsQueueArgs adds optional args to the queue +func WithConsumerOptionsQueueArgs(args Table) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.QueueOptions.Args = args + } +} + +// WithConsumerOptionsExchangeName sets the exchange name +func WithConsumerOptionsExchangeName(name string) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.ExchangeOptions.Name = name + } +} + +// WithConsumerOptionsExchangeKind ensures the queue is a durable queue +func WithConsumerOptionsExchangeKind(kind string) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.ExchangeOptions.Kind = kind + } +} + +// WithConsumerOptionsExchangeDurable ensures the exchange is a durable exchange +func WithConsumerOptionsExchangeDurable(options *ConsumerOptions) { + options.ExchangeOptions.Durable = true +} + +// WithConsumerOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange +func WithConsumerOptionsExchangeAutoDelete(options *ConsumerOptions) { + options.ExchangeOptions.AutoDelete = true +} + +// WithConsumerOptionsExchangeInternal ensures the exchange is an internal exchange +func WithConsumerOptionsExchangeInternal(options *ConsumerOptions) { + options.ExchangeOptions.Internal = true +} + +// WithConsumerOptionsExchangeNoWait ensures the exchange is a no-wait exchange +func WithConsumerOptionsExchangeNoWait(options *ConsumerOptions) { + options.ExchangeOptions.NoWait = true +} + +// WithConsumerOptionsExchangeDeclare stops this library from declaring the exchanges existance +func WithConsumerOptionsExchangeDeclare(options *ConsumerOptions) { + options.ExchangeOptions.Declare = true +} + +// WithConsumerOptionsExchangePassive ensures the exchange is a passive exchange +func WithConsumerOptionsExchangePassive(options *ConsumerOptions) { + options.ExchangeOptions.Passive = true +} + +// WithConsumerOptionsExchangeArgs adds optional args to the exchange +func WithConsumerOptionsExchangeArgs(args Table) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.ExchangeOptions.Args = args + } +} + +// WithConsumerOptionsRoutingKey binds the queue to a routing key with the default binding options +func WithConsumerOptionsRoutingKey(routingKey string) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.Bindings = append(options.Bindings, Binding{ + RoutingKey: routingKey, + BindingOptions: getDefaultBindingOptions(), + }) + } +} + +// WithConsumerOptionsBinding adds a new binding to the queue which allows you to set the binding options +// on a per-binding basis. Keep in mind that everything in the BindingOptions struct will default to +// the zero value. If you want to declare your bindings for example, be sure to set Declare=true +func WithConsumerOptionsBinding(binding Binding) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.Bindings = append(options.Bindings, binding) + } +} + +// WithConsumerOptionsConcurrency returns a function that sets the concurrency, which means that +// many goroutines will be spawned to run the provided handler on messages +func WithConsumerOptionsConcurrency(concurrency int) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.Concurrency = concurrency + } +} + +// WithConsumerOptionsConsumerName returns a function that sets the name on the server of this consumer +// if unset a random name will be given +func WithConsumerOptionsConsumerName(consumerName string) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.RabbitConsumerOptions.Name = consumerName + } +} + +// WithConsumerOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer +// if unset the default will be used (false) +func WithConsumerOptionsConsumerAutoAck(autoAck bool) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.RabbitConsumerOptions.AutoAck = autoAck + } +} + +// WithConsumerOptionsConsumerExclusive sets the consumer to exclusive, which means +// the server will ensure that this is the sole consumer +// from this queue. When exclusive is false, the server will fairly distribute +// deliveries across multiple consumers. +func WithConsumerOptionsConsumerExclusive(options *ConsumerOptions) { + options.RabbitConsumerOptions.Exclusive = true +} + +// WithConsumerOptionsConsumerNoWait sets the consumer to nowait, which means +// it does not wait for the server to confirm the request and +// immediately begin deliveries. If it is not possible to consume, a channel +// exception will be raised and the channel will be closed. +func WithConsumerOptionsConsumerNoWait(options *ConsumerOptions) { + options.RabbitConsumerOptions.NoWait = true +} + +// WithConsumerOptionsLogging uses a default logger that writes to std out +func WithConsumerOptionsLogging(options *ConsumerOptions) { + options.Logger = &stdDebugLogger{} +} + +// WithConsumerOptionsLogger sets logging to a custom interface. +// Use WithConsumerOptionsLogging to just log to stdout. +func WithConsumerOptionsLogger(log logger.Logger) func(options *ConsumerOptions) { + return func(options *ConsumerOptions) { + options.Logger = log + } +} diff --git a/declare.go b/declare.go index 2eeda1a..9462e14 100644 --- a/declare.go +++ b/declare.go @@ -70,7 +70,7 @@ func declareExchange(connManager *connectionmanager.ConnectionManager, options E return nil } -func declareBindings(connManager *connectionmanager.ConnectionManager, options ConsumeOptions) error { +func declareBindings(connManager *connectionmanager.ConnectionManager, options ConsumerOptions) error { for _, binding := range options.Bindings { if !binding.Declare { continue diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 8705cab..7c68733 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -10,8 +10,6 @@ import ( rabbitmq "github.com/wagslane/go-rabbitmq" ) -var consumerName = "example" - func main() { conn, err := rabbitmq.NewConn( "amqp://guest:guest@localhost", @@ -20,56 +18,24 @@ func main() { if err != nil { log.Fatal(err) } + defer conn.Close() consumer, err := rabbitmq.NewConsumer( conn, - rabbitmq.WithConsumerOptionsLogging, - ) - if err != nil { - log.Fatal(err) - } - defer consumer.Close() - - err = consumer.StartConsuming( func(d rabbitmq.Delivery) rabbitmq.Action { log.Printf("consumed: %v", string(d.Body)) // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue return rabbitmq.Ack }, "my_queue", - rabbitmq.WithConsumeOptionsConcurrency(2), - rabbitmq.WithConsumeOptionsConsumerName(consumerName), - rabbitmq.WithConsumeOptionsDefaultBinding("my_routing_key"), - rabbitmq.WithConsumeOptionsExchangeName("events"), - ) - if err != nil { - log.Fatal(err) - } - - consumer2, err := rabbitmq.NewConsumer( - conn, - rabbitmq.WithConsumerOptionsLogging, - ) - if err != nil { - log.Fatal(err) - } - defer consumer2.Close() - - err = consumer2.StartConsuming( - func(d rabbitmq.Delivery) rabbitmq.Action { - log.Printf("consumed 2: %v", string(d.Body)) - // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue - return rabbitmq.Ack - }, - "my_queue_2", - rabbitmq.WithConsumeOptionsConcurrency(2), - rabbitmq.WithConsumeOptionsConsumerName("consumer3"), - rabbitmq.WithConsumeOptionsDefaultBinding("my_routing_key"), - rabbitmq.WithConsumeOptionsExchangeName("events"), + rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"), + rabbitmq.WithConsumerOptionsExchangeName("events"), + rabbitmq.WithConsumerOptionsExchangeDeclare, ) if err != nil { log.Fatal(err) } + defer consumer.Close() // block main thread - wait for shutdown signal sigs := make(chan os.Signal, 1) diff --git a/examples/logger/main.go b/examples/logger/main.go index 52686a0..bb2a16f 100644 --- a/examples/logger/main.go +++ b/examples/logger/main.go @@ -39,6 +39,11 @@ func main() { if err != nil { log.Fatal(err) } + defer conn.Close() + + conn.NotifyReturn(func(r rabbitmq.Return) { + log.Printf("message returned from server: %s", string(r.Body)) + }) publisher, err := rabbitmq.NewPublisher( conn, @@ -58,11 +63,4 @@ func main() { if err != nil { log.Fatal(err) } - - returns := conn.NotifyReturn() - go func() { - for r := range returns { - log.Printf("message returned from server: %s", string(r.Body)) - } - }() } diff --git a/examples/multiconsumer/.gitignore b/examples/multiconsumer/.gitignore new file mode 100644 index 0000000..8d948f8 --- /dev/null +++ b/examples/multiconsumer/.gitignore @@ -0,0 +1 @@ +multiconsumer diff --git a/examples/multiconsumer/main.go b/examples/multiconsumer/main.go new file mode 100644 index 0000000..571af8b --- /dev/null +++ b/examples/multiconsumer/main.go @@ -0,0 +1,76 @@ +package main + +import ( + "fmt" + "log" + "os" + "os/signal" + "syscall" + + rabbitmq "github.com/wagslane/go-rabbitmq" +) + +func main() { + conn, err := rabbitmq.NewConn( + "amqp://guest:guest@localhost", + rabbitmq.WithConnectionOptionsLogging, + ) + if err != nil { + log.Fatal(err) + } + defer conn.Close() + + consumer, err := rabbitmq.NewConsumer( + conn, + func(d rabbitmq.Delivery) rabbitmq.Action { + log.Printf("consumed: %v", string(d.Body)) + // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue + return rabbitmq.Ack + }, + "my_queue", + rabbitmq.WithConsumerOptionsConcurrency(2), + rabbitmq.WithConsumerOptionsConsumerName("consumer_1"), + rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"), + rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key_2"), + rabbitmq.WithConsumerOptionsExchangeName("events"), + ) + if err != nil { + log.Fatal(err) + } + defer consumer.Close() + + consumer2, err := rabbitmq.NewConsumer( + conn, + func(d rabbitmq.Delivery) rabbitmq.Action { + log.Printf("consumed 2: %v", string(d.Body)) + // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue + return rabbitmq.Ack + }, + "my_queue", + rabbitmq.WithConsumerOptionsConcurrency(2), + rabbitmq.WithConsumerOptionsConsumerName("consumer_2"), + rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"), + rabbitmq.WithConsumerOptionsExchangeName("events"), + ) + if err != nil { + log.Fatal(err) + } + defer consumer2.Close() + + // block main thread - wait for shutdown signal + sigs := make(chan os.Signal, 1) + done := make(chan bool, 1) + + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + + go func() { + sig := <-sigs + fmt.Println() + fmt.Println(sig) + done <- true + }() + + fmt.Println("awaiting signal") + <-done + fmt.Println("stopping consumer") +} diff --git a/examples/multipublisher/.gitignore b/examples/multipublisher/.gitignore new file mode 100644 index 0000000..c4936ba --- /dev/null +++ b/examples/multipublisher/.gitignore @@ -0,0 +1 @@ +multipublisher diff --git a/examples/multipublisher/main.go b/examples/multipublisher/main.go new file mode 100644 index 0000000..4a9b5fb --- /dev/null +++ b/examples/multipublisher/main.go @@ -0,0 +1,100 @@ +package main + +import ( + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" + + rabbitmq "github.com/wagslane/go-rabbitmq" +) + +func main() { + conn, err := rabbitmq.NewConn( + "amqp://guest:guest@localhost", + rabbitmq.WithConnectionOptionsLogging, + ) + if err != nil { + log.Fatal(err) + } + defer conn.Close() + + conn.NotifyReturn(func(r rabbitmq.Return) { + log.Printf("message returned from server: %s", string(r.Body)) + }) + + conn.NotifyPublish(func(c rabbitmq.Confirmation) { + log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack) + }) + + publisher, err := rabbitmq.NewPublisher( + conn, + rabbitmq.WithPublisherOptionsLogging, + rabbitmq.WithPublisherOptionsExchangeName("events"), + rabbitmq.WithPublisherOptionsExchangeDeclare, + ) + if err != nil { + log.Fatal(err) + } + defer publisher.Close() + + publisher2, err := rabbitmq.NewPublisher( + conn, + rabbitmq.WithPublisherOptionsLogging, + rabbitmq.WithPublisherOptionsExchangeName("events"), + rabbitmq.WithPublisherOptionsExchangeDeclare, + ) + if err != nil { + log.Fatal(err) + } + defer publisher2.Close() + + // block main thread - wait for shutdown signal + sigs := make(chan os.Signal, 1) + done := make(chan bool, 1) + + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + + go func() { + sig := <-sigs + fmt.Println() + fmt.Println(sig) + done <- true + }() + + fmt.Println("awaiting signal") + + ticker := time.NewTicker(time.Second) + for { + select { + case <-ticker.C: + err = publisher.Publish( + []byte("hello, world"), + []string{"my_routing_key"}, + rabbitmq.WithPublishOptionsContentType("application/json"), + rabbitmq.WithPublishOptionsMandatory, + rabbitmq.WithPublishOptionsPersistentDelivery, + rabbitmq.WithPublishOptionsExchange("events"), + ) + if err != nil { + log.Println(err) + } + err = publisher2.Publish( + []byte("hello, world 2"), + []string{"my_routing_key_2"}, + rabbitmq.WithPublishOptionsContentType("application/json"), + rabbitmq.WithPublishOptionsMandatory, + rabbitmq.WithPublishOptionsPersistentDelivery, + rabbitmq.WithPublishOptionsExchange("events"), + ) + if err != nil { + log.Println(err) + } + case <-done: + fmt.Println("stopping publisher") + return + } + } +} diff --git a/examples/publisher/main.go b/examples/publisher/main.go index 98baebd..e0c9d40 100644 --- a/examples/publisher/main.go +++ b/examples/publisher/main.go @@ -19,38 +19,27 @@ func main() { if err != nil { log.Fatal(err) } + defer conn.Close() + + conn.NotifyReturn(func(r rabbitmq.Return) { + log.Printf("message returned from server: %s", string(r.Body)) + }) + + conn.NotifyPublish(func(c rabbitmq.Confirmation) { + log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack) + }) + publisher, err := rabbitmq.NewPublisher( conn, rabbitmq.WithPublisherOptionsLogging, + rabbitmq.WithPublisherOptionsExchangeName("events"), + rabbitmq.WithPublisherOptionsExchangeDeclare, ) if err != nil { log.Fatal(err) } defer publisher.Close() - publisher2, err := rabbitmq.NewPublisher( - conn, - rabbitmq.WithPublisherOptionsLogging, - ) - if err != nil { - log.Fatal(err) - } - defer publisher2.Close() - - returns := conn.NotifyReturn() - go func() { - for r := range returns { - log.Printf("message returned from server: %s", string(r.Body)) - } - }() - - confirmations := conn.NotifyPublish() - go func() { - for c := range confirmations { - log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack) - } - }() - // block main thread - wait for shutdown signal sigs := make(chan os.Signal, 1) done := make(chan bool, 1) @@ -81,17 +70,6 @@ func main() { if err != nil { log.Println(err) } - err = publisher2.Publish( - []byte("hello, world 2"), - []string{"my_routing_key"}, - rabbitmq.WithPublishOptionsContentType("application/json"), - rabbitmq.WithPublishOptionsMandatory, - rabbitmq.WithPublishOptionsPersistentDelivery, - rabbitmq.WithPublishOptionsExchange("events"), - ) - if err != nil { - log.Println(err) - } case <-done: fmt.Println("stopping publisher") return diff --git a/internal/connectionmanager/connection_manager.go b/internal/connectionmanager/connection_manager.go index 93e3f07..4489557 100644 --- a/internal/connectionmanager/connection_manager.go +++ b/internal/connectionmanager/connection_manager.go @@ -135,8 +135,8 @@ func (connManager *ConnectionManager) reconnect() error { return nil } -// close safely closes the current channel and connection -func (connManager *ConnectionManager) close() error { +// Close safely closes the current channel and connection +func (connManager *ConnectionManager) Close() error { connManager.logger.Infof("closing connection manager...") connManager.channelMux.Lock() defer connManager.channelMux.Unlock() diff --git a/publish.go b/publish.go index 680e403..b52b7f9 100644 --- a/publish.go +++ b/publish.go @@ -53,35 +53,14 @@ type Publisher struct { options PublisherOptions } -// PublisherOptions are used to describe a publisher's configuration. -// Logger is a custom logging interface. -type PublisherOptions struct { - Logger Logger -} - -// WithPublisherOptionsLogging sets logging to true on the consumer options -// and sets the -func WithPublisherOptionsLogging(options *PublisherOptions) { - options.Logger = &stdDebugLogger{} -} - -// WithPublisherOptionsLogger sets logging to a custom interface. -// Use WithPublisherOptionsLogging to just log to stdout. -func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) { - return func(options *PublisherOptions) { - options.Logger = log - } -} - // NewPublisher returns a new publisher with an open channel to the cluster. // If you plan to enforce mandatory or immediate publishing, those failures will be reported // on the channel of Returns that you should setup a listener on. // Flow controls are automatically handled as they are sent from the server, and publishing // will fail with an error when the server is requesting a slowdown func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publisher, error) { - options := &PublisherOptions{ - Logger: &stdDebugLogger{}, - } + defaultOptions := getDefaultPublisherOptions() + options := &defaultOptions for _, optionFunc := range optionFuncs { optionFunc(options) } @@ -101,19 +80,34 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe options: *options, } - go publisher.startNotifyFlowHandler() - go publisher.startNotifyBlockedHandler() + err := publisher.startup() + if err != nil { + return nil, err + } go publisher.handleRestarts() return publisher, nil } +func (publisher *Publisher) startup() error { + err := declareExchange(publisher.connManager, publisher.options.ExchangeOptions) + if err != nil { + return fmt.Errorf("declare exchange failed: %w", err) + } + go publisher.startNotifyFlowHandler() + go publisher.startNotifyBlockedHandler() + return nil +} + func (publisher *Publisher) handleRestarts() { for err := range publisher.reconnectErrCh { publisher.options.Logger.Infof("successful publisher recovery from: %v", err) - go publisher.startNotifyFlowHandler() - go publisher.startNotifyBlockedHandler() + err := publisher.startup() + if err != nil { + publisher.options.Logger.Infof("failed to startup publisher: %v", err) + continue + } } } @@ -177,7 +171,10 @@ func (publisher *Publisher) Publish( // Close closes the publisher and releases resources // The publisher should be discarded as it's not safe for re-use +// Only call Close() once func (publisher *Publisher) Close() { publisher.options.Logger.Infof("closing publisher...") - publisher.closeConnectionToManagerCh <- struct{}{} + go func() { + publisher.closeConnectionToManagerCh <- struct{}{} + }() } diff --git a/publisher_options.go b/publisher_options.go new file mode 100644 index 0000000..0e7a946 --- /dev/null +++ b/publisher_options.go @@ -0,0 +1,93 @@ +package rabbitmq + +import amqp "github.com/rabbitmq/amqp091-go" + +// PublisherOptions are used to describe a publisher's configuration. +// Logger is a custom logging interface. +type PublisherOptions struct { + ExchangeOptions ExchangeOptions + Logger Logger +} + +// getDefaultPublisherOptions describes the options that will be used when a value isn't provided +func getDefaultPublisherOptions() PublisherOptions { + return PublisherOptions{ + ExchangeOptions: ExchangeOptions{ + Name: "", + Kind: amqp.ExchangeDirect, + Durable: false, + AutoDelete: false, + Internal: false, + NoWait: false, + Passive: false, + Args: Table{}, + Declare: false, + }, + Logger: stdDebugLogger{}, + } +} + +// WithPublisherOptionsLogging sets logging to true on the publisher options +// and sets the +func WithPublisherOptionsLogging(options *PublisherOptions) { + options.Logger = &stdDebugLogger{} +} + +// WithPublisherOptionsLogger sets logging to a custom interface. +// Use WithPublisherOptionsLogging to just log to stdout. +func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) { + return func(options *PublisherOptions) { + options.Logger = log + } +} + +// WithPublisherOptionsExchangeName sets the exchange name +func WithPublisherOptionsExchangeName(name string) func(*PublisherOptions) { + return func(options *PublisherOptions) { + options.ExchangeOptions.Name = name + } +} + +// WithPublisherOptionsExchangeKind ensures the queue is a durable queue +func WithPublisherOptionsExchangeKind(kind string) func(*PublisherOptions) { + return func(options *PublisherOptions) { + options.ExchangeOptions.Kind = kind + } +} + +// WithPublisherOptionsExchangeDurable ensures the exchange is a durable exchange +func WithPublisherOptionsExchangeDurable(options *PublisherOptions) { + options.ExchangeOptions.Durable = true +} + +// WithPublisherOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange +func WithPublisherOptionsExchangeAutoDelete(options *PublisherOptions) { + options.ExchangeOptions.AutoDelete = true +} + +// WithPublisherOptionsExchangeInternal ensures the exchange is an internal exchange +func WithPublisherOptionsExchangeInternal(options *PublisherOptions) { + options.ExchangeOptions.Internal = true +} + +// WithPublisherOptionsExchangeNoWait ensures the exchange is a no-wait exchange +func WithPublisherOptionsExchangeNoWait(options *PublisherOptions) { + options.ExchangeOptions.NoWait = true +} + +// WithPublisherOptionsExchangeDeclare stops this library from declaring the exchanges existance +func WithPublisherOptionsExchangeDeclare(options *PublisherOptions) { + options.ExchangeOptions.Declare = true +} + +// WithPublisherOptionsExchangePassive ensures the exchange is a passive exchange +func WithPublisherOptionsExchangePassive(options *PublisherOptions) { + options.ExchangeOptions.Passive = true +} + +// WithPublisherOptionsExchangeArgs adds optional args to the exchange +func WithPublisherOptionsExchangeArgs(args Table) func(*PublisherOptions) { + return func(options *PublisherOptions) { + options.ExchangeOptions.Args = args + } +}