Browse Source

upd: remove logs

pull/157/head
nickolajgrishuk 2 years ago
parent
commit
8ffbe6c293
17 changed files with 22 additions and 259 deletions
  1. +3
    -2
      connection.go
  2. +0
    -16
      connection_options.go
  3. +5
    -21
      consume.go
  4. +0
    -16
      consumer_options.go
  5. +1
    -4
      examples/consumer/main.go
  6. +0
    -66
      examples/logger/main.go
  7. +1
    -4
      examples/multiconsumer/main.go
  8. +1
    -6
      examples/multipublisher/main.go
  9. +0
    -2
      examples/publisher/main.go
  10. +1
    -5
      examples/publisher_confirm/main.go
  11. +3
    -19
      internal/channelmanager/channel_manager.go
  12. +3
    -17
      internal/connectionmanager/connection_manager.go
  13. +0
    -11
      internal/logger/logger.go
  14. +0
    -41
      logger.go
  15. +4
    -9
      publish.go
  16. +0
    -4
      publish_flow_block.go
  17. +0
    -16
      publisher_options.go

+ 3
- 2
connection.go View File

@ -3,6 +3,7 @@ package rabbitmq
import ( import (
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/connectionmanager" "github.com/wagslane/go-rabbitmq/internal/connectionmanager"
"log"
) )
// Conn manages the connection to a rabbit cluster // Conn manages the connection to a rabbit cluster
@ -29,7 +30,7 @@ func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error)
optionFunc(options) optionFunc(options)
} }
manager, err := connectionmanager.NewConnectionManager(url, amqp.Config(options.Config), options.Logger, options.ReconnectInterval)
manager, err := connectionmanager.NewConnectionManager(url, amqp.Config(options.Config), options.ReconnectInterval)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -48,7 +49,7 @@ func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error)
func (conn *Conn) handleRestarts() { func (conn *Conn) handleRestarts() {
for err := range conn.reconnectErrCh { for err := range conn.reconnectErrCh {
conn.options.Logger.Infof("successful connection recovery from: %v", err)
log.Printf("successful connection recovery from: %v", err)
} }
} }


+ 0
- 16
connection_options.go View File

