Browse Source

Merge pull request #4 from tomarus/main

Replace logger with a Logger interface to allow for custom loggers.
pull/6/head
Lane Wagner 5 years ago
committed by GitHub
parent
commit
0f092bce0c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 110 additions and 29 deletions
  1. +7
    -7
      channel.go
  2. +27
    -6
      consume.go
  3. +44
    -0
      examples/logger/main.go
  4. +13
    -11
      logger.go
  5. +19
    -5
      publish.go

+ 7
- 7
channel.go View File

@ -9,21 +9,21 @@ import (
) )
type channelManager struct { type channelManager struct {
logger logger
logger Logger
url string url string
channel *amqp.Channel channel *amqp.Channel
channelMux *sync.RWMutex channelMux *sync.RWMutex
notifyCancelOrClose chan error notifyCancelOrClose chan error
} }
func newChannelManager(url string, logging bool) (*channelManager, error) {
func newChannelManager(url string, log Logger) (*channelManager, error) {
ch, err := getNewChannel(url) ch, err := getNewChannel(url)
if err != nil { if err != nil {
return nil, err return nil, err
} }
chManager := channelManager{ chManager := channelManager{
logger: logger{logging: logging},
logger: log,
url: url, url: url,
channel: ch, channel: ch,
channelMux: &sync.RWMutex{}, channelMux: &sync.RWMutex{},
@ -56,14 +56,14 @@ func (chManager *channelManager) startNotifyCancelOrClosed() {
notifyCancelChan = chManager.channel.NotifyCancel(notifyCancelChan) notifyCancelChan = chManager.channel.NotifyCancel(notifyCancelChan)
select { select {
case err := <-notifyCloseChan: case err := <-notifyCloseChan:
chManager.logger.Println("attempting to reconnect to amqp server after close")
chManager.logger.Printf("attempting to reconnect to amqp server after close")
chManager.reconnectWithBackoff() chManager.reconnectWithBackoff()
chManager.logger.Println("successfully reconnected to amqp server after close")
chManager.logger.Printf("successfully reconnected to amqp server after close")
chManager.notifyCancelOrClose <- err chManager.notifyCancelOrClose <- err
case err := <-notifyCancelChan: case err := <-notifyCancelChan:
chManager.logger.Println("attempting to reconnect to amqp server after cancel")
chManager.logger.Printf("attempting to reconnect to amqp server after cancel")
chManager.reconnectWithBackoff() chManager.reconnectWithBackoff()
chManager.logger.Println("successfully reconnected to amqp server after cancel")
chManager.logger.Printf("successfully reconnected to amqp server after cancel")
chManager.notifyCancelOrClose <- errors.New(err) chManager.notifyCancelOrClose <- errors.New(err)
} }
close(notifyCancelChan) close(notifyCancelChan)


+ 27
- 6
consume.go View File

@ -9,13 +9,15 @@ import (
// Consumer allows you to create and connect to queues for data consumption. // Consumer allows you to create and connect to queues for data consumption.
type Consumer struct { type Consumer struct {
chManager *channelManager chManager *channelManager
logger logger
logger Logger
} }
// ConsumerOptions are used to describe a consumer's configuration. // ConsumerOptions are used to describe a consumer's configuration.
// Logging set to true will enable the consumer to print to stdout // Logging set to true will enable the consumer to print to stdout
// Logger specifies a custom Logger interface implementation overruling Logging.
type ConsumerOptions struct { type ConsumerOptions struct {
Logging bool Logging bool
Logger Logger
} }
// Delivery captures the fields for a previously delivered message resident in // Delivery captures the fields for a previously delivered message resident in
@ -31,14 +33,17 @@ func NewConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, e
for _, optionFunc := range optionFuncs { for _, optionFunc := range optionFuncs {
optionFunc(options) optionFunc(options)
} }
if options.Logger == nil {
options.Logger = &nolog{} // default no logging
}
chManager, err := newChannelManager(url, options.Logging)
chManager, err := newChannelManager(url, options.Logger)
if err != nil { if err != nil {
return Consumer{}, err return Consumer{}, err
} }
consumer := Consumer{ consumer := Consumer{
chManager: chManager, chManager: chManager,
logger: logger{logging: options.Logging},
logger: options.Logger,
} }
return consumer, nil return consumer, nil
} }
@ -46,6 +51,16 @@ func NewConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, e
// WithConsumerOptionsLogging sets logging to true on the consumer options // WithConsumerOptionsLogging sets logging to true on the consumer options
func WithConsumerOptionsLogging(options *ConsumerOptions) { func WithConsumerOptionsLogging(options *ConsumerOptions) {
options.Logging = true options.Logging = true
options.Logger = &stdlog{}
}
// WithConsumerOptionsLogger sets logging to a custom interface.
// Use WithConsumerOptionsLogging to just log to stdout.
func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Logging = true
options.Logger = log
}
} }
// getDefaultConsumeOptions descibes the options that will be used when a value isn't provided // getDefaultConsumeOptions descibes the options that will be used when a value isn't provided
@ -330,12 +345,18 @@ func (consumer Consumer) startGoroutines(
continue continue
} }
if handler(Delivery{msg}) { if handler(Delivery{msg}) {
msg.Ack(false)
err := msg.Ack(false)
if err != nil {
consumer.logger.Printf("can't ack message: %v", err)
}
} else { } else {
msg.Nack(false, true)
err := msg.Nack(false, true)
if err != nil {
consumer.logger.Printf("can't nack message: %v", err)
}
} }
} }
consumer.logger.Println("rabbit consumer goroutine closed")
consumer.logger.Printf("rabbit consumer goroutine closed")
}() }()
} }
consumer.logger.Printf("Processing messages on %v goroutines", consumeOptions.Concurrency) consumer.logger.Printf("Processing messages on %v goroutines", consumeOptions.Concurrency)


