Browse Source

connection mananger

pull/95/head
wagslane 3 years ago
parent
commit
0df88ac7e9
20 changed files with 1081 additions and 798 deletions
  1. +0
    -134
      channel.go
  2. +0
    -9
      config.go
  3. +110
    -0
      connection.go
  4. +67
    -0
      connection_options.go
  5. +58
    -62
      consume.go
  6. +196
    -54
      consume_options.go
  7. +72
    -355
      declare.go
  8. +38
    -13
      examples/consumer/main.go
  9. +0
    -1
      examples/consumer_with_declare/.gitignore
  10. +0
    -74
      examples/consumer_with_declare/main.go
  11. +11
    -3
      examples/logger/main.go
  12. +32
    -10
      examples/publisher/main.go
  13. +16
    -0
      exchange_options.go
  14. +160
    -0
      internal/connectionmanager/connection_manager.go
  15. +68
    -0
      internal/connectionmanager/connection_manager_dispatch.go
  16. +210
    -0
      internal/connectionmanager/safe_wraps.go
  17. +12
    -0
      internal/logger/logger.go
  18. +10
    -10
      logger.go
  19. +19
    -71
      publish.go
  20. +2
    -2
      publish_flow_block.go

+ 0
- 134
channel.go View File

@ -1,134 +0,0 @@
package rabbitmq
import (
"errors"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type channelManager struct {
logger Logger
url string
channel *amqp.Channel
connection *amqp.Connection
amqpConfig Config
channelMux *sync.RWMutex
notifyCancelOrClose chan error
reconnectInterval time.Duration
reconnectionCount uint
}
func newChannelManager(url string, conf Config, log Logger, reconnectInterval time.Duration) (*channelManager, error) {
conn, ch, err := getNewChannel(url, conf)
if err != nil {
return nil, err
}
chManager := channelManager{
logger: log,
url: url,
connection: conn,
channel: ch,
channelMux: &sync.RWMutex{},
amqpConfig: conf,
notifyCancelOrClose: make(chan error),
reconnectInterval: reconnectInterval,
}
go chManager.startNotifyCancelOrClosed()
return &chManager, nil
}
func getNewChannel(url string, conf Config) (*amqp.Connection, *amqp.Channel, error) {
amqpConn, err := amqp.DialConfig(url, amqp.Config(conf))
if err != nil {
return nil, nil, err
}
ch, err := amqpConn.Channel()
if err != nil {
return nil, nil, err
}
return amqpConn, ch, nil
}
// startNotifyCancelOrClosed listens on the channel's cancelled and closed
// notifiers. When it detects a problem, it attempts to reconnect.
// Once reconnected, it sends an error back on the manager's notifyCancelOrClose
// channel
func (chManager *channelManager) startNotifyCancelOrClosed() {
notifyCloseChan := chManager.channel.NotifyClose(make(chan *amqp.Error, 1))
notifyCancelChan := chManager.channel.NotifyCancel(make(chan string, 1))
select {
case err := <-notifyCloseChan:
if err != nil {
chManager.logger.Errorf("attempting to reconnect to amqp server after close with error: %v", err)
chManager.reconnectLoop()
chManager.logger.Warnf("successfully reconnected to amqp server")
chManager.notifyCancelOrClose <- err
}
if err == nil {
chManager.logger.Infof("amqp channel closed gracefully")
}
case err := <-notifyCancelChan:
chManager.logger.Errorf("attempting to reconnect to amqp server after cancel with error: %s", err)
chManager.reconnectLoop()
chManager.logger.Warnf("successfully reconnected to amqp server after cancel")
chManager.notifyCancelOrClose <- errors.New(err)
}
}
// reconnectLoop continuously attempts to reconnect
func (chManager *channelManager) reconnectLoop() {
for {
chManager.logger.Infof("waiting %s seconds to attempt to reconnect to amqp server", chManager.reconnectInterval)
time.Sleep(chManager.reconnectInterval)
err := chManager.reconnect()
if err != nil {
chManager.logger.Errorf("error reconnecting to amqp server: %v", err)
} else {
chManager.reconnectionCount++
go chManager.startNotifyCancelOrClosed()
return
}
}
}
// reconnect safely closes the current channel and obtains a new one
func (chManager *channelManager) reconnect() error {
chManager.channelMux.Lock()
defer chManager.channelMux.Unlock()
newConn, newChannel, err := getNewChannel(chManager.url, chManager.amqpConfig)
if err != nil {
return err
}
if err = chManager.channel.Close(); err != nil {
chManager.logger.Warnf("error closing channel while reconnecting: %v", err)
}
if err = chManager.connection.Close(); err != nil {
chManager.logger.Warnf("error closing connection while reconnecting: %v", err)
}
chManager.connection = newConn
chManager.channel = newChannel
return nil
}
// close safely closes the current channel and connection
func (chManager *channelManager) close() error {
chManager.channelMux.Lock()
defer chManager.channelMux.Unlock()
err := chManager.channel.Close()
if err != nil {
return err
}
err = chManager.connection.Close()
if err != nil {
return err
}
return nil
}

+ 0
- 9
config.go View File

@ -1,9 +0,0 @@
package rabbitmq
import amqp "github.com/rabbitmq/amqp091-go"
// Config wraps amqp.Config
// Config is used in DialConfig and Open to specify the desired tuning
// parameters used during a connection open handshake. The negotiated tuning
// will be stored in the returned connection's Config field.
type Config amqp.Config

+ 110
- 0
connection.go View File

@ -0,0 +1,110 @@
package rabbitmq
import (
amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/connectionmanager"
)
// Conn manages the connection to a rabbit cluster
// it is intended to be shared across publishers and consumers
type Conn struct {
connectionManager *connectionmanager.ConnectionManager
reconnectErrCh <-chan error
closeConnectionToManagerCh chan<- struct{}
notifyReturnChan chan Return
notifyPublishChan chan Confirmation
options ConnectionOptions
}
// Config wraps amqp.Config
// Config is used in DialConfig and Open to specify the desired tuning
// parameters used during a connection open handshake. The negotiated tuning
// will be stored in the returned connection's Config field.
type Config amqp.Config
// NewConn creates a new connection manager
func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error) {
defaultOptions := getDefaultConnectionOptions()
options := &defaultOptions
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
manager, err := connectionmanager.NewConnectionManager(url, amqp.Config(options.Config), options.Logger, options.ReconnectInterval)
if err != nil {
return nil, err
}
err = manager.QosSafe(
options.QOSPrefetch,
0,
options.QOSGlobal,
)
if err != nil {
return nil, err
}
reconnectErrCh, closeCh := manager.NotifyReconnect()
conn := &Conn{
connectionManager: manager,
reconnectErrCh: reconnectErrCh,
closeConnectionToManagerCh: closeCh,
notifyReturnChan: nil,
notifyPublishChan: nil,
options: *options,
}
go conn.handleRestarts()
return conn, nil
}
func (conn *Conn) handleRestarts() {
for err := range conn.reconnectErrCh {
conn.options.Logger.Infof("successful connection recovery from: %v", err)
go conn.startNotifyReturnHandler()
go conn.startNotifyPublishHandler()
}
}
func (conn *Conn) startNotifyReturnHandler() {
if conn.notifyReturnChan == nil {
return
}
returnAMQPCh := conn.connectionManager.NotifyReturnSafe(make(chan amqp.Return, 1))
for ret := range returnAMQPCh {
conn.notifyReturnChan <- Return{ret}
}
}
func (conn *Conn) startNotifyPublishHandler() {
if conn.notifyPublishChan == nil {
return
}
conn.connectionManager.ConfirmSafe(false)
publishAMQPCh := conn.connectionManager.NotifyPublishSafe(make(chan amqp.Confirmation, 1))
for conf := range publishAMQPCh {
conn.notifyPublishChan <- Confirmation{
Confirmation: conf,
ReconnectionCount: int(conn.connectionManager.GetReconnectionCount()),
}
}
}
// NotifyReturn registers a listener for basic.return methods.
// These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags.
// These notifications are shared across an entire connection, so if you're creating multiple
// publishers on the same connection keep that in mind
func (conn *Conn) NotifyReturn() <-chan Return {
conn.notifyReturnChan = make(chan Return)
go conn.startNotifyReturnHandler()
return conn.notifyReturnChan
}
// NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option
// These notifications are shared across an entire connection, so if you're creating multiple
// publishers on the same connection keep that in mind
func (conn *Conn) NotifyPublish() <-chan Confirmation {
conn.notifyPublishChan = make(chan Confirmation)
go conn.startNotifyPublishHandler()
return conn.notifyPublishChan
}

