|
|
|
@ -1,11 +1,13 @@ |
|
|
|
package rabbitmq |
|
|
|
|
|
|
|
import ( |
|
|
|
"context" |
|
|
|
"errors" |
|
|
|
"fmt" |
|
|
|
"sync" |
|
|
|
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go" |
|
|
|
|
|
|
|
"github.com/wagslane/go-rabbitmq/internal/channelmanager" |
|
|
|
) |
|
|
|
|
|
|
|
@ -33,6 +35,10 @@ type Consumer struct { |
|
|
|
closeConnectionToManagerCh chan<- struct{} |
|
|
|
options ConsumerOptions |
|
|
|
|
|
|
|
// the handlerMux wraps handler execution in a read-lock and allows the write-lock to be used to wait for
|
|
|
|
// all handlers to complete their current messages.
|
|
|
|
handlerMux *sync.RWMutex |
|
|
|
|
|
|
|
isClosedMux *sync.RWMutex |
|
|
|
isClosed bool |
|
|
|
} |
|
|
|
@ -78,6 +84,14 @@ func NewConsumer( |
|
|
|
isClosed: false, |
|
|
|
} |
|
|
|
|
|
|
|
handler = func(d Delivery) (action Action) { |
|
|
|
if !consumer.handlerMux.TryRLock() { |
|
|
|
return NackRequeue |
|
|
|
} |
|
|
|
defer consumer.handlerMux.RUnlock() |
|
|
|
return handler(d) |
|
|
|
} |
|
|
|
|
|
|
|
err = consumer.startGoroutines( |
|
|
|
handler, |
|
|
|
*options, |
|
|
|
@ -125,6 +139,30 @@ func (consumer *Consumer) Close() { |
|
|
|
}() |
|
|
|
} |
|
|
|
|
|
|
|
// WaitForHandlers will wait for all handler goroutines to finish processing their current messages and may issue
|
|
|
|
// NackRequeue for any remaining messages if auto-ack is not enabled. So this should only be called after Close has
|
|
|
|
// called or may be used as a circuit breaker to interrupt processing.
|
|
|
|
// If the context is cancelled before the handlers finish, the error from the context will be returned.
|
|
|
|
func (consumer *Consumer) WaitForHandlers(ctx context.Context) error { |
|
|
|
if ctx == nil { |
|
|
|
ctx = context.Background() |
|
|
|
} else if ctx.Err() != nil { |
|
|
|
return ctx.Err() |
|
|
|
} |
|
|
|
c := make(chan struct{}) |
|
|
|
go func() { |
|
|
|
consumer.handlerMux.Lock() |
|
|
|
defer consumer.handlerMux.Unlock() |
|
|
|
close(c) |
|
|
|
}() |
|
|
|
select { |
|
|
|
case <-ctx.Done(): |
|
|
|
return ctx.Err() |
|
|
|
case <-c: |
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// startGoroutines declares the queue if it doesn't exist,
|
|
|
|
// binds the queue to the routing key(s), and starts the goroutines
|
|
|
|
// that will consume from the queue
|
|
|
|
|