From 8929a4e9dca7bfdc5909d9a887bfa9e524b222cf Mon Sep 17 00:00:00 2001 From: Thibault Leroy Date: Mon, 17 Jun 2024 12:11:54 +0200 Subject: [PATCH 1/5] feat(consumer): graceful shutdown --- consume.go | 70 +++++++++++++++++++++++++++++++++++++++++++++ consumer_options.go | 8 ++++++ 2 files changed, 78 insertions(+) diff --git a/consume.go b/consume.go index 4517c5d..bc83ab9 100644 --- a/consume.go +++ b/consume.go @@ -1,6 +1,7 @@ package rabbitmq import ( + "context" "errors" "fmt" "sync" @@ -32,6 +33,7 @@ type Consumer struct { reconnectErrCh <-chan error closeConnectionToManagerCh chan<- struct{} options ConsumerOptions + handlerMux *sync.RWMutex isClosedMux *sync.RWMutex isClosed bool @@ -89,6 +91,14 @@ func (consumer *Consumer) Run(handler Handler) error { return err } + handler = func(d Delivery) (action Action) { + if !consumer.handlerMux.TryRLock() { + return NackRequeue + } + defer consumer.handlerMux.RUnlock() + return handler(d) + } + for err := range consumer.reconnectErrCh { consumer.options.Logger.Infof("successful consumer recovery from: %v", err) err = consumer.startGoroutines( @@ -104,10 +114,50 @@ func (consumer *Consumer) Run(handler Handler) error { } // Close cleans up resources and closes the consumer. +// It waits for all handlers to finish before returning by default +// (use WithConsumerOptionsForceShutdown option to disable this behavior). // It does not close the connection manager, just the subscription // to the connection manager and the consuming goroutines. // Only call once. func (consumer *Consumer) Close() { + if consumer.options.CloseGracefully { + err := consumer.waitForHandlers(context.Background()) + if err != nil { + consumer.options.Logger.Warnf("error while waiting for handler to finish: %v", err) + } + } + + consumer.isClosedMux.Lock() + defer consumer.isClosedMux.Unlock() + consumer.isClosed = true + // close the channel so that rabbitmq server knows that the + // consumer has been stopped. + err := consumer.chanManager.Close() + if err != nil { + consumer.options.Logger.Warnf("error while closing the channel: %v", err) + } + + consumer.options.Logger.Infof("closing consumer...") + go func() { + consumer.closeConnectionToManagerCh <- struct{}{} + }() +} + +// CloseWithContext cleans up resources and closes the consumer. +// It waits for all handlers to finish before returning +// (use WithConsumerOptionsForceShutdown option to disable this behavior). +// Use the context to cancel the handlers completion. +// CloseWithContext does not close the connection manager, just the subscription +// to the connection manager and the consuming goroutines. +// Only call once. +func (consumer *Consumer) CloseWithContext(ctx context.Context) { + if consumer.options.CloseGracefully { + err := consumer.waitForHandlers(context.Background()) + if err != nil { + consumer.options.Logger.Warnf("error while waiting for handler to finish: %v", err) + } + } + consumer.isClosedMux.Lock() defer consumer.isClosedMux.Unlock() consumer.isClosed = true @@ -211,3 +261,23 @@ func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOpti } consumer.options.Logger.Infof("rabbit consumer goroutine closed") } + +func (consumer *Consumer) waitForHandlers(ctx context.Context) error { + if ctx == nil { + ctx = context.Background() + } else if ctx.Err() != nil { + return ctx.Err() + } + c := make(chan struct{}) + go func() { + consumer.handlerMux.Lock() + defer consumer.handlerMux.Unlock() + close(c) + }() + select { + case <-ctx.Done(): + return ctx.Err() + case <-c: + return nil + } +} diff --git a/consumer_options.go b/consumer_options.go index 7de85cb..fff0ed7 100644 --- a/consumer_options.go +++ b/consumer_options.go @@ -28,6 +28,7 @@ func getDefaultConsumerOptions(queueName string) ConsumerOptions { }, ExchangeOptions: []ExchangeOptions{}, Concurrency: 1, + CloseGracefully: true, Logger: stdDebugLogger{}, QOSPrefetch: 10, QOSGlobal: false, @@ -64,6 +65,7 @@ func getDefaultBindingOptions() BindingOptions { type ConsumerOptions struct { RabbitConsumerOptions RabbitConsumerOptions QueueOptions QueueOptions + CloseGracefully bool ExchangeOptions []ExchangeOptions Concurrency int Logger logger.Logger @@ -311,6 +313,12 @@ func WithConsumerOptionsQOSGlobal(options *ConsumerOptions) { options.QOSGlobal = true } +// WithConsumerOptionsForceShutdown tells the consumer to not wait for +// the handlers to complete in consumer.Close +func WithConsumerOptionsForceShutdown(options *ConsumerOptions) { + options.CloseGracefully = false +} + // WithConsumerOptionsQueueQuorum sets the queue a quorum type, which means // multiple nodes in the cluster will have the messages distributed amongst them // for higher reliability From 2b8632fd72145b0139856465a6ed752d94ae73de Mon Sep 17 00:00:00 2001 From: Thibault Leroy Date: Mon, 17 Jun 2024 12:16:06 +0200 Subject: [PATCH 2/5] fix(consumer): use ctx in CloseWithContext --- consume.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consume.go b/consume.go index bc83ab9..f6815c9 100644 --- a/consume.go +++ b/consume.go @@ -152,7 +152,7 @@ func (consumer *Consumer) Close() { // Only call once. func (consumer *Consumer) CloseWithContext(ctx context.Context) { if consumer.options.CloseGracefully { - err := consumer.waitForHandlers(context.Background()) + err := consumer.waitForHandlers(ctx) if err != nil { consumer.options.Logger.Warnf("error while waiting for handler to finish: %v", err) } From 15bc96b68af0f4c0472b56c09e595cefe2e09e1c Mon Sep 17 00:00:00 2001 From: Thibault Leroy Date: Mon, 17 Jun 2024 12:39:08 +0200 Subject: [PATCH 3/5] feat(consumer): freeupResources --- consume.go | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/consume.go b/consume.go index f6815c9..2e36e2e 100644 --- a/consume.go +++ b/consume.go @@ -116,6 +116,7 @@ func (consumer *Consumer) Run(handler Handler) error { // Close cleans up resources and closes the consumer. // It waits for all handlers to finish before returning by default // (use WithConsumerOptionsForceShutdown option to disable this behavior). +// Use CloseWithContext to specify a context to cancel the handlers completion. // It does not close the connection manager, just the subscription // to the connection manager and the consuming goroutines. // Only call once. @@ -127,6 +128,11 @@ func (consumer *Consumer) Close() { } } + consumer.cleanupResources() + +} + +func (consumer *Consumer) cleanupResources() { consumer.isClosedMux.Lock() defer consumer.isClosedMux.Unlock() consumer.isClosed = true @@ -158,20 +164,7 @@ func (consumer *Consumer) CloseWithContext(ctx context.Context) { } } - consumer.isClosedMux.Lock() - defer consumer.isClosedMux.Unlock() - consumer.isClosed = true - // close the channel so that rabbitmq server knows that the - // consumer has been stopped. - err := consumer.chanManager.Close() - if err != nil { - consumer.options.Logger.Warnf("error while closing the channel: %v", err) - } - - consumer.options.Logger.Infof("closing consumer...") - go func() { - consumer.closeConnectionToManagerCh <- struct{}{} - }() + consumer.cleanupResources() } // startGoroutines declares the queue if it doesn't exist, From 3b40794c2c5dd7ab722c730bbc8b9dff0644f1d0 Mon Sep 17 00:00:00 2001 From: Thibault Leroy Date: Tue, 18 Jun 2024 08:43:10 +0200 Subject: [PATCH 4/5] feat(consumer): log graceful shutdown --- consume.go | 1 + 1 file changed, 1 insertion(+) diff --git a/consume.go b/consume.go index 2e36e2e..d4eeba1 100644 --- a/consume.go +++ b/consume.go @@ -122,6 +122,7 @@ func (consumer *Consumer) Run(handler Handler) error { // Only call once. func (consumer *Consumer) Close() { if consumer.options.CloseGracefully { + consumer.options.Logger.Infof("waiting for handlers to finish...") err := consumer.waitForHandlers(context.Background()) if err != nil { consumer.options.Logger.Warnf("error while waiting for handler to finish: %v", err) From 0e96881ecd6baa5115fb60d032345c59fbb9d756 Mon Sep 17 00:00:00 2001 From: Thibault Leroy Date: Tue, 18 Jun 2024 09:27:58 +0200 Subject: [PATCH 5/5] feat(consumer): naming --- consume.go | 16 ++++++++-------- consumer_options.go | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/consume.go b/consume.go index d4eeba1..ad80012 100644 --- a/consume.go +++ b/consume.go @@ -114,16 +114,16 @@ func (consumer *Consumer) Run(handler Handler) error { } // Close cleans up resources and closes the consumer. -// It waits for all handlers to finish before returning by default +// It waits for handler to finish before returning by default // (use WithConsumerOptionsForceShutdown option to disable this behavior). -// Use CloseWithContext to specify a context to cancel the handlers completion. +// Use CloseWithContext to specify a context to cancel the handler completion. // It does not close the connection manager, just the subscription // to the connection manager and the consuming goroutines. // Only call once. func (consumer *Consumer) Close() { if consumer.options.CloseGracefully { - consumer.options.Logger.Infof("waiting for handlers to finish...") - err := consumer.waitForHandlers(context.Background()) + consumer.options.Logger.Infof("waiting for handler to finish...") + err := consumer.waitForHandlerCompletion(context.Background()) if err != nil { consumer.options.Logger.Warnf("error while waiting for handler to finish: %v", err) } @@ -151,15 +151,15 @@ func (consumer *Consumer) cleanupResources() { } // CloseWithContext cleans up resources and closes the consumer. -// It waits for all handlers to finish before returning +// It waits for handler to finish before returning // (use WithConsumerOptionsForceShutdown option to disable this behavior). -// Use the context to cancel the handlers completion. +// Use the context to cancel the handler completion. // CloseWithContext does not close the connection manager, just the subscription // to the connection manager and the consuming goroutines. // Only call once. func (consumer *Consumer) CloseWithContext(ctx context.Context) { if consumer.options.CloseGracefully { - err := consumer.waitForHandlers(ctx) + err := consumer.waitForHandlerCompletion(ctx) if err != nil { consumer.options.Logger.Warnf("error while waiting for handler to finish: %v", err) } @@ -256,7 +256,7 @@ func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOpti consumer.options.Logger.Infof("rabbit consumer goroutine closed") } -func (consumer *Consumer) waitForHandlers(ctx context.Context) error { +func (consumer *Consumer) waitForHandlerCompletion(ctx context.Context) error { if ctx == nil { ctx = context.Background() } else if ctx.Err() != nil { diff --git a/consumer_options.go b/consumer_options.go index fff0ed7..aa87bdd 100644 --- a/consumer_options.go +++ b/consumer_options.go @@ -314,7 +314,7 @@ func WithConsumerOptionsQOSGlobal(options *ConsumerOptions) { } // WithConsumerOptionsForceShutdown tells the consumer to not wait for -// the handlers to complete in consumer.Close +// the handler to complete in consumer.Close func WithConsumerOptionsForceShutdown(options *ConsumerOptions) { options.CloseGracefully = false }