Browse Source

return an error instead of crashing when we cannot retry

pull/145/head
Hugo Wetterberg 2 years ago
parent
commit
d5750cdbc4
Failed to extract signature
1 changed files with 20 additions and 21 deletions
  1. +20
    -21
      consume.go

+ 20
- 21
consume.go View File

@ -44,12 +44,9 @@ type Delivery struct {
amqp.Delivery
}
// NewConsumer returns a new Consumer connected to the given rabbitmq server
// it also starts consuming on the given connection with automatic reconnection handling
// Do not reuse the returned consumer for anything other than to close it
// NewConsumer returns a new Consumer connected to the given rabbitmq server.
func NewConsumer(
conn *Conn,
handler Handler,
queue string,
optionFuncs ...func(*ConsumerOptions),
) (*Consumer, error) {
@ -78,30 +75,32 @@ func NewConsumer(
isClosed: false,
}
err = consumer.startGoroutines(
return consumer, nil
}
// Run starts consuming with automatic reconnection handling. Do not reuse the
// consumer for anything other than to close it.
func (consumer *Consumer) Run(handler Handler) error {
err := consumer.startGoroutines(
handler,
*options,
consumer.options,
)
if err != nil {
return nil, err
return err
}
go func() {
for err := range consumer.reconnectErrCh {
consumer.options.Logger.Infof("successful consumer recovery from: %v", err)
err = consumer.startGoroutines(
handler,
*options,
)
if err != nil {
consumer.options.Logger.Fatalf("error restarting consumer goroutines after cancel or close: %v", err)
consumer.options.Logger.Fatalf("consumer closing, unable to recover")
return
}
for err := range consumer.reconnectErrCh {
consumer.options.Logger.Infof("successful consumer recovery from: %v", err)
err = consumer.startGoroutines(
handler,
consumer.options,
)
if err != nil {
return fmt.Errorf("error restarting consumer goroutines after cancel or close: %w", err)
}
}()
}
return consumer, nil
return nil
}
// Close cleans up resources and closes the consumer.


Loading…
Cancel
Save