Browse Source

finish options updates for publisher

pull/95/head
wagslane 3 years ago
parent
commit
eea274da92
16 changed files with 685 additions and 443 deletions
  1. +2
    -2
      README.md
  2. +59
    -35
      connection.go
  3. +39
    -47
      consume.go
  4. +0
    -248
      consume_options.go
  5. +264
    -0
      consumer_options.go
  6. +1
    -1
      declare.go
  7. +5
    -39
      examples/consumer/main.go
  8. +5
    -7
      examples/logger/main.go
  9. +1
    -0
      examples/multiconsumer/.gitignore
  10. +76
    -0
      examples/multiconsumer/main.go
  11. +1
    -0
      examples/multipublisher/.gitignore
  12. +100
    -0
      examples/multipublisher/main.go
  13. +12
    -34
      examples/publisher/main.go
  14. +2
    -2
      internal/connectionmanager/connection_manager.go
  15. +25
    -28
      publish.go
  16. +93
    -0
      publisher_options.go

+ 2
- 2
README.md View File

@ -50,9 +50,9 @@ err = consumer.StartConsuming(
}, },
"my_queue", "my_queue",
// spawns 10 goroutines to handle incoming messages // spawns 10 goroutines to handle incoming messages
rabbitmq.WithConsumeOptionsConcurrency(10),
rabbitmq.WithConsumerOptionsConcurrency(10),
// assigns a name to this consumer on the cluster // assigns a name to this consumer on the cluster
rabbitmq.WithConsumeOptionsConsumerName(consumerName),
rabbitmq.WithConsumerOptionsConsumerName(consumerName),
rabbitmq.WithConsumeDeclareOptions( rabbitmq.WithConsumeDeclareOptions(
// creates a durable queue named "my_queue" // creates a durable queue named "my_queue"
// if it doesn't exist yet // if it doesn't exist yet


+ 59
- 35
connection.go View File

@ -1,6 +1,8 @@
package rabbitmq package rabbitmq
import ( import (
"sync"
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/connectionmanager" "github.com/wagslane/go-rabbitmq/internal/connectionmanager"
) )
@ -11,9 +13,12 @@ type Conn struct {
connectionManager *connectionmanager.ConnectionManager connectionManager *connectionmanager.ConnectionManager
reconnectErrCh <-chan error reconnectErrCh <-chan error
closeConnectionToManagerCh chan<- struct{} closeConnectionToManagerCh chan<- struct{}
notifyReturnChan chan Return
notifyPublishChan chan Confirmation
options ConnectionOptions
handlerMux *sync.Mutex
notifyReturnHandler func(r Return)
notifyPublishHandler func(p Confirmation)
options ConnectionOptions
} }
// Config wraps amqp.Config // Config wraps amqp.Config
@ -49,8 +54,9 @@ func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error)
connectionManager: manager, connectionManager: manager,
reconnectErrCh: reconnectErrCh, reconnectErrCh: reconnectErrCh,
closeConnectionToManagerCh: closeCh, closeConnectionToManagerCh: closeCh,
notifyReturnChan: nil,
notifyPublishChan: nil,
handlerMux: &sync.Mutex{},
notifyReturnHandler: nil,
notifyPublishHandler: nil,
options: *options, options: *options,
} }
@ -61,50 +67,68 @@ func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error)
func (conn *Conn) handleRestarts() { func (conn *Conn) handleRestarts() {
for err := range conn.reconnectErrCh { for err := range conn.reconnectErrCh {
conn.options.Logger.Infof("successful connection recovery from: %v", err) conn.options.Logger.Infof("successful connection recovery from: %v", err)
go conn.startNotifyReturnHandler()
go conn.startNotifyPublishHandler()
go conn.startReturnHandler()
go conn.startPublishHandler()
} }
} }
func (conn *Conn) startNotifyReturnHandler() {
if conn.notifyReturnChan == nil {
// NotifyReturn registers a listener for basic.return methods.
// These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags.
// These notifications are shared across an entire connection, so if you're creating multiple
// publishers on the same connection keep that in mind
func (conn *Conn) NotifyReturn(handler func(r Return)) {
conn.handlerMux.Lock()
conn.notifyReturnHandler = handler
conn.handlerMux.Unlock()
go conn.startReturnHandler()
}
// NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option
// These notifications are shared across an entire connection, so if you're creating multiple
// publishers on the same connection keep that in mind
func (conn *Conn) NotifyPublish(handler func(p Confirmation)) {
conn.handlerMux.Lock()
conn.notifyPublishHandler = handler
conn.handlerMux.Unlock()
go conn.startPublishHandler()
}
func (conn *Conn) startReturnHandler() {
conn.handlerMux.Lock()
if conn.notifyReturnHandler == nil {
return return
} }
returnAMQPCh := conn.connectionManager.NotifyReturnSafe(make(chan amqp.Return, 1))
for ret := range returnAMQPCh {
conn.notifyReturnChan <- Return{ret}
conn.handlerMux.Unlock()
returns := conn.connectionManager.NotifyReturnSafe(make(chan amqp.Return, 1))
for ret := range returns {
go conn.notifyReturnHandler(Return{ret})
} }
} }
func (conn *Conn) startNotifyPublishHandler() {
if conn.notifyPublishChan == nil {
func (conn *Conn) startPublishHandler() {
conn.handlerMux.Lock()
if conn.notifyPublishHandler == nil {
return return
} }
conn.handlerMux.Unlock()
conn.connectionManager.ConfirmSafe(false) conn.connectionManager.ConfirmSafe(false)
publishAMQPCh := conn.connectionManager.NotifyPublishSafe(make(chan amqp.Confirmation, 1))
for conf := range publishAMQPCh {
conn.notifyPublishChan <- Confirmation{
confirmationCh := conn.connectionManager.NotifyPublishSafe(make(chan amqp.Confirmation, 1))
for conf := range confirmationCh {
go conn.notifyPublishHandler(Confirmation{
Confirmation: conf, Confirmation: conf,
ReconnectionCount: int(conn.connectionManager.GetReconnectionCount()), ReconnectionCount: int(conn.connectionManager.GetReconnectionCount()),
}
})
} }
} }
// NotifyReturn registers a listener for basic.return methods.
// These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags.
// These notifications are shared across an entire connection, so if you're creating multiple
// publishers on the same connection keep that in mind
func (conn *Conn) NotifyReturn() <-chan Return {
conn.notifyReturnChan = make(chan Return)
go conn.startNotifyReturnHandler()
return conn.notifyReturnChan
}
// NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option
// These notifications are shared across an entire connection, so if you're creating multiple
// publishers on the same connection keep that in mind
func (conn *Conn) NotifyPublish() <-chan Confirmation {
conn.notifyPublishChan = make(chan Confirmation)
go conn.startNotifyPublishHandler()
return conn.notifyPublishChan
// Close closes the connection, it's not safe for re-use.
// You should also close any consumers and publishers before
// closing the connection
func (conn *Conn) Close() error {
conn.closeConnectionToManagerCh <- struct{}{}
return conn.connectionManager.Close()
} }

