|
|
|
@ -1,6 +1,7 @@ |
|
|
|
package rabbitmq |
|
|
|
|
|
|
|
import ( |
|
|
|
"context" |
|
|
|
"errors" |
|
|
|
"fmt" |
|
|
|
"sync" |
|
|
|
@ -32,6 +33,7 @@ type Consumer struct { |
|
|
|
reconnectErrCh <-chan error |
|
|
|
closeConnectionToManagerCh chan<- struct{} |
|
|
|
options ConsumerOptions |
|
|
|
handlerMux *sync.RWMutex |
|
|
|
|
|
|
|
isClosedMux *sync.RWMutex |
|
|
|
isClosed bool |
|
|
|
@ -89,6 +91,14 @@ func (consumer *Consumer) Run(handler Handler) error { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
handler = func(d Delivery) (action Action) { |
|
|
|
if !consumer.handlerMux.TryRLock() { |
|
|
|
return NackRequeue |
|
|
|
} |
|
|
|
defer consumer.handlerMux.RUnlock() |
|
|
|
return handler(d) |
|
|
|
} |
|
|
|
|
|
|
|
for err := range consumer.reconnectErrCh { |
|
|
|
consumer.options.Logger.Infof("successful consumer recovery from: %v", err) |
|
|
|
err = consumer.startGoroutines( |
|
|
|
@ -104,10 +114,26 @@ func (consumer *Consumer) Run(handler Handler) error { |
|
|
|
} |
|
|
|
|
|
|
|
// Close cleans up resources and closes the consumer.
|
|
|
|
// It waits for handler to finish before returning by default
|
|
|
|
// (use WithConsumerOptionsForceShutdown option to disable this behavior).
|
|
|
|
// Use CloseWithContext to specify a context to cancel the handler completion.
|
|
|
|
// It does not close the connection manager, just the subscription
|
|
|
|
// to the connection manager and the consuming goroutines.
|
|
|
|
// Only call once.
|
|
|
|
func (consumer *Consumer) Close() { |
|
|
|
if consumer.options.CloseGracefully { |
|
|
|
consumer.options.Logger.Infof("waiting for handler to finish...") |
|
|
|
err := consumer.waitForHandlerCompletion(context.Background()) |
|
|
|
if err != nil { |
|
|
|
consumer.options.Logger.Warnf("error while waiting for handler to finish: %v", err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
consumer.cleanupResources() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
func (consumer *Consumer) cleanupResources() { |
|
|
|
consumer.isClosedMux.Lock() |
|
|
|
defer consumer.isClosedMux.Unlock() |
|
|
|
consumer.isClosed = true |
|
|
|
@ -124,6 +150,24 @@ func (consumer *Consumer) Close() { |
|
|
|
}() |
|
|
|
} |
|
|
|
|
|
|
|
// CloseWithContext cleans up resources and closes the consumer.
|
|
|
|
// It waits for handler to finish before returning
|
|
|
|
// (use WithConsumerOptionsForceShutdown option to disable this behavior).
|
|
|
|
// Use the context to cancel the handler completion.
|
|
|
|
// CloseWithContext does not close the connection manager, just the subscription
|
|
|
|
// to the connection manager and the consuming goroutines.
|
|
|
|
// Only call once.
|
|
|
|
func (consumer *Consumer) CloseWithContext(ctx context.Context) { |
|
|
|
if consumer.options.CloseGracefully { |
|
|
|
err := consumer.waitForHandlerCompletion(ctx) |
|
|
|
if err != nil { |
|
|
|
consumer.options.Logger.Warnf("error while waiting for handler to finish: %v", err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
consumer.cleanupResources() |
|
|
|
} |
|
|
|
|
|
|
|
// startGoroutines declares the queue if it doesn't exist,
|
|
|
|
// binds the queue to the routing key(s), and starts the goroutines
|
|
|
|
// that will consume from the queue
|
|
|
|
@ -213,3 +257,23 @@ func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOpti |
|
|
|
} |
|
|
|
consumer.options.Logger.Infof("rabbit consumer goroutine closed") |
|
|
|
} |
|
|
|
|
|
|
|
func (consumer *Consumer) waitForHandlerCompletion(ctx context.Context) error { |
|
|
|
if ctx == nil { |
|
|
|
ctx = context.Background() |
|
|
|
} else if ctx.Err() != nil { |
|
|
|
return ctx.Err() |
|
|
|
} |
|
|
|
c := make(chan struct{}) |
|
|
|
go func() { |
|
|
|
consumer.handlerMux.Lock() |
|
|
|
defer consumer.handlerMux.Unlock() |
|
|
|
close(c) |
|
|
|
}() |
|
|
|
select { |
|
|
|
case <-ctx.Done(): |
|
|
|
return ctx.Err() |
|
|
|
case <-c: |
|
|
|
return nil |
|
|
|
} |
|
|
|
} |