From 87ab733748dc89db310880c092d8297185dd90e9 Mon Sep 17 00:00:00 2001 From: Anatoly Ibragimov Date: Wed, 4 May 2022 11:46:29 +0300 Subject: [PATCH] Refactored `Logger` interface, Printf usages replaced by Debug, Info, Warning, Error methods --- channel.go | 17 ++++++++++------- consume.go | 19 ++++++++++--------- logger.go | 21 ++++++++++++++++++--- publish.go | 11 ++++++----- 4 files changed, 44 insertions(+), 24 deletions(-) diff --git a/channel.go b/channel.go index c2dba8d..30528c6 100644 --- a/channel.go +++ b/channel.go @@ -2,6 +2,7 @@ package rabbitmq import ( "errors" + "fmt" "sync" "time" @@ -62,18 +63,20 @@ func (chManager *channelManager) startNotifyCancelOrClosed() { select { case err := <-notifyCloseChan: if err != nil { - chManager.logger.Printf("attempting to reconnect to amqp server after close with error: %v", err) + chManager.logger.Error(fmt.Sprintf("connection to amqp server closed with error: %v", err)) + chManager.logger.Info("attempting to reconnect to amqp server after close") chManager.reconnectLoop() - chManager.logger.Printf("successfully reconnected to amqp server") + chManager.logger.Info("successfully reconnected to amqp server after close") chManager.notifyCancelOrClose <- err } if err == nil { - chManager.logger.Printf("amqp channel closed gracefully") + chManager.logger.Info("amqp channel closed gracefully") } case err := <-notifyCancelChan: - chManager.logger.Printf("attempting to reconnect to amqp server after cancel with error: %s", err) + chManager.logger.Error(fmt.Sprintf("connection to amqp server cancelled with error: %v", err)) + chManager.logger.Info("attempting to reconnect to amqp server after cancel") chManager.reconnectLoop() - chManager.logger.Printf("successfully reconnected to amqp server after cancel") + chManager.logger.Info("successfully reconnected to amqp server after cancel") chManager.notifyCancelOrClose <- errors.New(err) } } @@ -81,11 +84,11 @@ func (chManager *channelManager) startNotifyCancelOrClosed() { // reconnectLoop continuously attempts to reconnect func (chManager *channelManager) reconnectLoop() { for { - chManager.logger.Printf("waiting %s seconds to attempt to reconnect to amqp server", chManager.reconnectInterval) + chManager.logger.Debug(fmt.Sprintf("waiting %s seconds to attempt to reconnect to amqp server", chManager.reconnectInterval)) time.Sleep(chManager.reconnectInterval) err := chManager.reconnect() if err != nil { - chManager.logger.Printf("error reconnecting to amqp server: %v", err) + chManager.logger.Error(fmt.Sprintf("error reconnecting to amqp server: %v", err)) } else { chManager.reconnectionCount++ go chManager.startNotifyCancelOrClosed() diff --git a/consume.go b/consume.go index 276f75b..e27e816 100644 --- a/consume.go +++ b/consume.go @@ -1,6 +1,7 @@ package rabbitmq import ( + "errors" "fmt" "time" @@ -117,7 +118,7 @@ func (consumer Consumer) StartConsuming( go func() { for err := range consumer.chManager.notifyCancelOrClose { - consumer.logger.Printf("successful recovery from: %v", err) + consumer.logger.Error(fmt.Sprintf("successful recovery from: %v", err)) err = consumer.startGoroutines( handler, queue, @@ -125,7 +126,7 @@ func (consumer Consumer) StartConsuming( *options, ) if err != nil { - consumer.logger.Printf("error restarting consumer goroutines after cancel or close: %v", err) + consumer.logger.Error(fmt.Sprintf("error restarting consumer goroutines after cancel or close: %v", err)) } } }() @@ -135,7 +136,7 @@ func (consumer Consumer) StartConsuming( // Close cleans up resources and closes the consumer. // The consumer is not safe for reuse func (consumer Consumer) Close() error { - consumer.chManager.logger.Printf("closing consumer...") + consumer.chManager.logger.Info("closing consumer...") return consumer.chManager.close() } @@ -168,7 +169,7 @@ func (consumer Consumer) startGoroutines( if consumeOptions.BindingExchange != nil { exchange := consumeOptions.BindingExchange if exchange.Name == "" { - return fmt.Errorf("binding to exchange but name not specified") + return errors.New("binding to exchange but name not specified") } if exchange.Declare { err := consumer.chManager.channel.ExchangeDeclare( @@ -223,7 +224,7 @@ func (consumer Consumer) startGoroutines( for i := 0; i < consumeOptions.Concurrency; i++ { go handlerGoroutine(consumer, msgs, consumeOptions, handler) } - consumer.logger.Printf("Processing messages on %v goroutines", consumeOptions.Concurrency) + consumer.logger.Info(fmt.Sprintf("Processing messages on %v goroutines", consumeOptions.Concurrency)) return nil } @@ -237,19 +238,19 @@ func handlerGoroutine(consumer Consumer, msgs <-chan amqp.Delivery, consumeOptio case Ack: err := msg.Ack(false) if err != nil { - consumer.logger.Printf("can't ack message: %v", err) + consumer.logger.Error(fmt.Sprintf("can't ack message: %v", err)) } case NackDiscard: err := msg.Nack(false, false) if err != nil { - consumer.logger.Printf("can't nack message: %v", err) + consumer.logger.Error(fmt.Sprintf("can't nack message: %v", err)) } case NackRequeue: err := msg.Nack(false, true) if err != nil { - consumer.logger.Printf("can't nack message: %v", err) + consumer.logger.Error(fmt.Sprintf("can't nack message: %v", err)) } } } - consumer.logger.Printf("rabbit consumer goroutine closed") + consumer.logger.Debug("rabbit consumer goroutine closed") } diff --git a/logger.go b/logger.go index 920afb6..f9d4c13 100644 --- a/logger.go +++ b/logger.go @@ -8,7 +8,10 @@ import ( // Logger is the interface to send logs to. It can be set using // WithPublisherOptionsLogger() or WithConsumerOptionsLogger(). type Logger interface { - Printf(string, ...interface{}) + Debug(string) + Info(string) + Warning(string) + Error(string) } const loggingPrefix = "gorabbit" @@ -16,6 +19,18 @@ const loggingPrefix = "gorabbit" // stdLogger logs to stdout using go's default logger. type stdLogger struct{} -func (l stdLogger) Printf(format string, v ...interface{}) { - log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...) +func (l stdLogger) Debug(s string) { + log.Println(fmt.Sprintf("[Debug] %s: %s", loggingPrefix, s)) +} + +func (l stdLogger) Info(s string) { + log.Println(fmt.Sprintf("[Info] %s: %s", loggingPrefix, s)) +} + +func (l stdLogger) Warning(s string) { + log.Println(fmt.Sprintf("[Warning] %s: %s", loggingPrefix, s)) +} + +func (l stdLogger) Error(s string) { + log.Println(fmt.Sprintf("[Error] %s: %s", loggingPrefix, s)) } diff --git a/publish.go b/publish.go index 13db87a..b325e11 100644 --- a/publish.go +++ b/publish.go @@ -1,6 +1,7 @@ package rabbitmq import ( + "errors" "fmt" "sync" "time" @@ -119,7 +120,7 @@ func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptio func (publisher *Publisher) handleRestarts() { for err := range publisher.chManager.notifyCancelOrClose { - publisher.options.Logger.Printf("successful publisher recovery from: %v", err) + publisher.options.Logger.Error(fmt.Sprintf("successful publisher recovery from: %v", err)) go publisher.startNotifyFlowHandler() if publisher.notifyReturnChan != nil { go publisher.startNotifyReturnHandler() @@ -153,7 +154,7 @@ func (publisher *Publisher) Publish( ) error { publisher.disablePublishDueToFlowMux.RLock() if publisher.disablePublishDueToFlow { - return fmt.Errorf("publishing blocked due to high flow on the server") + return errors.New("publishing blocked due to high flow on the server") } publisher.disablePublishDueToFlowMux.RUnlock() @@ -200,7 +201,7 @@ func (publisher *Publisher) Publish( // Close closes the publisher and releases resources // The publisher should be discarded as it's not safe for re-use func (publisher Publisher) Close() error { - publisher.chManager.logger.Printf("closing publisher...") + publisher.chManager.logger.Info("closing publisher...") return publisher.chManager.close() } @@ -215,11 +216,11 @@ func (publisher *Publisher) startNotifyFlowHandler() { for ok := range notifyFlowChan { publisher.disablePublishDueToFlowMux.Lock() if ok { - publisher.options.Logger.Printf("pausing publishing due to flow request from server") + publisher.options.Logger.Info("pausing publishing due to flow request from server") publisher.disablePublishDueToFlow = true } else { publisher.disablePublishDueToFlow = false - publisher.options.Logger.Printf("resuming publishing due to flow request from server") + publisher.options.Logger.Info("resuming publishing due to flow request from server") } publisher.disablePublishDueToFlowMux.Unlock() }