+ 67
- 0
connection_options.go View File

@ -0,0 +1,67 @@
package rabbitmq
import "time"
// ConnectionOptions are used to describe how a new consumer will be created.
type ConnectionOptions struct {
QOSPrefetch int
QOSGlobal bool
ReconnectInterval time.Duration
Logger Logger
Config Config
}
// getDefaultConnectionOptions describes the options that will be used when a value isn't provided
func getDefaultConnectionOptions() ConnectionOptions {
return ConnectionOptions{
QOSPrefetch: 0,
QOSGlobal: false,
ReconnectInterval: time.Second * 5,
Logger: stdDebugLogger{},
Config: Config{},
}
}
// WithConnectionOptionsQOSPrefetch returns a function that sets the prefetch count, which means that
// many messages will be fetched from the server in advance to help with throughput.
// This doesn't affect the handler, messages are still processed one at a time.
func WithConnectionOptionsQOSPrefetch(prefetchCount int) func(*ConnectionOptions) {
return func(options *ConnectionOptions) {
options.QOSPrefetch = prefetchCount
}
}
// WithConnectionOptionsQOSGlobal sets the qos on the channel to global, which means
// these QOS settings apply to ALL existing and future
// consumers on all channels on the same connection
func WithConnectionOptionsQOSGlobal(options *ConnectionOptions) {
options.QOSGlobal = true
}
// WithConnectionOptionsReconnectInterval sets the reconnection interval
func WithConnectionOptionsReconnectInterval(interval time.Duration) func(options *ConnectionOptions) {
return func(options *ConnectionOptions) {
options.ReconnectInterval = interval
}
}
// WithConnectionOptionsLogging sets logging to true on the consumer options
// and sets the
func WithConnectionOptionsLogging(options *ConnectionOptions) {
options.Logger = stdDebugLogger{}
}
// WithConnectionOptionsLogger sets logging to true on the consumer options
// and sets the
func WithConnectionOptionsLogger(log Logger) func(options *ConnectionOptions) {
return func(options *ConnectionOptions) {
options.Logger = log
}
}
// WithConnectionOptionsConfig sets the Config used in the connection
func WithConnectionOptionsConfig(cfg Config) func(options *ConnectionOptions) {
return func(options *ConnectionOptions) {
options.Config = cfg
}
}

+ 58
- 62
consume.go View File

