From 56e0b50b2b204c5d88c00e00d975a0c63f6b5418 Mon Sep 17 00:00:00 2001 From: WiRight Date: Wed, 14 Jun 2023 15:01:48 +0300 Subject: [PATCH] try fix goroutines leak --- consume.go | 36 ++++++++++++------- go.mod | 5 ++- go.sum | 2 ++ internal/channelmanager/channel_manager.go | 20 +++++++---- .../connectionmanager/connection_manager.go | 23 +++++++----- internal/dispatcher/dispatcher.go | 8 +++-- 6 files changed, 64 insertions(+), 30 deletions(-) diff --git a/consume.go b/consume.go index decfb61..e46a914 100644 --- a/consume.go +++ b/consume.go @@ -31,6 +31,7 @@ type Consumer struct { chanManager *channelmanager.ChannelManager reconnectErrCh <-chan error closeConnectionToManagerCh chan<- struct{} + notifyClosedChan <-chan error options ConsumerOptions isClosedMux *sync.RWMutex @@ -67,12 +68,13 @@ func NewConsumer( if err != nil { return nil, err } - reconnectErrCh, closeCh, _ := chanManager.NotifyReconnect() + reconnectErrCh, closeCh, notifyClosedChan := chanManager.NotifyReconnect() consumer := &Consumer{ chanManager: chanManager, reconnectErrCh: reconnectErrCh, closeConnectionToManagerCh: closeCh, + notifyClosedChan: notifyClosedChan, options: *options, isClosedMux: &sync.RWMutex{}, isClosed: false, @@ -82,6 +84,7 @@ func NewConsumer( handler, *options, ) + if err != nil { return nil, err } @@ -89,13 +92,16 @@ func NewConsumer( go func() { for err := range consumer.reconnectErrCh { consumer.options.Logger.Infof("successful consumer recovery from: %v", err) + err = consumer.startGoroutines( handler, *options, ) + if err != nil { consumer.options.Logger.Fatalf("error restarting consumer goroutines after cancel or close: %v", err) consumer.options.Logger.Fatalf("consumer closing, unable to recover") + return } } @@ -111,18 +117,18 @@ func NewConsumer( func (consumer *Consumer) Close() { consumer.isClosedMux.Lock() defer consumer.isClosedMux.Unlock() + consumer.isClosed = true + // close the channel so that rabbitmq server knows that the // consumer has been stopped. - err := consumer.chanManager.Close() - if err != nil { + if err := consumer.chanManager.Close(); err != nil { consumer.options.Logger.Warnf("error while closing the channel: %v", err) } consumer.options.Logger.Infof("closing consumer...") - go func() { - consumer.closeConnectionToManagerCh <- struct{}{} - }() + + close(consumer.closeConnectionToManagerCh) } // startGoroutines declares the queue if it doesn't exist, @@ -137,19 +143,20 @@ func (consumer *Consumer) startGoroutines( 0, options.QOSGlobal, ) + if err != nil { return fmt.Errorf("declare qos failed: %w", err) } - err = declareExchange(consumer.chanManager, options.ExchangeOptions) - if err != nil { + + if err = declareExchange(consumer.chanManager, options.ExchangeOptions); err != nil { return fmt.Errorf("declare exchange failed: %w", err) } - err = declareQueue(consumer.chanManager, options.QueueOptions) - if err != nil { + + if err = declareQueue(consumer.chanManager, options.QueueOptions); err != nil { return fmt.Errorf("declare queue failed: %w", err) } - err = declareBindings(consumer.chanManager, options) - if err != nil { + + if err = declareBindings(consumer.chanManager, options); err != nil { return fmt.Errorf("declare bindings failed: %w", err) } @@ -162,6 +169,7 @@ func (consumer *Consumer) startGoroutines( options.RabbitConsumerOptions.NoWait, tableToAMQPTable(options.RabbitConsumerOptions.Args), ) + if err != nil { return err } @@ -169,13 +177,16 @@ func (consumer *Consumer) startGoroutines( for i := 0; i < options.Concurrency; i++ { go handlerGoroutine(consumer, msgs, options, handler) } + consumer.options.Logger.Infof("Processing messages on %v goroutines", options.Concurrency) + return nil } func (consumer *Consumer) getIsClosed() bool { consumer.isClosedMux.RLock() defer consumer.isClosedMux.RUnlock() + return consumer.isClosed } @@ -208,5 +219,6 @@ func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOpti } } } + consumer.options.Logger.Infof("rabbit consumer goroutine closed") } diff --git a/go.mod b/go.mod index 3aba60f..bb9de09 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module github.com/DizoftTeam/go-rabbitmq go 1.20 -require github.com/rabbitmq/amqp091-go v1.8.0 +require ( + github.com/rabbitmq/amqp091-go v1.8.0 + github.com/wagslane/go-rabbitmq v0.12.3 +) diff --git a/go.sum b/go.sum index 787640a..98d70f1 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/wagslane/go-rabbitmq v0.12.3 h1:nHoW6SgwaGNTjNyHGhcZwdJGru2228RZTwucxqmgA9M= +github.com/wagslane/go-rabbitmq v0.12.3/go.mod h1:1sUJ53rrW2AIA7LEp8ymmmebHqqq8ksH/gXIfUP0I0s= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/channelmanager/channel_manager.go b/internal/channelmanager/channel_manager.go index 457750b..a5c3c48 100644 --- a/internal/channelmanager/channel_manager.go +++ b/internal/channelmanager/channel_manager.go @@ -73,6 +73,7 @@ func (chanManager *ChannelManager) startNotifyCancelOrClosed() { chanManager.reconnectLoop() chanManager.logger.Warnf("successfully reconnected to amqp server") chanManager.dispatcher.Dispatch(err) + chanManager.dispatcher.DispatchLooseConnection(err) } if err == nil { @@ -83,6 +84,7 @@ func (chanManager *ChannelManager) startNotifyCancelOrClosed() { chanManager.reconnectLoop() chanManager.logger.Warnf("successfully reconnected to amqp server after cancel") chanManager.dispatcher.Dispatch(errors.New(err)) + chanManager.dispatcher.DispatchLooseConnection(errors.New(err)) } } @@ -90,12 +92,14 @@ func (chanManager *ChannelManager) startNotifyCancelOrClosed() { func (chanManager *ChannelManager) GetReconnectionCount() uint { chanManager.reconnectionCountMux.Lock() defer chanManager.reconnectionCountMux.Unlock() + return chanManager.reconnectionCount } func (chanManager *ChannelManager) incrementReconnectionCount() { chanManager.reconnectionCountMux.Lock() defer chanManager.reconnectionCountMux.Unlock() + chanManager.reconnectionCount++ } @@ -103,13 +107,18 @@ func (chanManager *ChannelManager) incrementReconnectionCount() { func (chanManager *ChannelManager) reconnectLoop() { for { chanManager.logger.Infof("waiting %s seconds to attempt to reconnect to amqp server", chanManager.reconnectInterval) + time.Sleep(chanManager.reconnectInterval) + err := chanManager.reconnect() + if err != nil { chanManager.logger.Errorf("error reconnecting to amqp server: %v", err) } else { chanManager.incrementReconnectionCount() + go chanManager.startNotifyCancelOrClosed() + return } } @@ -119,7 +128,9 @@ func (chanManager *ChannelManager) reconnectLoop() { func (chanManager *ChannelManager) reconnect() error { chanManager.channelMux.Lock() defer chanManager.channelMux.Unlock() + newChannel, err := getNewChannel(chanManager.connManager) + if err != nil { return err } @@ -129,21 +140,18 @@ func (chanManager *ChannelManager) reconnect() error { } chanManager.channel = newChannel + return nil } // Close safely closes the current channel and connection func (chanManager *ChannelManager) Close() error { chanManager.logger.Infof("closing channel manager...") + chanManager.channelMux.Lock() defer chanManager.channelMux.Unlock() - err := chanManager.channel.Close() - if err != nil { - return err - } - - return nil + return chanManager.channel.Close() } // NotifyReconnect adds a new subscriber that will receive error messages whenever diff --git a/internal/connectionmanager/connection_manager.go b/internal/connectionmanager/connection_manager.go index 5d107eb..df31de1 100644 --- a/internal/connectionmanager/connection_manager.go +++ b/internal/connectionmanager/connection_manager.go @@ -24,10 +24,11 @@ type ConnectionManager struct { // NewConnectionManager creates a new connection manager func NewConnectionManager(url string, conf amqp.Config, log logger.Logger, reconnectInterval time.Duration) (*ConnectionManager, error) { - conn, err := amqp.DialConfig(url, amqp.Config(conf)) + conn, err := amqp.DialConfig(url, conf) if err != nil { return nil, err } + connManager := ConnectionManager{ logger: log, url: url, @@ -39,21 +40,20 @@ func NewConnectionManager(url string, conf amqp.Config, log logger.Logger, recon reconnectionCountMux: &sync.Mutex{}, dispatcher: dispatcher.NewDispatcher(), } + go connManager.startNotifyClose() + return &connManager, nil } // Close safely closes the current channel and connection func (connManager *ConnectionManager) Close() error { connManager.logger.Infof("closing connection manager...") + connManager.connectionMux.Lock() defer connManager.connectionMux.Unlock() - err := connManager.connection.Close() - if err != nil { - return err - } - return nil + return connManager.connection.Close() } // NotifyReconnect adds a new subscriber that will receive error messages whenever @@ -84,11 +84,11 @@ func (connManager *ConnectionManager) startNotifyClose() { if err != nil { connManager.logger.Errorf("attempting to reconnect to amqp server after connection close with error: %v", err) - connManager.dispatcher.DispathLooseConnection(err) + connManager.dispatcher.DispatchLooseConnection(err) connManager.reconnectLoop() connManager.logger.Warnf("successfully reconnected to amqp server") connManager.dispatcher.Dispatch(err) - connManager.dispatcher.DispathLooseConnection(nil) + connManager.dispatcher.DispatchLooseConnection(nil) } if err == nil { @@ -100,12 +100,14 @@ func (connManager *ConnectionManager) startNotifyClose() { func (connManager *ConnectionManager) GetReconnectionCount() uint { connManager.reconnectionCountMux.Lock() defer connManager.reconnectionCountMux.Unlock() + return connManager.reconnectionCount } func (connManager *ConnectionManager) incrementReconnectionCount() { connManager.reconnectionCountMux.Lock() defer connManager.reconnectionCountMux.Unlock() + connManager.reconnectionCount++ } @@ -113,7 +115,9 @@ func (connManager *ConnectionManager) incrementReconnectionCount() { func (connManager *ConnectionManager) reconnectLoop() { for { connManager.logger.Infof("waiting %s seconds to attempt to reconnect to amqp server", connManager.ReconnectInterval) + time.Sleep(connManager.ReconnectInterval) + err := connManager.reconnect() if err != nil { @@ -132,7 +136,8 @@ func (connManager *ConnectionManager) reconnectLoop() { func (connManager *ConnectionManager) reconnect() error { connManager.connectionMux.Lock() defer connManager.connectionMux.Unlock() - newConn, err := amqp.DialConfig(connManager.url, amqp.Config(connManager.amqpConfig)) + + newConn, err := amqp.DialConfig(connManager.url, connManager.amqpConfig) if err != nil { return err diff --git a/internal/dispatcher/dispatcher.go b/internal/dispatcher/dispatcher.go index c57ee51..39beadf 100644 --- a/internal/dispatcher/dispatcher.go +++ b/internal/dispatcher/dispatcher.go @@ -45,8 +45,8 @@ func (d *Dispatcher) Dispatch(err error) error { return nil } -// DispathLooseConnection dispatching that connection to RabbitMQ is loosed -func (d *Dispatcher) DispathLooseConnection(err error) error { +// DispatchLooseConnection dispatching that connection to RabbitMQ is loosed +func (d *Dispatcher) DispatchLooseConnection(err error) error { d.subscribersMux.Lock() defer d.subscribersMux.Unlock() @@ -79,12 +79,16 @@ func (d *Dispatcher) AddSubscriber() (<-chan error, chan<- struct{}, <-chan erro go func(id int) { <-closeCh + d.subscribersMux.Lock() defer d.subscribersMux.Unlock() + sub, ok := d.subscribers[id] + if !ok { return } + close(sub.notifyCancelOrCloseChan) delete(d.subscribers, id) }(id)