diff --git a/connection.go b/connection.go index ceadc52..97b8bb4 100644 --- a/connection.go +++ b/connection.go @@ -1,8 +1,6 @@ package rabbitmq import ( - "sync" - amqp "github.com/rabbitmq/amqp091-go" "github.com/wagslane/go-rabbitmq/internal/connectionmanager" ) @@ -14,10 +12,6 @@ type Conn struct { reconnectErrCh <-chan error closeConnectionToManagerCh chan<- struct{} - handlerMux *sync.Mutex - notifyReturnHandler func(r Return) - notifyPublishHandler func(p Confirmation) - options ConnectionOptions } @@ -40,23 +34,11 @@ func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error) return nil, err } - err = manager.QosSafe( - options.QOSPrefetch, - 0, - options.QOSGlobal, - ) - if err != nil { - return nil, err - } - reconnectErrCh, closeCh := manager.NotifyReconnect() conn := &Conn{ connectionManager: manager, reconnectErrCh: reconnectErrCh, closeConnectionToManagerCh: closeCh, - handlerMux: &sync.Mutex{}, - notifyReturnHandler: nil, - notifyPublishHandler: nil, options: *options, } @@ -67,61 +49,6 @@ func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error) func (conn *Conn) handleRestarts() { for err := range conn.reconnectErrCh { conn.options.Logger.Infof("successful connection recovery from: %v", err) - go conn.startReturnHandler() - go conn.startPublishHandler() - } -} - -// NotifyReturn registers a listener for basic.return methods. -// These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags. -// These notifications are shared across an entire connection, so if you're creating multiple -// publishers on the same connection keep that in mind -func (conn *Conn) NotifyReturn(handler func(r Return)) { - conn.handlerMux.Lock() - conn.notifyReturnHandler = handler - conn.handlerMux.Unlock() - - go conn.startReturnHandler() -} - -// NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option -// These notifications are shared across an entire connection, so if you're creating multiple -// publishers on the same connection keep that in mind -func (conn *Conn) NotifyPublish(handler func(p Confirmation)) { - conn.handlerMux.Lock() - conn.notifyPublishHandler = handler - conn.handlerMux.Unlock() - - go conn.startPublishHandler() -} - -func (conn *Conn) startReturnHandler() { - conn.handlerMux.Lock() - if conn.notifyReturnHandler == nil { - return - } - conn.handlerMux.Unlock() - - returns := conn.connectionManager.NotifyReturnSafe(make(chan amqp.Return, 1)) - for ret := range returns { - go conn.notifyReturnHandler(Return{ret}) - } -} - -func (conn *Conn) startPublishHandler() { - conn.handlerMux.Lock() - if conn.notifyPublishHandler == nil { - return - } - conn.handlerMux.Unlock() - - conn.connectionManager.ConfirmSafe(false) - confirmationCh := conn.connectionManager.NotifyPublishSafe(make(chan amqp.Confirmation, 1)) - for conf := range confirmationCh { - go conn.notifyPublishHandler(Confirmation{ - Confirmation: conf, - ReconnectionCount: int(conn.connectionManager.GetReconnectionCount()), - }) } } diff --git a/connection_options.go b/connection_options.go index b56194f..aa47a55 100644 --- a/connection_options.go +++ b/connection_options.go @@ -4,8 +4,6 @@ import "time" // ConnectionOptions are used to describe how a new consumer will be created. type ConnectionOptions struct { - QOSPrefetch int - QOSGlobal bool ReconnectInterval time.Duration Logger Logger Config Config @@ -14,30 +12,12 @@ type ConnectionOptions struct { // getDefaultConnectionOptions describes the options that will be used when a value isn't provided func getDefaultConnectionOptions() ConnectionOptions { return ConnectionOptions{ - QOSPrefetch: 0, - QOSGlobal: false, ReconnectInterval: time.Second * 5, Logger: stdDebugLogger{}, Config: Config{}, } } -// WithConnectionOptionsQOSPrefetch returns a function that sets the prefetch count, which means that -// many messages will be fetched from the server in advance to help with throughput. -// This doesn't affect the handler, messages are still processed one at a time. -func WithConnectionOptionsQOSPrefetch(prefetchCount int) func(*ConnectionOptions) { - return func(options *ConnectionOptions) { - options.QOSPrefetch = prefetchCount - } -} - -// WithConnectionOptionsQOSGlobal sets the qos on the channel to global, which means -// these QOS settings apply to ALL existing and future -// consumers on all channels on the same connection -func WithConnectionOptionsQOSGlobal(options *ConnectionOptions) { - options.QOSGlobal = true -} - // WithConnectionOptionsReconnectInterval sets the reconnection interval func WithConnectionOptionsReconnectInterval(interval time.Duration) func(options *ConnectionOptions) { return func(options *ConnectionOptions) { diff --git a/consume.go b/consume.go index 00852db..30c9a64 100644 --- a/consume.go +++ b/consume.go @@ -6,7 +6,7 @@ import ( "sync" amqp "github.com/rabbitmq/amqp091-go" - "github.com/wagslane/go-rabbitmq/internal/connectionmanager" + "github.com/wagslane/go-rabbitmq/internal/channelmanager" ) // Action is an action that occurs after processed this delivery @@ -26,7 +26,7 @@ const ( // Consumer allows you to create and connect to queues for data consumption. type Consumer struct { - connManager *connectionmanager.ConnectionManager + chanManager *channelmanager.ChannelManager reconnectErrCh <-chan error closeConnectionToManagerCh chan<- struct{} options ConsumerOptions @@ -60,10 +60,15 @@ func NewConsumer( if conn.connectionManager == nil { return nil, errors.New("connection manager can't be nil") } - reconnectErrCh, closeCh := conn.connectionManager.NotifyReconnect() + + chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval) + if err != nil { + return nil, err + } + reconnectErrCh, closeCh := chanManager.NotifyReconnect() consumer := &Consumer{ - connManager: conn.connectionManager, + chanManager: chanManager, reconnectErrCh: reconnectErrCh, closeConnectionToManagerCh: closeCh, options: *options, @@ -71,7 +76,7 @@ func NewConsumer( isClosed: false, } - err := consumer.startGoroutines( + err = consumer.startGoroutines( handler, *options, ) @@ -116,21 +121,28 @@ func (consumer *Consumer) startGoroutines( handler Handler, options ConsumerOptions, ) error { - - err := declareExchange(consumer.connManager, options.ExchangeOptions) + err := consumer.chanManager.QosSafe( + options.QOSPrefetch, + 0, + options.QOSGlobal, + ) + if err != nil { + return fmt.Errorf("declare qos failed: %w", err) + } + err = declareExchange(consumer.chanManager, options.ExchangeOptions) if err != nil { return fmt.Errorf("declare exchange failed: %w", err) } - err = declareQueue(consumer.connManager, options.QueueOptions) + err = declareQueue(consumer.chanManager, options.QueueOptions) if err != nil { return fmt.Errorf("declare queue failed: %w", err) } - err = declareBindings(consumer.connManager, options) + err = declareBindings(consumer.chanManager, options) if err != nil { return fmt.Errorf("declare bindings failed: %w", err) } - msgs, err := consumer.connManager.ConsumeSafe( + msgs, err := consumer.chanManager.ConsumeSafe( options.QueueOptions.Name, options.RabbitConsumerOptions.Name, options.RabbitConsumerOptions.AutoAck, diff --git a/consumer_options.go b/consumer_options.go index 71dabbb..42d4b96 100644 --- a/consumer_options.go +++ b/consumer_options.go @@ -40,6 +40,8 @@ func getDefaultConsumerOptions(queueName string) ConsumerOptions { Bindings: []Binding{}, Concurrency: 1, Logger: stdDebugLogger{}, + QOSPrefetch: 0, + QOSGlobal: false, } } @@ -62,6 +64,8 @@ type ConsumerOptions struct { Bindings []Binding Concurrency int Logger logger.Logger + QOSPrefetch int + QOSGlobal bool } // RabbitConsumerOptions are used to configure the consumer @@ -262,3 +266,19 @@ func WithConsumerOptionsLogger(log logger.Logger) func(options *ConsumerOptions) options.Logger = log } } + +// WithConsumerOptionsQOSPrefetch returns a function that sets the prefetch count, which means that +// many messages will be fetched from the server in advance to help with throughput. +// This doesn't affect the handler, messages are still processed one at a time. +func WithConsumerOptionsQOSPrefetch(prefetchCount int) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.QOSPrefetch = prefetchCount + } +} + +// WithConsumerOptionsQOSGlobal sets the qos on the channel to global, which means +// these QOS settings apply to ALL existing and future +// consumers on all channels on the same connection +func WithConsumerOptionsQOSGlobal(options *ConsumerOptions) { + options.QOSGlobal = true +} diff --git a/declare.go b/declare.go index 9462e14..86abe85 100644 --- a/declare.go +++ b/declare.go @@ -1,15 +1,15 @@ package rabbitmq import ( - "github.com/wagslane/go-rabbitmq/internal/connectionmanager" + "github.com/wagslane/go-rabbitmq/internal/channelmanager" ) -func declareQueue(connManager *connectionmanager.ConnectionManager, options QueueOptions) error { +func declareQueue(chanManager *channelmanager.ChannelManager, options QueueOptions) error { if !options.Declare { return nil } if options.Passive { - _, err := connManager.QueueDeclarePassiveSafe( + _, err := chanManager.QueueDeclarePassiveSafe( options.Name, options.Durable, options.AutoDelete, @@ -22,7 +22,7 @@ func declareQueue(connManager *connectionmanager.ConnectionManager, options Queu } return nil } - _, err := connManager.QueueDeclareSafe( + _, err := chanManager.QueueDeclareSafe( options.Name, options.Durable, options.AutoDelete, @@ -36,12 +36,12 @@ func declareQueue(connManager *connectionmanager.ConnectionManager, options Queu return nil } -func declareExchange(connManager *connectionmanager.ConnectionManager, options ExchangeOptions) error { +func declareExchange(chanManager *channelmanager.ChannelManager, options ExchangeOptions) error { if !options.Declare { return nil } if options.Passive { - err := connManager.ExchangeDeclarePassiveSafe( + err := chanManager.ExchangeDeclarePassiveSafe( options.Name, options.Kind, options.Durable, @@ -55,7 +55,7 @@ func declareExchange(connManager *connectionmanager.ConnectionManager, options E } return nil } - err := connManager.ExchangeDeclareSafe( + err := chanManager.ExchangeDeclareSafe( options.Name, options.Kind, options.Durable, @@ -70,12 +70,12 @@ func declareExchange(connManager *connectionmanager.ConnectionManager, options E return nil } -func declareBindings(connManager *connectionmanager.ConnectionManager, options ConsumerOptions) error { +func declareBindings(chanManager *channelmanager.ChannelManager, options ConsumerOptions) error { for _, binding := range options.Bindings { if !binding.Declare { continue } - err := connManager.QueueBindSafe( + err := chanManager.QueueBindSafe( options.QueueOptions.Name, binding.RoutingKey, options.ExchangeOptions.Name, diff --git a/examples/logger/main.go b/examples/logger/main.go index bb2a16f..8461ddb 100644 --- a/examples/logger/main.go +++ b/examples/logger/main.go @@ -41,10 +41,6 @@ func main() { } defer conn.Close() - conn.NotifyReturn(func(r rabbitmq.Return) { - log.Printf("message returned from server: %s", string(r.Body)) - }) - publisher, err := rabbitmq.NewPublisher( conn, rabbitmq.WithPublisherOptionsLogger(mylogger), @@ -63,4 +59,8 @@ func main() { if err != nil { log.Fatal(err) } + + publisher.NotifyReturn(func(r rabbitmq.Return) { + log.Printf("message returned from server: %s", string(r.Body)) + }) } diff --git a/examples/multipublisher/main.go b/examples/multipublisher/main.go index 4a9b5fb..3b9de0d 100644 --- a/examples/multipublisher/main.go +++ b/examples/multipublisher/main.go @@ -21,14 +21,6 @@ func main() { } defer conn.Close() - conn.NotifyReturn(func(r rabbitmq.Return) { - log.Printf("message returned from server: %s", string(r.Body)) - }) - - conn.NotifyPublish(func(c rabbitmq.Confirmation) { - log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack) - }) - publisher, err := rabbitmq.NewPublisher( conn, rabbitmq.WithPublisherOptionsLogging, @@ -40,6 +32,14 @@ func main() { } defer publisher.Close() + publisher.NotifyReturn(func(r rabbitmq.Return) { + log.Printf("message returned from server: %s", string(r.Body)) + }) + + publisher.NotifyPublish(func(c rabbitmq.Confirmation) { + log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack) + }) + publisher2, err := rabbitmq.NewPublisher( conn, rabbitmq.WithPublisherOptionsLogging, @@ -51,6 +51,14 @@ func main() { } defer publisher2.Close() + publisher2.NotifyReturn(func(r rabbitmq.Return) { + log.Printf("message returned from server: %s", string(r.Body)) + }) + + publisher2.NotifyPublish(func(c rabbitmq.Confirmation) { + log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack) + }) + // block main thread - wait for shutdown signal sigs := make(chan os.Signal, 1) done := make(chan bool, 1) diff --git a/examples/publisher/main.go b/examples/publisher/main.go index e0c9d40..988d781 100644 --- a/examples/publisher/main.go +++ b/examples/publisher/main.go @@ -21,14 +21,6 @@ func main() { } defer conn.Close() - conn.NotifyReturn(func(r rabbitmq.Return) { - log.Printf("message returned from server: %s", string(r.Body)) - }) - - conn.NotifyPublish(func(c rabbitmq.Confirmation) { - log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack) - }) - publisher, err := rabbitmq.NewPublisher( conn, rabbitmq.WithPublisherOptionsLogging, @@ -40,6 +32,14 @@ func main() { } defer publisher.Close() + publisher.NotifyReturn(func(r rabbitmq.Return) { + log.Printf("message returned from server: %s", string(r.Body)) + }) + + publisher.NotifyPublish(func(c rabbitmq.Confirmation) { + log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack) + }) + // block main thread - wait for shutdown signal sigs := make(chan os.Signal, 1) done := make(chan bool, 1) diff --git a/internal/channelmanager/channel_manager.go b/internal/channelmanager/channel_manager.go new file mode 100644 index 0000000..67faf0a --- /dev/null +++ b/internal/channelmanager/channel_manager.go @@ -0,0 +1,149 @@ +package channelmanager + +import ( + "errors" + "sync" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/wagslane/go-rabbitmq/internal/connectionmanager" + "github.com/wagslane/go-rabbitmq/internal/dispatcher" + "github.com/wagslane/go-rabbitmq/internal/logger" +) + +// ChannelManager - +type ChannelManager struct { + logger logger.Logger + channel *amqp.Channel + connManager *connectionmanager.ConnectionManager + channelMux *sync.RWMutex + reconnectInterval time.Duration + reconnectionCount uint + reconnectionCountMux *sync.Mutex + dispatcher *dispatcher.Dispatcher +} + +// NewChannelManager creates a new connection manager +func NewChannelManager(connManager *connectionmanager.ConnectionManager, log logger.Logger, reconnectInterval time.Duration) (*ChannelManager, error) { + ch, err := getNewChannel(connManager) + if err != nil { + return nil, err + } + + chanManager := ChannelManager{ + logger: log, + connManager: connManager, + channel: ch, + channelMux: &sync.RWMutex{}, + reconnectInterval: reconnectInterval, + reconnectionCount: 0, + reconnectionCountMux: &sync.Mutex{}, + dispatcher: dispatcher.NewDispatcher(), + } + go chanManager.startNotifyCancelOrClosed() + return &chanManager, nil +} + +func getNewChannel(connManager *connectionmanager.ConnectionManager) (*amqp.Channel, error) { + conn := connManager.CheckoutConnection() + defer connManager.CheckinConnection() + + ch, err := conn.Channel() + if err != nil { + return nil, err + } + return ch, nil +} + +// startNotifyCancelOrClosed listens on the channel's cancelled and closed +// notifiers. When it detects a problem, it attempts to reconnect. +// Once reconnected, it sends an error back on the manager's notifyCancelOrClose +// channel +func (chanManager *ChannelManager) startNotifyCancelOrClosed() { + notifyCloseChan := chanManager.channel.NotifyClose(make(chan *amqp.Error, 1)) + notifyCancelChan := chanManager.channel.NotifyCancel(make(chan string, 1)) + + select { + case err := <-notifyCloseChan: + if err != nil { + chanManager.logger.Errorf("attempting to reconnect to amqp server after close with error: %v", err) + chanManager.reconnectLoop() + chanManager.logger.Warnf("successfully reconnected to amqp server") + chanManager.dispatcher.Dispatch(err) + } + if err == nil { + chanManager.logger.Infof("amqp channel closed gracefully") + } + case err := <-notifyCancelChan: + chanManager.logger.Errorf("attempting to reconnect to amqp server after cancel with error: %s", err) + chanManager.reconnectLoop() + chanManager.logger.Warnf("successfully reconnected to amqp server after cancel") + chanManager.dispatcher.Dispatch(errors.New(err)) + } +} + +// GetReconnectionCount - +func (chanManager *ChannelManager) GetReconnectionCount() uint { + chanManager.reconnectionCountMux.Lock() + defer chanManager.reconnectionCountMux.Unlock() + return chanManager.reconnectionCount +} + +func (chanManager *ChannelManager) incrementReconnectionCount() { + chanManager.reconnectionCountMux.Lock() + defer chanManager.reconnectionCountMux.Unlock() + chanManager.reconnectionCount++ +} + +// reconnectLoop continuously attempts to reconnect +func (chanManager *ChannelManager) reconnectLoop() { + for { + chanManager.logger.Infof("waiting %s seconds to attempt to reconnect to amqp server", chanManager.reconnectInterval) + time.Sleep(chanManager.reconnectInterval) + err := chanManager.reconnect() + if err != nil { + chanManager.logger.Errorf("error reconnecting to amqp server: %v", err) + } else { + chanManager.incrementReconnectionCount() + go chanManager.startNotifyCancelOrClosed() + return + } + } +} + +// reconnect safely closes the current channel and obtains a new one +func (chanManager *ChannelManager) reconnect() error { + chanManager.channelMux.Lock() + defer chanManager.channelMux.Unlock() + newChannel, err := getNewChannel(chanManager.connManager) + if err != nil { + return err + } + + if err = chanManager.channel.Close(); err != nil { + chanManager.logger.Warnf("error closing channel while reconnecting: %v", err) + } + + chanManager.channel = newChannel + return nil +} + +// Close safely closes the current channel and connection +func (chanManager *ChannelManager) Close() error { + chanManager.logger.Infof("closing channel manager...") + chanManager.channelMux.Lock() + defer chanManager.channelMux.Unlock() + + err := chanManager.channel.Close() + if err != nil { + return err + } + + return nil +} + +// NotifyReconnect adds a new subscriber that will receive error messages whenever +// the connection manager has successfully reconnect to the server +func (chanManager *ChannelManager) NotifyReconnect() (<-chan error, chan<- struct{}) { + return chanManager.dispatcher.AddSubscriber() +} diff --git a/internal/channelmanager/safe_wraps.go b/internal/channelmanager/safe_wraps.go new file mode 100644 index 0000000..67c2101 --- /dev/null +++ b/internal/channelmanager/safe_wraps.go @@ -0,0 +1,198 @@ +package channelmanager + +import ( + amqp "github.com/rabbitmq/amqp091-go" +) + +// ConsumeSafe safely wraps the (*amqp.Channel).Consume method +func (chanManager *ChannelManager) ConsumeSafe( + queue, + consumer string, + autoAck, + exclusive, + noLocal, + noWait bool, + args amqp.Table, +) (<-chan amqp.Delivery, error) { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.Consume( + queue, + consumer, + autoAck, + exclusive, + noLocal, + noWait, + args, + ) +} + +// QueueDeclarePassiveSafe safely wraps the (*amqp.Channel).QueueDeclarePassive method +func (chanManager *ChannelManager) QueueDeclarePassiveSafe( + name string, + durable bool, + autoDelete bool, + exclusive bool, + noWait bool, + args amqp.Table, +) (amqp.Queue, error) { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.QueueDeclarePassive( + name, + durable, + autoDelete, + exclusive, + noWait, + args, + ) +} + +// QueueDeclareSafe safely wraps the (*amqp.Channel).QueueDeclare method +func (chanManager *ChannelManager) QueueDeclareSafe( + name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table, +) (amqp.Queue, error) { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.QueueDeclare( + name, + durable, + autoDelete, + exclusive, + noWait, + args, + ) +} + +// ExchangeDeclarePassiveSafe safely wraps the (*amqp.Channel).ExchangeDeclarePassive method +func (chanManager *ChannelManager) ExchangeDeclarePassiveSafe( + name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.ExchangeDeclarePassive( + name, + kind, + durable, + autoDelete, + internal, + noWait, + args, + ) +} + +// ExchangeDeclareSafe safely wraps the (*amqp.Channel).ExchangeDeclare method +func (chanManager *ChannelManager) ExchangeDeclareSafe( + name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.ExchangeDeclare( + name, + kind, + durable, + autoDelete, + internal, + noWait, + args, + ) +} + +// QueueBindSafe safely wraps the (*amqp.Channel).QueueBind method +func (chanManager *ChannelManager) QueueBindSafe( + name string, key string, exchange string, noWait bool, args amqp.Table, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.QueueBind( + name, + key, + exchange, + noWait, + args, + ) +} + +// QosSafe safely wraps the (*amqp.Channel).Qos method +func (chanManager *ChannelManager) QosSafe( + prefetchCount int, prefetchSize int, global bool, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.Qos( + prefetchCount, + prefetchSize, + global, + ) +} + +// PublishSafe safely wraps the (*amqp.Channel).Publish method +func (chanManager *ChannelManager) PublishSafe( + exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.Publish( + exchange, + key, + mandatory, + immediate, + msg, + ) +} + +// NotifyReturnSafe safely wraps the (*amqp.Channel).NotifyReturn method +func (chanManager *ChannelManager) NotifyReturnSafe( + c chan amqp.Return, +) chan amqp.Return { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.NotifyReturn( + c, + ) +} + +// ConfirmSafe safely wraps the (*amqp.Channel).Confirm method +func (chanManager *ChannelManager) ConfirmSafe( + noWait bool, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.Confirm( + noWait, + ) +} + +// NotifyPublishSafe safely wraps the (*amqp.Channel).NotifyPublish method +func (chanManager *ChannelManager) NotifyPublishSafe( + confirm chan amqp.Confirmation, +) chan amqp.Confirmation { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.NotifyPublish( + confirm, + ) +} + +// NotifyFlowSafe safely wraps the (*amqp.Channel).NotifyFlow method +func (chanManager *ChannelManager) NotifyFlowSafe( + c chan bool, +) chan bool { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.NotifyFlow( + c, + ) +} diff --git a/internal/connectionmanager/connection_manager.go b/internal/connectionmanager/connection_manager.go index 4489557..fce1f2b 100644 --- a/internal/connectionmanager/connection_manager.go +++ b/internal/connectionmanager/connection_manager.go @@ -1,11 +1,11 @@ package connectionmanager import ( - "errors" "sync" "time" amqp "github.com/rabbitmq/amqp091-go" + "github.com/wagslane/go-rabbitmq/internal/dispatcher" "github.com/wagslane/go-rabbitmq/internal/logger" ) @@ -13,74 +13,82 @@ import ( type ConnectionManager struct { logger logger.Logger url string - channel *amqp.Channel connection *amqp.Connection amqpConfig amqp.Config - channelMux *sync.RWMutex - reconnectInterval time.Duration + connectionMux *sync.RWMutex + ReconnectInterval time.Duration reconnectionCount uint reconnectionCountMux *sync.Mutex - dispatcher *dispatcher + dispatcher *dispatcher.Dispatcher } // NewConnectionManager creates a new connection manager func NewConnectionManager(url string, conf amqp.Config, log logger.Logger, reconnectInterval time.Duration) (*ConnectionManager, error) { - conn, ch, err := getNewChannel(url, conf) + conn, err := amqp.DialConfig(url, amqp.Config(conf)) if err != nil { return nil, err } - connManager := ConnectionManager{ logger: log, url: url, connection: conn, - channel: ch, - channelMux: &sync.RWMutex{}, amqpConfig: conf, - reconnectInterval: reconnectInterval, + connectionMux: &sync.RWMutex{}, + ReconnectInterval: reconnectInterval, reconnectionCount: 0, reconnectionCountMux: &sync.Mutex{}, - dispatcher: newDispatcher(), + dispatcher: dispatcher.NewDispatcher(), } - go connManager.startNotifyCancelOrClosed() + go connManager.startNotifyClose() return &connManager, nil } -func getNewChannel(url string, conf amqp.Config) (*amqp.Connection, *amqp.Channel, error) { - amqpConn, err := amqp.DialConfig(url, amqp.Config(conf)) - if err != nil { - return nil, nil, err - } - ch, err := amqpConn.Channel() +// Close safely closes the current channel and connection +func (connManager *ConnectionManager) Close() error { + connManager.logger.Infof("closing connection manager...") + connManager.connectionMux.Lock() + defer connManager.connectionMux.Unlock() + + err := connManager.connection.Close() if err != nil { - return nil, nil, err + return err } - return amqpConn, ch, nil + return nil +} + +// NotifyReconnect adds a new subscriber that will receive error messages whenever +// the connection manager has successfully reconnected to the server +func (connManager *ConnectionManager) NotifyReconnect() (<-chan error, chan<- struct{}) { + return connManager.dispatcher.AddSubscriber() +} + +// CheckoutConnection - +func (connManager *ConnectionManager) CheckoutConnection() *amqp.Connection { + connManager.connectionMux.RLock() + return connManager.connection +} + +// CheckinConnection - +func (connManager *ConnectionManager) CheckinConnection() { + connManager.connectionMux.RUnlock() } // startNotifyCancelOrClosed listens on the channel's cancelled and closed // notifiers. When it detects a problem, it attempts to reconnect. // Once reconnected, it sends an error back on the manager's notifyCancelOrClose // channel -func (connManager *ConnectionManager) startNotifyCancelOrClosed() { - notifyCloseChan := connManager.channel.NotifyClose(make(chan *amqp.Error, 1)) - notifyCancelChan := connManager.channel.NotifyCancel(make(chan string, 1)) - select { - case err := <-notifyCloseChan: - if err != nil { - connManager.logger.Errorf("attempting to reconnect to amqp server after close with error: %v", err) - connManager.reconnectLoop() - connManager.logger.Warnf("successfully reconnected to amqp server") - connManager.dispatcher.dispatch(err) - } - if err == nil { - connManager.logger.Infof("amqp channel closed gracefully") - } - case err := <-notifyCancelChan: - connManager.logger.Errorf("attempting to reconnect to amqp server after cancel with error: %s", err) +func (connManager *ConnectionManager) startNotifyClose() { + notifyCloseChan := connManager.connection.NotifyClose(make(chan *amqp.Error, 1)) + + err := <-notifyCloseChan + if err != nil { + connManager.logger.Errorf("attempting to reconnect to amqp server after connection close with error: %v", err) connManager.reconnectLoop() - connManager.logger.Warnf("successfully reconnected to amqp server after cancel") - connManager.dispatcher.dispatch(errors.New(err)) + connManager.logger.Warnf("successfully reconnected to amqp server") + connManager.dispatcher.Dispatch(err) + } + if err == nil { + connManager.logger.Infof("amqp connection closed gracefully") } } @@ -100,14 +108,14 @@ func (connManager *ConnectionManager) incrementReconnectionCount() { // reconnectLoop continuously attempts to reconnect func (connManager *ConnectionManager) reconnectLoop() { for { - connManager.logger.Infof("waiting %s seconds to attempt to reconnect to amqp server", connManager.reconnectInterval) - time.Sleep(connManager.reconnectInterval) + connManager.logger.Infof("waiting %s seconds to attempt to reconnect to amqp server", connManager.ReconnectInterval) + time.Sleep(connManager.ReconnectInterval) err := connManager.reconnect() if err != nil { connManager.logger.Errorf("error reconnecting to amqp server: %v", err) } else { connManager.incrementReconnectionCount() - go connManager.startNotifyCancelOrClosed() + go connManager.startNotifyClose() return } } @@ -115,46 +123,17 @@ func (connManager *ConnectionManager) reconnectLoop() { // reconnect safely closes the current channel and obtains a new one func (connManager *ConnectionManager) reconnect() error { - connManager.channelMux.Lock() - defer connManager.channelMux.Unlock() - newConn, newChannel, err := getNewChannel(connManager.url, connManager.amqpConfig) + connManager.connectionMux.Lock() + defer connManager.connectionMux.Unlock() + newConn, err := amqp.DialConfig(connManager.url, amqp.Config(connManager.amqpConfig)) if err != nil { return err } - if err = connManager.channel.Close(); err != nil { - connManager.logger.Warnf("error closing channel while reconnecting: %v", err) - } - if err = connManager.connection.Close(); err != nil { connManager.logger.Warnf("error closing connection while reconnecting: %v", err) } connManager.connection = newConn - connManager.channel = newChannel return nil } - -// Close safely closes the current channel and connection -func (connManager *ConnectionManager) Close() error { - connManager.logger.Infof("closing connection manager...") - connManager.channelMux.Lock() - defer connManager.channelMux.Unlock() - - err := connManager.channel.Close() - if err != nil { - return err - } - - err = connManager.connection.Close() - if err != nil { - return err - } - return nil -} - -// NotifyReconnect adds a new subscriber that will receive error messages whenever -// the connection manager has successfully reconnect to the server -func (connManager *ConnectionManager) NotifyReconnect() (<-chan error, chan<- struct{}) { - return connManager.dispatcher.addSubscriber() -} diff --git a/internal/connectionmanager/safe_wraps.go b/internal/connectionmanager/safe_wraps.go index fc18ed6..b6702af 100644 --- a/internal/connectionmanager/safe_wraps.go +++ b/internal/connectionmanager/safe_wraps.go @@ -4,205 +4,12 @@ import ( amqp "github.com/rabbitmq/amqp091-go" ) -// ConsumeSafe safely wraps the (*amqp.Channel).Consume method -func (connManager *ConnectionManager) ConsumeSafe( - queue, - consumer string, - autoAck, - exclusive, - noLocal, - noWait bool, - args amqp.Table, -) (<-chan amqp.Delivery, error) { - connManager.channelMux.RLock() - defer connManager.channelMux.RUnlock() - - return connManager.channel.Consume( - queue, - consumer, - autoAck, - exclusive, - noLocal, - noWait, - args, - ) -} - -// QueueDeclarePassiveSafe safely wraps the (*amqp.Channel).QueueDeclarePassive method -func (connManager *ConnectionManager) QueueDeclarePassiveSafe( - name string, - durable bool, - autoDelete bool, - exclusive bool, - noWait bool, - args amqp.Table, -) (amqp.Queue, error) { - connManager.channelMux.RLock() - defer connManager.channelMux.RUnlock() - - return connManager.channel.QueueDeclarePassive( - name, - durable, - autoDelete, - exclusive, - noWait, - args, - ) -} - -// QueueDeclareSafe safely wraps the (*amqp.Channel).QueueDeclare method -func (connManager *ConnectionManager) QueueDeclareSafe( - name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table, -) (amqp.Queue, error) { - connManager.channelMux.RLock() - defer connManager.channelMux.RUnlock() - - return connManager.channel.QueueDeclare( - name, - durable, - autoDelete, - exclusive, - noWait, - args, - ) -} - -// ExchangeDeclarePassiveSafe safely wraps the (*amqp.Channel).ExchangeDeclarePassive method -func (connManager *ConnectionManager) ExchangeDeclarePassiveSafe( - name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table, -) error { - connManager.channelMux.RLock() - defer connManager.channelMux.RUnlock() - - return connManager.channel.ExchangeDeclarePassive( - name, - kind, - durable, - autoDelete, - internal, - noWait, - args, - ) -} - -// ExchangeDeclareSafe safely wraps the (*amqp.Channel).ExchangeDeclare method -func (connManager *ConnectionManager) ExchangeDeclareSafe( - name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table, -) error { - connManager.channelMux.RLock() - defer connManager.channelMux.RUnlock() - - return connManager.channel.ExchangeDeclare( - name, - kind, - durable, - autoDelete, - internal, - noWait, - args, - ) -} - -// QueueBindSafe safely wraps the (*amqp.Channel).QueueBind method -func (connManager *ConnectionManager) QueueBindSafe( - name string, key string, exchange string, noWait bool, args amqp.Table, -) error { - connManager.channelMux.RLock() - defer connManager.channelMux.RUnlock() - - return connManager.channel.QueueBind( - name, - key, - exchange, - noWait, - args, - ) -} - -// QosSafe safely wraps the (*amqp.Channel).Qos method -func (connManager *ConnectionManager) QosSafe( - prefetchCount int, prefetchSize int, global bool, -) error { - connManager.channelMux.RLock() - defer connManager.channelMux.RUnlock() - - return connManager.channel.Qos( - prefetchCount, - prefetchSize, - global, - ) -} - -// PublishSafe safely wraps the (*amqp.Channel).Publish method -func (connManager *ConnectionManager) PublishSafe( - exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing, -) error { - connManager.channelMux.RLock() - defer connManager.channelMux.RUnlock() - - return connManager.channel.Publish( - exchange, - key, - mandatory, - immediate, - msg, - ) -} - -// NotifyReturnSafe safely wraps the (*amqp.Channel).NotifyReturn method -func (connManager *ConnectionManager) NotifyReturnSafe( - c chan amqp.Return, -) chan amqp.Return { - connManager.channelMux.RLock() - defer connManager.channelMux.RUnlock() - - return connManager.channel.NotifyReturn( - c, - ) -} - -// ConfirmSafe safely wraps the (*amqp.Channel).Confirm method -func (connManager *ConnectionManager) ConfirmSafe( - noWait bool, -) error { - connManager.channelMux.RLock() - defer connManager.channelMux.RUnlock() - - return connManager.channel.Confirm( - noWait, - ) -} - -// NotifyPublishSafe safely wraps the (*amqp.Channel).NotifyPublish method -func (connManager *ConnectionManager) NotifyPublishSafe( - confirm chan amqp.Confirmation, -) chan amqp.Confirmation { - connManager.channelMux.RLock() - defer connManager.channelMux.RUnlock() - - return connManager.channel.NotifyPublish( - confirm, - ) -} - -// NotifyFlowSafe safely wraps the (*amqp.Channel).NotifyFlow method -func (connManager *ConnectionManager) NotifyFlowSafe( - c chan bool, -) chan bool { - connManager.channelMux.RLock() - defer connManager.channelMux.RUnlock() - - return connManager.channel.NotifyFlow( - c, - ) -} - // NotifyBlockedSafe safely wraps the (*amqp.Connection).NotifyBlocked method func (connManager *ConnectionManager) NotifyBlockedSafe( receiver chan amqp.Blocking, ) chan amqp.Blocking { - connManager.channelMux.RLock() - defer connManager.channelMux.RUnlock() + connManager.connectionMux.RLock() + defer connManager.connectionMux.RUnlock() return connManager.connection.NotifyBlocked( receiver, diff --git a/internal/connectionmanager/connection_manager_dispatch.go b/internal/dispatcher/dispatcher.go similarity index 81% rename from internal/connectionmanager/connection_manager_dispatch.go rename to internal/dispatcher/dispatcher.go index 5ab0133..24ae53f 100644 --- a/internal/connectionmanager/connection_manager_dispatch.go +++ b/internal/dispatcher/dispatcher.go @@ -1,4 +1,4 @@ -package connectionmanager +package dispatcher import ( "log" @@ -8,7 +8,8 @@ import ( "time" ) -type dispatcher struct { +// Dispatcher - +type Dispatcher struct { subscribers map[int]dispatchSubscriber subscribersMux *sync.Mutex } @@ -18,14 +19,16 @@ type dispatchSubscriber struct { closeCh <-chan struct{} } -func newDispatcher() *dispatcher { - return &dispatcher{ +// NewDispatcher - +func NewDispatcher() *Dispatcher { + return &Dispatcher{ subscribers: make(map[int]dispatchSubscriber), subscribersMux: &sync.Mutex{}, } } -func (d *dispatcher) dispatch(err error) error { +// Dispatch - +func (d *Dispatcher) Dispatch(err error) error { d.subscribersMux.Lock() defer d.subscribersMux.Unlock() for _, subscriber := range d.subscribers { @@ -38,7 +41,8 @@ func (d *dispatcher) dispatch(err error) error { return nil } -func (d *dispatcher) addSubscriber() (<-chan error, chan<- struct{}) { +// AddSubscriber - +func (d *Dispatcher) AddSubscriber() (<-chan error, chan<- struct{}) { const maxRand = math.MaxInt64 const minRand = 0 id := rand.Intn(maxRand-minRand) + minRand diff --git a/publish.go b/publish.go index b52b7f9..b361f84 100644 --- a/publish.go +++ b/publish.go @@ -6,6 +6,7 @@ import ( "sync" amqp "github.com/rabbitmq/amqp091-go" + "github.com/wagslane/go-rabbitmq/internal/channelmanager" "github.com/wagslane/go-rabbitmq/internal/connectionmanager" ) @@ -40,6 +41,7 @@ type Confirmation struct { // Publisher allows you to publish messages safely across an open connection type Publisher struct { + chanManager *channelmanager.ChannelManager connManager *connectionmanager.ConnectionManager reconnectErrCh <-chan error closeConnectionToManagerCh chan<- struct{} @@ -50,6 +52,10 @@ type Publisher struct { disablePublishDueToBlocked bool disablePublishDueToBlockedMux *sync.RWMutex + handlerMux *sync.Mutex + notifyReturnHandler func(r Return) + notifyPublishHandler func(p Confirmation) + options PublisherOptions } @@ -68,8 +74,15 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe if conn.connectionManager == nil { return nil, errors.New("connection manager can't be nil") } - reconnectErrCh, closeCh := conn.connectionManager.NotifyReconnect() + + chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval) + if err != nil { + return nil, err + } + + reconnectErrCh, closeCh := chanManager.NotifyReconnect() publisher := &Publisher{ + chanManager: chanManager, connManager: conn.connectionManager, reconnectErrCh: reconnectErrCh, closeConnectionToManagerCh: closeCh, @@ -77,10 +90,13 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe disablePublishDueToFlowMux: &sync.RWMutex{}, disablePublishDueToBlocked: false, disablePublishDueToBlockedMux: &sync.RWMutex{}, + handlerMux: &sync.Mutex{}, + notifyReturnHandler: nil, + notifyPublishHandler: nil, options: *options, } - err := publisher.startup() + err = publisher.startup() if err != nil { return nil, err } @@ -91,7 +107,7 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe } func (publisher *Publisher) startup() error { - err := declareExchange(publisher.connManager, publisher.options.ExchangeOptions) + err := declareExchange(publisher.chanManager, publisher.options.ExchangeOptions) if err != nil { return fmt.Errorf("declare exchange failed: %w", err) } @@ -108,6 +124,8 @@ func (publisher *Publisher) handleRestarts() { publisher.options.Logger.Infof("failed to startup publisher: %v", err) continue } + go publisher.startReturnHandler() + go publisher.startPublishHandler() } } @@ -155,7 +173,7 @@ func (publisher *Publisher) Publish( message.AppId = options.AppID // Actual publish. - err := publisher.connManager.PublishSafe( + err := publisher.chanManager.PublishSafe( options.Exchange, routingKey, options.Mandatory, @@ -178,3 +196,64 @@ func (publisher *Publisher) Close() { publisher.closeConnectionToManagerCh <- struct{}{} }() } + +// NotifyReturn registers a listener for basic.return methods. +// These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags. +// These notifications are shared across an entire connection, so if you're creating multiple +// publishers on the same connection keep that in mind +func (publisher *Publisher) NotifyReturn(handler func(r Return)) { + publisher.handlerMux.Lock() + start := publisher.notifyReturnHandler == nil + publisher.notifyReturnHandler = handler + publisher.handlerMux.Unlock() + + if start { + go publisher.startReturnHandler() + } +} + +// NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option +// These notifications are shared across an entire connection, so if you're creating multiple +// publishers on the same connection keep that in mind +func (publisher *Publisher) NotifyPublish(handler func(p Confirmation)) { + publisher.handlerMux.Lock() + start := publisher.notifyPublishHandler == nil + publisher.notifyPublishHandler = handler + publisher.handlerMux.Unlock() + + if start { + go publisher.startPublishHandler() + } +} + +func (publisher *Publisher) startReturnHandler() { + publisher.handlerMux.Lock() + if publisher.notifyReturnHandler == nil { + publisher.handlerMux.Unlock() + return + } + publisher.handlerMux.Unlock() + + returns := publisher.chanManager.NotifyReturnSafe(make(chan amqp.Return, 1)) + for ret := range returns { + go publisher.notifyReturnHandler(Return{ret}) + } +} + +func (publisher *Publisher) startPublishHandler() { + publisher.handlerMux.Lock() + if publisher.notifyPublishHandler == nil { + publisher.handlerMux.Unlock() + return + } + publisher.handlerMux.Unlock() + + publisher.chanManager.ConfirmSafe(false) + confirmationCh := publisher.chanManager.NotifyPublishSafe(make(chan amqp.Confirmation, 1)) + for conf := range confirmationCh { + go publisher.notifyPublishHandler(Confirmation{ + Confirmation: conf, + ReconnectionCount: int(publisher.chanManager.GetReconnectionCount()), + }) + } +} diff --git a/publish_flow_block.go b/publish_flow_block.go index 0bb82c1..5033037 100644 --- a/publish_flow_block.go +++ b/publish_flow_block.go @@ -5,7 +5,7 @@ import ( ) func (publisher *Publisher) startNotifyFlowHandler() { - notifyFlowChan := publisher.connManager.NotifyFlowSafe(make(chan bool)) + notifyFlowChan := publisher.chanManager.NotifyFlowSafe(make(chan bool)) publisher.disablePublishDueToFlowMux.Lock() publisher.disablePublishDueToFlow = false publisher.disablePublishDueToFlowMux.Unlock()