@ -1,10 +1,12 @@
package rabbitmq
import (
"errors"
"fmt"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/connectionmanager"
"github.com/wagslane/go-rabbitmq/internal/logger"
)
// Action is an action that occurs after processed this delivery
@ -24,15 +26,16 @@ const (
// Consumer allows you to create and connect to queues for data consumption.
type Consumer struct {
chManager *channelManager
logger Logger
connManager *connectionmanager.ConnectionManager
reconnectErrCh <-chan error
closeConnectionToManagerCh chan<- struct{}
options ConsumerOptions
}
// ConsumerOptions are used to describe a consumer's configuration.
// Logger specifies a custom Logger interface implementation.
type ConsumerOptions struct {
Logger Logger
ReconnectInterval time.Duration
Logger logger.Logger
}
// Delivery captures the fields for a previously delivered message resident in
@ -43,32 +46,27 @@ type Delivery struct {
}
// NewConsumer returns a new Consumer connected to the given rabbitmq server
func NewConsumer(url string, config Config, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) {
func NewConsumer(conn *Conn, optionFuncs ...func(*ConsumerOptions)) (*Consumer, error) {
options := &ConsumerOptions{
Logger: &stdDebugLogger{},
ReconnectInterval: time.Second * 5,
Logger: &stdDebugLogger{},
}
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
chManager, err := newChannelManager(url, config, options.Logger, options.ReconnectInterval)
if err != nil {
return Consumer{}, err
}
consumer := Consumer{
chManager: chManager,
logger: options.Logger,
if conn.connectionManager == nil {
return nil, errors.New("connection manager can't be nil")
}
return consumer, nil
}
reconnectErrCh, closeCh := conn.connectionManager.NotifyReconnect()
// WithConsumerOptionsReconnectInterval sets the interval at which the consumer will
// attempt to reconnect to the rabbit server
func WithConsumerOptionsReconnectInterval(reconnectInterval time.Duration) func(options *ConsumerOptions) {
return func(options *ConsumerOptions) {
options.ReconnectInterval = reconnectInterval
consumer := &Consumer{
connManager: conn.connectionManager,
reconnectErrCh: reconnectErrCh,
closeConnectionToManagerCh: closeCh,
options: *options,
}
return consumer, nil
}
// WithConsumerOptionsLogging uses a default logger that writes to std out
@ -78,7 +76,7 @@ func WithConsumerOptionsLogging(options *ConsumerOptions) {
// WithConsumerOptionsLogger sets logging to a custom interface.
// Use WithConsumerOptionsLogging to just log to stdout.
func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) {
func WithConsumerOptionsLogger(log logger.Logger) func(options *ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Logger = log
}
@ -88,7 +86,7 @@ func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) {
// Each goroutine spawns a handler that consumes off of the given queue which binds to the routing key(s).
// The provided handler is called once for each message. If the provided queue doesn't exist, it
// will be created on the cluster
func (consumer Consumer) StartConsuming(
func (consumer *Consumer) StartConsuming(
handler Handler,
queue string,
optionFuncs ...func(*ConsumeOptions),
@ -108,14 +106,14 @@ func (consumer Consumer) StartConsuming(
}
go func() {
for err := range consumer.chManager.notifyCancelOrClose {
consumer.logger.Infof("successful recovery from: %v", err)
for err := range consumer.reconnectErrCh {
consumer.options.Logger.Infof("successful recovery from: %v", err)
err = consumer.startGoroutines(
handler,
*options,
)
if err != nil {
consumer.logger.Errorf("error restarting consumer goroutines after cancel or close: %v", err)
consumer.options.Logger.Errorf("error restarting consumer goroutines after cancel or close: %v", err)
}
}
}()
@ -123,59 +121,57 @@ func (consumer Consumer) StartConsuming(
}
// Close cleans up resources and closes the consumer.
// The consumer is not safe for reuse
func (consumer Consumer) Close() error {
consumer.chManager.logger.Infof("closing consumer...")
return consumer.chManager.close()
// It does not close the connection manager, just the subscription
// to the connection manager
func (consumer *Consumer) Close() {
consumer.options.Logger.Infof("closing consumer...")
consumer.closeConnectionToManagerCh <- struct{}{}
}
// startGoroutines declares the queue if it doesn't exist,
// binds the queue to the routing key(s), and starts the goroutines
// that will consume from the queue
func (consumer Consumer) startGoroutines(
func (consumer *Consumer) startGoroutines(
handler Handler,
consumeOptions ConsumeOptions,
options ConsumeOptions,
) error {
err := handleDeclare(consumer.chManager, consumeOptions.DeclareOptions)
err := declareExchange(consumer.connManager, options.ExchangeOptions)
if err != nil {
return fmt.Errorf("declare failed: %w", err)
return fmt.Errorf("declare exchange failed: %w", err)
}
consumer.chManager.channelMux.RLock()
defer consumer.chManager.channelMux.RUnlock()
err = consumer.chManager.channel.Qos(
consumeOptions.QOSPrefetch,
0,
consumeOptions.QOSGlobal,
)
err = declareQueue(consumer.connManager, options.QueueOptions)
if err != nil {
return err
return fmt.Errorf("declare queue failed: %w", err)
}
err = declareBindings(consumer.connManager, options)
if err != nil {
return fmt.Errorf("declare bindings failed: %w", err)
}
msgs, err := consumer.chManager.channel.Consume(
consumeOptions.QueueName,
consumeOptions.ConsumerName,
consumeOptions.ConsumerAutoAck,
consumeOptions.ConsumerExclusive,
consumeOptions.ConsumerNoLocal, // no-local is not supported by RabbitMQ
consumeOptions.ConsumerNoWait,
tableToAMQPTable(consumeOptions.ConsumerArgs),
msgs, err := consumer.connManager.ConsumeSafe(
options.QueueOptions.Name,
options.RabbitConsumerOptions.Name,
options.RabbitConsumerOptions.AutoAck,
options.RabbitConsumerOptions.Exclusive,
false, // no-local is not supported by RabbitMQ
options.RabbitConsumerOptions.NoWait,
tableToAMQPTable(options.RabbitConsumerOptions.Args),
)
if err != nil {
return err
}
for i := 0; i < consumeOptions.Concurrency; i++ {
go handlerGoroutine(consumer, msgs, consumeOptions, handler)
for i := 0; i < options.Concurrency; i++ {
go handlerGoroutine(consumer, msgs, options, handler)
}
consumer.logger.Infof("Processing messages on %v goroutines", consumeOptions.Concurrency)
consumer.options.Logger.Infof("Processing messages on %v goroutines", options.Concurrency)
return nil
}
func handlerGoroutine(consumer Consumer, msgs <-chan amqp.Delivery, consumeOptions ConsumeOptions, handler Handler) {
func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOptions ConsumeOptions, handler Handler) {
for msg := range msgs {
if consumeOptions.ConsumerAutoAck {
if consumeOptions.RabbitConsumerOptions.AutoAck {
handler(Delivery{msg})
continue
}
@ -183,19 +179,19 @@ func handlerGoroutine(consumer Consumer, msgs <-chan amqp.Delivery, consumeOptio
case Ack:
err := msg.Ack(false)
if err != nil {
consumer.logger.Errorf("can't ack message: %v", err)
consumer.options.Logger.Errorf("can't ack message: %v", err)
}
case NackDiscard:
err := msg.Nack(false, false)
if err != nil {
consumer.logger.Errorf("can't nack message: %v", err)
consumer.options.Logger.Errorf("can't nack message: %v", err)
}
case NackRequeue:
err := msg.Nack(false, true)
if err != nil {
consumer.logger.Errorf("can't nack message: %v", err)
consumer.options.Logger.Errorf("can't nack message: %v", err)
}
}
}
consumer.logger.Infof("rabbit consumer goroutine closed")
consumer.options.Logger.Infof("rabbit consumer goroutine closed")
}

+ 196
- 54
consume_options.go View File

@ -1,83 +1,225 @@
package rabbitmq
import (
amqp "github.com/rabbitmq/amqp091-go"
)
// getDefaultConsumeOptions describes the options that will be used when a value isn't provided
func getDefaultConsumeOptions(queue string) ConsumeOptions {
func getDefaultConsumeOptions(queueName string) ConsumeOptions {
return ConsumeOptions{
QueueName: queue,
Concurrency: 1,
QOSPrefetch: 0,
QOSGlobal: false,
ConsumerName: "",
ConsumerAutoAck: false,
ConsumerExclusive: false,
ConsumerNoWait: false,
ConsumerNoLocal: false,
ConsumerArgs: nil,
RabbitConsumerOptions: RabbitConsumerOptions{
Name: "",
AutoAck: false,
Exclusive: false,
NoWait: false,
NoLocal: false,
Args: Table{},
},
QueueOptions: QueueOptions{
Name: queueName,
Durable: false,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Passive: false,
Args: Table{},
Declare: true,
},
ExchangeOptions: ExchangeOptions{
Name: "",
Kind: amqp.ExchangeDirect,
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
Passive: false,
Args: Table{},
Declare: true,
},
Bindings: []Binding{},
Concurrency: 1,
}
}
func getDefaultBindingOptions() BindingOptions {
return BindingOptions{
NoWait: false,
Args: Table{},
Declare: true,
}
}
// ConsumeOptions are used to describe how a new consumer will be created.
// If QueueOptions is not nil, the options will be used to declare a queue
// If ExchangeOptions is not nil, it will be used to declare an exchange
// If there are Bindings, the queue will be bound to them
type ConsumeOptions struct {
DeclareOptions
QueueName string
Concurrency int
QOSPrefetch int
QOSGlobal bool
ConsumerName string
ConsumerAutoAck bool
ConsumerExclusive bool
ConsumerNoWait bool
ConsumerNoLocal bool
ConsumerArgs Table
}
// WithConsumeDeclareOptions allows to set declare options that can be used to set up queue, exchange or bindings
// before the consumer process starts.
func WithConsumeDeclareOptions(declareOptionsFuncs ...func(options *DeclareOptions)) func(*ConsumeOptions) {
RabbitConsumerOptions RabbitConsumerOptions
QueueOptions QueueOptions
ExchangeOptions ExchangeOptions
Bindings []Binding
Concurrency int
}
// RabbitConsumerOptions are used to configure the consumer
// on the rabbit server
type RabbitConsumerOptions struct {
Name string
AutoAck bool
Exclusive bool
NoWait bool
NoLocal bool
Args Table
}
// QueueOptions are used to configure a queue.
// A passive queue is assumed by RabbitMQ to already exist, and attempting to connect
// to a non-existent queue will cause RabbitMQ to throw an exception.
type QueueOptions struct {
Name string
Durable bool
AutoDelete bool
Exclusive bool
NoWait bool
Passive bool // if false, a missing queue will be created on the server
Args Table
Declare bool
}
// Binding describes the bhinding of a queue to a routing key on an exchange
type Binding struct {
RoutingKey string
BindingOptions
}
// BindingOptions describes the options a binding can have
type BindingOptions struct {
NoWait bool
Args Table
Declare bool
}
// WithConsumeOptionsQueueDurable ensures the queue is a durable queue
func WithConsumeOptionsQueueDurable(options *ConsumeOptions) {
options.QueueOptions.Durable = true
}
// WithConsumeOptionsQueueAutoDelete ensures the queue is an auto-delete queue
func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions) {
options.QueueOptions.AutoDelete = true
}
// WithConsumeOptionsQueueExclusive ensures the queue is an exclusive queue
func WithConsumeOptionsQueueExclusive(options *ConsumeOptions) {
options.QueueOptions.Exclusive = true
}
// WithConsumeOptionsQueueNoWait ensures the queue is a no-wait queue
func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) {
options.QueueOptions.NoWait = true
}
// WithConsumeOptionsQueuePassive ensures the queue is a passive queue
func WithConsumeOptionsQueuePassive(options *ConsumeOptions) {
options.QueueOptions.Passive = true
}
// WithConsumeOptionsQueueNoDeclare will turn off the declaration of the queue's
// existance upon startup
func WithConsumeOptionsQueueNoDeclare(options *ConsumeOptions) {
options.QueueOptions.Declare = false
}
// WithConsumeOptionsQueueArgs adds optional args to the queue
func WithConsumeOptionsQueueArgs(args Table) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.QueueOptions.Args = args
}
}
// WithConsumeOptionsExchangeName sets the exchange name
func WithConsumeOptionsExchangeName(name string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.ExchangeOptions.Name = name
}
}
// WithConsumeOptionsExchangeKind ensures the queue is a durable queue
func WithConsumeOptionsExchangeKind(kind string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
for _, declareOption := range declareOptionsFuncs {
// If a queue was set to declare, ensure that the queue name is set.
if options.Queue != nil {
if options.Queue.Name == "" {
options.Queue.Name = options.QueueName
}
}
options.ExchangeOptions.Kind = kind
}
}
declareOption(&options.DeclareOptions)
}
// WithConsumeOptionsExchangeDurable ensures the exchange is a durable exchange
func WithConsumeOptionsExchangeDurable(options *ConsumeOptions) {
options.ExchangeOptions.Durable = true
}
// WithConsumeOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange
func WithConsumeOptionsExchangeAutoDelete(options *ConsumeOptions) {
options.ExchangeOptions.AutoDelete = true
}
// WithConsumeOptionsExchangeInternal ensures the exchange is an internal exchange
func WithConsumeOptionsExchangeInternal(options *ConsumeOptions) {
options.ExchangeOptions.Internal = true
}
// WithConsumeOptionsExchangeNoWait ensures the exchange is a no-wait exchange
func WithConsumeOptionsExchangeNoWait(options *ConsumeOptions) {
options.ExchangeOptions.NoWait = true
}
// WithConsumeOptionsExchangeNoDeclare stops this library from declaring the exchanges existance
func WithConsumeOptionsExchangeNoDeclare(options *ConsumeOptions) {
options.ExchangeOptions.Declare = false
}
// WithConsumeOptionsExchangePassive ensures the exchange is a passive exchange
func WithConsumeOptionsExchangePassive(options *ConsumeOptions) {
options.ExchangeOptions.Passive = true
}
// WithConsumeOptionsExchangeArgs adds optional args to the exchange
func WithConsumeOptionsExchangeArgs(args Table) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.ExchangeOptions.Args = args
}
}
// WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that
// many goroutines will be spawned to run the provided handler on messages
func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) {
// WithConsumeOptionsDefaultBinding binds the queue to a routing key with the default binding options
func WithConsumeOptionsDefaultBinding(routingKey string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.Concurrency = concurrency
options.Bindings = append(options.Bindings, Binding{
RoutingKey: routingKey,
BindingOptions: getDefaultBindingOptions(),
})
}
}
// WithConsumeOptionsQOSPrefetch returns a function that sets the prefetch count, which means that
// many messages will be fetched from the server in advance to help with throughput.
// This doesn't affect the handler, messages are still processed one at a time.
func WithConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions) {
// WithConsumeOptionsBinding adds a new binding to the queue which allows you to set the binding options
// on a per-binding basis. Keep in mind that everything in the BindingOptions struct will default to
// the zero value. If you want to declare your bindings for example, be sure to set Declare=true
func WithConsumeOptionsBinding(binding Binding) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.QOSPrefetch = prefetchCount
options.Bindings = append(options.Bindings, binding)
}
}
// WithConsumeOptionsQOSGlobal sets the qos on the channel to global, which means
// these QOS settings apply to ALL existing and future
// consumers on all channels on the same connection
func WithConsumeOptionsQOSGlobal(options *ConsumeOptions) {
options.QOSGlobal = true
// WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that
// many goroutines will be spawned to run the provided handler on messages
func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.Concurrency = concurrency
}
}
// WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer
// if unset a random name will be given
func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.ConsumerName = consumerName
options.RabbitConsumerOptions.Name = consumerName
}
}
@ -85,7 +227,7 @@ func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) {
// if unset the default will be used (false)
func WithConsumeOptionsConsumerAutoAck(autoAck bool) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.ConsumerAutoAck = autoAck
options.RabbitConsumerOptions.AutoAck = autoAck
}
}
@ -94,7 +236,7 @@ func WithConsumeOptionsConsumerAutoAck(autoAck bool) func(*ConsumeOptions) {
// from this queue. When exclusive is false, the server will fairly distribute
// deliveries across multiple consumers.
func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) {
options.ConsumerExclusive = true
options.RabbitConsumerOptions.Exclusive = true
}
// WithConsumeOptionsConsumerNoWait sets the consumer to nowait, which means
@ -102,5 +244,5 @@ func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) {
// immediately begin deliveries. If it is not possible to consume, a channel
// exception will be raised and the channel will be closed.
func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions) {
options.ConsumerNoWait = true
options.RabbitConsumerOptions.NoWait = true
}