+ 39
- 47
consume.go View File

@ -3,10 +3,10 @@ package rabbitmq
import ( import (
"errors" "errors"
"fmt" "fmt"
"sync"
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/connectionmanager" "github.com/wagslane/go-rabbitmq/internal/connectionmanager"
"github.com/wagslane/go-rabbitmq/internal/logger"
) )
// Action is an action that occurs after processed this delivery // Action is an action that occurs after processed this delivery
@ -30,12 +30,9 @@ type Consumer struct {
reconnectErrCh <-chan error reconnectErrCh <-chan error
closeConnectionToManagerCh chan<- struct{} closeConnectionToManagerCh chan<- struct{}
options ConsumerOptions options ConsumerOptions
}
// ConsumerOptions are used to describe a consumer's configuration.
// Logger specifies a custom Logger interface implementation.
type ConsumerOptions struct {
Logger logger.Logger
isClosedMux *sync.RWMutex
isClosed bool
} }
// Delivery captures the fields for a previously delivered message resident in // Delivery captures the fields for a previously delivered message resident in
@ -46,10 +43,16 @@ type Delivery struct {
} }
// NewConsumer returns a new Consumer connected to the given rabbitmq server // NewConsumer returns a new Consumer connected to the given rabbitmq server
func NewConsumer(conn *Conn, optionFuncs ...func(*ConsumerOptions)) (*Consumer, error) {
options := &ConsumerOptions{
Logger: &stdDebugLogger{},
}
// it also starts consuming on the given connection with automatic reconnection handling
// Do do reuse the returned consumer for anything other than to close it
func NewConsumer(
conn *Conn,
handler Handler,
queue string,
optionFuncs ...func(*ConsumerOptions),
) (*Consumer, error) {
defaultOptions := getDefaultConsumerOptions(queue)
options := &defaultOptions
for _, optionFunc := range optionFuncs { for _, optionFunc := range optionFuncs {
optionFunc(options) optionFunc(options)
} }
@ -64,37 +67,8 @@ func NewConsumer(conn *Conn, optionFuncs ...func(*ConsumerOptions)) (*Consumer,
reconnectErrCh: reconnectErrCh, reconnectErrCh: reconnectErrCh,
closeConnectionToManagerCh: closeCh, closeConnectionToManagerCh: closeCh,
options: *options, options: *options,
}
return consumer, nil
}
// WithConsumerOptionsLogging uses a default logger that writes to std out
func WithConsumerOptionsLogging(options *ConsumerOptions) {
options.Logger = &stdDebugLogger{}
}
// WithConsumerOptionsLogger sets logging to a custom interface.
// Use WithConsumerOptionsLogging to just log to stdout.
func WithConsumerOptionsLogger(log logger.Logger) func(options *ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Logger = log
}
}
// StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency".
// Each goroutine spawns a handler that consumes off of the given queue which binds to the routing key(s).
// The provided handler is called once for each message. If the provided queue doesn't exist, it
// will be created on the cluster
func (consumer *Consumer) StartConsuming(
handler Handler,
queue string,
optionFuncs ...func(*ConsumeOptions),
) error {
defaultOptions := getDefaultConsumeOptions(queue)
options := &defaultOptions
for _, optionFunc := range optionFuncs {
optionFunc(options)
isClosedMux: &sync.RWMutex{},
isClosed: false,
} }
err := consumer.startGoroutines( err := consumer.startGoroutines(
@ -102,7 +76,7 @@ func (consumer *Consumer) StartConsuming(
*options, *options,
) )
if err != nil { if err != nil {
return err
return nil, err
} }
go func() { go func() {
@ -117,15 +91,22 @@ func (consumer *Consumer) StartConsuming(
} }
} }
}() }()
return nil
return consumer, nil
} }
// Close cleans up resources and closes the consumer. // Close cleans up resources and closes the consumer.
// It does not close the connection manager, just the subscription // It does not close the connection manager, just the subscription
// to the connection manager
// to the connection manager and the consuming goroutines.
// Only call once.
func (consumer *Consumer) Close() { func (consumer *Consumer) Close() {
consumer.isClosedMux.Lock()
defer consumer.isClosedMux.Unlock()
consumer.isClosed = true
consumer.options.Logger.Infof("closing consumer...") consumer.options.Logger.Infof("closing consumer...")
consumer.closeConnectionToManagerCh <- struct{}{}
go func() {
consumer.closeConnectionToManagerCh <- struct{}{}
}()
} }
// startGoroutines declares the queue if it doesn't exist, // startGoroutines declares the queue if it doesn't exist,
@ -133,7 +114,7 @@ func (consumer *Consumer) Close() {
// that will consume from the queue // that will consume from the queue
func (consumer *Consumer) startGoroutines( func (consumer *Consumer) startGoroutines(
handler Handler, handler Handler,
options ConsumeOptions,
options ConsumerOptions,
) error { ) error {
err := declareExchange(consumer.connManager, options.ExchangeOptions) err := declareExchange(consumer.connManager, options.ExchangeOptions)
@ -169,12 +150,23 @@ func (consumer *Consumer) startGoroutines(
return nil return nil
} }
func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOptions ConsumeOptions, handler Handler) {
func (consumer *Consumer) getIsClosed() bool {
consumer.isClosedMux.RLock()
defer consumer.isClosedMux.RUnlock()
return consumer.isClosed
}
func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOptions ConsumerOptions, handler Handler) {
for msg := range msgs { for msg := range msgs {
if consumer.getIsClosed() {
break
}
if consumeOptions.RabbitConsumerOptions.AutoAck { if consumeOptions.RabbitConsumerOptions.AutoAck {
handler(Delivery{msg}) handler(Delivery{msg})
continue continue
} }
switch handler(Delivery{msg}) { switch handler(Delivery{msg}) {
case Ack: case Ack:
err := msg.Ack(false) err := msg.Ack(false)


+ 0
- 248
consume_options.go View File

@ -1,248 +0,0 @@
package rabbitmq
import (
amqp "github.com/rabbitmq/amqp091-go"
)
// getDefaultConsumeOptions describes the options that will be used when a value isn't provided
func getDefaultConsumeOptions(queueName string) ConsumeOptions {
return ConsumeOptions{
RabbitConsumerOptions: RabbitConsumerOptions{
Name: "",
AutoAck: false,
Exclusive: false,
NoWait: false,
NoLocal: false,
Args: Table{},
},
QueueOptions: QueueOptions{
Name: queueName,
Durable: false,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Passive: false,
Args: Table{},
Declare: true,
},
ExchangeOptions: ExchangeOptions{
Name: "",
Kind: amqp.ExchangeDirect,
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
Passive: false,
Args: Table{},
Declare: true,
},
Bindings: []Binding{},
Concurrency: 1,
}
}
func getDefaultBindingOptions() BindingOptions {
return BindingOptions{
NoWait: false,
Args: Table{},
Declare: true,
}
}
// ConsumeOptions are used to describe how a new consumer will be created.
// If QueueOptions is not nil, the options will be used to declare a queue
// If ExchangeOptions is not nil, it will be used to declare an exchange
// If there are Bindings, the queue will be bound to them
type ConsumeOptions struct {
RabbitConsumerOptions RabbitConsumerOptions
QueueOptions QueueOptions
ExchangeOptions ExchangeOptions
Bindings []Binding
Concurrency int
}
// RabbitConsumerOptions are used to configure the consumer
// on the rabbit server
type RabbitConsumerOptions struct {
Name string
AutoAck bool
Exclusive bool
NoWait bool
NoLocal bool
Args Table
}
// QueueOptions are used to configure a queue.
// A passive queue is assumed by RabbitMQ to already exist, and attempting to connect
// to a non-existent queue will cause RabbitMQ to throw an exception.
type QueueOptions struct {
Name string
Durable bool
AutoDelete bool
Exclusive bool
NoWait bool
Passive bool // if false, a missing queue will be created on the server
Args Table
Declare bool
}
// Binding describes the bhinding of a queue to a routing key on an exchange
type Binding struct {
RoutingKey string
BindingOptions
}
// BindingOptions describes the options a binding can have
type BindingOptions struct {
NoWait bool
Args Table
Declare bool
}
// WithConsumeOptionsQueueDurable ensures the queue is a durable queue
func WithConsumeOptionsQueueDurable(options *ConsumeOptions) {
options.QueueOptions.Durable = true
}
// WithConsumeOptionsQueueAutoDelete ensures the queue is an auto-delete queue
func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions) {
options.QueueOptions.AutoDelete = true
}
// WithConsumeOptionsQueueExclusive ensures the queue is an exclusive queue
func WithConsumeOptionsQueueExclusive(options *ConsumeOptions) {
options.QueueOptions.Exclusive = true
}
// WithConsumeOptionsQueueNoWait ensures the queue is a no-wait queue
func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) {
options.QueueOptions.NoWait = true
}
// WithConsumeOptionsQueuePassive ensures the queue is a passive queue
func WithConsumeOptionsQueuePassive(options *ConsumeOptions) {
options.QueueOptions.Passive = true
}
// WithConsumeOptionsQueueNoDeclare will turn off the declaration of the queue's
// existance upon startup
func WithConsumeOptionsQueueNoDeclare(options *ConsumeOptions) {
options.QueueOptions.Declare = false
}
// WithConsumeOptionsQueueArgs adds optional args to the queue
func WithConsumeOptionsQueueArgs(args Table) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.QueueOptions.Args = args
}
}
// WithConsumeOptionsExchangeName sets the exchange name
func WithConsumeOptionsExchangeName(name string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.ExchangeOptions.Name = name
}
}
// WithConsumeOptionsExchangeKind ensures the queue is a durable queue
func WithConsumeOptionsExchangeKind(kind string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.ExchangeOptions.Kind = kind
}
}
// WithConsumeOptionsExchangeDurable ensures the exchange is a durable exchange
func WithConsumeOptionsExchangeDurable(options *ConsumeOptions) {
options.ExchangeOptions.Durable = true
}
// WithConsumeOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange
func WithConsumeOptionsExchangeAutoDelete(options *ConsumeOptions) {
options.ExchangeOptions.AutoDelete = true
}
// WithConsumeOptionsExchangeInternal ensures the exchange is an internal exchange
func WithConsumeOptionsExchangeInternal(options *ConsumeOptions) {
options.ExchangeOptions.Internal = true
}
// WithConsumeOptionsExchangeNoWait ensures the exchange is a no-wait exchange
func WithConsumeOptionsExchangeNoWait(options *ConsumeOptions) {
options.ExchangeOptions.NoWait = true
}
// WithConsumeOptionsExchangeNoDeclare stops this library from declaring the exchanges existance
func WithConsumeOptionsExchangeNoDeclare(options *ConsumeOptions) {
options.ExchangeOptions.Declare = false
}
// WithConsumeOptionsExchangePassive ensures the exchange is a passive exchange
func WithConsumeOptionsExchangePassive(options *ConsumeOptions) {
options.ExchangeOptions.Passive = true
}
// WithConsumeOptionsExchangeArgs adds optional args to the exchange
func WithConsumeOptionsExchangeArgs(args Table) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.ExchangeOptions.Args = args
}
}
// WithConsumeOptionsDefaultBinding binds the queue to a routing key with the default binding options
func WithConsumeOptionsDefaultBinding(routingKey string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.Bindings = append(options.Bindings, Binding{
RoutingKey: routingKey,
BindingOptions: getDefaultBindingOptions(),
})
}
}
// WithConsumeOptionsBinding adds a new binding to the queue which allows you to set the binding options
// on a per-binding basis. Keep in mind that everything in the BindingOptions struct will default to
// the zero value. If you want to declare your bindings for example, be sure to set Declare=true
func WithConsumeOptionsBinding(binding Binding) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.Bindings = append(options.Bindings, binding)
}
}
// WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that
// many goroutines will be spawned to run the provided handler on messages
func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.Concurrency = concurrency
}
}
// WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer
// if unset a random name will be given
func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.RabbitConsumerOptions.Name = consumerName
}
}
// WithConsumeOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer
// if unset the default will be used (false)
func WithConsumeOptionsConsumerAutoAck(autoAck bool) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.RabbitConsumerOptions.AutoAck = autoAck
}
}
// WithConsumeOptionsConsumerExclusive sets the consumer to exclusive, which means
// the server will ensure that this is the sole consumer
// from this queue. When exclusive is false, the server will fairly distribute
// deliveries across multiple consumers.
func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) {
options.RabbitConsumerOptions.Exclusive = true
}
// WithConsumeOptionsConsumerNoWait sets the consumer to nowait, which means
// it does not wait for the server to confirm the request and
// immediately begin deliveries. If it is not possible to consume, a channel
// exception will be raised and the channel will be closed.
func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions) {
options.RabbitConsumerOptions.NoWait = true
}

