From fa1267bc6e136fd303ea4539ccbaa8235d27b4fe Mon Sep 17 00:00:00 2001 From: Tommy van Leeuwen Date: Wed, 7 Apr 2021 21:01:44 +0200 Subject: [PATCH 1/2] Replace logger with a Logger interface to allow for custom loggers. --- channel.go | 14 +++++++------- consume.go | 33 +++++++++++++++++++++++++------ examples/logger/main.go | 43 +++++++++++++++++++++++++++++++++++++++++ logger.go | 24 ++++++++++++----------- publish.go | 24 ++++++++++++++++++----- 5 files changed, 109 insertions(+), 29 deletions(-) create mode 100644 examples/logger/main.go diff --git a/channel.go b/channel.go index 2057598..84bc086 100644 --- a/channel.go +++ b/channel.go @@ -9,21 +9,21 @@ import ( ) type channelManager struct { - logger logger + logger Logger url string channel *amqp.Channel channelMux *sync.RWMutex notifyCancelOrClose chan error } -func newChannelManager(url string, logging bool) (*channelManager, error) { +func newChannelManager(url string, log Logger) (*channelManager, error) { ch, err := getNewChannel(url) if err != nil { return nil, err } chManager := channelManager{ - logger: logger{logging: logging}, + logger: log, url: url, channel: ch, channelMux: &sync.RWMutex{}, @@ -56,14 +56,14 @@ func (chManager *channelManager) startNotifyCancelOrClosed() { notifyCancelChan = chManager.channel.NotifyCancel(notifyCancelChan) select { case err := <-notifyCloseChan: - chManager.logger.Println("attempting to reconnect to amqp server after close") + chManager.logger.Printf("attempting to reconnect to amqp server after close") chManager.reconnectWithBackoff() - chManager.logger.Println("successfully reconnected to amqp server after close") + chManager.logger.Printf("successfully reconnected to amqp server after close") chManager.notifyCancelOrClose <- err case err := <-notifyCancelChan: - chManager.logger.Println("attempting to reconnect to amqp server after cancel") + chManager.logger.Printf("attempting to reconnect to amqp server after cancel") chManager.reconnectWithBackoff() - chManager.logger.Println("successfully reconnected to amqp server after cancel") + chManager.logger.Printf("successfully reconnected to amqp server after cancel") chManager.notifyCancelOrClose <- errors.New(err) } close(notifyCancelChan) diff --git a/consume.go b/consume.go index 7e9e80d..8f5c637 100644 --- a/consume.go +++ b/consume.go @@ -9,13 +9,15 @@ import ( // Consumer allows you to create and connect to queues for data consumption. type Consumer struct { chManager *channelManager - logger logger + logger Logger } // ConsumerOptions are used to describe a consumer's configuration. // Logging set to true will enable the consumer to print to stdout +// Logger specifies a custom Logger interface implementation overruling Logging. type ConsumerOptions struct { Logging bool + Logger Logger } // Delivery captures the fields for a previously delivered message resident in @@ -31,14 +33,17 @@ func NewConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, e for _, optionFunc := range optionFuncs { optionFunc(options) } + if options.Logger == nil { + options.Logger = &nolog{} // default no logging + } - chManager, err := newChannelManager(url, options.Logging) + chManager, err := newChannelManager(url, options.Logger) if err != nil { return Consumer{}, err } consumer := Consumer{ chManager: chManager, - logger: logger{logging: options.Logging}, + logger: options.Logger, } return consumer, nil } @@ -46,6 +51,16 @@ func NewConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, e // WithConsumerOptionsLogging sets logging to true on the consumer options func WithConsumerOptionsLogging(options *ConsumerOptions) { options.Logging = true + options.Logger = &stdlog{} +} + +// WithConsumerOptionsLogger sets logging to a custom interface. +// Use WithConsumerOptionsLogging to just log to stdout. +func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) { + return func(options *ConsumerOptions) { + options.Logging = true + options.Logger = log + } } // getDefaultConsumeOptions descibes the options that will be used when a value isn't provided @@ -330,12 +345,18 @@ func (consumer Consumer) startGoroutines( continue } if handler(Delivery{msg}) { - msg.Ack(false) + err := msg.Ack(false) + if err != nil { + consumer.logger.Printf("can't ack message: %v", err) + } } else { - msg.Nack(false, true) + err := msg.Nack(false, true) + if err != nil { + consumer.logger.Printf("can't nack message: %v", err) + } } } - consumer.logger.Println("rabbit consumer goroutine closed") + consumer.logger.Printf("rabbit consumer goroutine closed") }() } consumer.logger.Printf("Processing messages on %v goroutines", consumeOptions.Concurrency) diff --git a/examples/logger/main.go b/examples/logger/main.go new file mode 100644 index 0000000..06ccd47 --- /dev/null +++ b/examples/logger/main.go @@ -0,0 +1,43 @@ +package main + +import ( + "log" + + rabbitmq "github.com/wagslane/go-rabbitmq" +) + +// CustomLog is used in WithPublisherOptionsLogger to create a custom logger. +type CustomLog struct{} + +func (c *CustomLog) Printf(fmt string, args ...interface{}) { + log.Printf("mylogger: "+fmt, args...) +} + +func main() { + mylogger := &CustomLog{} + + publisher, returns, err := rabbitmq.NewPublisher( + "amqp://guest:guest@localhost", + rabbitmq.WithPublisherOptionsLogger(mylogger), + ) + if err != nil { + log.Fatal(err) + } + err = publisher.Publish( + []byte("hello, world"), + []string{"routing_key"}, + rabbitmq.WithPublishOptionsContentType("application/json"), + rabbitmq.WithPublishOptionsMandatory, + rabbitmq.WithPublishOptionsPersistentDelivery, + rabbitmq.WithPublishOptionsExchange("events"), + ) + if err != nil { + log.Fatal(err) + } + + go func() { + for r := range returns { + log.Printf("message returned from server: %s", string(r.Body)) + } + }() +} diff --git a/logger.go b/logger.go index a47341d..621a0ad 100644 --- a/logger.go +++ b/logger.go @@ -5,20 +5,22 @@ import ( "log" ) -type logger struct { - logging bool +// Logger is the interface to send logs to. It can be set using +// WithPublisherOptionsLogger() or WithConsumerOptionsLogger(). +type Logger interface { + Printf(string, ...interface{}) } const loggingPrefix = "gorabbit" -func (l logger) Printf(format string, v ...interface{}) { - if l.logging { - log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...) - } -} +// stdlog logs to stdout using go's default logger. +type stdlog struct{} -func (l logger) Println(v ...interface{}) { - if l.logging { - log.Println(loggingPrefix, fmt.Sprintf("%v", v...)) - } +func (l stdlog) Printf(format string, v ...interface{}) { + log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...) } + +// nolog does not log at all, this is the default. +type nolog struct{} + +func (l nolog) Printf(format string, v ...interface{}) {} diff --git a/publish.go b/publish.go index f390272..29d4144 100644 --- a/publish.go +++ b/publish.go @@ -85,18 +85,29 @@ type Publisher struct { disablePublishDueToFlow bool disablePublishDueToFlowMux *sync.RWMutex - logger logger + logger Logger } // PublisherOptions are used to describe a publisher's configuration. // Logging set to true will enable the consumer to print to stdout type PublisherOptions struct { Logging bool + Logger Logger } // WithPublisherOptionsLogging sets logging to true on the consumer options func WithPublisherOptionsLogging(options *PublisherOptions) { options.Logging = true + options.Logger = &stdlog{} +} + +// WithPublisherOptionLogger sets logging to a custom interface. +// Use WithPublisherOptionsLogging to just log to stdout. +func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) { + return func(options *PublisherOptions) { + options.Logging = true + options.Logger = log + } } // NewPublisher returns a new publisher with an open channel to the cluster. @@ -109,8 +120,11 @@ func NewPublisher(url string, optionFuncs ...func(*PublisherOptions)) (Publisher for _, optionFunc := range optionFuncs { optionFunc(options) } + if options.Logger == nil { + options.Logger = &nolog{} // default no logging + } - chManager, err := newChannelManager(url, options.Logging) + chManager, err := newChannelManager(url, options.Logger) if err != nil { return Publisher{}, nil, err } @@ -120,7 +134,7 @@ func NewPublisher(url string, optionFuncs ...func(*PublisherOptions)) (Publisher notifyFlowChan: make(chan bool), disablePublishDueToFlow: false, disablePublishDueToFlowMux: &sync.RWMutex{}, - logger: logger{logging: options.Logging}, + logger: options.Logger, } returnAMQPChan := make(chan amqp.Return) @@ -182,13 +196,13 @@ func (publisher *Publisher) Publish( func (publisher *Publisher) startNotifyFlowHandler() { for ok := range publisher.notifyFlowChan { publisher.disablePublishDueToFlowMux.Lock() - publisher.logger.Println("pausing publishing due to flow request from server") + publisher.logger.Printf("pausing publishing due to flow request from server") if ok { publisher.disablePublishDueToFlow = false } else { publisher.disablePublishDueToFlow = true } publisher.disablePublishDueToFlowMux.Unlock() - publisher.logger.Println("resuming publishing due to flow request from server") + publisher.logger.Printf("resuming publishing due to flow request from server") } } From 4e2687264a74f7aac571be3bdb585c2ff4d4952f Mon Sep 17 00:00:00 2001 From: Tommy van Leeuwen Date: Wed, 7 Apr 2021 21:23:10 +0200 Subject: [PATCH 2/2] Fix some linter warnings. --- examples/logger/main.go | 1 + publish.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/logger/main.go b/examples/logger/main.go index 06ccd47..e2cab77 100644 --- a/examples/logger/main.go +++ b/examples/logger/main.go @@ -9,6 +9,7 @@ import ( // CustomLog is used in WithPublisherOptionsLogger to create a custom logger. type CustomLog struct{} +// Printf is the only method needed in the Logger interface to function properly. func (c *CustomLog) Printf(fmt string, args ...interface{}) { log.Printf("mylogger: "+fmt, args...) } diff --git a/publish.go b/publish.go index 29d4144..54c1e4c 100644 --- a/publish.go +++ b/publish.go @@ -101,7 +101,7 @@ func WithPublisherOptionsLogging(options *PublisherOptions) { options.Logger = &stdlog{} } -// WithPublisherOptionLogger sets logging to a custom interface. +// WithPublisherOptionsLogger sets logging to a custom interface. // Use WithPublisherOptionsLogging to just log to stdout. func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) { return func(options *PublisherOptions) {