@ -5,7 +5,6 @@ import "time"
// ConnectionOptions are used to describe how a new consumer will be created. // ConnectionOptions are used to describe how a new consumer will be created.
type ConnectionOptions struct { type ConnectionOptions struct {
ReconnectInterval time.Duration ReconnectInterval time.Duration
Logger Logger
Config Config Config Config
} }
@ -13,7 +12,6 @@ type ConnectionOptions struct {
func getDefaultConnectionOptions() ConnectionOptions { func getDefaultConnectionOptions() ConnectionOptions {
return ConnectionOptions{ return ConnectionOptions{
ReconnectInterval: time.Second * 5, ReconnectInterval: time.Second * 5,
Logger: stdDebugLogger{},
Config: Config{}, Config: Config{},
} }
} }
@ -25,20 +23,6 @@ func WithConnectionOptionsReconnectInterval(interval time.Duration) func(options
} }
} }
// WithConnectionOptionsLogging sets logging to true on the consumer options
// and sets the
func WithConnectionOptionsLogging(options *ConnectionOptions) {
options.Logger = stdDebugLogger{}
}
// WithConnectionOptionsLogger sets logging to true on the consumer options
// and sets the
func WithConnectionOptionsLogger(log Logger) func(options *ConnectionOptions) {
return func(options *ConnectionOptions) {
options.Logger = log
}
}
// WithConnectionOptionsConfig sets the Config used in the connection // WithConnectionOptionsConfig sets the Config used in the connection
func WithConnectionOptionsConfig(cfg Config) func(options *ConnectionOptions) { func WithConnectionOptionsConfig(cfg Config) func(options *ConnectionOptions) {
return func(options *ConnectionOptions) { return func(options *ConnectionOptions) {


+ 5
- 21
consume.go View File

@ -60,7 +60,7 @@ func NewConsumer(
return nil, errors.New("connection manager can't be nil") return nil, errors.New("connection manager can't be nil")
} }
chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval)
chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, conn.connectionManager.ReconnectInterval)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -90,7 +90,6 @@ func (consumer *Consumer) Run(handler Handler) error {
} }
for err := range consumer.reconnectErrCh { for err := range consumer.reconnectErrCh {
consumer.options.Logger.Infof("successful consumer recovery from: %v", err)
err = consumer.startGoroutines( err = consumer.startGoroutines(
handler, handler,
consumer.options, consumer.options,
@ -113,12 +112,8 @@ func (consumer *Consumer) Close() {
consumer.isClosed = true consumer.isClosed = true
// close the channel so that rabbitmq server knows that the // close the channel so that rabbitmq server knows that the
// consumer has been stopped. // consumer has been stopped.
err := consumer.chanManager.Close()
if err != nil {
consumer.options.Logger.Warnf("error while closing the channel: %v", err)
}
consumer.chanManager.Close()
consumer.options.Logger.Infof("closing consumer...")
go func() { go func() {
consumer.closeConnectionToManagerCh <- struct{}{} consumer.closeConnectionToManagerCh <- struct{}{}
}() }()
@ -170,7 +165,6 @@ func (consumer *Consumer) startGoroutines(
for i := 0; i < options.Concurrency; i++ { for i := 0; i < options.Concurrency; i++ {
go handlerGoroutine(consumer, msgs, options, handler) go handlerGoroutine(consumer, msgs, options, handler)
} }
consumer.options.Logger.Infof("Processing messages on %v goroutines", options.Concurrency)
return nil return nil
} }
@ -193,21 +187,11 @@ func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOpti
switch handler(Delivery{msg}) { switch handler(Delivery{msg}) {
case Ack: case Ack:
err := msg.Ack(false)
if err != nil {
consumer.options.Logger.Errorf("can't ack message: %v", err)
}
msg.Ack(false)
case NackDiscard: case NackDiscard:
err := msg.Nack(false, false)
if err != nil {
consumer.options.Logger.Errorf("can't nack message: %v", err)
}
msg.Nack(false, false)
case NackRequeue: case NackRequeue:
err := msg.Nack(false, true)
if err != nil {
consumer.options.Logger.Errorf("can't nack message: %v", err)
}
msg.Nack(false, true)
} }
} }
consumer.options.Logger.Infof("rabbit consumer goroutine closed")
} }

+ 0
- 16
consumer_options.go View File

@ -2,7 +2,6 @@ package rabbitmq
import ( import (
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/logger"
) )
// getDefaultConsumerOptions describes the options that will be used when a value isn't provided // getDefaultConsumerOptions describes the options that will be used when a value isn't provided
@ -28,7 +27,6 @@ func getDefaultConsumerOptions(queueName string) ConsumerOptions {
}, },
ExchangeOptions: []ExchangeOptions{}, ExchangeOptions: []ExchangeOptions{},
Concurrency: 1, Concurrency: 1,
Logger: stdDebugLogger{},
QOSPrefetch: 10, QOSPrefetch: 10,
QOSGlobal: false, QOSGlobal: false,
} }
@ -66,7 +64,6 @@ type ConsumerOptions struct {
QueueOptions QueueOptions QueueOptions QueueOptions
ExchangeOptions []ExchangeOptions ExchangeOptions []ExchangeOptions
Concurrency int Concurrency int
Logger logger.Logger
QOSPrefetch int QOSPrefetch int
QOSGlobal bool QOSGlobal bool
} }
@ -282,19 +279,6 @@ func WithConsumerOptionsConsumerNoWait(options *ConsumerOptions) {
options.RabbitConsumerOptions.NoWait = true options.RabbitConsumerOptions.NoWait = true
} }
// WithConsumerOptionsLogging uses a default logger that writes to std out
func WithConsumerOptionsLogging(options *ConsumerOptions) {
options.Logger = &stdDebugLogger{}
}
// WithConsumerOptionsLogger sets logging to a custom interface.
// Use WithConsumerOptionsLogging to just log to stdout.
func WithConsumerOptionsLogger(log logger.Logger) func(options *ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Logger = log
}
}
// WithConsumerOptionsQOSPrefetch returns a function that sets the prefetch count, which means that // WithConsumerOptionsQOSPrefetch returns a function that sets the prefetch count, which means that
// many messages will be fetched from the server in advance to help with throughput. // many messages will be fetched from the server in advance to help with throughput.
// This doesn't affect the handler, messages are still processed one at a time. // This doesn't affect the handler, messages are still processed one at a time.