+ 264
- 0
consumer_options.go View File

@ -0,0 +1,264 @@
package rabbitmq
import (
amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/logger"
)
// getDefaultConsumerOptions describes the options that will be used when a value isn't provided
func getDefaultConsumerOptions(queueName string) ConsumerOptions {
return ConsumerOptions{
RabbitConsumerOptions: RabbitConsumerOptions{
Name: "",
AutoAck: false,
Exclusive: false,
NoWait: false,
NoLocal: false,
Args: Table{},
},
QueueOptions: QueueOptions{
Name: queueName,
Durable: false,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Passive: false,
Args: Table{},
Declare: true,
},
ExchangeOptions: ExchangeOptions{
Name: "",
Kind: amqp.ExchangeDirect,
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
Passive: false,
Args: Table{},
Declare: false,
},
Bindings: []Binding{},
Concurrency: 1,
Logger: stdDebugLogger{},
}
}
func getDefaultBindingOptions() BindingOptions {
return BindingOptions{
NoWait: false,
Args: Table{},
Declare: true,
}
}
// ConsumerOptions are used to describe how a new consumer will be created.
// If QueueOptions is not nil, the options will be used to declare a queue
// If ExchangeOptions is not nil, it will be used to declare an exchange
// If there are Bindings, the queue will be bound to them
type ConsumerOptions struct {
RabbitConsumerOptions RabbitConsumerOptions
QueueOptions QueueOptions
ExchangeOptions ExchangeOptions
Bindings []Binding
Concurrency int
Logger logger.Logger
}
// RabbitConsumerOptions are used to configure the consumer
// on the rabbit server
type RabbitConsumerOptions struct {
Name string
AutoAck bool
Exclusive bool
NoWait bool
NoLocal bool
Args Table
}
// QueueOptions are used to configure a queue.
// A passive queue is assumed by RabbitMQ to already exist, and attempting to connect
// to a non-existent queue will cause RabbitMQ to throw an exception.
type QueueOptions struct {
Name string
Durable bool
AutoDelete bool
Exclusive bool
NoWait bool
Passive bool // if false, a missing queue will be created on the server
Args Table
Declare bool
}
// Binding describes the bhinding of a queue to a routing key on an exchange
type Binding struct {
RoutingKey string
BindingOptions
}
// BindingOptions describes the options a binding can have
type BindingOptions struct {
NoWait bool
Args Table
Declare bool
}
// WithConsumerOptionsQueueDurable ensures the queue is a durable queue
func WithConsumerOptionsQueueDurable(options *ConsumerOptions) {
options.QueueOptions.Durable = true
}
// WithConsumerOptionsQueueAutoDelete ensures the queue is an auto-delete queue
func WithConsumerOptionsQueueAutoDelete(options *ConsumerOptions) {
options.QueueOptions.AutoDelete = true
}
// WithConsumerOptionsQueueExclusive ensures the queue is an exclusive queue
func WithConsumerOptionsQueueExclusive(options *ConsumerOptions) {
options.QueueOptions.Exclusive = true
}
// WithConsumerOptionsQueueNoWait ensures the queue is a no-wait queue
func WithConsumerOptionsQueueNoWait(options *ConsumerOptions) {
options.QueueOptions.NoWait = true
}
// WithConsumerOptionsQueuePassive ensures the queue is a passive queue
func WithConsumerOptionsQueuePassive(options *ConsumerOptions) {
options.QueueOptions.Passive = true
}
// WithConsumerOptionsQueueNoDeclare will turn off the declaration of the queue's
// existance upon startup
func WithConsumerOptionsQueueNoDeclare(options *ConsumerOptions) {
options.QueueOptions.Declare = false
}
// WithConsumerOptionsQueueArgs adds optional args to the queue
func WithConsumerOptionsQueueArgs(args Table) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.QueueOptions.Args = args
}
}
// WithConsumerOptionsExchangeName sets the exchange name
func WithConsumerOptionsExchangeName(name string) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.ExchangeOptions.Name = name
}
}
// WithConsumerOptionsExchangeKind ensures the queue is a durable queue
func WithConsumerOptionsExchangeKind(kind string) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.ExchangeOptions.Kind = kind
}
}
// WithConsumerOptionsExchangeDurable ensures the exchange is a durable exchange
func WithConsumerOptionsExchangeDurable(options *ConsumerOptions) {
options.ExchangeOptions.Durable = true
}
// WithConsumerOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange
func WithConsumerOptionsExchangeAutoDelete(options *ConsumerOptions) {
options.ExchangeOptions.AutoDelete = true
}
// WithConsumerOptionsExchangeInternal ensures the exchange is an internal exchange
func WithConsumerOptionsExchangeInternal(options *ConsumerOptions) {
options.ExchangeOptions.Internal = true
}
// WithConsumerOptionsExchangeNoWait ensures the exchange is a no-wait exchange
func WithConsumerOptionsExchangeNoWait(options *ConsumerOptions) {
options.ExchangeOptions.NoWait = true
}
// WithConsumerOptionsExchangeDeclare stops this library from declaring the exchanges existance
func WithConsumerOptionsExchangeDeclare(options *ConsumerOptions) {
options.ExchangeOptions.Declare = true
}
// WithConsumerOptionsExchangePassive ensures the exchange is a passive exchange
func WithConsumerOptionsExchangePassive(options *ConsumerOptions) {
options.ExchangeOptions.Passive = true
}
// WithConsumerOptionsExchangeArgs adds optional args to the exchange
func WithConsumerOptionsExchangeArgs(args Table) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.ExchangeOptions.Args = args
}
}
// WithConsumerOptionsRoutingKey binds the queue to a routing key with the default binding options
func WithConsumerOptionsRoutingKey(routingKey string) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Bindings = append(options.Bindings, Binding{
RoutingKey: routingKey,
BindingOptions: getDefaultBindingOptions(),
})
}
}
// WithConsumerOptionsBinding adds a new binding to the queue which allows you to set the binding options
// on a per-binding basis. Keep in mind that everything in the BindingOptions struct will default to
// the zero value. If you want to declare your bindings for example, be sure to set Declare=true
func WithConsumerOptionsBinding(binding Binding) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Bindings = append(options.Bindings, binding)
}
}
// WithConsumerOptionsConcurrency returns a function that sets the concurrency, which means that
// many goroutines will be spawned to run the provided handler on messages
func WithConsumerOptionsConcurrency(concurrency int) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Concurrency = concurrency
}
}
// WithConsumerOptionsConsumerName returns a function that sets the name on the server of this consumer
// if unset a random name will be given
func WithConsumerOptionsConsumerName(consumerName string) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.RabbitConsumerOptions.Name = consumerName
}
}
// WithConsumerOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer
// if unset the default will be used (false)
func WithConsumerOptionsConsumerAutoAck(autoAck bool) func(*ConsumerOptions) {
return func(options *ConsumerOptions) {
options.RabbitConsumerOptions.AutoAck = autoAck
}
}
// WithConsumerOptionsConsumerExclusive sets the consumer to exclusive, which means
// the server will ensure that this is the sole consumer
// from this queue. When exclusive is false, the server will fairly distribute
// deliveries across multiple consumers.
func WithConsumerOptionsConsumerExclusive(options *ConsumerOptions) {
options.RabbitConsumerOptions.Exclusive = true
}
// WithConsumerOptionsConsumerNoWait sets the consumer to nowait, which means
// it does not wait for the server to confirm the request and
// immediately begin deliveries. If it is not possible to consume, a channel
// exception will be raised and the channel will be closed.
func WithConsumerOptionsConsumerNoWait(options *ConsumerOptions) {
options.RabbitConsumerOptions.NoWait = true
}
// WithConsumerOptionsLogging uses a default logger that writes to std out
func WithConsumerOptionsLogging(options *ConsumerOptions) {
options.Logger = &stdDebugLogger{}
}
// WithConsumerOptionsLogger sets logging to a custom interface.
// Use WithConsumerOptionsLogging to just log to stdout.
func WithConsumerOptionsLogger(log logger.Logger) func(options *ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Logger = log
}
}

