Browse Source

fixing reconnect issues and updating close API

pull/73/head
wagslane 4 years ago
parent
commit
ea9be35725
8 changed files with 60 additions and 78 deletions
  1. +11
    -4
      README.md
  2. +14
    -28
      channel.go
  3. +9
    -0
      config.go
  4. +6
    -25
      consume.go
  5. +7
    -4
      examples/consumer/main.go
  6. +1
    -2
      examples/logger/main.go
  7. +7
    -7
      examples/publisher/main.go
  8. +5
    -8
      publish.go

+ 11
- 4
README.md View File

@ -32,10 +32,14 @@ go get github.com/wagslane/go-rabbitmq
### Default options
```go
consumer, err := rabbitmq.NewConsumer("amqp://user:pass@localhost", amqp091.Config{})
consumer, err := rabbitmq.NewConsumer(
"amqp://guest:guest@localhost", rabbitmq.Config{},
rabbitmq.WithConsumerOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
err = consumer.StartConsuming(
func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed: %v", string(d.Body))
@ -55,12 +59,13 @@ if err != nil {
```go
consumer, err := rabbitmq.NewConsumer(
"amqp://user:pass@localhost",
amqp091.Config{},
rabbitmq.Config{},
rabbitmq.WithConsumerOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
err = consumer.StartConsuming(
func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed: %v", string(d.Body))
@ -87,10 +92,11 @@ if err != nil {
### Default options
```go
publisher, returns, err := rabbitmq.NewPublisher("amqp://user:pass@localhost", amqp091.Config{})
publisher, returns, err := rabbitmq.NewPublisher("amqp://user:pass@localhost", rabbitmq.Config{})
if err != nil {
log.Fatal(err)
}
defer publisher.Close()
err = publisher.Publish([]byte("hello, world"), []string{"routing_key"})
if err != nil {
log.Fatal(err)
@ -102,10 +108,11 @@ if err != nil {
```go
publisher, returns, err := rabbitmq.NewPublisher(
"amqp://user:pass@localhost",
amqp091.Config{},
rabbitmq.Config{},
// can pass nothing for no logging
rabbitmq.WithPublisherOptionsLogging,
)
defer publisher.Close()
if err != nil {
log.Fatal(err)
}


+ 14
- 28
channel.go View File

@ -2,7 +2,6 @@ package rabbitmq
import (
"errors"
"strings"
"sync"
"time"
@ -14,13 +13,13 @@ type channelManager struct {
url string
channel *amqp.Channel
connection *amqp.Connection
amqpConfig amqp.Config
amqpConfig Config
channelMux *sync.RWMutex
notifyCancelOrClose chan error
reconnectInterval time.Duration
}
func newChannelManager(url string, conf amqp.Config, log Logger, reconnectInterval time.Duration) (*channelManager, error) {
func newChannelManager(url string, conf Config, log Logger, reconnectInterval time.Duration) (*channelManager, error) {
conn, ch, err := getNewChannel(url, conf)
if err != nil {
return nil, err
@ -40,8 +39,8 @@ func newChannelManager(url string, conf amqp.Config, log Logger, reconnectInterv
return &chManager, nil
}
func getNewChannel(url string, conf amqp.Config) (*amqp.Connection, *amqp.Channel, error) {
amqpConn, err := amqp.DialConfig(url, conf)
func getNewChannel(url string, conf Config) (*amqp.Connection, *amqp.Channel, error) {
amqpConn, err := amqp.DialConfig(url, amqp.Config(conf))
if err != nil {
return nil, nil, err
}
@ -59,31 +58,13 @@ func getNewChannel(url string, conf amqp.Config) (*amqp.Connection, *amqp.Channe
func (chManager *channelManager) startNotifyCancelOrClosed() {
notifyCloseChan := chManager.channel.NotifyClose(make(chan *amqp.Error, 1))
notifyCancelChan := chManager.channel.NotifyCancel(make(chan string, 1))
select {
case err := <-notifyCloseChan:
if err != nil && err.Server {
chManager.logger.Printf("attempting to reconnect to amqp server after close")
chManager.reconnectLoop()
chManager.logger.Printf("successfully reconnected to amqp server after close")
chManager.notifyCancelOrClose <- err
} else if err != nil && err.Reason == "EOF" {
chManager.logger.Printf("attempting to reconnect to amqp server after eof")
chManager.reconnectLoop()
chManager.logger.Printf("successfully reconnected to amqp server after eof")
chManager.notifyCancelOrClose <- err
} else if err != nil && strings.Contains(err.Error(), "timeout") {
chManager.logger.Printf("attempting to reconnect to amqp server after timeout")
if err != nil {
chManager.logger.Printf("attempting to reconnect to amqp server from error: %v", err)
chManager.reconnectLoop()
chManager.logger.Printf("successfully reconnected to amqp server after timeout")
chManager.logger.Printf("successfully reconnected to amqp server")
chManager.notifyCancelOrClose <- err
} else if err != nil {
chManager.logger.Printf("not attempting to reconnect to amqp server because closure was initiated by the client: %v", err)
} else if err == nil {
chManager.logger.Printf("amqp channel closed gracefully")
}
if err != nil {
chManager.logger.Printf("not attempting to reconnect to amqp server because closure was initiated by the client")
}
if err == nil {
chManager.logger.Printf("amqp channel closed gracefully")
@ -128,12 +109,17 @@ func (chManager *channelManager) reconnect() error {
return nil
}
// close safely closes the current channel
// close safely closes the current channel and connection
func (chManager *channelManager) close() error {
chManager.channelMux.Lock()
defer chManager.channelMux.Unlock()
err := chManager.connection.Close()
err := chManager.channel.Close()
if err != nil {
return err
}
err = chManager.connection.Close()
if err != nil {
return err
}


+ 9
- 0
config.go View File

@ -0,0 +1,9 @@
package rabbitmq
import amqp "github.com/rabbitmq/amqp091-go"
// Config wraps amqp.Config
// Config is used in DialConfig and Open to specify the desired tuning
// parameters used during a connection open handshake. The negotiated tuning
// will be stored in the returned connection's Config field.
type Config amqp.Config

+ 6
- 25
consume.go View File

@ -45,7 +45,7 @@ type Delivery struct {
}
// NewConsumer returns a new Consumer connected to the given rabbitmq server
func NewConsumer(url string, config amqp.Config, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) {
func NewConsumer(url string, config Config, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) {
options := &ConsumerOptions{
Logging: true,
Logger: &stdLogger{},
@ -132,30 +132,11 @@ func (consumer Consumer) StartConsuming(
return nil
}
// Disconnect disconnects both the channel and the connection.
// This method doesn't throw a reconnect, and should be used when finishing a program.
// IMPORTANT: If this method is executed before StopConsuming, it could cause unexpected behavior
// such as messages being processed, but not being acknowledged, thus being requeued by the broker
func (consumer Consumer) Disconnect() {
consumer.chManager.channel.Close()
consumer.chManager.connection.Close()
}
// StopConsuming stops the consumption of messages.
// The consumer should be discarded as it's not safe for re-use.
// This method sends a basic.cancel notification.
// The consumerName is the name or delivery tag of the amqp consumer we want to cancel.
// When noWait is true, do not wait for the server to acknowledge the cancel.
// Only use this when you are certain there are no deliveries in flight that
// require an acknowledgment, otherwise they will arrive and be dropped in the
// client without an ack, and will not be redelivered to other consumers.
// IMPORTANT: Since the streadway library doesn't provide a way to retrieve the consumer's tag after the creation
// it's imperative for you to set the name when creating the consumer, if you want to use this function later
// a simple uuid4 should do the trick, since it should be unique.
// If you start many consumers, you should store the name of the consumers when creating them, such that you can
// use them in a for to stop all the consumers.
func (consumer Consumer) StopConsuming(consumerName string, noWait bool) {
consumer.chManager.channel.Cancel(consumerName, noWait)
// Close cleans up resources and closes the consumer.
// The consumer is not safe for reuse
func (consumer Consumer) Close() error {
consumer.chManager.logger.Printf("closing consumer...")
return consumer.chManager.close()
}
// startGoroutines declares the queue if it doesn't exist,


+ 7
- 4
examples/consumer/main.go View File

@ -7,7 +7,6 @@ import (
"os/signal"
"syscall"
amqp "github.com/rabbitmq/amqp091-go"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
@ -15,17 +14,21 @@ var consumerName = "example"
func main() {
consumer, err := rabbitmq.NewConsumer(
"amqp://guest:guest@localhost", amqp.Config{},
"amqp://guest:guest@localhost", rabbitmq.Config{},
rabbitmq.WithConsumerOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
defer func() {
err := consumer.Close()
if err != nil {
log.Fatal(err)
}
}()
// wait for server to acknowledge the cancel
const noWait = false
defer consumer.Disconnect()
defer consumer.StopConsuming(consumerName, noWait)
err = consumer.StartConsuming(
func(d rabbitmq.Delivery) rabbitmq.Action {


+ 1
- 2
examples/logger/main.go View File

@ -3,7 +3,6 @@ package main
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
@ -19,7 +18,7 @@ func main() {
mylogger := &customLogger{}
publisher, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost", amqp.Config{},
"amqp://guest:guest@localhost", rabbitmq.Config{},
rabbitmq.WithPublisherOptionsLogger(mylogger),
)
if err != nil {


+ 7
- 7
examples/publisher/main.go View File

@ -8,18 +8,23 @@ import (
"syscall"
"time"
amqp "github.com/rabbitmq/amqp091-go"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
func main() {
publisher, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost", amqp.Config{},
"amqp://guest:guest@localhost", rabbitmq.Config{},
rabbitmq.WithPublisherOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
defer func() {
err := publisher.Close()
if err != nil {
log.Fatal(err)
}
}()
returns := publisher.NotifyReturn()
go func() {
@ -67,11 +72,6 @@ func main() {
}
case <-done:
fmt.Println("stopping publisher")
err := publisher.StopPublishing()
if err != nil {
log.Fatal(err)
}
fmt.Println("publisher stopped")
return
}
}


+ 5
- 8
publish.go View File

@ -84,7 +84,7 @@ func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) {
// on the channel of Returns that you should setup a listener on.
// Flow controls are automatically handled as they are sent from the server, and publishing
// will fail with an error when the server is requesting a slowdown
func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*PublisherOptions)) (*Publisher, error) {
func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptions)) (*Publisher, error) {
options := &PublisherOptions{
Logging: true,
Logger: &stdLogger{},
@ -195,14 +195,11 @@ func (publisher *Publisher) Publish(
return nil
}
// StopPublishing stops the publishing of messages.
// Close closes the publisher and releases resources
// The publisher should be discarded as it's not safe for re-use
func (publisher Publisher) StopPublishing() error {
err := publisher.chManager.close()
if err != nil {
return err
}
return nil
func (publisher Publisher) Close() error {
publisher.chManager.logger.Printf("closing publisher...")
return publisher.chManager.close()
}
func (publisher *Publisher) startNotifyFlowHandler() {


Loading…
Cancel
Save