Browse Source

feat(consumer): graceful shutdown

pull/166/head
Thibault Leroy 2 years ago
parent
commit
8929a4e9dc
2 changed files with 78 additions and 0 deletions
  1. +70
    -0
      consume.go
  2. +8
    -0
      consumer_options.go

+ 70
- 0
consume.go View File

@ -1,6 +1,7 @@
package rabbitmq
import (
"context"
"errors"
"fmt"
"sync"
@ -32,6 +33,7 @@ type Consumer struct {
reconnectErrCh <-chan error
closeConnectionToManagerCh chan<- struct{}
options ConsumerOptions
handlerMux *sync.RWMutex
isClosedMux *sync.RWMutex
isClosed bool
@ -89,6 +91,14 @@ func (consumer *Consumer) Run(handler Handler) error {
return err
}
handler = func(d Delivery) (action Action) {
if !consumer.handlerMux.TryRLock() {
return NackRequeue
}
defer consumer.handlerMux.RUnlock()
return handler(d)
}
for err := range consumer.reconnectErrCh {
consumer.options.Logger.Infof("successful consumer recovery from: %v", err)
err = consumer.startGoroutines(
@ -104,10 +114,50 @@ func (consumer *Consumer) Run(handler Handler) error {
}
// Close cleans up resources and closes the consumer.
// It waits for all handlers to finish before returning by default
// (use WithConsumerOptionsForceShutdown option to disable this behavior).
// It does not close the connection manager, just the subscription
// to the connection manager and the consuming goroutines.
// Only call once.
func (consumer *Consumer) Close() {
if consumer.options.CloseGracefully {
err := consumer.waitForHandlers(context.Background())
if err != nil {
consumer.options.Logger.Warnf("error while waiting for handler to finish: %v", err)
}
}
consumer.isClosedMux.Lock()
defer consumer.isClosedMux.Unlock()
consumer.isClosed = true
// close the channel so that rabbitmq server knows that the
// consumer has been stopped.
err := consumer.chanManager.Close()
if err != nil {
consumer.options.Logger.Warnf("error while closing the channel: %v", err)
}
consumer.options.Logger.Infof("closing consumer...")
go func() {
consumer.closeConnectionToManagerCh <- struct{}{}
}()
}
// CloseWithContext cleans up resources and closes the consumer.
// It waits for all handlers to finish before returning
// (use WithConsumerOptionsForceShutdown option to disable this behavior).
// Use the context to cancel the handlers completion.
// CloseWithContext does not close the connection manager, just the subscription
// to the connection manager and the consuming goroutines.
// Only call once.
func (consumer *Consumer) CloseWithContext(ctx context.Context) {
if consumer.options.CloseGracefully {
err := consumer.waitForHandlers(context.Background())
if err != nil {
consumer.options.Logger.Warnf("error while waiting for handler to finish: %v", err)
}
}
consumer.isClosedMux.Lock()
defer consumer.isClosedMux.Unlock()
consumer.isClosed = true
@ -211,3 +261,23 @@ func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOpti
}
consumer.options.Logger.Infof("rabbit consumer goroutine closed")
}
func (consumer *Consumer) waitForHandlers(ctx context.Context) error {
if ctx == nil {
ctx = context.Background()
} else if ctx.Err() != nil {
return ctx.Err()
}
c := make(chan struct{})
go func() {
consumer.handlerMux.Lock()
defer consumer.handlerMux.Unlock()
close(c)
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-c:
return nil
}
}

+ 8
- 0
consumer_options.go View File

@ -28,6 +28,7 @@ func getDefaultConsumerOptions(queueName string) ConsumerOptions {
},
ExchangeOptions: []ExchangeOptions{},
Concurrency: 1,
CloseGracefully: true,
Logger: stdDebugLogger{},
QOSPrefetch: 10,
QOSGlobal: false,
@ -64,6 +65,7 @@ func getDefaultBindingOptions() BindingOptions {
type ConsumerOptions struct {
RabbitConsumerOptions RabbitConsumerOptions
QueueOptions QueueOptions
CloseGracefully bool
ExchangeOptions []ExchangeOptions
Concurrency int
Logger logger.Logger
@ -311,6 +313,12 @@ func WithConsumerOptionsQOSGlobal(options *ConsumerOptions) {
options.QOSGlobal = true
}
// WithConsumerOptionsForceShutdown tells the consumer to not wait for
// the handlers to complete in consumer.Close
func WithConsumerOptionsForceShutdown(options *ConsumerOptions) {
options.CloseGracefully = false
}
// WithConsumerOptionsQueueQuorum sets the queue a quorum type, which means
// multiple nodes in the cluster will have the messages distributed amongst them
// for higher reliability


Loading…
Cancel
Save