From 15bc96b68af0f4c0472b56c09e595cefe2e09e1c Mon Sep 17 00:00:00 2001 From: Thibault Leroy Date: Mon, 17 Jun 2024 12:39:08 +0200 Subject: [PATCH] feat(consumer): freeupResources --- consume.go | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/consume.go b/consume.go index f6815c9..2e36e2e 100644 --- a/consume.go +++ b/consume.go @@ -116,6 +116,7 @@ func (consumer *Consumer) Run(handler Handler) error { // Close cleans up resources and closes the consumer. // It waits for all handlers to finish before returning by default // (use WithConsumerOptionsForceShutdown option to disable this behavior). +// Use CloseWithContext to specify a context to cancel the handlers completion. // It does not close the connection manager, just the subscription // to the connection manager and the consuming goroutines. // Only call once. @@ -127,6 +128,11 @@ func (consumer *Consumer) Close() { } } + consumer.cleanupResources() + +} + +func (consumer *Consumer) cleanupResources() { consumer.isClosedMux.Lock() defer consumer.isClosedMux.Unlock() consumer.isClosed = true @@ -158,20 +164,7 @@ func (consumer *Consumer) CloseWithContext(ctx context.Context) { } } - consumer.isClosedMux.Lock() - defer consumer.isClosedMux.Unlock() - consumer.isClosed = true - // close the channel so that rabbitmq server knows that the - // consumer has been stopped. - err := consumer.chanManager.Close() - if err != nil { - consumer.options.Logger.Warnf("error while closing the channel: %v", err) - } - - consumer.options.Logger.Infof("closing consumer...") - go func() { - consumer.closeConnectionToManagerCh <- struct{}{} - }() + consumer.cleanupResources() } // startGoroutines declares the queue if it doesn't exist,