+ 72
- 355
declare.go View File

@ -1,373 +1,90 @@
package rabbitmq
import "fmt"
// DeclareOptions are used to describe how a new queues, exchanges the routing setup should look like.
type DeclareOptions struct {
Queue *QueueOptions
Exchange *ExchangeOptions
Bindings []Binding
}
// QueueOptions are used to configure a queue.
// If the Passive flag is set the client will only check if the queue exists on the server
// and that the settings match, no creation attempt will be made.
type QueueOptions struct {
Name string
Durable bool
AutoDelete bool
Exclusive bool
NoWait bool
Passive bool // if false, a missing queue will be created on the server
Args Table
}
// ExchangeOptions are used to configure an exchange.
// If the Passive flag is set the client will only check if the exchange exists on the server
// and that the settings match, no creation attempt will be made.
type ExchangeOptions struct {
Name string
Kind string // possible values: empty string for default exchange or direct, topic, fanout
Durable bool
AutoDelete bool
Internal bool
NoWait bool
Passive bool // if false, a missing exchange will be created on the server
Args Table
}
// BindingOption are used to configure a queue bindings.
type BindingOption struct {
NoWait bool
Args Table
}
// Binding describes a queue binding to a specific exchange.
type Binding struct {
BindingOption
QueueName string
ExchangeName string
RoutingKey string
}
// SetBindings trys to generate bindings for the given routing keys and the queue and exchange options.
// If either Queue or Exchange properties are empty or no queue name is specified, no bindings will be set.
func (o *DeclareOptions) SetBindings(routingKeys []string, opt BindingOption) {
if o.Queue == nil || o.Exchange == nil {
return // nothing to set...
}
if o.Queue.Name == "" {
return // nothing to set...
}
for _, routingKey := range routingKeys {
o.Bindings = append(o.Bindings, Binding{
QueueName: o.Queue.Name,
ExchangeName: o.Exchange.Name,
RoutingKey: routingKey,
BindingOption: opt,
})
}
}
// handleDeclare handles the queue, exchange and binding declare process on the server.
// If there are no options set, no actions will be executed.
func handleDeclare(chManager *channelManager, options DeclareOptions) error {
chManager.channelMux.RLock()
defer chManager.channelMux.RUnlock()
// bind queue
if options.Queue != nil {
queue := options.Queue
if queue.Name == "" {
return fmt.Errorf("missing queue name")
}
if queue.Passive {
_, err := chManager.channel.QueueDeclarePassive(
queue.Name,
queue.Durable,
queue.AutoDelete,
queue.Exclusive,
queue.NoWait,
tableToAMQPTable(queue.Args),
)
if err != nil {
return err
}
} else {
_, err := chManager.channel.QueueDeclare(
queue.Name,
queue.Durable,
queue.AutoDelete,
queue.Exclusive,
queue.NoWait,
tableToAMQPTable(queue.Args),
)
if err != nil {
return err
}
}
}
// bind exchange
if options.Exchange != nil {
exchange := options.Exchange
if exchange.Name == "" {
return fmt.Errorf("missing exchange name")
}
if exchange.Passive {
err := chManager.channel.ExchangeDeclarePassive(
exchange.Name,
exchange.Kind,
exchange.Durable,
exchange.AutoDelete,
exchange.Internal,
exchange.NoWait,
tableToAMQPTable(exchange.Args),
)
if err != nil {
return err
}
} else {
err := chManager.channel.ExchangeDeclare(
exchange.Name,
exchange.Kind,
exchange.Durable,
exchange.AutoDelete,
exchange.Internal,
exchange.NoWait,
tableToAMQPTable(exchange.Args),
)
if err != nil {
return err
}
}
}
// handle binding of queues to exchange
for _, binding := range options.Bindings {
err := chManager.channel.QueueBind(
binding.QueueName, // name of the queue
binding.RoutingKey, // bindingKey
binding.ExchangeName, // sourceExchange
binding.NoWait, // noWait
tableToAMQPTable(binding.Args), // arguments
import (
"github.com/wagslane/go-rabbitmq/internal/connectionmanager"
)
func declareQueue(connManager *connectionmanager.ConnectionManager, options QueueOptions) error {
if !options.Declare {
return nil
}
if options.Passive {
_, err := connManager.QueueDeclarePassiveSafe(
options.Name,
options.Durable,
options.AutoDelete,
options.Exclusive,
options.NoWait,
tableToAMQPTable(options.Args),
)
if err != nil {
return err
}
return nil
}
_, err := connManager.QueueDeclareSafe(
options.Name,
options.Durable,
options.AutoDelete,
options.Exclusive,
options.NoWait,
tableToAMQPTable(options.Args),
)
if err != nil {
return err
}
return nil
}
// getExchangeOptionsOrSetDefault returns pointer to current ExchangeOptions options.
// If no exchange options are set yet, new options with default values will be defined.
func getExchangeOptionsOrSetDefault(options *DeclareOptions) *ExchangeOptions {
if options.Exchange == nil {
options.Exchange = &ExchangeOptions{
Name: "",
Kind: "direct",
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
Args: nil,
Passive: false,
}
func declareExchange(connManager *connectionmanager.ConnectionManager, options ExchangeOptions) error {
if !options.Declare {
return nil
}
return options.Exchange
}
// getQueueOptionsOrSetDefault returns pointer to current QueueOptions options.
// If no queue options are set yet, new options with default values will be defined.
func getQueueOptionsOrSetDefault(options *DeclareOptions) *QueueOptions {
if options.Queue == nil {
options.Queue = &QueueOptions{
Name: "",
Durable: false,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Passive: false,
Args: nil,
if options.Passive {
err := connManager.ExchangeDeclarePassiveSafe(
options.Name,
options.Kind,
options.Durable,
options.AutoDelete,
options.Internal,
options.NoWait,
tableToAMQPTable(options.Args),
)
if err != nil {
return err
}
return nil
}
err := connManager.ExchangeDeclareSafe(
options.Name,
options.Kind,
options.Durable,
options.AutoDelete,
options.Internal,
options.NoWait,
tableToAMQPTable(options.Args),
)
if err != nil {
return err
}
return options.Queue
}
// region general-options
// WithDeclareQueue sets the queue that should be declared prior to other RabbitMQ actions are being executed.
// Only the settings will be validated if the queue already exists on the server.
// Matching settings will result in no action, different settings will result in an error.
// If the 'Passive' property is set to false, a missing queue will be created on the server.
func WithDeclareQueue(settings *QueueOptions) func(*DeclareOptions) {
return func(options *DeclareOptions) {
options.Queue = settings
}
}
// WithDeclareExchange sets the exchange that should be declared prior to other RabbitMQ actions are being executed.
// Only the settings will be validated if the exchange already exists on the server.
// Matching settings will result in no action, different settings will result in an error.
// If the 'Passive' property is set to false, a missing exchange will be created on the server.
func WithDeclareExchange(settings *ExchangeOptions) func(*DeclareOptions) {
return func(options *DeclareOptions) {
options.Exchange = settings
}
}
// WithDeclareBindings sets the bindings that should be declared prior to other RabbitMQ actions are being executed.
// Only the settings will be validated if one of the bindings already exists on the server.
// Matching settings will result in no action, different settings will result in an error.
// If the 'Passive' property is set to false, missing bindings will be created on the server.
func WithDeclareBindings(bindings []Binding) func(*DeclareOptions) {
return func(options *DeclareOptions) {
options.Bindings = bindings
}
}
// WithDeclareBindingsForRoutingKeys sets the bindings that should be declared prior to other RabbitMQ
// actions are being executed.
// This function must be called after the queue and exchange declaration settings have been set,
// otherwise this function has no effect.
func WithDeclareBindingsForRoutingKeys(routingKeys []string) func(*DeclareOptions) {
return func(options *DeclareOptions) {
options.SetBindings(routingKeys, BindingOption{})
}
}
// endregion general-options
// region single-options
// WithDeclareQueueName returns a function that sets the queue name.
func WithDeclareQueueName(name string) func(*DeclareOptions) {
return func(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).Name = name
}
}
// WithDeclareQueueDurable sets the queue to durable, which means it won't
// be destroyed when the server restarts. It must only be bound to durable exchanges.
func WithDeclareQueueDurable(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).Durable = true
}
// WithDeclareQueueAutoDelete sets the queue to auto delete, which means it will
// be deleted when there are no more consumers on it.
func WithDeclareQueueAutoDelete(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).AutoDelete = true
}
// WithDeclareQueueExclusive sets the queue to exclusive, which means
// it's are only accessible by the connection that declares it and
// will be deleted when the connection closes. Channels on other connections
// will receive an error when attempting to declare, bind, consume, purge or
// delete a queue with the same name.
func WithDeclareQueueExclusive(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).Exclusive = true
}
// WithDeclareQueueNoWait sets the queue to nowait, which means
// the queue will assume to be declared on the server. A channel
// exception will arrive if the conditions are met for existing queues
// or attempting to modify an existing queue from a different connection.
func WithDeclareQueueNoWait(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).NoWait = true
}
// WithDeclareQueueNoDeclare sets the queue to no declare, which means
// the queue will be assumed to be declared on the server, and thus only will be validated.
func WithDeclareQueueNoDeclare(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).Passive = true
}
// WithDeclareQueueArgs returns a function that sets the queue arguments.
func WithDeclareQueueArgs(args Table) func(*DeclareOptions) {
return func(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).Args = args
}
}
// WithDeclareQueueQuorum sets the queue a quorum type, which means multiple nodes
// in the cluster will have the messages distributed amongst them for higher reliability.
func WithDeclareQueueQuorum(options *DeclareOptions) {
queue := getQueueOptionsOrSetDefault(options)
if queue.Args == nil {
queue.Args = Table{}
}
queue.Args["x-queue-type"] = "quorum"
}
// WithDeclareExchangeName returns a function that sets the exchange name.
func WithDeclareExchangeName(name string) func(*DeclareOptions) {
return func(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).Name = name
}
}
// WithDeclareExchangeKind returns a function that sets the binding exchange kind/type.
func WithDeclareExchangeKind(kind string) func(*DeclareOptions) {
return func(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).Kind = kind
}
}
// WithDeclareExchangeDurable returns a function that sets the binding exchange durable flag.
func WithDeclareExchangeDurable(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).Durable = true
}
// WithDeclareExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag.
func WithDeclareExchangeAutoDelete(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).AutoDelete = true
}
// WithDeclareExchangeInternal returns a function that sets the binding exchange internal flag.
func WithDeclareExchangeInternal(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).Internal = true
}
// WithDeclareExchangeNoWait returns a function that sets the binding exchange noWait flag.
func WithDeclareExchangeNoWait(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).NoWait = true
}
// WithDeclareExchangeArgs returns a function that sets the binding exchange arguments
// that are specific to the server's implementation of the exchange.
func WithDeclareExchangeArgs(args Table) func(*DeclareOptions) {
return func(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).Args = args
}
}
// WithDeclareExchangeNoDeclare returns a function that skips the declaration of the
// binding exchange. Use this setting if the exchange already exists and you don't need to declare
// it on consumer start.
func WithDeclareExchangeNoDeclare(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).Passive = true
}
// WithDeclareBindingNoWait sets the bindings to nowait, which means if the queue can not be bound
// the channel will not be closed with an error.
// This function must be called after bindings have been defined, otherwise it has no effect.
func WithDeclareBindingNoWait(options *DeclareOptions) {
for i := range options.Bindings {
options.Bindings[i].NoWait = true
}
return nil
}
// WithDeclareBindingArgs sets the arguments of the bindings to args.
// This function must be called after bindings have been defined, otherwise it has no effect.
func WithDeclareBindingArgs(args Table) func(*DeclareOptions) {
return func(options *DeclareOptions) {
for i := range options.Bindings {
options.Bindings[i].Args = args
func declareBindings(connManager *connectionmanager.ConnectionManager, options ConsumeOptions) error {
for _, binding := range options.Bindings {
if !binding.Declare {
continue
}
err := connManager.QueueBindSafe(
options.QueueOptions.Name,
binding.RoutingKey,
options.ExchangeOptions.Name,
binding.NoWait,
tableToAMQPTable(binding.Args),
)
if err != nil {
return err
}
}
return nil
}
// endregion single-options

+ 38
- 13
examples/consumer/main.go View File

@ -13,19 +13,22 @@ import (
var consumerName = "example"
func main() {
conn, err := rabbitmq.NewConn(
"amqp://guest:guest@localhost",
rabbitmq.WithConnectionOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
consumer, err := rabbitmq.NewConsumer(
"amqp://guest:guest@localhost", rabbitmq.Config{},
conn,
rabbitmq.WithConsumerOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
defer func() {
err := consumer.Close()
if err != nil {
log.Fatal(err)
}
}()
defer consumer.Close()
err = consumer.StartConsuming(
func(d rabbitmq.Delivery) rabbitmq.Action {
@ -34,13 +37,35 @@ func main() {
return rabbitmq.Ack
},
"my_queue",
rabbitmq.WithConsumeOptionsConcurrency(10),
rabbitmq.WithConsumeOptionsConcurrency(2),
rabbitmq.WithConsumeOptionsConsumerName(consumerName),
rabbitmq.WithConsumeDeclareOptions(
// creates a the queue if it doesn't exist yet
rabbitmq.WithDeclareQueueDurable,
rabbitmq.WithDeclareBindingsForRoutingKeys([]string{"my_routing_key"}),
),
rabbitmq.WithConsumeOptionsDefaultBinding("my_routing_key"),
rabbitmq.WithConsumeOptionsExchangeName("events"),
)
if err != nil {
log.Fatal(err)
}
consumer2, err := rabbitmq.NewConsumer(
conn,
rabbitmq.WithConsumerOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
defer consumer2.Close()
err = consumer2.StartConsuming(
func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed 2: %v", string(d.Body))
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
return rabbitmq.Ack
},
"my_queue_2",
rabbitmq.WithConsumeOptionsConcurrency(2),
rabbitmq.WithConsumeOptionsConsumerName("consumer3"),
rabbitmq.WithConsumeOptionsDefaultBinding("my_routing_key"),
rabbitmq.WithConsumeOptionsExchangeName("events"),
)
if err != nil {
log.Fatal(err)


+ 0
- 1
examples/consumer_with_declare/.gitignore View File

@ -1 +0,0 @@
consumer_with_declare

+ 0
- 74
examples/consumer_with_declare/main.go View File

@ -1,74 +0,0 @@
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
var consumerName = "example_with_declare"
func main() {
consumer, err := rabbitmq.NewConsumer(
"amqp://guest:guest@localhost", rabbitmq.Config{},
rabbitmq.WithConsumerOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
defer func() {
err := consumer.Close()
if err != nil {
log.Fatal(err)
}
}()
err = consumer.StartConsuming(
func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed: %v", string(d.Body))
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
return rabbitmq.Ack
},
"my_queue",
rabbitmq.WithConsumeDeclareOptions(
rabbitmq.WithDeclareQueueDurable,
rabbitmq.WithDeclareQueueQuorum,
rabbitmq.WithDeclareExchangeName("events"),
rabbitmq.WithDeclareExchangeKind("topic"),
rabbitmq.WithDeclareExchangeDurable,
rabbitmq.WithDeclareBindingsForRoutingKeys([]string{"routing_key", "routing_key_2"}), // implicit bindings
rabbitmq.WithDeclareBindings([]rabbitmq.Binding{ // custom bindings
{
QueueName: "my_queue",
ExchangeName: "events",
RoutingKey: "a_custom_key",
},
}),
),
rabbitmq.WithConsumeOptionsConsumerName(consumerName),
)
if err != nil {
log.Fatal(err)
}
// block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Println()
fmt.Println(sig)
done <- true
}()
fmt.Println("awaiting signal")
<-done
fmt.Println("stopping consumer")
}

+ 11
- 3
examples/logger/main.go View File

@ -32,8 +32,16 @@ func (l errorLogger) Tracef(format string, v ...interface{}) {}
func main() {
mylogger := &errorLogger{}
conn, err := rabbitmq.NewConn(
"amqp://guest:guest@localhost",
rabbitmq.WithConnectionOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
publisher, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost", rabbitmq.Config{},
conn,
rabbitmq.WithPublisherOptionsLogger(mylogger),
)
if err != nil {
@ -41,7 +49,7 @@ func main() {
}
err = publisher.Publish(
[]byte("hello, world"),
[]string{"routing_key"},
[]string{"my_routing_key"},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
@ -51,7 +59,7 @@ func main() {
log.Fatal(err)
}
returns := publisher.NotifyReturn()
returns := conn.NotifyReturn()
go func() {
for r := range returns {
log.Printf("message returned from server: %s", string(r.Body))


+ 32
- 10
examples/publisher/main.go View File

@ -12,28 +12,39 @@ import (
)
func main() {
conn, err := rabbitmq.NewConn(
"amqp://guest:guest@localhost",
rabbitmq.WithConnectionOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
publisher, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost", rabbitmq.Config{},
conn,
rabbitmq.WithPublisherOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
defer func() {
err := publisher.Close()
if err != nil {
log.Fatal(err)
}
}()
defer publisher.Close()
publisher2, err := rabbitmq.NewPublisher(
conn,
rabbitmq.WithPublisherOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
defer publisher2.Close()
returns := publisher.NotifyReturn()
returns := conn.NotifyReturn()
go func() {
for r := range returns {
log.Printf("message returned from server: %s", string(r.Body))
}
}()
confirmations := publisher.NotifyPublish()
confirmations := conn.NotifyPublish()
go func() {
for c := range confirmations {
log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack)
@ -61,7 +72,18 @@ func main() {
case <-ticker.C:
err = publisher.Publish(
[]byte("hello, world"),
[]string{"routing_key"},
[]string{"my_routing_key"},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
rabbitmq.WithPublishOptionsExchange("events"),
)
if err != nil {
log.Println(err)
}
err = publisher2.Publish(
[]byte("hello, world 2"),
[]string{"my_routing_key"},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,


+ 16
- 0
exchange_options.go View File

@ -0,0 +1,16 @@
package rabbitmq
// ExchangeOptions are used to configure an exchange.
// If the Passive flag is set the client will only check if the exchange exists on the server
// and that the settings match, no creation attempt will be made.
type ExchangeOptions struct {
Name string
Kind string // possible values: empty string for default exchange or direct, topic, fanout
Durable bool
AutoDelete bool
Internal bool
NoWait bool
Passive bool // if false, a missing exchange will be created on the server
Args Table
Declare bool
}

+ 160
- 0
internal/connectionmanager/connection_manager.go View File

@ -0,0 +1,160 @@
package connectionmanager
import (
"errors"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/logger"
)
// ConnectionManager -
type ConnectionManager struct {
logger logger.Logger
url string
channel *amqp.Channel
connection *amqp.Connection
amqpConfig amqp.Config
channelMux *sync.RWMutex
reconnectInterval time.Duration
reconnectionCount uint
reconnectionCountMux *sync.Mutex
dispatcher *dispatcher
}
// NewConnectionManager creates a new connection manager
func NewConnectionManager(url string, conf amqp.Config, log logger.Logger, reconnectInterval time.Duration) (*ConnectionManager, error) {
conn, ch, err := getNewChannel(url, conf)
if err != nil {
return nil, err
}
connManager := ConnectionManager{
logger: log,
url: url,
connection: conn,
channel: ch,
channelMux: &sync.RWMutex{},
amqpConfig: conf,
reconnectInterval: reconnectInterval,
reconnectionCount: 0,
reconnectionCountMux: &sync.Mutex{},
dispatcher: newDispatcher(),
}
go connManager.startNotifyCancelOrClosed()
return &connManager, nil
}
func getNewChannel(url string, conf amqp.Config) (*amqp.Connection, *amqp.Channel, error) {
amqpConn, err := amqp.DialConfig(url, amqp.Config(conf))
if err != nil {
return nil, nil, err
}
ch, err := amqpConn.Channel()
if err != nil {
return nil, nil, err
}
return amqpConn, ch, nil
}
// startNotifyCancelOrClosed listens on the channel's cancelled and closed
// notifiers. When it detects a problem, it attempts to reconnect.
// Once reconnected, it sends an error back on the manager's notifyCancelOrClose
// channel
func (connManager *ConnectionManager) startNotifyCancelOrClosed() {
notifyCloseChan := connManager.channel.NotifyClose(make(chan *amqp.Error, 1))
notifyCancelChan := connManager.channel.NotifyCancel(make(chan string, 1))
select {
case err := <-notifyCloseChan:
if err != nil {
connManager.logger.Errorf("attempting to reconnect to amqp server after close with error: %v", err)
connManager.reconnectLoop()
connManager.logger.Warnf("successfully reconnected to amqp server")
connManager.dispatcher.dispatch(err)
}
if err == nil {
connManager.logger.Infof("amqp channel closed gracefully")
}
case err := <-notifyCancelChan:
connManager.logger.Errorf("attempting to reconnect to amqp server after cancel with error: %s", err)
connManager.reconnectLoop()
connManager.logger.Warnf("successfully reconnected to amqp server after cancel")
connManager.dispatcher.dispatch(errors.New(err))
}
}
// GetReconnectionCount -
func (connManager *ConnectionManager) GetReconnectionCount() uint {
connManager.reconnectionCountMux.Lock()
defer connManager.reconnectionCountMux.Unlock()
return connManager.reconnectionCount
}
func (connManager *ConnectionManager) incrementReconnectionCount() {
connManager.reconnectionCountMux.Lock()
defer connManager.reconnectionCountMux.Unlock()
connManager.reconnectionCount++
}
// reconnectLoop continuously attempts to reconnect
func (connManager *ConnectionManager) reconnectLoop() {
for {
connManager.logger.Infof("waiting %s seconds to attempt to reconnect to amqp server", connManager.reconnectInterval)
time.Sleep(connManager.reconnectInterval)
err := connManager.reconnect()
if err != nil {
connManager.logger.Errorf("error reconnecting to amqp server: %v", err)
} else {
connManager.incrementReconnectionCount()
go connManager.startNotifyCancelOrClosed()
return
}
}
}
// reconnect safely closes the current channel and obtains a new one
func (connManager *ConnectionManager) reconnect() error {
connManager.channelMux.Lock()
defer connManager.channelMux.Unlock()
newConn, newChannel, err := getNewChannel(connManager.url, connManager.amqpConfig)
if err != nil {
return err
}
if err = connManager.channel.Close(); err != nil {
connManager.logger.Warnf("error closing channel while reconnecting: %v", err)
}
if err = connManager.connection.Close(); err != nil {
connManager.logger.Warnf("error closing connection while reconnecting: %v", err)
}
connManager.connection = newConn
connManager.channel = newChannel
return nil
}
// close safely closes the current channel and connection
func (connManager *ConnectionManager) close() error {
connManager.logger.Infof("closing connection manager...")
connManager.channelMux.Lock()
defer connManager.channelMux.Unlock()
err := connManager.channel.Close()
if err != nil {
return err
}
err = connManager.connection.Close()
if err != nil {
return err
}
return nil
}
// NotifyReconnect adds a new subscriber that will receive error messages whenever
// the connection manager has successfully reconnect to the server
func (connManager *ConnectionManager) NotifyReconnect() (<-chan error, chan<- struct{}) {
return connManager.dispatcher.addSubscriber()
}

+ 68
- 0
internal/connectionmanager/connection_manager_dispatch.go View File

@ -0,0 +1,68 @@
package connectionmanager
import (
"log"
"math"
"math/rand"
"sync"
"time"
)
type dispatcher struct {
subscribers map[int]dispatchSubscriber
subscribersMux *sync.Mutex
}
type dispatchSubscriber struct {
notifyCancelOrCloseChan chan error
closeCh <-chan struct{}
}
func newDispatcher() *dispatcher {
return &dispatcher{
subscribers: make(map[int]dispatchSubscriber),
subscribersMux: &sync.Mutex{},
}
}
func (d *dispatcher) dispatch(err error) error {
d.subscribersMux.Lock()
defer d.subscribersMux.Unlock()
for _, subscriber := range d.subscribers {
select {
case <-time.After(time.Second * 5):
log.Println("Unexpected rabbitmq error: timeout in dispatch")
case subscriber.notifyCancelOrCloseChan <- err:
}
}
return nil
}
func (d *dispatcher) addSubscriber() (<-chan error, chan<- struct{}) {
const maxRand = math.MaxInt64
const minRand = 0
id := rand.Intn(maxRand-minRand) + minRand
closeCh := make(chan struct{})
notifyCancelOrCloseChan := make(chan error)
d.subscribersMux.Lock()
d.subscribers[id] = dispatchSubscriber{
notifyCancelOrCloseChan: notifyCancelOrCloseChan,
closeCh: closeCh,
}
d.subscribersMux.Unlock()
go func(id int) {
<-closeCh
d.subscribersMux.Lock()
defer d.subscribersMux.Unlock()
sub, ok := d.subscribers[id]
if !ok {
return
}
close(sub.notifyCancelOrCloseChan)
delete(d.subscribers, id)
}(id)
return notifyCancelOrCloseChan, closeCh
}

+ 210
- 0
internal/connectionmanager/safe_wraps.go View File

@ -0,0 +1,210 @@
package connectionmanager
import (
amqp "github.com/rabbitmq/amqp091-go"
)
// ConsumeSafe safely wraps the (*amqp.Channel).Consume method
func (connManager *ConnectionManager) ConsumeSafe(
queue,
consumer string,
autoAck,
exclusive,
noLocal,
noWait bool,
args amqp.Table,
) (<-chan amqp.Delivery, error) {
connManager.channelMux.RLock()
defer connManager.channelMux.RUnlock()
return connManager.channel.Consume(
queue,
consumer,
autoAck,
exclusive,
noLocal,
noWait,
args,
)
}
// QueueDeclarePassiveSafe safely wraps the (*amqp.Channel).QueueDeclarePassive method
func (connManager *ConnectionManager) QueueDeclarePassiveSafe(
name string,
durable bool,
autoDelete bool,
exclusive bool,
noWait bool,
args amqp.Table,
) (amqp.Queue, error) {
connManager.channelMux.RLock()
defer connManager.channelMux.RUnlock()
return connManager.channel.QueueDeclarePassive(
name,
durable,
autoDelete,
exclusive,
noWait,
args,
)
}
// QueueDeclareSafe safely wraps the (*amqp.Channel).QueueDeclare method
func (connManager *ConnectionManager) QueueDeclareSafe(
name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table,
) (amqp.Queue, error) {
connManager.channelMux.RLock()
defer connManager.channelMux.RUnlock()
return connManager.channel.QueueDeclare(
name,
durable,
autoDelete,
exclusive,
noWait,
args,
)
}
// ExchangeDeclarePassiveSafe safely wraps the (*amqp.Channel).ExchangeDeclarePassive method
func (connManager *ConnectionManager) ExchangeDeclarePassiveSafe(
name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table,
) error {
connManager.channelMux.RLock()
defer connManager.channelMux.RUnlock()
return connManager.channel.ExchangeDeclarePassive(
name,
kind,
durable,
autoDelete,
internal,
noWait,
args,
)
}
// ExchangeDeclareSafe safely wraps the (*amqp.Channel).ExchangeDeclare method
func (connManager *ConnectionManager) ExchangeDeclareSafe(
name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table,
) error {
connManager.channelMux.RLock()
defer connManager.channelMux.RUnlock()
return connManager.channel.ExchangeDeclare(
name,
kind,
durable,
autoDelete,
internal,
noWait,
args,
)
}
// QueueBindSafe safely wraps the (*amqp.Channel).QueueBind method
func (connManager *ConnectionManager) QueueBindSafe(
name string, key string, exchange string, noWait bool, args amqp.Table,
) error {
connManager.channelMux.RLock()
defer connManager.channelMux.RUnlock()
return connManager.channel.QueueBind(
name,
key,
exchange,
noWait,
args,
)
}
// QosSafe safely wraps the (*amqp.Channel).Qos method
func (connManager *ConnectionManager) QosSafe(
prefetchCount int, prefetchSize int, global bool,
) error {
connManager.channelMux.RLock()
defer connManager.channelMux.RUnlock()
return connManager.channel.Qos(
prefetchCount,
prefetchSize,
global,
)
}
// PublishSafe safely wraps the (*amqp.Channel).Publish method
func (connManager *ConnectionManager) PublishSafe(
exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing,
) error {
connManager.channelMux.RLock()
defer connManager.channelMux.RUnlock()
return connManager.channel.Publish(
exchange,
key,
mandatory,
immediate,
msg,
)
}
// NotifyReturnSafe safely wraps the (*amqp.Channel).NotifyReturn method
func (connManager *ConnectionManager) NotifyReturnSafe(
c chan amqp.Return,
) chan amqp.Return {
connManager.channelMux.RLock()
defer connManager.channelMux.RUnlock()
return connManager.channel.NotifyReturn(
c,
)
}
// ConfirmSafe safely wraps the (*amqp.Channel).Confirm method
func (connManager *ConnectionManager) ConfirmSafe(
noWait bool,
) error {
connManager.channelMux.RLock()
defer connManager.channelMux.RUnlock()
return connManager.channel.Confirm(
noWait,
)
}
// NotifyPublishSafe safely wraps the (*amqp.Channel).NotifyPublish method
func (connManager *ConnectionManager) NotifyPublishSafe(
confirm chan amqp.Confirmation,
) chan amqp.Confirmation {
connManager.channelMux.RLock()
defer connManager.channelMux.RUnlock()
return connManager.channel.NotifyPublish(
confirm,
)
}
// NotifyFlowSafe safely wraps the (*amqp.Channel).NotifyFlow method
func (connManager *ConnectionManager) NotifyFlowSafe(
c chan bool,
) chan bool {
connManager.channelMux.RLock()
defer connManager.channelMux.RUnlock()
return connManager.channel.NotifyFlow(
c,
)
}
// NotifyBlockedSafe safely wraps the (*amqp.Connection).NotifyBlocked method
func (connManager *ConnectionManager) NotifyBlockedSafe(
receiver chan amqp.Blocking,
) chan amqp.Blocking {
connManager.channelMux.RLock()
defer connManager.channelMux.RUnlock()
return connManager.connection.NotifyBlocked(
receiver,
)
}

+ 12
- 0
internal/logger/logger.go View File

@ -0,0 +1,12 @@
package logger
// Logger is describes a logging structure. It can be set using
// WithPublisherOptionsLogger() or WithConsumerOptionsLogger().
type Logger interface {
Fatalf(string, ...interface{})
Errorf(string, ...interface{})
Warnf(string, ...interface{})
Infof(string, ...interface{})
Debugf(string, ...interface{})
Tracef(string, ...interface{})
}

+ 10
- 10
logger.go View File

@ -3,42 +3,42 @@ package rabbitmq
import (
"fmt"
"log"
"github.com/wagslane/go-rabbitmq/internal/logger"
)
// Logger is the interface to send logs to. It can be set using
// Logger is describes a logging structure. It can be set using
// WithPublisherOptionsLogger() or WithConsumerOptionsLogger().
type Logger interface {
Fatalf(string, ...interface{})
Errorf(string, ...interface{})
Warnf(string, ...interface{})
Infof(string, ...interface{})
Debugf(string, ...interface{})
Tracef(string, ...interface{})
}
type Logger logger.Logger
const loggingPrefix = "gorabbit"
// stdDebugLogger logs to stdout up to the `DebugF` level
type stdDebugLogger struct{}
// Fatalf -
func (l stdDebugLogger) Fatalf(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s FATAL: %s", loggingPrefix, format), v...)
}
// Errorf -
func (l stdDebugLogger) Errorf(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s ERROR: %s", loggingPrefix, format), v...)
}
// Warnf -
func (l stdDebugLogger) Warnf(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s WARN: %s", loggingPrefix, format), v...)
}
// Infof -
func (l stdDebugLogger) Infof(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s INFO: %s", loggingPrefix, format), v...)
}
// Debugf -
func (l stdDebugLogger) Debugf(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s DEBUG: %s", loggingPrefix, format), v...)
}
// Tracef -
func (l stdDebugLogger) Tracef(format string, v ...interface{}) {}

+ 19
- 71
publish.go View File

@ -1,11 +1,12 @@
package rabbitmq
import (
"errors"
"fmt"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/connectionmanager"
)
// DeliveryMode. Transient means higher throughput but messages will not be
@ -39,10 +40,9 @@ type Confirmation struct {
// Publisher allows you to publish messages safely across an open connection
type Publisher struct {
chManager *channelManager
notifyReturnChan chan Return
notifyPublishChan chan Confirmation
connManager *connectionmanager.ConnectionManager
reconnectErrCh <-chan error
closeConnectionToManagerCh chan<- struct{}
disablePublishDueToFlow bool
disablePublishDueToFlowMux *sync.RWMutex
@ -56,16 +56,7 @@ type Publisher struct {
// PublisherOptions are used to describe a publisher's configuration.
// Logger is a custom logging interface.
type PublisherOptions struct {
Logger Logger
ReconnectInterval time.Duration
}
// WithPublisherOptionsReconnectInterval sets the interval at which the publisher will
// attempt to reconnect to the rabbit server
func WithPublisherOptionsReconnectInterval(reconnectInterval time.Duration) func(options *PublisherOptions) {
return func(options *PublisherOptions) {
options.ReconnectInterval = reconnectInterval
}
Logger Logger
}
// WithPublisherOptionsLogging sets logging to true on the consumer options
@ -87,29 +78,27 @@ func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) {
// on the channel of Returns that you should setup a listener on.
// Flow controls are automatically handled as they are sent from the server, and publishing
// will fail with an error when the server is requesting a slowdown
func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptions)) (*Publisher, error) {
func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publisher, error) {
options := &PublisherOptions{
Logger: &stdDebugLogger{},
ReconnectInterval: time.Second * 5,
Logger: &stdDebugLogger{},
}
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
chManager, err := newChannelManager(url, config, options.Logger, options.ReconnectInterval)
if err != nil {
return nil, err
if conn.connectionManager == nil {
return nil, errors.New("connection manager can't be nil")
}
reconnectErrCh, closeCh := conn.connectionManager.NotifyReconnect()
publisher := &Publisher{
chManager: chManager,
connManager: conn.connectionManager,
reconnectErrCh: reconnectErrCh,
closeConnectionToManagerCh: closeCh,
disablePublishDueToFlow: false,
disablePublishDueToFlowMux: &sync.RWMutex{},
disablePublishDueToBlocked: false,
disablePublishDueToBlockedMux: &sync.RWMutex{},
options: *options,
notifyReturnChan: nil,
notifyPublishChan: nil,
}
go publisher.startNotifyFlowHandler()
@ -121,34 +110,13 @@ func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptio
}
func (publisher *Publisher) handleRestarts() {
for err := range publisher.chManager.notifyCancelOrClose {
for err := range publisher.reconnectErrCh {
publisher.options.Logger.Infof("successful publisher recovery from: %v", err)
go publisher.startNotifyFlowHandler()
go publisher.startNotifyBlockedHandler()
if publisher.notifyReturnChan != nil {
go publisher.startNotifyReturnHandler()
}
if publisher.notifyPublishChan != nil {
publisher.startNotifyPublishHandler()
}
}
}
// NotifyReturn registers a listener for basic.return methods.
// These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags.
func (publisher *Publisher) NotifyReturn() <-chan Return {
publisher.notifyReturnChan = make(chan Return)
go publisher.startNotifyReturnHandler()
return publisher.notifyReturnChan
}
// NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option
func (publisher *Publisher) NotifyPublish() <-chan Confirmation {
publisher.notifyPublishChan = make(chan Confirmation)
publisher.startNotifyPublishHandler()
return publisher.notifyPublishChan
}
// Publish publishes the provided data to the given routing keys over the connection
func (publisher *Publisher) Publish(
data []byte,
@ -193,7 +161,7 @@ func (publisher *Publisher) Publish(
message.AppId = options.AppID
// Actual publish.
err := publisher.chManager.channel.Publish(
err := publisher.connManager.PublishSafe(
options.Exchange,
routingKey,
options.Mandatory,
@ -209,27 +177,7 @@ func (publisher *Publisher) Publish(
// Close closes the publisher and releases resources
// The publisher should be discarded as it's not safe for re-use
func (publisher Publisher) Close() error {
publisher.chManager.logger.Infof("closing publisher...")
return publisher.chManager.close()
}
func (publisher *Publisher) startNotifyReturnHandler() {
returnAMQPCh := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1))
for ret := range returnAMQPCh {
publisher.notifyReturnChan <- Return{ret}
}
}
func (publisher *Publisher) startNotifyPublishHandler() {
publisher.chManager.channel.Confirm(false)
go func() {
publishAMQPCh := publisher.chManager.channel.NotifyPublish(make(chan amqp.Confirmation, 1))
for conf := range publishAMQPCh {
publisher.notifyPublishChan <- Confirmation{
Confirmation: conf,
ReconnectionCount: int(publisher.chManager.reconnectionCount),
}
}
}()
func (publisher *Publisher) Close() {
publisher.options.Logger.Infof("closing publisher...")
publisher.closeConnectionToManagerCh <- struct{}{}
}

+ 2
- 2
publish_flow_block.go View File

@ -5,7 +5,7 @@ import (
)
func (publisher *Publisher) startNotifyFlowHandler() {
notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool))
notifyFlowChan := publisher.connManager.NotifyFlowSafe(make(chan bool))
publisher.disablePublishDueToFlowMux.Lock()
publisher.disablePublishDueToFlow = false
publisher.disablePublishDueToFlowMux.Unlock()
@ -24,7 +24,7 @@ func (publisher *Publisher) startNotifyFlowHandler() {
}
func (publisher *Publisher) startNotifyBlockedHandler() {
blockings := publisher.chManager.connection.NotifyBlocked(make(chan amqp.Blocking))
blockings := publisher.connManager.NotifyBlockedSafe(make(chan amqp.Blocking))
publisher.disablePublishDueToBlockedMux.Lock()
publisher.disablePublishDueToBlocked = false
publisher.disablePublishDueToBlockedMux.Unlock()


Loading…
Cancel
Save