Browse Source

logger

pull/80/head
wagslane 4 years ago
parent
commit
9d2ba192df
6 changed files with 77 additions and 47 deletions
  1. +7
    -7
      channel.go
  2. +12
    -17
      consume.go
  3. +21
    -6
      examples/logger/main.go
  4. +27
    -4
      logger.go
  5. +6
    -9
      publish.go
  6. +4
    -4
      publish_flow_block.go

+ 7
- 7
channel.go View File

@ -62,18 +62,18 @@ func (chManager *channelManager) startNotifyCancelOrClosed() {
select {
case err := <-notifyCloseChan:
if err != nil {
chManager.logger.Printf("attempting to reconnect to amqp server after close with error: %v", err)
chManager.logger.ErrorF("attempting to reconnect to amqp server after close with error: %v", err)
chManager.reconnectLoop()
chManager.logger.Printf("successfully reconnected to amqp server")
chManager.logger.WarnF("successfully reconnected to amqp server")
chManager.notifyCancelOrClose <- err
}
if err == nil {
chManager.logger.Printf("amqp channel closed gracefully")
chManager.logger.InfoF("amqp channel closed gracefully")
}
case err := <-notifyCancelChan:
chManager.logger.Printf("attempting to reconnect to amqp server after cancel with error: %s", err)
chManager.logger.ErrorF("attempting to reconnect to amqp server after cancel with error: %s", err)
chManager.reconnectLoop()
chManager.logger.Printf("successfully reconnected to amqp server after cancel")
chManager.logger.WarnF("successfully reconnected to amqp server after cancel")
chManager.notifyCancelOrClose <- errors.New(err)
}
}
@ -81,11 +81,11 @@ func (chManager *channelManager) startNotifyCancelOrClosed() {
// reconnectLoop continuously attempts to reconnect
func (chManager *channelManager) reconnectLoop() {
for {
chManager.logger.Printf("waiting %s seconds to attempt to reconnect to amqp server", chManager.reconnectInterval)
chManager.logger.InfoF("waiting %s seconds to attempt to reconnect to amqp server", chManager.reconnectInterval)
time.Sleep(chManager.reconnectInterval)
err := chManager.reconnect()
if err != nil {
chManager.logger.Printf("error reconnecting to amqp server: %v", err)
chManager.logger.ErrorF("error reconnecting to amqp server: %v", err)
} else {
chManager.reconnectionCount++
go chManager.startNotifyCancelOrClosed()


+ 12
- 17
consume.go View File

@ -29,10 +29,8 @@ type Consumer struct {
}
// ConsumerOptions are used to describe a consumer's configuration.
// Logging set to true will enable the consumer to print to stdout
// Logger specifies a custom Logger interface implementation overruling Logging.
// Logger specifies a custom Logger interface implementation.
type ConsumerOptions struct {
Logging bool
Logger Logger
ReconnectInterval time.Duration
}
@ -47,8 +45,7 @@ type Delivery struct {
// NewConsumer returns a new Consumer connected to the given rabbitmq server
func NewConsumer(url string, config Config, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) {
options := &ConsumerOptions{
Logging: true,
Logger: &stdLogger{},
Logger: &stdDebugLogger{},
ReconnectInterval: time.Second * 5,
}
for _, optionFunc := range optionFuncs {
@ -74,17 +71,15 @@ func WithConsumerOptionsReconnectInterval(reconnectInterval time.Duration) func(
}
}
// WithConsumerOptionsLogging sets a logger to log to stdout
// WithConsumerOptionsLogging uses a default logger that writes to std out
func WithConsumerOptionsLogging(options *ConsumerOptions) {
options.Logging = true
options.Logger = &stdLogger{}
options.Logger = &stdDebugLogger{}
}
// WithConsumerOptionsLogger sets logging to a custom interface.
// Use WithConsumerOptionsLogging to just log to stdout.
func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Logging = true
options.Logger = log
}
}
@ -117,7 +112,7 @@ func (consumer Consumer) StartConsuming(
go func() {
for err := range consumer.chManager.notifyCancelOrClose {
consumer.logger.Printf("successful recovery from: %v", err)
consumer.logger.InfoF("successful recovery from: %v", err)
err = consumer.startGoroutines(
handler,
queue,
@ -125,7 +120,7 @@ func (consumer Consumer) StartConsuming(
*options,
)
if err != nil {
consumer.logger.Printf("error restarting consumer goroutines after cancel or close: %v", err)
consumer.logger.ErrorF("error restarting consumer goroutines after cancel or close: %v", err)
}
}
}()
@ -135,7 +130,7 @@ func (consumer Consumer) StartConsuming(
// Close cleans up resources and closes the consumer.
// The consumer is not safe for reuse
func (consumer Consumer) Close() error {
consumer.chManager.logger.Printf("closing consumer...")
consumer.chManager.logger.InfoF("closing consumer...")
return consumer.chManager.close()
}
@ -223,7 +218,7 @@ func (consumer Consumer) startGoroutines(
for i := 0; i < consumeOptions.Concurrency; i++ {
go handlerGoroutine(consumer, msgs, consumeOptions, handler)
}
consumer.logger.Printf("Processing messages on %v goroutines", consumeOptions.Concurrency)
consumer.logger.InfoF("Processing messages on %v goroutines", consumeOptions.Concurrency)
return nil
}
@ -237,19 +232,19 @@ func handlerGoroutine(consumer Consumer, msgs <-chan amqp.Delivery, consumeOptio
case Ack:
err := msg.Ack(false)
if err != nil {
consumer.logger.Printf("can't ack message: %v", err)
consumer.logger.ErrorF("can't ack message: %v", err)
}
case NackDiscard:
err := msg.Nack(false, false)
if err != nil {
consumer.logger.Printf("can't nack message: %v", err)
consumer.logger.ErrorF("can't nack message: %v", err)
}
case NackRequeue:
err := msg.Nack(false, true)
if err != nil {
consumer.logger.Printf("can't nack message: %v", err)
consumer.logger.ErrorF("can't nack message: %v", err)
}
}
}
consumer.logger.Printf("rabbit consumer goroutine closed")
consumer.logger.InfoF("rabbit consumer goroutine closed")
}

+ 21
- 6
examples/logger/main.go View File

@ -6,16 +6,31 @@ import (
rabbitmq "github.com/wagslane/go-rabbitmq"
)
// customLogger is used in WithPublisherOptionsLogger to create a custom logger.
type customLogger struct{}
// errorLogger is used in WithPublisherOptionsLogger to create a custom logger
// that only logs ERROR and FATAL log levels
type errorLogger struct{}
// Printf is the only method needed in the Logger interface to function properly.
func (c *customLogger) Printf(fmt string, args ...interface{}) {
log.Printf("mylogger: "+fmt, args...)
func (l errorLogger) FatalF(format string, v ...interface{}) {
log.Printf("mylogger: "+format, v...)
}
func (l errorLogger) ErrorF(format string, v ...interface{}) {
log.Printf("mylogger: "+format, v...)
}
func (l errorLogger) WarnF(format string, v ...interface{}) {
}
func (l errorLogger) InfoF(format string, v ...interface{}) {
}
func (l errorLogger) DebugF(format string, v ...interface{}) {
}
func (l errorLogger) TraceF(format string, v ...interface{}) {}
func main() {
mylogger := &customLogger{}
mylogger := &errorLogger{}
publisher, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost", rabbitmq.Config{},


+ 27
- 4
logger.go View File

@ -8,14 +8,37 @@ import (
// Logger is the interface to send logs to. It can be set using
// WithPublisherOptionsLogger() or WithConsumerOptionsLogger().
type Logger interface {
Printf(string, ...interface{})
FatalF(string, ...interface{})
ErrorF(string, ...interface{})
WarnF(string, ...interface{})
InfoF(string, ...interface{})
DebugF(string, ...interface{})
TraceF(string, ...interface{})
}
const loggingPrefix = "gorabbit"
// stdLogger logs to stdout using go's default logger.
type stdLogger struct{}
// stdDebugLogger logs to stdout up to the `DebugF` level
type stdDebugLogger struct{}
func (l stdLogger) Printf(format string, v ...interface{}) {
func (l stdDebugLogger) FatalF(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...)
}
func (l stdDebugLogger) ErrorF(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...)
}
func (l stdDebugLogger) WarnF(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...)
}
func (l stdDebugLogger) InfoF(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...)
}
func (l stdDebugLogger) DebugF(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...)
}
func (l stdDebugLogger) TraceF(format string, v ...interface{}) {}

+ 6
- 9
publish.go View File

@ -54,9 +54,8 @@ type Publisher struct {
}
// PublisherOptions are used to describe a publisher's configuration.
// Logging set to true will enable the consumer to print to stdout
// Logger is a custom logging interface.
type PublisherOptions struct {
Logging bool
Logger Logger
ReconnectInterval time.Duration
}
@ -70,16 +69,15 @@ func WithPublisherOptionsReconnectInterval(reconnectInterval time.Duration) func
}
// WithPublisherOptionsLogging sets logging to true on the consumer options
// and sets the
func WithPublisherOptionsLogging(options *PublisherOptions) {
options.Logging = true
options.Logger = &stdLogger{}
options.Logger = &stdDebugLogger{}
}
// WithPublisherOptionsLogger sets logging to a custom interface.
// Use WithPublisherOptionsLogging to just log to stdout.
func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) {
return func(options *PublisherOptions) {
options.Logging = true
options.Logger = log
}
}
@ -91,8 +89,7 @@ func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) {
// will fail with an error when the server is requesting a slowdown
func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptions)) (*Publisher, error) {
options := &PublisherOptions{
Logging: true,
Logger: &stdLogger{},
Logger: &stdDebugLogger{},
ReconnectInterval: time.Second * 5,
}
for _, optionFunc := range optionFuncs {
@ -124,7 +121,7 @@ func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptio
func (publisher *Publisher) handleRestarts() {
for err := range publisher.chManager.notifyCancelOrClose {
publisher.options.Logger.Printf("successful publisher recovery from: %v", err)
publisher.options.Logger.InfoF("successful publisher recovery from: %v", err)
go publisher.startNotifyFlowHandler()
if publisher.notifyReturnChan != nil {
go publisher.startNotifyReturnHandler()
@ -211,7 +208,7 @@ func (publisher *Publisher) Publish(
// Close closes the publisher and releases resources
// The publisher should be discarded as it's not safe for re-use
func (publisher Publisher) Close() error {
publisher.chManager.logger.Printf("closing publisher...")
publisher.chManager.logger.InfoF("closing publisher...")
return publisher.chManager.close()
}


+ 4
- 4
publish_flow_block.go View File

@ -13,11 +13,11 @@ func (publisher *Publisher) startNotifyFlowHandler() {
for ok := range notifyFlowChan {
publisher.disablePublishDueToFlowMux.Lock()
if ok {
publisher.options.Logger.Printf("pausing publishing due to flow request from server")
publisher.options.Logger.WarnF("pausing publishing due to flow request from server")
publisher.disablePublishDueToFlow = true
} else {
publisher.disablePublishDueToFlow = false
publisher.options.Logger.Printf("resuming publishing due to flow request from server")
publisher.options.Logger.WarnF("resuming publishing due to flow request from server")
}
publisher.disablePublishDueToFlowMux.Unlock()
}
@ -32,11 +32,11 @@ func (publisher *Publisher) startNotifyBlockedHandler() {
for b := range blockings {
publisher.disablePublishDueToBlockedMux.Lock()
if b.Active {
publisher.options.Logger.Printf("pausing publishing due to TCP blocking from server")
publisher.options.Logger.WarnF("pausing publishing due to TCP blocking from server")
publisher.disablePublishDueToBlocked = true
} else {
publisher.disablePublishDueToBlocked = false
publisher.options.Logger.Printf("resuming publishing due to TCP blocking from server")
publisher.options.Logger.WarnF("resuming publishing due to TCP blocking from server")
}
publisher.disablePublishDueToBlockedMux.Unlock()
}


Loading…
Cancel
Save