Browse Source

Refactored `Logger` interface, Printf usages replaced by Debug, Info, Warning, Error methods

pull/73/head
Anatoly Ibragimov 4 years ago
parent
commit
87ab733748
4 changed files with 44 additions and 24 deletions
  1. +10
    -7
      channel.go
  2. +10
    -9
      consume.go
  3. +18
    -3
      logger.go
  4. +6
    -5
      publish.go

+ 10
- 7
channel.go View File

@ -2,6 +2,7 @@ package rabbitmq
import (
"errors"
"fmt"
"sync"
"time"
@ -62,18 +63,20 @@ func (chManager *channelManager) startNotifyCancelOrClosed() {
select {
case err := <-notifyCloseChan:
if err != nil {
chManager.logger.Printf("attempting to reconnect to amqp server after close with error: %v", err)
chManager.logger.Error(fmt.Sprintf("connection to amqp server closed with error: %v", err))
chManager.logger.Info("attempting to reconnect to amqp server after close")
chManager.reconnectLoop()
chManager.logger.Printf("successfully reconnected to amqp server")
chManager.logger.Info("successfully reconnected to amqp server after close")
chManager.notifyCancelOrClose <- err
}
if err == nil {
chManager.logger.Printf("amqp channel closed gracefully")
chManager.logger.Info("amqp channel closed gracefully")
}
case err := <-notifyCancelChan:
chManager.logger.Printf("attempting to reconnect to amqp server after cancel with error: %s", err)
chManager.logger.Error(fmt.Sprintf("connection to amqp server cancelled with error: %v", err))
chManager.logger.Info("attempting to reconnect to amqp server after cancel")
chManager.reconnectLoop()
chManager.logger.Printf("successfully reconnected to amqp server after cancel")
chManager.logger.Info("successfully reconnected to amqp server after cancel")
chManager.notifyCancelOrClose <- errors.New(err)
}
}
@ -81,11 +84,11 @@ func (chManager *channelManager) startNotifyCancelOrClosed() {
// reconnectLoop continuously attempts to reconnect
func (chManager *channelManager) reconnectLoop() {
for {
chManager.logger.Printf("waiting %s seconds to attempt to reconnect to amqp server", chManager.reconnectInterval)
chManager.logger.Debug(fmt.Sprintf("waiting %s seconds to attempt to reconnect to amqp server", chManager.reconnectInterval))
time.Sleep(chManager.reconnectInterval)
err := chManager.reconnect()
if err != nil {
chManager.logger.Printf("error reconnecting to amqp server: %v", err)
chManager.logger.Error(fmt.Sprintf("error reconnecting to amqp server: %v", err))
} else {
chManager.reconnectionCount++
go chManager.startNotifyCancelOrClosed()


+ 10
- 9
consume.go View File

@ -1,6 +1,7 @@
package rabbitmq
import (
"errors"
"fmt"
"time"
@ -117,7 +118,7 @@ func (consumer Consumer) StartConsuming(
go func() {
for err := range consumer.chManager.notifyCancelOrClose {
consumer.logger.Printf("successful recovery from: %v", err)
consumer.logger.Error(fmt.Sprintf("successful recovery from: %v", err))
err = consumer.startGoroutines(
handler,
queue,
@ -125,7 +126,7 @@ func (consumer Consumer) StartConsuming(
*options,
)
if err != nil {
consumer.logger.Printf("error restarting consumer goroutines after cancel or close: %v", err)
consumer.logger.Error(fmt.Sprintf("error restarting consumer goroutines after cancel or close: %v", err))
}
}
}()
@ -135,7 +136,7 @@ func (consumer Consumer) StartConsuming(
// Close cleans up resources and closes the consumer.
// The consumer is not safe for reuse
func (consumer Consumer) Close() error {
consumer.chManager.logger.Printf("closing consumer...")
consumer.chManager.logger.Info("closing consumer...")
return consumer.chManager.close()
}
@ -168,7 +169,7 @@ func (consumer Consumer) startGoroutines(
if consumeOptions.BindingExchange != nil {
exchange := consumeOptions.BindingExchange
if exchange.Name == "" {
return fmt.Errorf("binding to exchange but name not specified")
return errors.New("binding to exchange but name not specified")
}
if exchange.Declare {
err := consumer.chManager.channel.ExchangeDeclare(
@ -223,7 +224,7 @@ func (consumer Consumer) startGoroutines(
for i := 0; i < consumeOptions.Concurrency; i++ {
go handlerGoroutine(consumer, msgs, consumeOptions, handler)
}
consumer.logger.Printf("Processing messages on %v goroutines", consumeOptions.Concurrency)
consumer.logger.Info(fmt.Sprintf("Processing messages on %v goroutines", consumeOptions.Concurrency))
return nil
}
@ -237,19 +238,19 @@ func handlerGoroutine(consumer Consumer, msgs <-chan amqp.Delivery, consumeOptio
case Ack:
err := msg.Ack(false)
if err != nil {
consumer.logger.Printf("can't ack message: %v", err)
consumer.logger.Error(fmt.Sprintf("can't ack message: %v", err))
}
case NackDiscard:
err := msg.Nack(false, false)
if err != nil {
consumer.logger.Printf("can't nack message: %v", err)
consumer.logger.Error(fmt.Sprintf("can't nack message: %v", err))
}
case NackRequeue:
err := msg.Nack(false, true)
if err != nil {
consumer.logger.Printf("can't nack message: %v", err)
consumer.logger.Error(fmt.Sprintf("can't nack message: %v", err))
}
}
}
consumer.logger.Printf("rabbit consumer goroutine closed")
consumer.logger.Debug("rabbit consumer goroutine closed")
}

+ 18
- 3
logger.go View File

@ -8,7 +8,10 @@ import (
// Logger is the interface to send logs to. It can be set using
// WithPublisherOptionsLogger() or WithConsumerOptionsLogger().
type Logger interface {
Printf(string, ...interface{})
Debug(string)
Info(string)
Warning(string)
Error(string)
}
const loggingPrefix = "gorabbit"
@ -16,6 +19,18 @@ const loggingPrefix = "gorabbit"
// stdLogger logs to stdout using go's default logger.
type stdLogger struct{}
func (l stdLogger) Printf(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...)
func (l stdLogger) Debug(s string) {
log.Println(fmt.Sprintf("[Debug] %s: %s", loggingPrefix, s))
}
func (l stdLogger) Info(s string) {
log.Println(fmt.Sprintf("[Info] %s: %s", loggingPrefix, s))
}
func (l stdLogger) Warning(s string) {
log.Println(fmt.Sprintf("[Warning] %s: %s", loggingPrefix, s))
}
func (l stdLogger) Error(s string) {
log.Println(fmt.Sprintf("[Error] %s: %s", loggingPrefix, s))
}

+ 6
- 5
publish.go View File

@ -1,6 +1,7 @@
package rabbitmq
import (
"errors"
"fmt"
"sync"
"time"
@ -119,7 +120,7 @@ func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptio
func (publisher *Publisher) handleRestarts() {
for err := range publisher.chManager.notifyCancelOrClose {
publisher.options.Logger.Printf("successful publisher recovery from: %v", err)
publisher.options.Logger.Error(fmt.Sprintf("successful publisher recovery from: %v", err))
go publisher.startNotifyFlowHandler()
if publisher.notifyReturnChan != nil {
go publisher.startNotifyReturnHandler()
@ -153,7 +154,7 @@ func (publisher *Publisher) Publish(
) error {
publisher.disablePublishDueToFlowMux.RLock()
if publisher.disablePublishDueToFlow {
return fmt.Errorf("publishing blocked due to high flow on the server")
return errors.New("publishing blocked due to high flow on the server")
}
publisher.disablePublishDueToFlowMux.RUnlock()
@ -200,7 +201,7 @@ func (publisher *Publisher) Publish(
// Close closes the publisher and releases resources
// The publisher should be discarded as it's not safe for re-use
func (publisher Publisher) Close() error {
publisher.chManager.logger.Printf("closing publisher...")
publisher.chManager.logger.Info("closing publisher...")
return publisher.chManager.close()
}
@ -215,11 +216,11 @@ func (publisher *Publisher) startNotifyFlowHandler() {
for ok := range notifyFlowChan {
publisher.disablePublishDueToFlowMux.Lock()
if ok {
publisher.options.Logger.Printf("pausing publishing due to flow request from server")
publisher.options.Logger.Info("pausing publishing due to flow request from server")
publisher.disablePublishDueToFlow = true
} else {
publisher.disablePublishDueToFlow = false
publisher.options.Logger.Printf("resuming publishing due to flow request from server")
publisher.options.Logger.Info("resuming publishing due to flow request from server")
}
publisher.disablePublishDueToFlowMux.Unlock()
}


Loading…
Cancel
Save