diff --git a/Makefile b/Makefile index 82b7598..8a16013 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,16 @@ -all: test vet staticcheck +all: prepare-integration test vet staticcheck clean-integration test: - go test ./... + go test -v ./... vet: go vet ./... staticcheck: staticcheck ./... + +prepare-integration: + docker compose up -d + +clean-integration: + docker compose down -v diff --git a/compose.yaml b/compose.yaml new file mode 100644 index 0000000..3d21e05 --- /dev/null +++ b/compose.yaml @@ -0,0 +1,21 @@ +services: + ready: # stub service to ensure compose up waits until rabbitmq is healthy + image: "busybox" + depends_on: + rabbitmq: + condition: service_healthy + + rabbitmq: + image: "rabbitmq:3-management-alpine" + environment: + RABBITMQ_ERLANG_COOKIE: SWQOKODSQALRPCLNMEQG + RABBITMQ_DEFAULT_USER: rabbitmq + RABBITMQ_DEFAULT_PASS: wagslane + ports: + - "15672:15672" + - "5672:5672" + healthcheck: + test: ["CMD-SHELL", "rabbitmq-diagnostics -q check_port_connectivity"] + interval: 5s + timeout: 5s + retries: 5 diff --git a/consume.go b/consume.go index 248eee3..3b4e850 100644 --- a/consume.go +++ b/consume.go @@ -88,7 +88,7 @@ func NewConsumer( go func() { for err := range consumer.reconnectErrCh { - consumer.options.Logger.Infof("successful recovery from: %v", err) + consumer.options.Logger.Infof("successful consumer recovery from: %v", err) err = consumer.startGoroutines( handler, *options, diff --git a/go.mod b/go.mod index 3c21622..49f6bb6 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,14 @@ module github.com/wagslane/go-rabbitmq go 1.20 -require github.com/rabbitmq/amqp091-go v1.7.0 +require ( + github.com/rabbitmq/amqp091-go v1.7.0 + github.com/streadway/amqp v1.0.0 + github.com/stretchr/testify v1.8.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum index e627709..76b45a9 100644 --- a/go.sum +++ b/go.sum @@ -1,14 +1,21 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo= github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI= +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= +github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= @@ -39,6 +46,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/integration/main_test.go b/integration/main_test.go new file mode 100644 index 0000000..d92b91a --- /dev/null +++ b/integration/main_test.go @@ -0,0 +1,263 @@ +package integration_test + +import ( + "encoding/json" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/streadway/amqp" + "github.com/stretchr/testify/require" + "github.com/wagslane/go-rabbitmq" +) + +const ( + rabbitmqUser = "rabbitmq" + rabbitmqPass = "wagslane" + mgmtUrl = "http://localhost:15672" +) + +var ( + defaultUrl = fmt.Sprintf("amqp://%s:%s@localhost", rabbitmqUser, rabbitmqPass) + messageDelay = 250 * time.Millisecond + reconnectInterval = 1 * time.Second +) + +type TestMessage struct { + Text string +} + +func TestBasic(t *testing.T) { + require := require.New(t) + require.True(true) + + consumerConn, err := rabbitmq.NewConn( + defaultUrl, + rabbitmq.WithConnectionOptionsLogging, + rabbitmq.WithConnectionOptionsReconnectInterval(reconnectInterval), + ) + require.NoError(err) + defer consumerConn.Close() + + receivedMessages := []rabbitmq.Delivery{} + + exchange := fmt.Sprintf("test-%d", rand.Intn(10000)) + + consumer, err := rabbitmq.NewConsumer( + consumerConn, + func(d rabbitmq.Delivery) rabbitmq.Action { + receivedMessages = append(receivedMessages, d) + return rabbitmq.Ack + }, + "test", + rabbitmq.WithConsumerOptionsRoutingKey("test"), + rabbitmq.WithConsumerOptionsExchangeName(exchange), + rabbitmq.WithConsumerOptionsExchangeDeclare, + ) + require.NoError(err) + defer consumer.Close() + + pusblisherConn, err := rabbitmq.NewConn( + defaultUrl, + rabbitmq.WithConnectionOptionsLogging, + rabbitmq.WithConnectionOptionsReconnectInterval(reconnectInterval), + ) + require.NoError(err) + defer pusblisherConn.Close() + + publisher, err := rabbitmq.NewPublisher( + pusblisherConn, + rabbitmq.WithPublisherOptionsLogging, + rabbitmq.WithPublisherOptionsExchangeName(exchange), + rabbitmq.WithPublisherOptionsExchangeDeclare, + ) + require.NoError(err) + defer publisher.Close() + + msg := publishTestMessage(require, publisher) + publishTestMessage(require, publisher) + + time.Sleep(messageDelay) + + require.Len(receivedMessages, 2) + + var receivedMsg TestMessage + + require.NoError(json.Unmarshal(receivedMessages[0].Body, &receivedMsg)) + require.Equal(msg, receivedMsg) +} + +func TestBasicReconnect(t *testing.T) { + require := require.New(t) + require.True(true) + + consumerConn, err := rabbitmq.NewConn( + defaultUrl, + rabbitmq.WithConnectionOptionsLogging, + rabbitmq.WithConnectionOptionsReconnectInterval(reconnectInterval), + ) + require.NoError(err) + defer consumerConn.Close() + + receivedMessages := []rabbitmq.Delivery{} + + exchange := fmt.Sprintf("test-%d", rand.Intn(10000)) + + consumer, err := rabbitmq.NewConsumer( + consumerConn, + func(d rabbitmq.Delivery) rabbitmq.Action { + receivedMessages = append(receivedMessages, d) + return rabbitmq.Ack + }, + "test", + rabbitmq.WithConsumerOptionsRoutingKey("test"), + rabbitmq.WithConsumerOptionsExchangeName(exchange), + rabbitmq.WithConsumerOptionsExchangeDeclare, + ) + require.NoError(err) + defer consumer.Close() + + pusblisherConn, err := rabbitmq.NewConn( + defaultUrl, + rabbitmq.WithConnectionOptionsLogging, + rabbitmq.WithConnectionOptionsReconnectInterval(reconnectInterval), + ) + require.NoError(err) + defer pusblisherConn.Close() + + publisher, err := rabbitmq.NewPublisher( + pusblisherConn, + rabbitmq.WithPublisherOptionsLogging, + rabbitmq.WithPublisherOptionsExchangeName(exchange), + rabbitmq.WithPublisherOptionsExchangeDeclare, + ) + require.NoError(err) + defer publisher.Close() + + waitForConnections(require, 2) + + msg := publishTestMessage(require, publisher) + + time.Sleep(messageDelay) + + terminateConnections(require) + + waitForConnections(require, 2) + + publishTestMessage(require, publisher) + + time.Sleep(messageDelay) + + require.Len(receivedMessages, 2) + + var receivedMsg TestMessage + + require.NoError(json.Unmarshal(receivedMessages[0].Body, &receivedMsg)) + require.Equal(msg, receivedMsg) +} + +func TestBasicReconnectBrokenChannel(t *testing.T) { + require := require.New(t) + require.True(true) + + consumerConn, err := rabbitmq.NewConn( + defaultUrl, + rabbitmq.WithConnectionOptionsLogging, + rabbitmq.WithConnectionOptionsReconnectInterval(reconnectInterval), + ) + require.NoError(err) + defer consumerConn.Close() + + receivedMessages := []rabbitmq.Delivery{} + + exchange := fmt.Sprintf("test-%d", rand.Intn(10000)) + + consumer, err := rabbitmq.NewConsumer( + consumerConn, + func(d rabbitmq.Delivery) rabbitmq.Action { + receivedMessages = append(receivedMessages, d) + return rabbitmq.Ack + }, + "test", + rabbitmq.WithConsumerOptionsRoutingKey("test"), + rabbitmq.WithConsumerOptionsExchangeName(exchange), + rabbitmq.WithConsumerOptionsExchangeDeclare, + ) + require.NoError(err) + defer consumer.Close() + + pusblisherConn, err := rabbitmq.NewConn( + defaultUrl, + rabbitmq.WithConnectionOptionsLogging, + rabbitmq.WithConnectionOptionsReconnectInterval(reconnectInterval), + ) + require.NoError(err) + defer pusblisherConn.Close() + + publisher, err := rabbitmq.NewPublisher( + pusblisherConn, + rabbitmq.WithPublisherOptionsLogging, + rabbitmq.WithPublisherOptionsExchangeName(exchange), + rabbitmq.WithPublisherOptionsExchangeDeclare, + ) + require.NoError(err) + defer publisher.Close() + + waitForConnections(require, 2) + + msg := publishTestMessage(require, publisher) + + time.Sleep(messageDelay) + + terminateConnections(require) + + // Simulate a channel failure, but registering the same queue with a different type + // this will cause an error + // + // Delete the queue after some time, which should re-establish the connection + time.Sleep(reconnectInterval / 2) + + conn, err := amqp.DialConfig(defaultUrl, amqp.Config{}) + require.NoError(err) + + defer conn.Close() + + channel, err := conn.Channel() + require.NoError(err) + + _, err = channel.QueueDelete("test", false, false, false) + require.NoError(err) + + _, err = channel.QueueDeclare( + "test", // name of the queue + true, // durable + false, // delete when unused + true, // exclusive + false, // noWait + nil, // arguments + ) + require.NoError(err) + + time.Sleep(reconnectInterval * 2) + + _, err = channel.QueueDelete("test", false, false, false) + require.NoError(err) + + waitForConnections(require, 2) + + publishTestMessage(require, publisher) + + time.Sleep(messageDelay) + + publishTestMessage(require, publisher) + + time.Sleep(messageDelay) + + require.Len(receivedMessages, 3) + + var receivedMsg TestMessage + + require.NoError(json.Unmarshal(receivedMessages[0].Body, &receivedMsg)) + require.Equal(msg, receivedMsg) +} diff --git a/integration/utils_test.go b/integration/utils_test.go new file mode 100644 index 0000000..df5b20c --- /dev/null +++ b/integration/utils_test.go @@ -0,0 +1,86 @@ +package integration_test + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/stretchr/testify/require" + "github.com/wagslane/go-rabbitmq" +) + +func waitForConnections(require *require.Assertions, expected int) { + maxWait := 30 + for i := 0; i < maxWait; i++ { + req, err := http.NewRequest(http.MethodGet, mgmtUrl+"/api/connections", http.NoBody) + require.NoError(err) + + req.SetBasicAuth(rabbitmqUser, rabbitmqPass) + + res, err := http.DefaultClient.Do(req) + require.NoError(err) + + defer res.Body.Close() + + require.Equal(200, res.StatusCode) + resBody, err := io.ReadAll(res.Body) + require.NoError(err) + + connections := []map[string]interface{}{} + require.NoError(json.Unmarshal(resBody, &connections)) + + if len(connections) == expected { + return + } + time.Sleep(1 * time.Second) + } + + require.Fail("waitForConnections timed out") +} + +func terminateConnections(require *require.Assertions) { + req, err := http.NewRequest(http.MethodGet, mgmtUrl+"/api/connections", http.NoBody) + require.NoError(err) + + req.SetBasicAuth(rabbitmqUser, rabbitmqPass) + + res, err := http.DefaultClient.Do(req) + require.NoError(err) + + defer res.Body.Close() + + require.Equal(200, res.StatusCode) + resBody, err := io.ReadAll(res.Body) + require.NoError(err) + + connections := []map[string]interface{}{} + require.NoError(json.Unmarshal(resBody, &connections)) + + require.Len(connections, 2) + + for _, connection := range connections { + name := connection["name"].(string) + fmt.Println("connection", name) + + req, err := http.NewRequest(http.MethodDelete, mgmtUrl+"/api/connections/"+name, http.NoBody) + require.NoError(err) + req.SetBasicAuth(rabbitmqUser, rabbitmqPass) + + res, err := http.DefaultClient.Do(req) + require.NoError(err) + require.Equal(204, res.StatusCode) + } +} + +func publishTestMessage(require *require.Assertions, publisher *rabbitmq.Publisher) TestMessage { + msg := TestMessage{Text: "Test"} + + msgBytes, err := json.Marshal(msg) + require.NoError(err) + + require.NoError(publisher.Publish(msgBytes, []string{"test"})) + + return msg +}