Browse Source

Replace logger with a Logger interface to allow for custom loggers.

pull/4/head
Tommy van Leeuwen 5 years ago
parent
commit
fa1267bc6e
5 changed files with 109 additions and 29 deletions
  1. +7
    -7
      channel.go
  2. +27
    -6
      consume.go
  3. +43
    -0
      examples/logger/main.go
  4. +13
    -11
      logger.go
  5. +19
    -5
      publish.go

+ 7
- 7
channel.go View File

@ -9,21 +9,21 @@ import (
)
type channelManager struct {
logger logger
logger Logger
url string
channel *amqp.Channel
channelMux *sync.RWMutex
notifyCancelOrClose chan error
}
func newChannelManager(url string, logging bool) (*channelManager, error) {
func newChannelManager(url string, log Logger) (*channelManager, error) {
ch, err := getNewChannel(url)
if err != nil {
return nil, err
}
chManager := channelManager{
logger: logger{logging: logging},
logger: log,
url: url,
channel: ch,
channelMux: &sync.RWMutex{},
@ -56,14 +56,14 @@ func (chManager *channelManager) startNotifyCancelOrClosed() {
notifyCancelChan = chManager.channel.NotifyCancel(notifyCancelChan)
select {
case err := <-notifyCloseChan:
chManager.logger.Println("attempting to reconnect to amqp server after close")
chManager.logger.Printf("attempting to reconnect to amqp server after close")
chManager.reconnectWithBackoff()
chManager.logger.Println("successfully reconnected to amqp server after close")
chManager.logger.Printf("successfully reconnected to amqp server after close")
chManager.notifyCancelOrClose <- err
case err := <-notifyCancelChan:
chManager.logger.Println("attempting to reconnect to amqp server after cancel")
chManager.logger.Printf("attempting to reconnect to amqp server after cancel")
chManager.reconnectWithBackoff()
chManager.logger.Println("successfully reconnected to amqp server after cancel")
chManager.logger.Printf("successfully reconnected to amqp server after cancel")
chManager.notifyCancelOrClose <- errors.New(err)
}
close(notifyCancelChan)


+ 27
- 6
consume.go View File

@ -9,13 +9,15 @@ import (
// Consumer allows you to create and connect to queues for data consumption.
type Consumer struct {
chManager *channelManager
logger logger
logger Logger
}
// ConsumerOptions are used to describe a consumer's configuration.
// Logging set to true will enable the consumer to print to stdout
// Logger specifies a custom Logger interface implementation overruling Logging.
type ConsumerOptions struct {
Logging bool
Logger Logger
}
// Delivery captures the fields for a previously delivered message resident in
@ -31,14 +33,17 @@ func NewConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, e
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
if options.Logger == nil {
options.Logger = &nolog{} // default no logging
}
chManager, err := newChannelManager(url, options.Logging)
chManager, err := newChannelManager(url, options.Logger)
if err != nil {
return Consumer{}, err
}
consumer := Consumer{
chManager: chManager,
logger: logger{logging: options.Logging},
logger: options.Logger,
}
return consumer, nil
}
@ -46,6 +51,16 @@ func NewConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, e
// WithConsumerOptionsLogging sets logging to true on the consumer options
func WithConsumerOptionsLogging(options *ConsumerOptions) {
options.Logging = true
options.Logger = &stdlog{}
}
// WithConsumerOptionsLogger sets logging to a custom interface.
// Use WithConsumerOptionsLogging to just log to stdout.
func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Logging = true
options.Logger = log
}
}
// getDefaultConsumeOptions descibes the options that will be used when a value isn't provided
@ -330,12 +345,18 @@ func (consumer Consumer) startGoroutines(
continue
}
if handler(Delivery{msg}) {
msg.Ack(false)
err := msg.Ack(false)
if err != nil {
consumer.logger.Printf("can't ack message: %v", err)
}
} else {
msg.Nack(false, true)
err := msg.Nack(false, true)
if err != nil {
consumer.logger.Printf("can't nack message: %v", err)
}
}
}
consumer.logger.Println("rabbit consumer goroutine closed")
consumer.logger.Printf("rabbit consumer goroutine closed")
}()
}
consumer.logger.Printf("Processing messages on %v goroutines", consumeOptions.Concurrency)


