diff --git a/README.md b/README.md index 1d5f8b9..6e6ffe5 100644 --- a/README.md +++ b/README.md @@ -32,10 +32,14 @@ go get github.com/wagslane/go-rabbitmq ### Default options ```go -consumer, err := rabbitmq.NewConsumer("amqp://user:pass@localhost", amqp091.Config{}) +consumer, err := rabbitmq.NewConsumer( + "amqp://guest:guest@localhost", rabbitmq.Config{}, + rabbitmq.WithConsumerOptionsLogging, +) if err != nil { log.Fatal(err) } +defer consumer.Close() err = consumer.StartConsuming( func(d rabbitmq.Delivery) rabbitmq.Action { log.Printf("consumed: %v", string(d.Body)) @@ -55,12 +59,13 @@ if err != nil { ```go consumer, err := rabbitmq.NewConsumer( "amqp://user:pass@localhost", - amqp091.Config{}, + rabbitmq.Config{}, rabbitmq.WithConsumerOptionsLogging, ) if err != nil { log.Fatal(err) } +defer consumer.Close() err = consumer.StartConsuming( func(d rabbitmq.Delivery) rabbitmq.Action { log.Printf("consumed: %v", string(d.Body)) @@ -87,10 +92,11 @@ if err != nil { ### Default options ```go -publisher, returns, err := rabbitmq.NewPublisher("amqp://user:pass@localhost", amqp091.Config{}) +publisher, returns, err := rabbitmq.NewPublisher("amqp://user:pass@localhost", rabbitmq.Config{}) if err != nil { log.Fatal(err) } +defer publisher.Close() err = publisher.Publish([]byte("hello, world"), []string{"routing_key"}) if err != nil { log.Fatal(err) @@ -102,10 +108,11 @@ if err != nil { ```go publisher, returns, err := rabbitmq.NewPublisher( "amqp://user:pass@localhost", - amqp091.Config{}, + rabbitmq.Config{}, // can pass nothing for no logging rabbitmq.WithPublisherOptionsLogging, ) +defer publisher.Close() if err != nil { log.Fatal(err) } diff --git a/channel.go b/channel.go index 25246cb..b353989 100644 --- a/channel.go +++ b/channel.go @@ -2,7 +2,6 @@ package rabbitmq import ( "errors" - "strings" "sync" "time" @@ -14,13 +13,13 @@ type channelManager struct { url string channel *amqp.Channel connection *amqp.Connection - amqpConfig amqp.Config + amqpConfig Config channelMux *sync.RWMutex notifyCancelOrClose chan error reconnectInterval time.Duration } -func newChannelManager(url string, conf amqp.Config, log Logger, reconnectInterval time.Duration) (*channelManager, error) { +func newChannelManager(url string, conf Config, log Logger, reconnectInterval time.Duration) (*channelManager, error) { conn, ch, err := getNewChannel(url, conf) if err != nil { return nil, err @@ -40,8 +39,8 @@ func newChannelManager(url string, conf amqp.Config, log Logger, reconnectInterv return &chManager, nil } -func getNewChannel(url string, conf amqp.Config) (*amqp.Connection, *amqp.Channel, error) { - amqpConn, err := amqp.DialConfig(url, conf) +func getNewChannel(url string, conf Config) (*amqp.Connection, *amqp.Channel, error) { + amqpConn, err := amqp.DialConfig(url, amqp.Config(conf)) if err != nil { return nil, nil, err } @@ -59,31 +58,13 @@ func getNewChannel(url string, conf amqp.Config) (*amqp.Connection, *amqp.Channe func (chManager *channelManager) startNotifyCancelOrClosed() { notifyCloseChan := chManager.channel.NotifyClose(make(chan *amqp.Error, 1)) notifyCancelChan := chManager.channel.NotifyCancel(make(chan string, 1)) - select { case err := <-notifyCloseChan: - if err != nil && err.Server { - chManager.logger.Printf("attempting to reconnect to amqp server after close") - chManager.reconnectLoop() - chManager.logger.Printf("successfully reconnected to amqp server after close") - chManager.notifyCancelOrClose <- err - } else if err != nil && err.Reason == "EOF" { - chManager.logger.Printf("attempting to reconnect to amqp server after eof") - chManager.reconnectLoop() - chManager.logger.Printf("successfully reconnected to amqp server after eof") - chManager.notifyCancelOrClose <- err - } else if err != nil && strings.Contains(err.Error(), "timeout") { - chManager.logger.Printf("attempting to reconnect to amqp server after timeout") + if err != nil { + chManager.logger.Printf("attempting to reconnect to amqp server from error: %v", err) chManager.reconnectLoop() - chManager.logger.Printf("successfully reconnected to amqp server after timeout") + chManager.logger.Printf("successfully reconnected to amqp server") chManager.notifyCancelOrClose <- err - } else if err != nil { - chManager.logger.Printf("not attempting to reconnect to amqp server because closure was initiated by the client: %v", err) - } else if err == nil { - chManager.logger.Printf("amqp channel closed gracefully") - } - if err != nil { - chManager.logger.Printf("not attempting to reconnect to amqp server because closure was initiated by the client") } if err == nil { chManager.logger.Printf("amqp channel closed gracefully") @@ -128,12 +109,17 @@ func (chManager *channelManager) reconnect() error { return nil } -// close safely closes the current channel +// close safely closes the current channel and connection func (chManager *channelManager) close() error { chManager.channelMux.Lock() defer chManager.channelMux.Unlock() - err := chManager.connection.Close() + err := chManager.channel.Close() + if err != nil { + return err + } + + err = chManager.connection.Close() if err != nil { return err } diff --git a/config.go b/config.go new file mode 100644 index 0000000..b55b986 --- /dev/null +++ b/config.go @@ -0,0 +1,9 @@ +package rabbitmq + +import amqp "github.com/rabbitmq/amqp091-go" + +// Config wraps amqp.Config +// Config is used in DialConfig and Open to specify the desired tuning +// parameters used during a connection open handshake. The negotiated tuning +// will be stored in the returned connection's Config field. +type Config amqp.Config diff --git a/consume.go b/consume.go index 99203c4..276f75b 100644 --- a/consume.go +++ b/consume.go @@ -45,7 +45,7 @@ type Delivery struct { } // NewConsumer returns a new Consumer connected to the given rabbitmq server -func NewConsumer(url string, config amqp.Config, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) { +func NewConsumer(url string, config Config, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) { options := &ConsumerOptions{ Logging: true, Logger: &stdLogger{}, @@ -132,30 +132,11 @@ func (consumer Consumer) StartConsuming( return nil } -// Disconnect disconnects both the channel and the connection. -// This method doesn't throw a reconnect, and should be used when finishing a program. -// IMPORTANT: If this method is executed before StopConsuming, it could cause unexpected behavior -// such as messages being processed, but not being acknowledged, thus being requeued by the broker -func (consumer Consumer) Disconnect() { - consumer.chManager.channel.Close() - consumer.chManager.connection.Close() -} - -// StopConsuming stops the consumption of messages. -// The consumer should be discarded as it's not safe for re-use. -// This method sends a basic.cancel notification. -// The consumerName is the name or delivery tag of the amqp consumer we want to cancel. -// When noWait is true, do not wait for the server to acknowledge the cancel. -// Only use this when you are certain there are no deliveries in flight that -// require an acknowledgment, otherwise they will arrive and be dropped in the -// client without an ack, and will not be redelivered to other consumers. -// IMPORTANT: Since the streadway library doesn't provide a way to retrieve the consumer's tag after the creation -// it's imperative for you to set the name when creating the consumer, if you want to use this function later -// a simple uuid4 should do the trick, since it should be unique. -// If you start many consumers, you should store the name of the consumers when creating them, such that you can -// use them in a for to stop all the consumers. -func (consumer Consumer) StopConsuming(consumerName string, noWait bool) { - consumer.chManager.channel.Cancel(consumerName, noWait) +// Close cleans up resources and closes the consumer. +// The consumer is not safe for reuse +func (consumer Consumer) Close() error { + consumer.chManager.logger.Printf("closing consumer...") + return consumer.chManager.close() } // startGoroutines declares the queue if it doesn't exist, diff --git a/examples/consumer/main.go b/examples/consumer/main.go index d9e63cf..b6c1b13 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -7,7 +7,6 @@ import ( "os/signal" "syscall" - amqp "github.com/rabbitmq/amqp091-go" rabbitmq "github.com/wagslane/go-rabbitmq" ) @@ -15,17 +14,21 @@ var consumerName = "example" func main() { consumer, err := rabbitmq.NewConsumer( - "amqp://guest:guest@localhost", amqp.Config{}, + "amqp://guest:guest@localhost", rabbitmq.Config{}, rabbitmq.WithConsumerOptionsLogging, ) if err != nil { log.Fatal(err) } + defer func() { + err := consumer.Close() + if err != nil { + log.Fatal(err) + } + }() // wait for server to acknowledge the cancel const noWait = false - defer consumer.Disconnect() - defer consumer.StopConsuming(consumerName, noWait) err = consumer.StartConsuming( func(d rabbitmq.Delivery) rabbitmq.Action { diff --git a/examples/logger/main.go b/examples/logger/main.go index 62f56ee..e082792 100644 --- a/examples/logger/main.go +++ b/examples/logger/main.go @@ -3,7 +3,6 @@ package main import ( "log" - amqp "github.com/rabbitmq/amqp091-go" rabbitmq "github.com/wagslane/go-rabbitmq" ) @@ -19,7 +18,7 @@ func main() { mylogger := &customLogger{} publisher, err := rabbitmq.NewPublisher( - "amqp://guest:guest@localhost", amqp.Config{}, + "amqp://guest:guest@localhost", rabbitmq.Config{}, rabbitmq.WithPublisherOptionsLogger(mylogger), ) if err != nil { diff --git a/examples/publisher/main.go b/examples/publisher/main.go index 0e222a3..5ec6732 100644 --- a/examples/publisher/main.go +++ b/examples/publisher/main.go @@ -8,18 +8,23 @@ import ( "syscall" "time" - amqp "github.com/rabbitmq/amqp091-go" rabbitmq "github.com/wagslane/go-rabbitmq" ) func main() { publisher, err := rabbitmq.NewPublisher( - "amqp://guest:guest@localhost", amqp.Config{}, + "amqp://guest:guest@localhost", rabbitmq.Config{}, rabbitmq.WithPublisherOptionsLogging, ) if err != nil { log.Fatal(err) } + defer func() { + err := publisher.Close() + if err != nil { + log.Fatal(err) + } + }() returns := publisher.NotifyReturn() go func() { @@ -67,11 +72,6 @@ func main() { } case <-done: fmt.Println("stopping publisher") - err := publisher.StopPublishing() - if err != nil { - log.Fatal(err) - } - fmt.Println("publisher stopped") return } } diff --git a/publish.go b/publish.go index 70ad138..3b480b1 100644 --- a/publish.go +++ b/publish.go @@ -84,7 +84,7 @@ func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) { // on the channel of Returns that you should setup a listener on. // Flow controls are automatically handled as they are sent from the server, and publishing // will fail with an error when the server is requesting a slowdown -func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*PublisherOptions)) (*Publisher, error) { +func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptions)) (*Publisher, error) { options := &PublisherOptions{ Logging: true, Logger: &stdLogger{}, @@ -195,14 +195,11 @@ func (publisher *Publisher) Publish( return nil } -// StopPublishing stops the publishing of messages. +// Close closes the publisher and releases resources // The publisher should be discarded as it's not safe for re-use -func (publisher Publisher) StopPublishing() error { - err := publisher.chManager.close() - if err != nil { - return err - } - return nil +func (publisher Publisher) Close() error { + publisher.chManager.logger.Printf("closing publisher...") + return publisher.chManager.close() } func (publisher *Publisher) startNotifyFlowHandler() {