|
|
|
@ -116,6 +116,7 @@ func (consumer *Consumer) Run(handler Handler) error { |
|
|
|
// Close cleans up resources and closes the consumer.
|
|
|
|
// It waits for all handlers to finish before returning by default
|
|
|
|
// (use WithConsumerOptionsForceShutdown option to disable this behavior).
|
|
|
|
// Use CloseWithContext to specify a context to cancel the handlers completion.
|
|
|
|
// It does not close the connection manager, just the subscription
|
|
|
|
// to the connection manager and the consuming goroutines.
|
|
|
|
// Only call once.
|
|
|
|
@ -127,6 +128,11 @@ func (consumer *Consumer) Close() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
consumer.cleanupResources() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
func (consumer *Consumer) cleanupResources() { |
|
|
|
consumer.isClosedMux.Lock() |
|
|
|
defer consumer.isClosedMux.Unlock() |
|
|
|
consumer.isClosed = true |
|
|
|
@ -158,20 +164,7 @@ func (consumer *Consumer) CloseWithContext(ctx context.Context) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
consumer.isClosedMux.Lock() |
|
|
|
defer consumer.isClosedMux.Unlock() |
|
|
|
consumer.isClosed = true |
|
|
|
// close the channel so that rabbitmq server knows that the
|
|
|
|
// consumer has been stopped.
|
|
|
|
err := consumer.chanManager.Close() |
|
|
|
if err != nil { |
|
|
|
consumer.options.Logger.Warnf("error while closing the channel: %v", err) |
|
|
|
} |
|
|
|
|
|
|
|
consumer.options.Logger.Infof("closing consumer...") |
|
|
|
go func() { |
|
|
|
consumer.closeConnectionToManagerCh <- struct{}{} |
|
|
|
}() |
|
|
|
consumer.cleanupResources() |
|
|
|
} |
|
|
|
|
|
|
|
// startGoroutines declares the queue if it doesn't exist,
|
|
|
|
|