|
|
|
@ -54,9 +54,9 @@ func NewConsumer( |
|
|
|
optionFuncs ...func(*ConsumerOptions), |
|
|
|
) (*Consumer, error) { |
|
|
|
defaultOptions := getDefaultConsumerOptions(queue) |
|
|
|
options := &defaultOptions |
|
|
|
options := defaultOptions |
|
|
|
for _, optionFunc := range optionFuncs { |
|
|
|
optionFunc(options) |
|
|
|
optionFunc(&options) |
|
|
|
} |
|
|
|
|
|
|
|
if conn.connectionManager == nil { |
|
|
|
@ -73,14 +73,14 @@ func NewConsumer( |
|
|
|
chanManager: chanManager, |
|
|
|
reconnectErrCh: reconnectErrCh, |
|
|
|
closeConnectionToManagerCh: closeCh, |
|
|
|
options: *options, |
|
|
|
options: options, |
|
|
|
isClosedMux: &sync.RWMutex{}, |
|
|
|
isClosed: false, |
|
|
|
} |
|
|
|
|
|
|
|
err = consumer.startGoroutines( |
|
|
|
handler, |
|
|
|
*options, |
|
|
|
options, |
|
|
|
) |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
@ -91,7 +91,7 @@ func NewConsumer( |
|
|
|
consumer.options.Logger.Infof("successful consumer recovery from: %v", err) |
|
|
|
err = consumer.startGoroutines( |
|
|
|
handler, |
|
|
|
*options, |
|
|
|
options, |
|
|
|
) |
|
|
|
if err != nil { |
|
|
|
consumer.options.Logger.Fatalf("error restarting consumer goroutines after cancel or close: %v", err) |
|
|
|
|