|
|
|
@ -114,16 +114,16 @@ func (consumer *Consumer) Run(handler Handler) error { |
|
|
|
} |
|
|
|
|
|
|
|
// Close cleans up resources and closes the consumer.
|
|
|
|
// It waits for all handlers to finish before returning by default
|
|
|
|
// It waits for handler to finish before returning by default
|
|
|
|
// (use WithConsumerOptionsForceShutdown option to disable this behavior).
|
|
|
|
// Use CloseWithContext to specify a context to cancel the handlers completion.
|
|
|
|
// Use CloseWithContext to specify a context to cancel the handler completion.
|
|
|
|
// It does not close the connection manager, just the subscription
|
|
|
|
// to the connection manager and the consuming goroutines.
|
|
|
|
// Only call once.
|
|
|
|
func (consumer *Consumer) Close() { |
|
|
|
if consumer.options.CloseGracefully { |
|
|
|
consumer.options.Logger.Infof("waiting for handlers to finish...") |
|
|
|
err := consumer.waitForHandlers(context.Background()) |
|
|
|
consumer.options.Logger.Infof("waiting for handler to finish...") |
|
|
|
err := consumer.waitForHandlerCompletion(context.Background()) |
|
|
|
if err != nil { |
|
|
|
consumer.options.Logger.Warnf("error while waiting for handler to finish: %v", err) |
|
|
|
} |
|
|
|
@ -151,15 +151,15 @@ func (consumer *Consumer) cleanupResources() { |
|
|
|
} |
|
|
|
|
|
|
|
// CloseWithContext cleans up resources and closes the consumer.
|
|
|
|
// It waits for all handlers to finish before returning
|
|
|
|
// It waits for handler to finish before returning
|
|
|
|
// (use WithConsumerOptionsForceShutdown option to disable this behavior).
|
|
|
|
// Use the context to cancel the handlers completion.
|
|
|
|
// Use the context to cancel the handler completion.
|
|
|
|
// CloseWithContext does not close the connection manager, just the subscription
|
|
|
|
// to the connection manager and the consuming goroutines.
|
|
|
|
// Only call once.
|
|
|
|
func (consumer *Consumer) CloseWithContext(ctx context.Context) { |
|
|
|
if consumer.options.CloseGracefully { |
|
|
|
err := consumer.waitForHandlers(ctx) |
|
|
|
err := consumer.waitForHandlerCompletion(ctx) |
|
|
|
if err != nil { |
|
|
|
consumer.options.Logger.Warnf("error while waiting for handler to finish: %v", err) |
|
|
|
} |
|
|
|
@ -256,7 +256,7 @@ func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOpti |
|
|
|
consumer.options.Logger.Infof("rabbit consumer goroutine closed") |
|
|
|
} |
|
|
|
|
|
|
|
func (consumer *Consumer) waitForHandlers(ctx context.Context) error { |
|
|
|
func (consumer *Consumer) waitForHandlerCompletion(ctx context.Context) error { |
|
|
|
if ctx == nil { |
|
|
|
ctx = context.Background() |
|
|
|
} else if ctx.Err() != nil { |
|
|
|
|