+ 1
- 1
declare.go View File

@ -70,7 +70,7 @@ func declareExchange(connManager *connectionmanager.ConnectionManager, options E
return nil return nil
} }
func declareBindings(connManager *connectionmanager.ConnectionManager, options ConsumeOptions) error {
func declareBindings(connManager *connectionmanager.ConnectionManager, options ConsumerOptions) error {
for _, binding := range options.Bindings { for _, binding := range options.Bindings {
if !binding.Declare { if !binding.Declare {
continue continue


+ 5
- 39
examples/consumer/main.go View File

@ -10,8 +10,6 @@ import (
rabbitmq "github.com/wagslane/go-rabbitmq" rabbitmq "github.com/wagslane/go-rabbitmq"
) )
var consumerName = "example"
func main() { func main() {
conn, err := rabbitmq.NewConn( conn, err := rabbitmq.NewConn(
"amqp://guest:guest@localhost", "amqp://guest:guest@localhost",
@ -20,56 +18,24 @@ func main() {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer conn.Close()
consumer, err := rabbitmq.NewConsumer( consumer, err := rabbitmq.NewConsumer(
conn, conn,
rabbitmq.WithConsumerOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
err = consumer.StartConsuming(
func(d rabbitmq.Delivery) rabbitmq.Action { func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed: %v", string(d.Body)) log.Printf("consumed: %v", string(d.Body))
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
return rabbitmq.Ack return rabbitmq.Ack
}, },
"my_queue", "my_queue",
rabbitmq.WithConsumeOptionsConcurrency(2),
rabbitmq.WithConsumeOptionsConsumerName(consumerName),
rabbitmq.WithConsumeOptionsDefaultBinding("my_routing_key"),
rabbitmq.WithConsumeOptionsExchangeName("events"),
)
if err != nil {
log.Fatal(err)
}
consumer2, err := rabbitmq.NewConsumer(
conn,
rabbitmq.WithConsumerOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
defer consumer2.Close()
err = consumer2.StartConsuming(
func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed 2: %v", string(d.Body))
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
return rabbitmq.Ack
},
"my_queue_2",
rabbitmq.WithConsumeOptionsConcurrency(2),
rabbitmq.WithConsumeOptionsConsumerName("consumer3"),
rabbitmq.WithConsumeOptionsDefaultBinding("my_routing_key"),
rabbitmq.WithConsumeOptionsExchangeName("events"),
rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"),
rabbitmq.WithConsumerOptionsExchangeName("events"),
rabbitmq.WithConsumerOptionsExchangeDeclare,
) )
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer consumer.Close()
// block main thread - wait for shutdown signal // block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)


+ 5
- 7
examples/logger/main.go View File

@ -39,6 +39,11 @@ func main() {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer conn.Close()
conn.NotifyReturn(func(r rabbitmq.Return) {
log.Printf("message returned from server: %s", string(r.Body))
})
publisher, err := rabbitmq.NewPublisher( publisher, err := rabbitmq.NewPublisher(
conn, conn,
@ -58,11 +63,4 @@ func main() {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
returns := conn.NotifyReturn()
go func() {
for r := range returns {
log.Printf("message returned from server: %s", string(r.Body))
}
}()
} }

+ 1
- 0
examples/multiconsumer/.gitignore View File

@ -0,0 +1 @@
multiconsumer

+ 76
- 0
examples/multiconsumer/main.go View File

@ -0,0 +1,76 @@
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
func main() {
conn, err := rabbitmq.NewConn(
"amqp://guest:guest@localhost",
rabbitmq.WithConnectionOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
consumer, err := rabbitmq.NewConsumer(
conn,
func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed: %v", string(d.Body))
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
return rabbitmq.Ack
},
"my_queue",
rabbitmq.WithConsumerOptionsConcurrency(2),
rabbitmq.WithConsumerOptionsConsumerName("consumer_1"),
rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"),
rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key_2"),
rabbitmq.WithConsumerOptionsExchangeName("events"),
)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
consumer2, err := rabbitmq.NewConsumer(
conn,
func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed 2: %v", string(d.Body))
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
return rabbitmq.Ack
},
"my_queue",
rabbitmq.WithConsumerOptionsConcurrency(2),
rabbitmq.WithConsumerOptionsConsumerName("consumer_2"),
rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"),
rabbitmq.WithConsumerOptionsExchangeName("events"),
)
if err != nil {
log.Fatal(err)
}
defer consumer2.Close()
// block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Println()
fmt.Println(sig)
done <- true
}()
fmt.Println("awaiting signal")
<-done
fmt.Println("stopping consumer")
}

+ 1
- 0
examples/multipublisher/.gitignore View File

@ -0,0 +1 @@
multipublisher

+ 100
- 0
examples/multipublisher/main.go View File

@ -0,0 +1,100 @@
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
func main() {
conn, err := rabbitmq.NewConn(
"amqp://guest:guest@localhost",
rabbitmq.WithConnectionOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
conn.NotifyReturn(func(r rabbitmq.Return) {
log.Printf("message returned from server: %s", string(r.Body))
})
conn.NotifyPublish(func(c rabbitmq.Confirmation) {
log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack)
})
publisher, err := rabbitmq.NewPublisher(
conn,
rabbitmq.WithPublisherOptionsLogging,
rabbitmq.WithPublisherOptionsExchangeName("events"),
rabbitmq.WithPublisherOptionsExchangeDeclare,
)
if err != nil {
log.Fatal(err)
}
defer publisher.Close()
publisher2, err := rabbitmq.NewPublisher(
conn,
rabbitmq.WithPublisherOptionsLogging,
rabbitmq.WithPublisherOptionsExchangeName("events"),
rabbitmq.WithPublisherOptionsExchangeDeclare,
)
if err != nil {
log.Fatal(err)
}
defer publisher2.Close()
// block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Println()
fmt.Println(sig)
done <- true
}()
fmt.Println("awaiting signal")
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
err = publisher.Publish(
[]byte("hello, world"),
[]string{"my_routing_key"},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
rabbitmq.WithPublishOptionsExchange("events"),
)
if err != nil {
log.Println(err)
}
err = publisher2.Publish(
[]byte("hello, world 2"),
[]string{"my_routing_key_2"},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
rabbitmq.WithPublishOptionsExchange("events"),
)
if err != nil {
log.Println(err)
}
case <-done:
fmt.Println("stopping publisher")
return
}
}
}

+ 12
- 34
examples/publisher/main.go View File

@ -19,38 +19,27 @@ func main() {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer conn.Close()
conn.NotifyReturn(func(r rabbitmq.Return) {
log.Printf("message returned from server: %s", string(r.Body))
})
conn.NotifyPublish(func(c rabbitmq.Confirmation) {
log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack)
})
publisher, err := rabbitmq.NewPublisher( publisher, err := rabbitmq.NewPublisher(
conn, conn,
rabbitmq.WithPublisherOptionsLogging, rabbitmq.WithPublisherOptionsLogging,
rabbitmq.WithPublisherOptionsExchangeName("events"),
rabbitmq.WithPublisherOptionsExchangeDeclare,
) )
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer publisher.Close() defer publisher.Close()
publisher2, err := rabbitmq.NewPublisher(
conn,
rabbitmq.WithPublisherOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
defer publisher2.Close()
returns := conn.NotifyReturn()
go func() {
for r := range returns {
log.Printf("message returned from server: %s", string(r.Body))
}
}()
confirmations := conn.NotifyPublish()
go func() {
for c := range confirmations {
log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack)
}
}()
// block main thread - wait for shutdown signal // block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
done := make(chan bool, 1) done := make(chan bool, 1)
@ -81,17 +70,6 @@ func main() {
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
err = publisher2.Publish(
[]byte("hello, world 2"),
[]string{"my_routing_key"},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
rabbitmq.WithPublishOptionsExchange("events"),
)
if err != nil {
log.Println(err)
}
case <-done: case <-done:
fmt.Println("stopping publisher") fmt.Println("stopping publisher")
return return


+ 2
- 2
internal/connectionmanager/connection_manager.go View File

@ -135,8 +135,8 @@ func (connManager *ConnectionManager) reconnect() error {
return nil return nil
} }
// close safely closes the current channel and connection
func (connManager *ConnectionManager) close() error {
// Close safely closes the current channel and connection
func (connManager *ConnectionManager) Close() error {
connManager.logger.Infof("closing connection manager...") connManager.logger.Infof("closing connection manager...")
connManager.channelMux.Lock() connManager.channelMux.Lock()
defer connManager.channelMux.Unlock() defer connManager.channelMux.Unlock()


+ 25
- 28
publish.go View File

@ -53,35 +53,14 @@ type Publisher struct {
options PublisherOptions options PublisherOptions
} }
// PublisherOptions are used to describe a publisher's configuration.
// Logger is a custom logging interface.
type PublisherOptions struct {
Logger Logger
}
// WithPublisherOptionsLogging sets logging to true on the consumer options
// and sets the
func WithPublisherOptionsLogging(options *PublisherOptions) {
options.Logger = &stdDebugLogger{}
}
// WithPublisherOptionsLogger sets logging to a custom interface.
// Use WithPublisherOptionsLogging to just log to stdout.
func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) {
return func(options *PublisherOptions) {
options.Logger = log
}
}
// NewPublisher returns a new publisher with an open channel to the cluster. // NewPublisher returns a new publisher with an open channel to the cluster.
// If you plan to enforce mandatory or immediate publishing, those failures will be reported // If you plan to enforce mandatory or immediate publishing, those failures will be reported
// on the channel of Returns that you should setup a listener on. // on the channel of Returns that you should setup a listener on.
// Flow controls are automatically handled as they are sent from the server, and publishing // Flow controls are automatically handled as they are sent from the server, and publishing
// will fail with an error when the server is requesting a slowdown // will fail with an error when the server is requesting a slowdown
func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publisher, error) { func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publisher, error) {
options := &PublisherOptions{
Logger: &stdDebugLogger{},
}
defaultOptions := getDefaultPublisherOptions()
options := &defaultOptions
for _, optionFunc := range optionFuncs { for _, optionFunc := range optionFuncs {
optionFunc(options) optionFunc(options)
} }
@ -101,19 +80,34 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe
options: *options, options: *options,
} }
go publisher.startNotifyFlowHandler()
go publisher.startNotifyBlockedHandler()
err := publisher.startup()
if err != nil {
return nil, err
}
go publisher.handleRestarts() go publisher.handleRestarts()
return publisher, nil return publisher, nil
} }
func (publisher *Publisher) startup() error {
err := declareExchange(publisher.connManager, publisher.options.ExchangeOptions)
if err != nil {
return fmt.Errorf("declare exchange failed: %w", err)
}
go publisher.startNotifyFlowHandler()
go publisher.startNotifyBlockedHandler()
return nil
}
func (publisher *Publisher) handleRestarts() { func (publisher *Publisher) handleRestarts() {
for err := range publisher.reconnectErrCh { for err := range publisher.reconnectErrCh {
publisher.options.Logger.Infof("successful publisher recovery from: %v", err) publisher.options.Logger.Infof("successful publisher recovery from: %v", err)
go publisher.startNotifyFlowHandler()
go publisher.startNotifyBlockedHandler()
err := publisher.startup()
if err != nil {
publisher.options.Logger.Infof("failed to startup publisher: %v", err)
continue
}
} }
} }
@ -177,7 +171,10 @@ func (publisher *Publisher) Publish(
// Close closes the publisher and releases resources // Close closes the publisher and releases resources
// The publisher should be discarded as it's not safe for re-use // The publisher should be discarded as it's not safe for re-use
// Only call Close() once
func (publisher *Publisher) Close() { func (publisher *Publisher) Close() {
publisher.options.Logger.Infof("closing publisher...") publisher.options.Logger.Infof("closing publisher...")
publisher.closeConnectionToManagerCh <- struct{}{}
go func() {
publisher.closeConnectionToManagerCh <- struct{}{}
}()
} }

+ 93
- 0
publisher_options.go View File

@ -0,0 +1,93 @@
package rabbitmq
import amqp "github.com/rabbitmq/amqp091-go"
// PublisherOptions are used to describe a publisher's configuration.
// Logger is a custom logging interface.
type PublisherOptions struct {
ExchangeOptions ExchangeOptions
Logger Logger
}
// getDefaultPublisherOptions describes the options that will be used when a value isn't provided
func getDefaultPublisherOptions() PublisherOptions {
return PublisherOptions{
ExchangeOptions: ExchangeOptions{
Name: "",
Kind: amqp.ExchangeDirect,
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
Passive: false,
Args: Table{},
Declare: false,
},
Logger: stdDebugLogger{},
}
}
// WithPublisherOptionsLogging sets logging to true on the publisher options
// and sets the
func WithPublisherOptionsLogging(options *PublisherOptions) {
options.Logger = &stdDebugLogger{}
}
// WithPublisherOptionsLogger sets logging to a custom interface.
// Use WithPublisherOptionsLogging to just log to stdout.
func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) {
return func(options *PublisherOptions) {
options.Logger = log
}
}
// WithPublisherOptionsExchangeName sets the exchange name
func WithPublisherOptionsExchangeName(name string) func(*PublisherOptions) {
return func(options *PublisherOptions) {
options.ExchangeOptions.Name = name
}
}
// WithPublisherOptionsExchangeKind ensures the queue is a durable queue
func WithPublisherOptionsExchangeKind(kind string) func(*PublisherOptions) {
return func(options *PublisherOptions) {
options.ExchangeOptions.Kind = kind
}
}
// WithPublisherOptionsExchangeDurable ensures the exchange is a durable exchange
func WithPublisherOptionsExchangeDurable(options *PublisherOptions) {
options.ExchangeOptions.Durable = true
}
// WithPublisherOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange
func WithPublisherOptionsExchangeAutoDelete(options *PublisherOptions) {
options.ExchangeOptions.AutoDelete = true
}
// WithPublisherOptionsExchangeInternal ensures the exchange is an internal exchange
func WithPublisherOptionsExchangeInternal(options *PublisherOptions) {
options.ExchangeOptions.Internal = true
}
// WithPublisherOptionsExchangeNoWait ensures the exchange is a no-wait exchange
func WithPublisherOptionsExchangeNoWait(options *PublisherOptions) {
options.ExchangeOptions.NoWait = true
}
// WithPublisherOptionsExchangeDeclare stops this library from declaring the exchanges existance
func WithPublisherOptionsExchangeDeclare(options *PublisherOptions) {
options.ExchangeOptions.Declare = true
}
// WithPublisherOptionsExchangePassive ensures the exchange is a passive exchange
func WithPublisherOptionsExchangePassive(options *PublisherOptions) {
options.ExchangeOptions.Passive = true
}
// WithPublisherOptionsExchangeArgs adds optional args to the exchange
func WithPublisherOptionsExchangeArgs(args Table) func(*PublisherOptions) {
return func(options *PublisherOptions) {
options.ExchangeOptions.Args = args
}
}

Loading…
Cancel
Save