Browse Source

recovery from channel failure

pull/110/head
Johannes Würbach 3 years ago
parent
commit
dc73f616da
No known key found for this signature in database GPG Key ID: 74DB0F4D956CCCE3
7 changed files with 399 additions and 4 deletions
  1. +8
    -2
      Makefile
  2. +21
    -0
      compose.yaml
  3. +1
    -1
      consume.go
  4. +11
    -1
      go.mod
  5. +9
    -0
      go.sum
  6. +263
    -0
      integration/main_test.go
  7. +86
    -0
      integration/utils_test.go

+ 8
- 2
Makefile View File

@ -1,10 +1,16 @@
all: test vet staticcheck
all: prepare-integration test vet staticcheck clean-integration
test:
go test ./...
go test -v ./...
vet:
go vet ./...
staticcheck:
staticcheck ./...
prepare-integration:
docker compose up -d
clean-integration:
docker compose down -v

+ 21
- 0
compose.yaml View File

@ -0,0 +1,21 @@
services:
ready: # stub service to ensure compose up waits until rabbitmq is healthy
image: "busybox"
depends_on:
rabbitmq:
condition: service_healthy
rabbitmq:
image: "rabbitmq:3-management-alpine"
environment:
RABBITMQ_ERLANG_COOKIE: SWQOKODSQALRPCLNMEQG
RABBITMQ_DEFAULT_USER: rabbitmq
RABBITMQ_DEFAULT_PASS: wagslane
ports:
- "15672:15672"
- "5672:5672"
healthcheck:
test: ["CMD-SHELL", "rabbitmq-diagnostics -q check_port_connectivity"]
interval: 5s
timeout: 5s
retries: 5

+ 1
- 1
consume.go View File

