diff --git a/connection.go b/connection.go index 97b8bb4..b15c741 100644 --- a/connection.go +++ b/connection.go @@ -3,6 +3,7 @@ package rabbitmq import ( amqp "github.com/rabbitmq/amqp091-go" "github.com/wagslane/go-rabbitmq/internal/connectionmanager" + "log" ) // Conn manages the connection to a rabbit cluster @@ -29,7 +30,7 @@ func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error) optionFunc(options) } - manager, err := connectionmanager.NewConnectionManager(url, amqp.Config(options.Config), options.Logger, options.ReconnectInterval) + manager, err := connectionmanager.NewConnectionManager(url, amqp.Config(options.Config), options.ReconnectInterval) if err != nil { return nil, err } @@ -48,7 +49,7 @@ func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error) func (conn *Conn) handleRestarts() { for err := range conn.reconnectErrCh { - conn.options.Logger.Infof("successful connection recovery from: %v", err) + log.Printf("successful connection recovery from: %v", err) } } diff --git a/connection_options.go b/connection_options.go index aa47a55..c2035d6 100644 --- a/connection_options.go +++ b/connection_options.go @@ -5,7 +5,6 @@ import "time" // ConnectionOptions are used to describe how a new consumer will be created. type ConnectionOptions struct { ReconnectInterval time.Duration - Logger Logger Config Config } @@ -13,7 +12,6 @@ type ConnectionOptions struct { func getDefaultConnectionOptions() ConnectionOptions { return ConnectionOptions{ ReconnectInterval: time.Second * 5, - Logger: stdDebugLogger{}, Config: Config{}, } } @@ -25,20 +23,6 @@ func WithConnectionOptionsReconnectInterval(interval time.Duration) func(options } } -// WithConnectionOptionsLogging sets logging to true on the consumer options -// and sets the -func WithConnectionOptionsLogging(options *ConnectionOptions) { - options.Logger = stdDebugLogger{} -} - -// WithConnectionOptionsLogger sets logging to true on the consumer options -// and sets the -func WithConnectionOptionsLogger(log Logger) func(options *ConnectionOptions) { - return func(options *ConnectionOptions) { - options.Logger = log - } -} - // WithConnectionOptionsConfig sets the Config used in the connection func WithConnectionOptionsConfig(cfg Config) func(options *ConnectionOptions) { return func(options *ConnectionOptions) { diff --git a/consume.go b/consume.go index 4517c5d..f019787 100644 --- a/consume.go +++ b/consume.go @@ -60,7 +60,7 @@ func NewConsumer( return nil, errors.New("connection manager can't be nil") } - chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval) + chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, conn.connectionManager.ReconnectInterval) if err != nil { return nil, err } @@ -90,7 +90,6 @@ func (consumer *Consumer) Run(handler Handler) error { } for err := range consumer.reconnectErrCh { - consumer.options.Logger.Infof("successful consumer recovery from: %v", err) err = consumer.startGoroutines( handler, consumer.options, @@ -113,12 +112,8 @@ func (consumer *Consumer) Close() { consumer.isClosed = true // close the channel so that rabbitmq server knows that the // consumer has been stopped. - err := consumer.chanManager.Close() - if err != nil { - consumer.options.Logger.Warnf("error while closing the channel: %v", err) - } + consumer.chanManager.Close() - consumer.options.Logger.Infof("closing consumer...") go func() { consumer.closeConnectionToManagerCh <- struct{}{} }() @@ -170,7 +165,6 @@ func (consumer *Consumer) startGoroutines( for i := 0; i < options.Concurrency; i++ { go handlerGoroutine(consumer, msgs, options, handler) } - consumer.options.Logger.Infof("Processing messages on %v goroutines", options.Concurrency) return nil } @@ -193,21 +187,11 @@ func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOpti switch handler(Delivery{msg}) { case Ack: - err := msg.Ack(false) - if err != nil { - consumer.options.Logger.Errorf("can't ack message: %v", err) - } + msg.Ack(false) case NackDiscard: - err := msg.Nack(false, false) - if err != nil { - consumer.options.Logger.Errorf("can't nack message: %v", err) - } + msg.Nack(false, false) case NackRequeue: - err := msg.Nack(false, true) - if err != nil { - consumer.options.Logger.Errorf("can't nack message: %v", err) - } + msg.Nack(false, true) } } - consumer.options.Logger.Infof("rabbit consumer goroutine closed") } diff --git a/consumer_options.go b/consumer_options.go index 7de85cb..3d38f46 100644 --- a/consumer_options.go +++ b/consumer_options.go @@ -2,7 +2,6 @@ package rabbitmq import ( amqp "github.com/rabbitmq/amqp091-go" - "github.com/wagslane/go-rabbitmq/internal/logger" ) // getDefaultConsumerOptions describes the options that will be used when a value isn't provided @@ -28,7 +27,6 @@ func getDefaultConsumerOptions(queueName string) ConsumerOptions { }, ExchangeOptions: []ExchangeOptions{}, Concurrency: 1, - Logger: stdDebugLogger{}, QOSPrefetch: 10, QOSGlobal: false, } @@ -66,7 +64,6 @@ type ConsumerOptions struct { QueueOptions QueueOptions ExchangeOptions []ExchangeOptions Concurrency int - Logger logger.Logger QOSPrefetch int QOSGlobal bool } @@ -282,19 +279,6 @@ func WithConsumerOptionsConsumerNoWait(options *ConsumerOptions) { options.RabbitConsumerOptions.NoWait = true } -// WithConsumerOptionsLogging uses a default logger that writes to std out -func WithConsumerOptionsLogging(options *ConsumerOptions) { - options.Logger = &stdDebugLogger{} -} - -// WithConsumerOptionsLogger sets logging to a custom interface. -// Use WithConsumerOptionsLogging to just log to stdout. -func WithConsumerOptionsLogger(log logger.Logger) func(options *ConsumerOptions) { - return func(options *ConsumerOptions) { - options.Logger = log - } -} - // WithConsumerOptionsQOSPrefetch returns a function that sets the prefetch count, which means that // many messages will be fetched from the server in advance to help with throughput. // This doesn't affect the handler, messages are still processed one at a time. diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 3912b18..8795f41 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -11,10 +11,7 @@ import ( ) func main() { - conn, err := rabbitmq.NewConn( - "amqp://guest:guest@localhost", - rabbitmq.WithConnectionOptionsLogging, - ) + conn, err := rabbitmq.NewConn("amqp://guest:guest@localhost") if err != nil { log.Fatal(err) } diff --git a/examples/logger/main.go b/examples/logger/main.go deleted file mode 100644 index 1618e91..0000000 --- a/examples/logger/main.go +++ /dev/null @@ -1,66 +0,0 @@ -package main - -import ( - "context" - "log" - - rabbitmq "github.com/wagslane/go-rabbitmq" -) - -// errorLogger is used in WithPublisherOptionsLogger to create a custom logger -// that only logs ERROR and FATAL log levels -type errorLogger struct{} - -func (l errorLogger) Fatalf(format string, v ...interface{}) { - log.Printf("mylogger: "+format, v...) -} - -func (l errorLogger) Errorf(format string, v ...interface{}) { - log.Printf("mylogger: "+format, v...) -} - -func (l errorLogger) Warnf(format string, v ...interface{}) { -} - -func (l errorLogger) Infof(format string, v ...interface{}) { -} - -func (l errorLogger) Debugf(format string, v ...interface{}) { -} - -func main() { - mylogger := &errorLogger{} - - conn, err := rabbitmq.NewConn( - "amqp://guest:guest@localhost", - rabbitmq.WithConnectionOptionsLogging, - ) - if err != nil { - log.Fatal(err) - } - defer conn.Close() - - publisher, err := rabbitmq.NewPublisher( - conn, - rabbitmq.WithPublisherOptionsLogger(mylogger), - ) - if err != nil { - log.Fatal(err) - } - err = publisher.PublishWithContext( - context.Background(), - []byte("hello, world"), - []string{"my_routing_key"}, - rabbitmq.WithPublishOptionsContentType("application/json"), - rabbitmq.WithPublishOptionsMandatory, - rabbitmq.WithPublishOptionsPersistentDelivery, - rabbitmq.WithPublishOptionsExchange("events"), - ) - if err != nil { - log.Fatal(err) - } - - publisher.NotifyReturn(func(r rabbitmq.Return) { - log.Printf("message returned from server: %s", string(r.Body)) - }) -} diff --git a/examples/multiconsumer/main.go b/examples/multiconsumer/main.go index 90a4cc6..9f3c9cc 100644 --- a/examples/multiconsumer/main.go +++ b/examples/multiconsumer/main.go @@ -12,10 +12,7 @@ import ( ) func main() { - conn, err := rabbitmq.NewConn( - "amqp://guest:guest@localhost", - rabbitmq.WithConnectionOptionsLogging, - ) + conn, err := rabbitmq.NewConn("amqp://guest:guest@localhost") if err != nil { log.Fatal(err) } diff --git a/examples/multipublisher/main.go b/examples/multipublisher/main.go index 5121a3f..5c2e227 100644 --- a/examples/multipublisher/main.go +++ b/examples/multipublisher/main.go @@ -13,10 +13,7 @@ import ( ) func main() { - conn, err := rabbitmq.NewConn( - "amqp://guest:guest@localhost", - rabbitmq.WithConnectionOptionsLogging, - ) + conn, err := rabbitmq.NewConn("amqp://guest:guest@localhost") if err != nil { log.Fatal(err) } @@ -24,7 +21,6 @@ func main() { publisher, err := rabbitmq.NewPublisher( conn, - rabbitmq.WithPublisherOptionsLogging, rabbitmq.WithPublisherOptionsExchangeName("events"), rabbitmq.WithPublisherOptionsExchangeDeclare, ) @@ -43,7 +39,6 @@ func main() { publisher2, err := rabbitmq.NewPublisher( conn, - rabbitmq.WithPublisherOptionsLogging, rabbitmq.WithPublisherOptionsExchangeName("events"), rabbitmq.WithPublisherOptionsExchangeDeclare, ) diff --git a/examples/publisher/main.go b/examples/publisher/main.go index d07cc27..77baf45 100644 --- a/examples/publisher/main.go +++ b/examples/publisher/main.go @@ -15,7 +15,6 @@ import ( func main() { conn, err := rabbitmq.NewConn( "amqp://guest:guest@localhost", - rabbitmq.WithConnectionOptionsLogging, ) if err != nil { log.Fatal(err) @@ -24,7 +23,6 @@ func main() { publisher, err := rabbitmq.NewPublisher( conn, - rabbitmq.WithPublisherOptionsLogging, rabbitmq.WithPublisherOptionsExchangeName("events"), rabbitmq.WithPublisherOptionsExchangeDeclare, ) diff --git a/examples/publisher_confirm/main.go b/examples/publisher_confirm/main.go index f5aecaf..604999e 100644 --- a/examples/publisher_confirm/main.go +++ b/examples/publisher_confirm/main.go @@ -13,10 +13,7 @@ import ( ) func main() { - conn, err := rabbitmq.NewConn( - "amqp://guest:guest@localhost", - rabbitmq.WithConnectionOptionsLogging, - ) + conn, err := rabbitmq.NewConn("amqp://guest:guest@localhost") if err != nil { log.Fatal(err) } @@ -24,7 +21,6 @@ func main() { publisher, err := rabbitmq.NewPublisher( conn, - rabbitmq.WithPublisherOptionsLogging, rabbitmq.WithPublisherOptionsExchangeName("events"), rabbitmq.WithPublisherOptionsExchangeDeclare, rabbitmq.WithPublisherOptionsConfirm, diff --git a/internal/channelmanager/channel_manager.go b/internal/channelmanager/channel_manager.go index 67faf0a..d349c78 100644 --- a/internal/channelmanager/channel_manager.go +++ b/internal/channelmanager/channel_manager.go @@ -8,12 +8,10 @@ import ( amqp "github.com/rabbitmq/amqp091-go" "github.com/wagslane/go-rabbitmq/internal/connectionmanager" "github.com/wagslane/go-rabbitmq/internal/dispatcher" - "github.com/wagslane/go-rabbitmq/internal/logger" ) // ChannelManager - type ChannelManager struct { - logger logger.Logger channel *amqp.Channel connManager *connectionmanager.ConnectionManager channelMux *sync.RWMutex @@ -24,14 +22,13 @@ type ChannelManager struct { } // NewChannelManager creates a new connection manager -func NewChannelManager(connManager *connectionmanager.ConnectionManager, log logger.Logger, reconnectInterval time.Duration) (*ChannelManager, error) { +func NewChannelManager(connManager *connectionmanager.ConnectionManager, reconnectInterval time.Duration) (*ChannelManager, error) { ch, err := getNewChannel(connManager) if err != nil { return nil, err } chanManager := ChannelManager{ - logger: log, connManager: connManager, channel: ch, channelMux: &sync.RWMutex{}, @@ -66,18 +63,11 @@ func (chanManager *ChannelManager) startNotifyCancelOrClosed() { select { case err := <-notifyCloseChan: if err != nil { - chanManager.logger.Errorf("attempting to reconnect to amqp server after close with error: %v", err) chanManager.reconnectLoop() - chanManager.logger.Warnf("successfully reconnected to amqp server") chanManager.dispatcher.Dispatch(err) } - if err == nil { - chanManager.logger.Infof("amqp channel closed gracefully") - } case err := <-notifyCancelChan: - chanManager.logger.Errorf("attempting to reconnect to amqp server after cancel with error: %s", err) chanManager.reconnectLoop() - chanManager.logger.Warnf("successfully reconnected to amqp server after cancel") chanManager.dispatcher.Dispatch(errors.New(err)) } } @@ -98,12 +88,9 @@ func (chanManager *ChannelManager) incrementReconnectionCount() { // reconnectLoop continuously attempts to reconnect func (chanManager *ChannelManager) reconnectLoop() { for { - chanManager.logger.Infof("waiting %s seconds to attempt to reconnect to amqp server", chanManager.reconnectInterval) time.Sleep(chanManager.reconnectInterval) err := chanManager.reconnect() - if err != nil { - chanManager.logger.Errorf("error reconnecting to amqp server: %v", err) - } else { + if err == nil { chanManager.incrementReconnectionCount() go chanManager.startNotifyCancelOrClosed() return @@ -120,9 +107,7 @@ func (chanManager *ChannelManager) reconnect() error { return err } - if err = chanManager.channel.Close(); err != nil { - chanManager.logger.Warnf("error closing channel while reconnecting: %v", err) - } + chanManager.channel.Close() chanManager.channel = newChannel return nil @@ -130,7 +115,6 @@ func (chanManager *ChannelManager) reconnect() error { // Close safely closes the current channel and connection func (chanManager *ChannelManager) Close() error { - chanManager.logger.Infof("closing channel manager...") chanManager.channelMux.Lock() defer chanManager.channelMux.Unlock() diff --git a/internal/connectionmanager/connection_manager.go b/internal/connectionmanager/connection_manager.go index fce1f2b..153e7b2 100644 --- a/internal/connectionmanager/connection_manager.go +++ b/internal/connectionmanager/connection_manager.go @@ -6,12 +6,10 @@ import ( amqp "github.com/rabbitmq/amqp091-go" "github.com/wagslane/go-rabbitmq/internal/dispatcher" - "github.com/wagslane/go-rabbitmq/internal/logger" ) // ConnectionManager - type ConnectionManager struct { - logger logger.Logger url string connection *amqp.Connection amqpConfig amqp.Config @@ -23,13 +21,12 @@ type ConnectionManager struct { } // NewConnectionManager creates a new connection manager -func NewConnectionManager(url string, conf amqp.Config, log logger.Logger, reconnectInterval time.Duration) (*ConnectionManager, error) { +func NewConnectionManager(url string, conf amqp.Config, reconnectInterval time.Duration) (*ConnectionManager, error) { conn, err := amqp.DialConfig(url, amqp.Config(conf)) if err != nil { return nil, err } connManager := ConnectionManager{ - logger: log, url: url, connection: conn, amqpConfig: conf, @@ -45,7 +42,6 @@ func NewConnectionManager(url string, conf amqp.Config, log logger.Logger, recon // Close safely closes the current channel and connection func (connManager *ConnectionManager) Close() error { - connManager.logger.Infof("closing connection manager...") connManager.connectionMux.Lock() defer connManager.connectionMux.Unlock() @@ -82,14 +78,9 @@ func (connManager *ConnectionManager) startNotifyClose() { err := <-notifyCloseChan if err != nil { - connManager.logger.Errorf("attempting to reconnect to amqp server after connection close with error: %v", err) connManager.reconnectLoop() - connManager.logger.Warnf("successfully reconnected to amqp server") connManager.dispatcher.Dispatch(err) } - if err == nil { - connManager.logger.Infof("amqp connection closed gracefully") - } } // GetReconnectionCount - @@ -108,12 +99,9 @@ func (connManager *ConnectionManager) incrementReconnectionCount() { // reconnectLoop continuously attempts to reconnect func (connManager *ConnectionManager) reconnectLoop() { for { - connManager.logger.Infof("waiting %s seconds to attempt to reconnect to amqp server", connManager.ReconnectInterval) time.Sleep(connManager.ReconnectInterval) err := connManager.reconnect() - if err != nil { - connManager.logger.Errorf("error reconnecting to amqp server: %v", err) - } else { + if err == nil { connManager.incrementReconnectionCount() go connManager.startNotifyClose() return @@ -130,9 +118,7 @@ func (connManager *ConnectionManager) reconnect() error { return err } - if err = connManager.connection.Close(); err != nil { - connManager.logger.Warnf("error closing connection while reconnecting: %v", err) - } + connManager.connection.Close() connManager.connection = newConn return nil diff --git a/internal/logger/logger.go b/internal/logger/logger.go deleted file mode 100644 index 5da321f..0000000 --- a/internal/logger/logger.go +++ /dev/null @@ -1,11 +0,0 @@ -package logger - -// Logger is describes a logging structure. It can be set using -// WithPublisherOptionsLogger() or WithConsumerOptionsLogger(). -type Logger interface { - Fatalf(string, ...interface{}) - Errorf(string, ...interface{}) - Warnf(string, ...interface{}) - Infof(string, ...interface{}) - Debugf(string, ...interface{}) -} diff --git a/logger.go b/logger.go deleted file mode 100644 index 34ef793..0000000 --- a/logger.go +++ /dev/null @@ -1,41 +0,0 @@ -package rabbitmq - -import ( - "fmt" - "log" - - "github.com/wagslane/go-rabbitmq/internal/logger" -) - -// Logger is describes a logging structure. It can be set using -// WithPublisherOptionsLogger() or WithConsumerOptionsLogger(). -type Logger logger.Logger - -const loggingPrefix = "gorabbit" - -type stdDebugLogger struct{} - -// Fatalf - -func (l stdDebugLogger) Fatalf(format string, v ...interface{}) { - log.Fatalf(fmt.Sprintf("%s FATAL: %s", loggingPrefix, format), v...) -} - -// Errorf - -func (l stdDebugLogger) Errorf(format string, v ...interface{}) { - log.Printf(fmt.Sprintf("%s ERROR: %s", loggingPrefix, format), v...) -} - -// Warnf - -func (l stdDebugLogger) Warnf(format string, v ...interface{}) { - log.Printf(fmt.Sprintf("%s WARN: %s", loggingPrefix, format), v...) -} - -// Infof - -func (l stdDebugLogger) Infof(format string, v ...interface{}) { - log.Printf(fmt.Sprintf("%s INFO: %s", loggingPrefix, format), v...) -} - -// Debugf - -func (l stdDebugLogger) Debugf(format string, v ...interface{}) { - log.Printf(fmt.Sprintf("%s DEBUG: %s", loggingPrefix, format), v...) -} diff --git a/publish.go b/publish.go index a58b48d..58519e6 100644 --- a/publish.go +++ b/publish.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "log" "sync" amqp "github.com/rabbitmq/amqp091-go" @@ -78,7 +79,7 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe return nil, errors.New("connection manager can't be nil") } - chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval) + chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, conn.connectionManager.ReconnectInterval) if err != nil { return nil, err } @@ -112,11 +113,9 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe go func() { for err := range publisher.reconnectErrCh { - publisher.options.Logger.Infof("successful publisher recovery from: %v", err) + log.Printf("successful publisher recovery from: %v", err) err := publisher.startup() if err != nil { - publisher.options.Logger.Fatalf("error on startup for publisher after cancel or close: %v", err) - publisher.options.Logger.Fatalf("publisher closing, unable to recover") return } publisher.startReturnHandler() @@ -281,11 +280,7 @@ func (publisher *Publisher) PublishWithDeferredConfirmWithContext( func (publisher *Publisher) Close() { // close the channel so that rabbitmq server knows that the // publisher has been stopped. - err := publisher.chanManager.Close() - if err != nil { - publisher.options.Logger.Warnf("error while closing the channel: %v", err) - } - publisher.options.Logger.Infof("closing publisher...") + publisher.chanManager.Close() go func() { publisher.closeConnectionToManagerCh <- struct{}{} }() diff --git a/publish_flow_block.go b/publish_flow_block.go index 5033037..e4e6588 100644 --- a/publish_flow_block.go +++ b/publish_flow_block.go @@ -13,11 +13,9 @@ func (publisher *Publisher) startNotifyFlowHandler() { for ok := range notifyFlowChan { publisher.disablePublishDueToFlowMux.Lock() if ok { - publisher.options.Logger.Warnf("pausing publishing due to flow request from server") publisher.disablePublishDueToFlow = true } else { publisher.disablePublishDueToFlow = false - publisher.options.Logger.Warnf("resuming publishing due to flow request from server") } publisher.disablePublishDueToFlowMux.Unlock() } @@ -32,11 +30,9 @@ func (publisher *Publisher) startNotifyBlockedHandler() { for b := range blockings { publisher.disablePublishDueToBlockedMux.Lock() if b.Active { - publisher.options.Logger.Warnf("pausing publishing due to TCP blocking from server") publisher.disablePublishDueToBlocked = true } else { publisher.disablePublishDueToBlocked = false - publisher.options.Logger.Warnf("resuming publishing due to TCP blocking from server") } publisher.disablePublishDueToBlockedMux.Unlock() } diff --git a/publisher_options.go b/publisher_options.go index 17d42cf..99a39b1 100644 --- a/publisher_options.go +++ b/publisher_options.go @@ -6,7 +6,6 @@ import amqp "github.com/rabbitmq/amqp091-go" // Logger is a custom logging interface. type PublisherOptions struct { ExchangeOptions ExchangeOptions - Logger Logger ConfirmMode bool } @@ -24,25 +23,10 @@ func getDefaultPublisherOptions() PublisherOptions { Args: Table{}, Declare: false, }, - Logger: stdDebugLogger{}, ConfirmMode: false, } } -// WithPublisherOptionsLogging sets logging to true on the publisher options -// and sets the -func WithPublisherOptionsLogging(options *PublisherOptions) { - options.Logger = &stdDebugLogger{} -} - -// WithPublisherOptionsLogger sets logging to a custom interface. -// Use WithPublisherOptionsLogging to just log to stdout. -func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) { - return func(options *PublisherOptions) { - options.Logger = log - } -} - // WithPublisherOptionsExchangeName sets the exchange name func WithPublisherOptionsExchangeName(name string) func(*PublisherOptions) { return func(options *PublisherOptions) {