From 9d2ba192df1c0262996f723b00a29e06a38d2e95 Mon Sep 17 00:00:00 2001 From: wagslane Date: Fri, 20 May 2022 09:53:24 -0600 Subject: [PATCH] logger --- channel.go | 14 +++++++------- consume.go | 29 ++++++++++++----------------- examples/logger/main.go | 27 +++++++++++++++++++++------ logger.go | 31 +++++++++++++++++++++++++++---- publish.go | 15 ++++++--------- publish_flow_block.go | 8 ++++---- 6 files changed, 77 insertions(+), 47 deletions(-) diff --git a/channel.go b/channel.go index c2dba8d..fb471cd 100644 --- a/channel.go +++ b/channel.go @@ -62,18 +62,18 @@ func (chManager *channelManager) startNotifyCancelOrClosed() { select { case err := <-notifyCloseChan: if err != nil { - chManager.logger.Printf("attempting to reconnect to amqp server after close with error: %v", err) + chManager.logger.ErrorF("attempting to reconnect to amqp server after close with error: %v", err) chManager.reconnectLoop() - chManager.logger.Printf("successfully reconnected to amqp server") + chManager.logger.WarnF("successfully reconnected to amqp server") chManager.notifyCancelOrClose <- err } if err == nil { - chManager.logger.Printf("amqp channel closed gracefully") + chManager.logger.InfoF("amqp channel closed gracefully") } case err := <-notifyCancelChan: - chManager.logger.Printf("attempting to reconnect to amqp server after cancel with error: %s", err) + chManager.logger.ErrorF("attempting to reconnect to amqp server after cancel with error: %s", err) chManager.reconnectLoop() - chManager.logger.Printf("successfully reconnected to amqp server after cancel") + chManager.logger.WarnF("successfully reconnected to amqp server after cancel") chManager.notifyCancelOrClose <- errors.New(err) } } @@ -81,11 +81,11 @@ func (chManager *channelManager) startNotifyCancelOrClosed() { // reconnectLoop continuously attempts to reconnect func (chManager *channelManager) reconnectLoop() { for { - chManager.logger.Printf("waiting %s seconds to attempt to reconnect to amqp server", chManager.reconnectInterval) + chManager.logger.InfoF("waiting %s seconds to attempt to reconnect to amqp server", chManager.reconnectInterval) time.Sleep(chManager.reconnectInterval) err := chManager.reconnect() if err != nil { - chManager.logger.Printf("error reconnecting to amqp server: %v", err) + chManager.logger.ErrorF("error reconnecting to amqp server: %v", err) } else { chManager.reconnectionCount++ go chManager.startNotifyCancelOrClosed() diff --git a/consume.go b/consume.go index 276f75b..5346861 100644 --- a/consume.go +++ b/consume.go @@ -29,10 +29,8 @@ type Consumer struct { } // ConsumerOptions are used to describe a consumer's configuration. -// Logging set to true will enable the consumer to print to stdout -// Logger specifies a custom Logger interface implementation overruling Logging. +// Logger specifies a custom Logger interface implementation. type ConsumerOptions struct { - Logging bool Logger Logger ReconnectInterval time.Duration } @@ -47,8 +45,7 @@ type Delivery struct { // NewConsumer returns a new Consumer connected to the given rabbitmq server func NewConsumer(url string, config Config, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) { options := &ConsumerOptions{ - Logging: true, - Logger: &stdLogger{}, + Logger: &stdDebugLogger{}, ReconnectInterval: time.Second * 5, } for _, optionFunc := range optionFuncs { @@ -74,17 +71,15 @@ func WithConsumerOptionsReconnectInterval(reconnectInterval time.Duration) func( } } -// WithConsumerOptionsLogging sets a logger to log to stdout +// WithConsumerOptionsLogging uses a default logger that writes to std out func WithConsumerOptionsLogging(options *ConsumerOptions) { - options.Logging = true - options.Logger = &stdLogger{} + options.Logger = &stdDebugLogger{} } // WithConsumerOptionsLogger sets logging to a custom interface. // Use WithConsumerOptionsLogging to just log to stdout. func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) { return func(options *ConsumerOptions) { - options.Logging = true options.Logger = log } } @@ -117,7 +112,7 @@ func (consumer Consumer) StartConsuming( go func() { for err := range consumer.chManager.notifyCancelOrClose { - consumer.logger.Printf("successful recovery from: %v", err) + consumer.logger.InfoF("successful recovery from: %v", err) err = consumer.startGoroutines( handler, queue, @@ -125,7 +120,7 @@ func (consumer Consumer) StartConsuming( *options, ) if err != nil { - consumer.logger.Printf("error restarting consumer goroutines after cancel or close: %v", err) + consumer.logger.ErrorF("error restarting consumer goroutines after cancel or close: %v", err) } } }() @@ -135,7 +130,7 @@ func (consumer Consumer) StartConsuming( // Close cleans up resources and closes the consumer. // The consumer is not safe for reuse func (consumer Consumer) Close() error { - consumer.chManager.logger.Printf("closing consumer...") + consumer.chManager.logger.InfoF("closing consumer...") return consumer.chManager.close() } @@ -223,7 +218,7 @@ func (consumer Consumer) startGoroutines( for i := 0; i < consumeOptions.Concurrency; i++ { go handlerGoroutine(consumer, msgs, consumeOptions, handler) } - consumer.logger.Printf("Processing messages on %v goroutines", consumeOptions.Concurrency) + consumer.logger.InfoF("Processing messages on %v goroutines", consumeOptions.Concurrency) return nil } @@ -237,19 +232,19 @@ func handlerGoroutine(consumer Consumer, msgs <-chan amqp.Delivery, consumeOptio case Ack: err := msg.Ack(false) if err != nil { - consumer.logger.Printf("can't ack message: %v", err) + consumer.logger.ErrorF("can't ack message: %v", err) } case NackDiscard: err := msg.Nack(false, false) if err != nil { - consumer.logger.Printf("can't nack message: %v", err) + consumer.logger.ErrorF("can't nack message: %v", err) } case NackRequeue: err := msg.Nack(false, true) if err != nil { - consumer.logger.Printf("can't nack message: %v", err) + consumer.logger.ErrorF("can't nack message: %v", err) } } } - consumer.logger.Printf("rabbit consumer goroutine closed") + consumer.logger.InfoF("rabbit consumer goroutine closed") } diff --git a/examples/logger/main.go b/examples/logger/main.go index e082792..e7fca1a 100644 --- a/examples/logger/main.go +++ b/examples/logger/main.go @@ -6,16 +6,31 @@ import ( rabbitmq "github.com/wagslane/go-rabbitmq" ) -// customLogger is used in WithPublisherOptionsLogger to create a custom logger. -type customLogger struct{} +// errorLogger is used in WithPublisherOptionsLogger to create a custom logger +// that only logs ERROR and FATAL log levels +type errorLogger struct{} -// Printf is the only method needed in the Logger interface to function properly. -func (c *customLogger) Printf(fmt string, args ...interface{}) { - log.Printf("mylogger: "+fmt, args...) +func (l errorLogger) FatalF(format string, v ...interface{}) { + log.Printf("mylogger: "+format, v...) } +func (l errorLogger) ErrorF(format string, v ...interface{}) { + log.Printf("mylogger: "+format, v...) +} + +func (l errorLogger) WarnF(format string, v ...interface{}) { +} + +func (l errorLogger) InfoF(format string, v ...interface{}) { +} + +func (l errorLogger) DebugF(format string, v ...interface{}) { +} + +func (l errorLogger) TraceF(format string, v ...interface{}) {} + func main() { - mylogger := &customLogger{} + mylogger := &errorLogger{} publisher, err := rabbitmq.NewPublisher( "amqp://guest:guest@localhost", rabbitmq.Config{}, diff --git a/logger.go b/logger.go index 920afb6..3e4fea1 100644 --- a/logger.go +++ b/logger.go @@ -8,14 +8,37 @@ import ( // Logger is the interface to send logs to. It can be set using // WithPublisherOptionsLogger() or WithConsumerOptionsLogger(). type Logger interface { - Printf(string, ...interface{}) + FatalF(string, ...interface{}) + ErrorF(string, ...interface{}) + WarnF(string, ...interface{}) + InfoF(string, ...interface{}) + DebugF(string, ...interface{}) + TraceF(string, ...interface{}) } const loggingPrefix = "gorabbit" -// stdLogger logs to stdout using go's default logger. -type stdLogger struct{} +// stdDebugLogger logs to stdout up to the `DebugF` level +type stdDebugLogger struct{} -func (l stdLogger) Printf(format string, v ...interface{}) { +func (l stdDebugLogger) FatalF(format string, v ...interface{}) { log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...) } + +func (l stdDebugLogger) ErrorF(format string, v ...interface{}) { + log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...) +} + +func (l stdDebugLogger) WarnF(format string, v ...interface{}) { + log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...) +} + +func (l stdDebugLogger) InfoF(format string, v ...interface{}) { + log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...) +} + +func (l stdDebugLogger) DebugF(format string, v ...interface{}) { + log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...) +} + +func (l stdDebugLogger) TraceF(format string, v ...interface{}) {} diff --git a/publish.go b/publish.go index 4e3046f..7113a4f 100644 --- a/publish.go +++ b/publish.go @@ -54,9 +54,8 @@ type Publisher struct { } // PublisherOptions are used to describe a publisher's configuration. -// Logging set to true will enable the consumer to print to stdout +// Logger is a custom logging interface. type PublisherOptions struct { - Logging bool Logger Logger ReconnectInterval time.Duration } @@ -70,16 +69,15 @@ func WithPublisherOptionsReconnectInterval(reconnectInterval time.Duration) func } // WithPublisherOptionsLogging sets logging to true on the consumer options +// and sets the func WithPublisherOptionsLogging(options *PublisherOptions) { - options.Logging = true - options.Logger = &stdLogger{} + options.Logger = &stdDebugLogger{} } // WithPublisherOptionsLogger sets logging to a custom interface. // Use WithPublisherOptionsLogging to just log to stdout. func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) { return func(options *PublisherOptions) { - options.Logging = true options.Logger = log } } @@ -91,8 +89,7 @@ func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) { // will fail with an error when the server is requesting a slowdown func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptions)) (*Publisher, error) { options := &PublisherOptions{ - Logging: true, - Logger: &stdLogger{}, + Logger: &stdDebugLogger{}, ReconnectInterval: time.Second * 5, } for _, optionFunc := range optionFuncs { @@ -124,7 +121,7 @@ func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptio func (publisher *Publisher) handleRestarts() { for err := range publisher.chManager.notifyCancelOrClose { - publisher.options.Logger.Printf("successful publisher recovery from: %v", err) + publisher.options.Logger.InfoF("successful publisher recovery from: %v", err) go publisher.startNotifyFlowHandler() if publisher.notifyReturnChan != nil { go publisher.startNotifyReturnHandler() @@ -211,7 +208,7 @@ func (publisher *Publisher) Publish( // Close closes the publisher and releases resources // The publisher should be discarded as it's not safe for re-use func (publisher Publisher) Close() error { - publisher.chManager.logger.Printf("closing publisher...") + publisher.chManager.logger.InfoF("closing publisher...") return publisher.chManager.close() } diff --git a/publish_flow_block.go b/publish_flow_block.go index 9d70fe6..549061b 100644 --- a/publish_flow_block.go +++ b/publish_flow_block.go @@ -13,11 +13,11 @@ func (publisher *Publisher) startNotifyFlowHandler() { for ok := range notifyFlowChan { publisher.disablePublishDueToFlowMux.Lock() if ok { - publisher.options.Logger.Printf("pausing publishing due to flow request from server") + publisher.options.Logger.WarnF("pausing publishing due to flow request from server") publisher.disablePublishDueToFlow = true } else { publisher.disablePublishDueToFlow = false - publisher.options.Logger.Printf("resuming publishing due to flow request from server") + publisher.options.Logger.WarnF("resuming publishing due to flow request from server") } publisher.disablePublishDueToFlowMux.Unlock() } @@ -32,11 +32,11 @@ func (publisher *Publisher) startNotifyBlockedHandler() { for b := range blockings { publisher.disablePublishDueToBlockedMux.Lock() if b.Active { - publisher.options.Logger.Printf("pausing publishing due to TCP blocking from server") + publisher.options.Logger.WarnF("pausing publishing due to TCP blocking from server") publisher.disablePublishDueToBlocked = true } else { publisher.disablePublishDueToBlocked = false - publisher.options.Logger.Printf("resuming publishing due to TCP blocking from server") + publisher.options.Logger.WarnF("resuming publishing due to TCP blocking from server") } publisher.disablePublishDueToBlockedMux.Unlock() }