+ 43
- 0
examples/logger/main.go View File

@ -0,0 +1,43 @@
package main
import (
"log"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
// CustomLog is used in WithPublisherOptionsLogger to create a custom logger.
type CustomLog struct{}
func (c *CustomLog) Printf(fmt string, args ...interface{}) {
log.Printf("mylogger: "+fmt, args...)
}
func main() {
mylogger := &CustomLog{}
publisher, returns, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost",
rabbitmq.WithPublisherOptionsLogger(mylogger),
)
if err != nil {
log.Fatal(err)
}
err = publisher.Publish(
[]byte("hello, world"),
[]string{"routing_key"},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
rabbitmq.WithPublishOptionsExchange("events"),
)
if err != nil {
log.Fatal(err)
}
go func() {
for r := range returns {
log.Printf("message returned from server: %s", string(r.Body))
}
}()
}

+ 13
- 11
logger.go View File

@ -5,20 +5,22 @@ import (
"log"
)
type logger struct {
logging bool
// Logger is the interface to send logs to. It can be set using
// WithPublisherOptionsLogger() or WithConsumerOptionsLogger().
type Logger interface {
Printf(string, ...interface{})
}
const loggingPrefix = "gorabbit"
func (l logger) Printf(format string, v ...interface{}) {
if l.logging {
log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...)
}
}
// stdlog logs to stdout using go's default logger.
type stdlog struct{}
func (l logger) Println(v ...interface{}) {
if l.logging {
log.Println(loggingPrefix, fmt.Sprintf("%v", v...))
}
func (l stdlog) Printf(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...)
}
// nolog does not log at all, this is the default.
type nolog struct{}
func (l nolog) Printf(format string, v ...interface{}) {}

+ 19
- 5
publish.go View File

@ -85,18 +85,29 @@ type Publisher struct {
disablePublishDueToFlow bool
disablePublishDueToFlowMux *sync.RWMutex
logger logger
logger Logger
}
// PublisherOptions are used to describe a publisher's configuration.
// Logging set to true will enable the consumer to print to stdout
type PublisherOptions struct {
Logging bool
Logger Logger
}
// WithPublisherOptionsLogging sets logging to true on the consumer options
func WithPublisherOptionsLogging(options *PublisherOptions) {
options.Logging = true
options.Logger = &stdlog{}
}
// WithPublisherOptionLogger sets logging to a custom interface.
// Use WithPublisherOptionsLogging to just log to stdout.
func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) {
return func(options *PublisherOptions) {
options.Logging = true
options.Logger = log
}
}
// NewPublisher returns a new publisher with an open channel to the cluster.
@ -109,8 +120,11 @@ func NewPublisher(url string, optionFuncs ...func(*PublisherOptions)) (Publisher
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
if options.Logger == nil {
options.Logger = &nolog{} // default no logging
}
chManager, err := newChannelManager(url, options.Logging)
chManager, err := newChannelManager(url, options.Logger)
if err != nil {
return Publisher{}, nil, err
}
@ -120,7 +134,7 @@ func NewPublisher(url string, optionFuncs ...func(*PublisherOptions)) (Publisher
notifyFlowChan: make(chan bool),
disablePublishDueToFlow: false,
disablePublishDueToFlowMux: &sync.RWMutex{},
logger: logger{logging: options.Logging},
logger: options.Logger,
}
returnAMQPChan := make(chan amqp.Return)
@ -182,13 +196,13 @@ func (publisher *Publisher) Publish(
func (publisher *Publisher) startNotifyFlowHandler() {
for ok := range publisher.notifyFlowChan {
publisher.disablePublishDueToFlowMux.Lock()
publisher.logger.Println("pausing publishing due to flow request from server")
publisher.logger.Printf("pausing publishing due to flow request from server")
if ok {
publisher.disablePublishDueToFlow = false
} else {
publisher.disablePublishDueToFlow = true
}
publisher.disablePublishDueToFlowMux.Unlock()
publisher.logger.Println("resuming publishing due to flow request from server")
publisher.logger.Printf("resuming publishing due to flow request from server")
}
}

Loading…
Cancel
Save