From e5c898d9c8dc3189175e5e9290fa5dcf3472a1a8 Mon Sep 17 00:00:00 2001 From: Ben Meier Date: Wed, 3 Jan 2024 10:41:39 +0000 Subject: [PATCH] feat: added consumer.WaitForHandlers Signed-off-by: Ben Meier --- consume.go | 38 ++++++++++++++++++++++++++++++++++++++ examples/consumer/main.go | 9 ++++++++- 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/consume.go b/consume.go index d1f802f..fd57c00 100644 --- a/consume.go +++ b/consume.go @@ -1,11 +1,13 @@ package rabbitmq import ( + "context" "errors" "fmt" "sync" amqp "github.com/rabbitmq/amqp091-go" + "github.com/wagslane/go-rabbitmq/internal/channelmanager" ) @@ -33,6 +35,10 @@ type Consumer struct { closeConnectionToManagerCh chan<- struct{} options ConsumerOptions + // the handlerMux wraps handler execution in a read-lock and allows the write-lock to be used to wait for + // all handlers to complete their current messages. + handlerMux *sync.RWMutex + isClosedMux *sync.RWMutex isClosed bool } @@ -78,6 +84,14 @@ func NewConsumer( isClosed: false, } + handler = func(d Delivery) (action Action) { + if !consumer.handlerMux.TryRLock() { + return NackRequeue + } + defer consumer.handlerMux.RUnlock() + return handler(d) + } + err = consumer.startGoroutines( handler, *options, @@ -125,6 +139,30 @@ func (consumer *Consumer) Close() { }() } +// WaitForHandlers will wait for all handler goroutines to finish processing their current messages and may issue +// NackRequeue for any remaining messages if auto-ack is not enabled. So this should only be called after Close has +// called or may be used as a circuit breaker to interrupt processing. +// If the context is cancelled before the handlers finish, the error from the context will be returned. +func (consumer *Consumer) WaitForHandlers(ctx context.Context) error { + if ctx == nil { + ctx = context.Background() + } else if ctx.Err() != nil { + return ctx.Err() + } + c := make(chan struct{}) + go func() { + consumer.handlerMux.Lock() + defer consumer.handlerMux.Unlock() + close(c) + }() + select { + case <-ctx.Done(): + return ctx.Err() + case <-c: + return nil + } +} + // startGoroutines declares the queue if it doesn't exist, // binds the queue to the routing key(s), and starts the goroutines // that will consume from the queue diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 7c68733..1bcdc61 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "log" "os" @@ -35,7 +36,13 @@ func main() { if err != nil { log.Fatal(err) } - defer consumer.Close() + defer func() { + consumer.Close() + + // We can optionally wait for any long-running handlers to finish cleanly and avoid stopping the process + // while handlers are still closing files or performing actions that are vulnerable to corruption. + _ = consumer.WaitForHandlers(context.Background()) + }() // block main thread - wait for shutdown signal sigs := make(chan os.Signal, 1)