+ 1
- 4
examples/consumer/main.go View File

@ -11,10 +11,7 @@ import (
) )
func main() { func main() {
conn, err := rabbitmq.NewConn(
"amqp://guest:guest@localhost",
rabbitmq.WithConnectionOptionsLogging,
)
conn, err := rabbitmq.NewConn("amqp://guest:guest@localhost")
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }


+ 0
- 66
examples/logger/main.go View File

@ -1,66 +0,0 @@
package main
import (
"context"
"log"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
// errorLogger is used in WithPublisherOptionsLogger to create a custom logger
// that only logs ERROR and FATAL log levels
type errorLogger struct{}
func (l errorLogger) Fatalf(format string, v ...interface{}) {
log.Printf("mylogger: "+format, v...)
}
func (l errorLogger) Errorf(format string, v ...interface{}) {
log.Printf("mylogger: "+format, v...)
}
func (l errorLogger) Warnf(format string, v ...interface{}) {
}
func (l errorLogger) Infof(format string, v ...interface{}) {
}
func (l errorLogger) Debugf(format string, v ...interface{}) {
}
func main() {
mylogger := &errorLogger{}
conn, err := rabbitmq.NewConn(
"amqp://guest:guest@localhost",
rabbitmq.WithConnectionOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
publisher, err := rabbitmq.NewPublisher(
conn,
rabbitmq.WithPublisherOptionsLogger(mylogger),
)
if err != nil {
log.Fatal(err)
}
err = publisher.PublishWithContext(
context.Background(),
[]byte("hello, world"),
[]string{"my_routing_key"},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
rabbitmq.WithPublishOptionsExchange("events"),
)
if err != nil {
log.Fatal(err)
}
publisher.NotifyReturn(func(r rabbitmq.Return) {
log.Printf("message returned from server: %s", string(r.Body))
})
}

+ 1
- 4
examples/multiconsumer/main.go View File

@ -12,10 +12,7 @@ import (
) )
func main() { func main() {
conn, err := rabbitmq.NewConn(
"amqp://guest:guest@localhost",
rabbitmq.WithConnectionOptionsLogging,
)
conn, err := rabbitmq.NewConn("amqp://guest:guest@localhost")
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }


+ 1
- 6
examples/multipublisher/main.go View File

@ -13,10 +13,7 @@ import (
) )
func main() { func main() {
conn, err := rabbitmq.NewConn(
"amqp://guest:guest@localhost",
rabbitmq.WithConnectionOptionsLogging,
)
conn, err := rabbitmq.NewConn("amqp://guest:guest@localhost")
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -24,7 +21,6 @@ func main() {
publisher, err := rabbitmq.NewPublisher( publisher, err := rabbitmq.NewPublisher(
conn, conn,
rabbitmq.WithPublisherOptionsLogging,
rabbitmq.WithPublisherOptionsExchangeName("events"), rabbitmq.WithPublisherOptionsExchangeName("events"),
rabbitmq.WithPublisherOptionsExchangeDeclare, rabbitmq.WithPublisherOptionsExchangeDeclare,
) )
@ -43,7 +39,6 @@ func main() {
publisher2, err := rabbitmq.NewPublisher( publisher2, err := rabbitmq.NewPublisher(
conn, conn,
rabbitmq.WithPublisherOptionsLogging,
rabbitmq.WithPublisherOptionsExchangeName("events"), rabbitmq.WithPublisherOptionsExchangeName("events"),
rabbitmq.WithPublisherOptionsExchangeDeclare, rabbitmq.WithPublisherOptionsExchangeDeclare,
) )


+ 0
- 2
examples/publisher/main.go View File

@ -15,7 +15,6 @@ import (
func main() { func main() {
conn, err := rabbitmq.NewConn( conn, err := rabbitmq.NewConn(
"amqp://guest:guest@localhost", "amqp://guest:guest@localhost",
rabbitmq.WithConnectionOptionsLogging,
) )
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -24,7 +23,6 @@ func main() {
publisher, err := rabbitmq.NewPublisher( publisher, err := rabbitmq.NewPublisher(
conn, conn,
rabbitmq.WithPublisherOptionsLogging,
rabbitmq.WithPublisherOptionsExchangeName("events"), rabbitmq.WithPublisherOptionsExchangeName("events"),
rabbitmq.WithPublisherOptionsExchangeDeclare, rabbitmq.WithPublisherOptionsExchangeDeclare,
) )


+ 1
- 5
examples/publisher_confirm/main.go View File

@ -13,10 +13,7 @@ import (
) )
func main() { func main() {
conn, err := rabbitmq.NewConn(
"amqp://guest:guest@localhost",
rabbitmq.WithConnectionOptionsLogging,
)
conn, err := rabbitmq.NewConn("amqp://guest:guest@localhost")
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -24,7 +21,6 @@ func main() {
publisher, err := rabbitmq.NewPublisher( publisher, err := rabbitmq.NewPublisher(
conn, conn,
rabbitmq.WithPublisherOptionsLogging,
rabbitmq.WithPublisherOptionsExchangeName("events"), rabbitmq.WithPublisherOptionsExchangeName("events"),
rabbitmq.WithPublisherOptionsExchangeDeclare, rabbitmq.WithPublisherOptionsExchangeDeclare,
rabbitmq.WithPublisherOptionsConfirm, rabbitmq.WithPublisherOptionsConfirm,


+ 3
- 19
internal/channelmanager/channel_manager.go View File

@ -8,12 +8,10 @@ import (
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/connectionmanager" "github.com/wagslane/go-rabbitmq/internal/connectionmanager"
"github.com/wagslane/go-rabbitmq/internal/dispatcher" "github.com/wagslane/go-rabbitmq/internal/dispatcher"
"github.com/wagslane/go-rabbitmq/internal/logger"
) )
// ChannelManager - // ChannelManager -
type ChannelManager struct { type ChannelManager struct {
logger logger.Logger
channel *amqp.Channel channel *amqp.Channel
connManager *connectionmanager.ConnectionManager connManager *connectionmanager.ConnectionManager
channelMux *sync.RWMutex channelMux *sync.RWMutex
@ -24,14 +22,13 @@ type ChannelManager struct {
} }
// NewChannelManager creates a new connection manager // NewChannelManager creates a new connection manager
func NewChannelManager(connManager *connectionmanager.ConnectionManager, log logger.Logger, reconnectInterval time.Duration) (*ChannelManager, error) {
func NewChannelManager(connManager *connectionmanager.ConnectionManager, reconnectInterval time.Duration) (*ChannelManager, error) {
ch, err := getNewChannel(connManager) ch, err := getNewChannel(connManager)
if err != nil { if err != nil {
return nil, err return nil, err
} }
chanManager := ChannelManager{ chanManager := ChannelManager{
logger: log,
connManager: connManager, connManager: connManager,
channel: ch, channel: ch,
channelMux: &sync.RWMutex{}, channelMux: &sync.RWMutex{},
@ -66,18 +63,11 @@ func (chanManager *ChannelManager) startNotifyCancelOrClosed() {
select { select {
case err := <-notifyCloseChan: case err := <-notifyCloseChan:
if err != nil { if err != nil {
chanManager.logger.Errorf("attempting to reconnect to amqp server after close with error: %v", err)
chanManager.reconnectLoop() chanManager.reconnectLoop()
chanManager.logger.Warnf("successfully reconnected to amqp server")
chanManager.dispatcher.Dispatch(err) chanManager.dispatcher.Dispatch(err)
} }
if err == nil {
chanManager.logger.Infof("amqp channel closed gracefully")
}
case err := <-notifyCancelChan: case err := <-notifyCancelChan:
chanManager.logger.Errorf("attempting to reconnect to amqp server after cancel with error: %s", err)
chanManager.reconnectLoop() chanManager.reconnectLoop()
chanManager.logger.Warnf("successfully reconnected to amqp server after cancel")
chanManager.dispatcher.Dispatch(errors.New(err)) chanManager.dispatcher.Dispatch(errors.New(err))
} }
} }
@ -98,12 +88,9 @@ func (chanManager *ChannelManager) incrementReconnectionCount() {
// reconnectLoop continuously attempts to reconnect // reconnectLoop continuously attempts to reconnect
func (chanManager *ChannelManager) reconnectLoop() { func (chanManager *ChannelManager) reconnectLoop() {
for { for {
chanManager.logger.Infof("waiting %s seconds to attempt to reconnect to amqp server", chanManager.reconnectInterval)
time.Sleep(chanManager.reconnectInterval) time.Sleep(chanManager.reconnectInterval)
err := chanManager.reconnect() err := chanManager.reconnect()
if err != nil {
chanManager.logger.Errorf("error reconnecting to amqp server: %v", err)
} else {
if err == nil {
chanManager.incrementReconnectionCount() chanManager.incrementReconnectionCount()
go chanManager.startNotifyCancelOrClosed() go chanManager.startNotifyCancelOrClosed()
return return
@ -120,9 +107,7 @@ func (chanManager *ChannelManager) reconnect() error {
return err return err
} }
if err = chanManager.channel.Close(); err != nil {
chanManager.logger.Warnf("error closing channel while reconnecting: %v", err)
}
chanManager.channel.Close()
chanManager.channel = newChannel chanManager.channel = newChannel
return nil return nil
@ -130,7 +115,6 @@ func (chanManager *ChannelManager) reconnect() error {
// Close safely closes the current channel and connection // Close safely closes the current channel and connection
func (chanManager *ChannelManager) Close() error { func (chanManager *ChannelManager) Close() error {
chanManager.logger.Infof("closing channel manager...")
chanManager.channelMux.Lock() chanManager.channelMux.Lock()
defer chanManager.channelMux.Unlock() defer chanManager.channelMux.Unlock()


+ 3
- 17
internal/connectionmanager/connection_manager.go View File

@ -6,12 +6,10 @@ import (
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/dispatcher" "github.com/wagslane/go-rabbitmq/internal/dispatcher"
"github.com/wagslane/go-rabbitmq/internal/logger"
) )
// ConnectionManager - // ConnectionManager -
type ConnectionManager struct { type ConnectionManager struct {
logger logger.Logger
url string url string
connection *amqp.Connection connection *amqp.Connection
amqpConfig amqp.Config amqpConfig amqp.Config
@ -23,13 +21,12 @@ type ConnectionManager struct {
} }
// NewConnectionManager creates a new connection manager // NewConnectionManager creates a new connection manager
func NewConnectionManager(url string, conf amqp.Config, log logger.Logger, reconnectInterval time.Duration) (*ConnectionManager, error) {
func NewConnectionManager(url string, conf amqp.Config, reconnectInterval time.Duration) (*ConnectionManager, error) {
conn, err := amqp.DialConfig(url, amqp.Config(conf)) conn, err := amqp.DialConfig(url, amqp.Config(conf))
if err != nil { if err != nil {
return nil, err return nil, err
} }
connManager := ConnectionManager{ connManager := ConnectionManager{
logger: log,
url: url, url: url,
connection: conn, connection: conn,
amqpConfig: conf, amqpConfig: conf,
@ -45,7 +42,6 @@ func NewConnectionManager(url string, conf amqp.Config, log logger.Logger, recon
// Close safely closes the current channel and connection // Close safely closes the current channel and connection
func (connManager *ConnectionManager) Close() error { func (connManager *ConnectionManager) Close() error {
connManager.logger.Infof("closing connection manager...")
connManager.connectionMux.Lock() connManager.connectionMux.Lock()
defer connManager.connectionMux.Unlock() defer connManager.connectionMux.Unlock()
@ -82,14 +78,9 @@ func (connManager *ConnectionManager) startNotifyClose() {
err := <-notifyCloseChan err := <-notifyCloseChan
if err != nil { if err != nil {
connManager.logger.Errorf("attempting to reconnect to amqp server after connection close with error: %v", err)
connManager.reconnectLoop() connManager.reconnectLoop()
connManager.logger.Warnf("successfully reconnected to amqp server")
connManager.dispatcher.Dispatch(err) connManager.dispatcher.Dispatch(err)
} }
if err == nil {
connManager.logger.Infof("amqp connection closed gracefully")
}
} }
// GetReconnectionCount - // GetReconnectionCount -
@ -108,12 +99,9 @@ func (connManager *ConnectionManager) incrementReconnectionCount() {
// reconnectLoop continuously attempts to reconnect // reconnectLoop continuously attempts to reconnect
func (connManager *ConnectionManager) reconnectLoop() { func (connManager *ConnectionManager) reconnectLoop() {
for { for {
connManager.logger.Infof("waiting %s seconds to attempt to reconnect to amqp server", connManager.ReconnectInterval)
time.Sleep(connManager.ReconnectInterval) time.Sleep(connManager.ReconnectInterval)
err := connManager.reconnect() err := connManager.reconnect()
if err != nil {
connManager.logger.Errorf("error reconnecting to amqp server: %v", err)
} else {
if err == nil {
connManager.incrementReconnectionCount() connManager.incrementReconnectionCount()
go connManager.startNotifyClose() go connManager.startNotifyClose()
return return
@ -130,9 +118,7 @@ func (connManager *ConnectionManager) reconnect() error {
return err return err
} }
if err = connManager.connection.Close(); err != nil {
connManager.logger.Warnf("error closing connection while reconnecting: %v", err)
}
connManager.connection.Close()
connManager.connection = newConn connManager.connection = newConn
return nil return nil


+ 0
- 11
internal/logger/logger.go View File

@ -1,11 +0,0 @@
package logger
// Logger is describes a logging structure. It can be set using
// WithPublisherOptionsLogger() or WithConsumerOptionsLogger().
type Logger interface {
Fatalf(string, ...interface{})
Errorf(string, ...interface{})
Warnf(string, ...interface{})
Infof(string, ...interface{})
Debugf(string, ...interface{})
}

+ 0
- 41
logger.go View File

@ -1,41 +0,0 @@
package rabbitmq
import (
"fmt"
"log"
"github.com/wagslane/go-rabbitmq/internal/logger"
)
// Logger is describes a logging structure. It can be set using
// WithPublisherOptionsLogger() or WithConsumerOptionsLogger().
type Logger logger.Logger
const loggingPrefix = "gorabbit"
type stdDebugLogger struct{}
// Fatalf -
func (l stdDebugLogger) Fatalf(format string, v ...interface{}) {
log.Fatalf(fmt.Sprintf("%s FATAL: %s", loggingPrefix, format), v...)
}
// Errorf -
func (l stdDebugLogger) Errorf(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s ERROR: %s", loggingPrefix, format), v...)
}
// Warnf -
func (l stdDebugLogger) Warnf(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s WARN: %s", loggingPrefix, format), v...)
}
// Infof -
func (l stdDebugLogger) Infof(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s INFO: %s", loggingPrefix, format), v...)
}
// Debugf -
func (l stdDebugLogger) Debugf(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s DEBUG: %s", loggingPrefix, format), v...)
}

+ 4
- 9
publish.go View File

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"log"
"sync" "sync"
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
@ -78,7 +79,7 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe
return nil, errors.New("connection manager can't be nil") return nil, errors.New("connection manager can't be nil")
} }
chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval)
chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, conn.connectionManager.ReconnectInterval)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -112,11 +113,9 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe
go func() { go func() {
for err := range publisher.reconnectErrCh { for err := range publisher.reconnectErrCh {
publisher.options.Logger.Infof("successful publisher recovery from: %v", err)
log.Printf("successful publisher recovery from: %v", err)
err := publisher.startup() err := publisher.startup()
if err != nil { if err != nil {
publisher.options.Logger.Fatalf("error on startup for publisher after cancel or close: %v", err)
publisher.options.Logger.Fatalf("publisher closing, unable to recover")
return return
} }
publisher.startReturnHandler() publisher.startReturnHandler()
@ -281,11 +280,7 @@ func (publisher *Publisher) PublishWithDeferredConfirmWithContext(
func (publisher *Publisher) Close() { func (publisher *Publisher) Close() {
// close the channel so that rabbitmq server knows that the // close the channel so that rabbitmq server knows that the
// publisher has been stopped. // publisher has been stopped.
err := publisher.chanManager.Close()
if err != nil {
publisher.options.Logger.Warnf("error while closing the channel: %v", err)
}
publisher.options.Logger.Infof("closing publisher...")
publisher.chanManager.Close()
go func() { go func() {
publisher.closeConnectionToManagerCh <- struct{}{} publisher.closeConnectionToManagerCh <- struct{}{}
}() }()


+ 0
- 4
publish_flow_block.go View File

@ -13,11 +13,9 @@ func (publisher *Publisher) startNotifyFlowHandler() {
for ok := range notifyFlowChan { for ok := range notifyFlowChan {
publisher.disablePublishDueToFlowMux.Lock() publisher.disablePublishDueToFlowMux.Lock()
if ok { if ok {
publisher.options.Logger.Warnf("pausing publishing due to flow request from server")
publisher.disablePublishDueToFlow = true publisher.disablePublishDueToFlow = true
} else { } else {
publisher.disablePublishDueToFlow = false publisher.disablePublishDueToFlow = false
publisher.options.Logger.Warnf("resuming publishing due to flow request from server")
} }
publisher.disablePublishDueToFlowMux.Unlock() publisher.disablePublishDueToFlowMux.Unlock()
} }
@ -32,11 +30,9 @@ func (publisher *Publisher) startNotifyBlockedHandler() {
for b := range blockings { for b := range blockings {
publisher.disablePublishDueToBlockedMux.Lock() publisher.disablePublishDueToBlockedMux.Lock()
if b.Active { if b.Active {
publisher.options.Logger.Warnf("pausing publishing due to TCP blocking from server")
publisher.disablePublishDueToBlocked = true publisher.disablePublishDueToBlocked = true
} else { } else {
publisher.disablePublishDueToBlocked = false publisher.disablePublishDueToBlocked = false
publisher.options.Logger.Warnf("resuming publishing due to TCP blocking from server")
} }
publisher.disablePublishDueToBlockedMux.Unlock() publisher.disablePublishDueToBlockedMux.Unlock()
} }


+ 0
- 16
publisher_options.go View File

@ -6,7 +6,6 @@ import amqp "github.com/rabbitmq/amqp091-go"
// Logger is a custom logging interface. // Logger is a custom logging interface.
type PublisherOptions struct { type PublisherOptions struct {
ExchangeOptions ExchangeOptions ExchangeOptions ExchangeOptions
Logger Logger
ConfirmMode bool ConfirmMode bool
} }
@ -24,25 +23,10 @@ func getDefaultPublisherOptions() PublisherOptions {
Args: Table{}, Args: Table{},
Declare: false, Declare: false,
}, },
Logger: stdDebugLogger{},
ConfirmMode: false, ConfirmMode: false,
} }
} }
// WithPublisherOptionsLogging sets logging to true on the publisher options
// and sets the
func WithPublisherOptionsLogging(options *PublisherOptions) {
options.Logger = &stdDebugLogger{}
}
// WithPublisherOptionsLogger sets logging to a custom interface.
// Use WithPublisherOptionsLogging to just log to stdout.
func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) {
return func(options *PublisherOptions) {
options.Logger = log
}
}
// WithPublisherOptionsExchangeName sets the exchange name // WithPublisherOptionsExchangeName sets the exchange name
func WithPublisherOptionsExchangeName(name string) func(*PublisherOptions) { func WithPublisherOptionsExchangeName(name string) func(*PublisherOptions) {
return func(options *PublisherOptions) { return func(options *PublisherOptions) {


Loading…
Cancel
Save