@ -88,7 +88,7 @@ func NewConsumer(
go func() {
for err := range consumer.reconnectErrCh {
consumer.options.Logger.Infof("successful recovery from: %v", err)
consumer.options.Logger.Infof("successful consumer recovery from: %v", err)
err = consumer.startGoroutines(
handler,
*options,


+ 11
- 1
go.mod View File

@ -2,4 +2,14 @@ module github.com/wagslane/go-rabbitmq
go 1.20
require github.com/rabbitmq/amqp091-go v1.7.0
require (
github.com/rabbitmq/amqp091-go v1.7.0
github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.8.0
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

+ 9
- 0
go.sum View File

@ -1,14 +1,21 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo=
github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
@ -39,6 +46,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

+ 263
- 0
integration/main_test.go View File

@ -0,0 +1,263 @@
package integration_test
import (
"encoding/json"
"fmt"
"math/rand"
"testing"
"time"
"github.com/streadway/amqp"
"github.com/stretchr/testify/require"
"github.com/wagslane/go-rabbitmq"
)
const (
rabbitmqUser = "rabbitmq"
rabbitmqPass = "wagslane"
mgmtUrl = "http://localhost:15672"
)
var (
defaultUrl = fmt.Sprintf("amqp://%s:%s@localhost", rabbitmqUser, rabbitmqPass)
messageDelay = 250 * time.Millisecond
reconnectInterval = 1 * time.Second
)
type TestMessage struct {
Text string
}
func TestBasic(t *testing.T) {
require := require.New(t)
require.True(true)
consumerConn, err := rabbitmq.NewConn(
defaultUrl,
rabbitmq.WithConnectionOptionsLogging,
rabbitmq.WithConnectionOptionsReconnectInterval(reconnectInterval),
)
require.NoError(err)
defer consumerConn.Close()
receivedMessages := []rabbitmq.Delivery{}
exchange := fmt.Sprintf("test-%d", rand.Intn(10000))
consumer, err := rabbitmq.NewConsumer(
consumerConn,
func(d rabbitmq.Delivery) rabbitmq.Action {
receivedMessages = append(receivedMessages, d)
return rabbitmq.Ack
},
"test",
rabbitmq.WithConsumerOptionsRoutingKey("test"),
rabbitmq.WithConsumerOptionsExchangeName(exchange),
rabbitmq.WithConsumerOptionsExchangeDeclare,
)
require.NoError(err)
defer consumer.Close()
pusblisherConn, err := rabbitmq.NewConn(
defaultUrl,
rabbitmq.WithConnectionOptionsLogging,
rabbitmq.WithConnectionOptionsReconnectInterval(reconnectInterval),
)
require.NoError(err)
defer pusblisherConn.Close()
publisher, err := rabbitmq.NewPublisher(
pusblisherConn,
rabbitmq.WithPublisherOptionsLogging,
rabbitmq.WithPublisherOptionsExchangeName(exchange),
rabbitmq.WithPublisherOptionsExchangeDeclare,
)
require.NoError(err)
defer publisher.Close()
msg := publishTestMessage(require, publisher)
publishTestMessage(require, publisher)
time.Sleep(messageDelay)
require.Len(receivedMessages, 2)
var receivedMsg TestMessage
require.NoError(json.Unmarshal(receivedMessages[0].Body, &receivedMsg))
require.Equal(msg, receivedMsg)
}
func TestBasicReconnect(t *testing.T) {
require := require.New(t)
require.True(true)
consumerConn, err := rabbitmq.NewConn(
defaultUrl,
rabbitmq.WithConnectionOptionsLogging,
rabbitmq.WithConnectionOptionsReconnectInterval(reconnectInterval),
)
require.NoError(err)
defer consumerConn.Close()
receivedMessages := []rabbitmq.Delivery{}
exchange := fmt.Sprintf("test-%d", rand.Intn(10000))
consumer, err := rabbitmq.NewConsumer(
consumerConn,
func(d rabbitmq.Delivery) rabbitmq.Action {
receivedMessages = append(receivedMessages, d)
return rabbitmq.Ack
},
"test",
rabbitmq.WithConsumerOptionsRoutingKey("test"),
rabbitmq.WithConsumerOptionsExchangeName(exchange),
rabbitmq.WithConsumerOptionsExchangeDeclare,
)
require.NoError(err)
defer consumer.Close()
pusblisherConn, err := rabbitmq.NewConn(
defaultUrl,
rabbitmq.WithConnectionOptionsLogging,
rabbitmq.WithConnectionOptionsReconnectInterval(reconnectInterval),
)
require.NoError(err)
defer pusblisherConn.Close()
publisher, err := rabbitmq.NewPublisher(
pusblisherConn,
rabbitmq.WithPublisherOptionsLogging,
rabbitmq.WithPublisherOptionsExchangeName(exchange),
rabbitmq.WithPublisherOptionsExchangeDeclare,
)
require.NoError(err)
defer publisher.Close()
waitForConnections(require, 2)
msg := publishTestMessage(require, publisher)
time.Sleep(messageDelay)
terminateConnections(require)
waitForConnections(require, 2)
publishTestMessage(require, publisher)
time.Sleep(messageDelay)
require.Len(receivedMessages, 2)
var receivedMsg TestMessage
require.NoError(json.Unmarshal(receivedMessages[0].Body, &receivedMsg))
require.Equal(msg, receivedMsg)
}
func TestBasicReconnectBrokenChannel(t *testing.T) {
require := require.New(t)
require.True(true)
consumerConn, err := rabbitmq.NewConn(
defaultUrl,
rabbitmq.WithConnectionOptionsLogging,
rabbitmq.WithConnectionOptionsReconnectInterval(reconnectInterval),
)
require.NoError(err)
defer consumerConn.Close()
receivedMessages := []rabbitmq.Delivery{}
exchange := fmt.Sprintf("test-%d", rand.Intn(10000))
consumer, err := rabbitmq.NewConsumer(
consumerConn,
func(d rabbitmq.Delivery) rabbitmq.Action {
receivedMessages = append(receivedMessages, d)
return rabbitmq.Ack
},
"test",
rabbitmq.WithConsumerOptionsRoutingKey("test"),
rabbitmq.WithConsumerOptionsExchangeName(exchange),
rabbitmq.WithConsumerOptionsExchangeDeclare,
)
require.NoError(err)
defer consumer.Close()
pusblisherConn, err := rabbitmq.NewConn(
defaultUrl,
rabbitmq.WithConnectionOptionsLogging,
rabbitmq.WithConnectionOptionsReconnectInterval(reconnectInterval),
)
require.NoError(err)
defer pusblisherConn.Close()
publisher, err := rabbitmq.NewPublisher(
pusblisherConn,
rabbitmq.WithPublisherOptionsLogging,
rabbitmq.WithPublisherOptionsExchangeName(exchange),
rabbitmq.WithPublisherOptionsExchangeDeclare,
)
require.NoError(err)
defer publisher.Close()
waitForConnections(require, 2)
msg := publishTestMessage(require, publisher)
time.Sleep(messageDelay)
terminateConnections(require)
// Simulate a channel failure, but registering the same queue with a different type
// this will cause an error
//
// Delete the queue after some time, which should re-establish the connection
time.Sleep(reconnectInterval / 2)
conn, err := amqp.DialConfig(defaultUrl, amqp.Config{})
require.NoError(err)
defer conn.Close()
channel, err := conn.Channel()
require.NoError(err)
_, err = channel.QueueDelete("test", false, false, false)
require.NoError(err)
_, err = channel.QueueDeclare(
"test", // name of the queue
true, // durable
false, // delete when unused
true, // exclusive
false, // noWait
nil, // arguments
)
require.NoError(err)
time.Sleep(reconnectInterval * 2)
_, err = channel.QueueDelete("test", false, false, false)
require.NoError(err)
waitForConnections(require, 2)
publishTestMessage(require, publisher)
time.Sleep(messageDelay)
publishTestMessage(require, publisher)
time.Sleep(messageDelay)
require.Len(receivedMessages, 3)
var receivedMsg TestMessage
require.NoError(json.Unmarshal(receivedMessages[0].Body, &receivedMsg))
require.Equal(msg, receivedMsg)
}

+ 86
- 0
integration/utils_test.go View File

@ -0,0 +1,86 @@
package integration_test
import (
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"github.com/stretchr/testify/require"
"github.com/wagslane/go-rabbitmq"
)
func waitForConnections(require *require.Assertions, expected int) {
maxWait := 30
for i := 0; i < maxWait; i++ {
req, err := http.NewRequest(http.MethodGet, mgmtUrl+"/api/connections", http.NoBody)
require.NoError(err)
req.SetBasicAuth(rabbitmqUser, rabbitmqPass)
res, err := http.DefaultClient.Do(req)
require.NoError(err)
defer res.Body.Close()
require.Equal(200, res.StatusCode)
resBody, err := io.ReadAll(res.Body)
require.NoError(err)
connections := []map[string]interface{}{}
require.NoError(json.Unmarshal(resBody, &connections))
if len(connections) == expected {
return
}
time.Sleep(1 * time.Second)
}
require.Fail("waitForConnections timed out")
}
func terminateConnections(require *require.Assertions) {
req, err := http.NewRequest(http.MethodGet, mgmtUrl+"/api/connections", http.NoBody)
require.NoError(err)
req.SetBasicAuth(rabbitmqUser, rabbitmqPass)
res, err := http.DefaultClient.Do(req)
require.NoError(err)
defer res.Body.Close()
require.Equal(200, res.StatusCode)
resBody, err := io.ReadAll(res.Body)
require.NoError(err)
connections := []map[string]interface{}{}
require.NoError(json.Unmarshal(resBody, &connections))
require.Len(connections, 2)
for _, connection := range connections {
name := connection["name"].(string)
fmt.Println("connection", name)
req, err := http.NewRequest(http.MethodDelete, mgmtUrl+"/api/connections/"+name, http.NoBody)
require.NoError(err)
req.SetBasicAuth(rabbitmqUser, rabbitmqPass)
res, err := http.DefaultClient.Do(req)
require.NoError(err)
require.Equal(204, res.StatusCode)
}
}
func publishTestMessage(require *require.Assertions, publisher *rabbitmq.Publisher) TestMessage {
msg := TestMessage{Text: "Test"}
msgBytes, err := json.Marshal(msg)
require.NoError(err)
require.NoError(publisher.Publish(msgBytes, []string{"test"}))
return msg
}

Loading…
Cancel
Save