+ 44
- 0
examples/logger/main.go View File

@ -0,0 +1,44 @@
package main
import (
"log"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
// CustomLog is used in WithPublisherOptionsLogger to create a custom logger.
type CustomLog struct{}
// Printf is the only method needed in the Logger interface to function properly.
func (c *CustomLog) Printf(fmt string, args ...interface{}) {
log.Printf("mylogger: "+fmt, args...)
}
func main() {
mylogger := &CustomLog{}
publisher, returns, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost",
rabbitmq.WithPublisherOptionsLogger(mylogger),
)
if err != nil {
log.Fatal(err)
}
err = publisher.Publish(
[]byte("hello, world"),
[]string{"routing_key"},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
rabbitmq.WithPublishOptionsExchange("events"),
)
if err != nil {
log.Fatal(err)
}
go func() {
for r := range returns {
log.Printf("message returned from server: %s", string(r.Body))
}
}()
}

+ 13
- 11
logger.go View File

@ -5,20 +5,22 @@ import (
"log" "log"
) )
type logger struct {
logging bool
// Logger is the interface to send logs to. It can be set using
// WithPublisherOptionsLogger() or WithConsumerOptionsLogger().
type Logger interface {
Printf(string, ...interface{})
} }
const loggingPrefix = "gorabbit" const loggingPrefix = "gorabbit"
func (l logger) Printf(format string, v ...interface{}) {
if l.logging {
log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...)
}
}
// stdlog logs to stdout using go's default logger.
type stdlog struct{}
func (l logger) Println(v ...interface{}) {
if l.logging {
log.Println(loggingPrefix, fmt.Sprintf("%v", v...))
}
func (l stdlog) Printf(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...)
} }
// nolog does not log at all, this is the default.
type nolog struct{}
func (l nolog) Printf(format string, v ...interface{}) {}

+ 19
- 5
publish.go View File

@ -85,18 +85,29 @@ type Publisher struct {
disablePublishDueToFlow bool disablePublishDueToFlow bool
disablePublishDueToFlowMux *sync.RWMutex disablePublishDueToFlowMux *sync.RWMutex
logger logger
logger Logger
} }
// PublisherOptions are used to describe a publisher's configuration. // PublisherOptions are used to describe a publisher's configuration.
// Logging set to true will enable the consumer to print to stdout // Logging set to true will enable the consumer to print to stdout
type PublisherOptions struct { type PublisherOptions struct {
Logging bool Logging bool
Logger Logger
} }
// WithPublisherOptionsLogging sets logging to true on the consumer options // WithPublisherOptionsLogging sets logging to true on the consumer options
func WithPublisherOptionsLogging(options *PublisherOptions) { func WithPublisherOptionsLogging(options *PublisherOptions) {
options.Logging = true options.Logging = true
options.Logger = &stdlog{}
}
// WithPublisherOptionsLogger sets logging to a custom interface.
// Use WithPublisherOptionsLogging to just log to stdout.
func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) {
return func(options *PublisherOptions) {
options.Logging = true
options.Logger = log
}
} }
// NewPublisher returns a new publisher with an open channel to the cluster. // NewPublisher returns a new publisher with an open channel to the cluster.
@ -109,8 +120,11 @@ func NewPublisher(url string, optionFuncs ...func(*PublisherOptions)) (Publisher
for _, optionFunc := range optionFuncs { for _, optionFunc := range optionFuncs {
optionFunc(options) optionFunc(options)
} }
if options.Logger == nil {
options.Logger = &nolog{} // default no logging
}
chManager, err := newChannelManager(url, options.Logging)
chManager, err := newChannelManager(url, options.Logger)
if err != nil { if err != nil {
return Publisher{}, nil, err return Publisher{}, nil, err
} }
@ -120,7 +134,7 @@ func NewPublisher(url string, optionFuncs ...func(*PublisherOptions)) (Publisher
notifyFlowChan: make(chan bool), notifyFlowChan: make(chan bool),
disablePublishDueToFlow: false, disablePublishDueToFlow: false,
disablePublishDueToFlowMux: &sync.RWMutex{}, disablePublishDueToFlowMux: &sync.RWMutex{},
logger: logger{logging: options.Logging},
logger: options.Logger,
} }
returnAMQPChan := make(chan amqp.Return) returnAMQPChan := make(chan amqp.Return)
@ -182,13 +196,13 @@ func (publisher *Publisher) Publish(
func (publisher *Publisher) startNotifyFlowHandler() { func (publisher *Publisher) startNotifyFlowHandler() {
for ok := range publisher.notifyFlowChan { for ok := range publisher.notifyFlowChan {
publisher.disablePublishDueToFlowMux.Lock() publisher.disablePublishDueToFlowMux.Lock()
publisher.logger.Println("pausing publishing due to flow request from server")
publisher.logger.Printf("pausing publishing due to flow request from server")
if ok { if ok {
publisher.disablePublishDueToFlow = false publisher.disablePublishDueToFlow = false
} else { } else {
publisher.disablePublishDueToFlow = true publisher.disablePublishDueToFlow = true
} }
publisher.disablePublishDueToFlowMux.Unlock() publisher.disablePublishDueToFlowMux.Unlock()
publisher.logger.Println("resuming publishing due to flow request from server")
publisher.logger.Printf("resuming publishing due to flow request from server")
} }
} }

Loading…
Cancel
Save