| @ -1,21 +0,0 @@ | |||||
| services: | |||||
| ready: # stub service to ensure compose up waits until rabbitmq is healthy | |||||
| image: "busybox" | |||||
| depends_on: | |||||
| rabbitmq: | |||||
| condition: service_healthy | |||||
| rabbitmq: | |||||
| image: "rabbitmq:3-management-alpine" | |||||
| environment: | |||||
| RABBITMQ_ERLANG_COOKIE: SWQOKODSQALRPCLNMEQG | |||||
| RABBITMQ_DEFAULT_USER: rabbitmq | |||||
| RABBITMQ_DEFAULT_PASS: wagslane | |||||
| ports: | |||||
| - "15672:15672" | |||||
| - "5672:5672" | |||||
| healthcheck: | |||||
| test: ["CMD-SHELL", "rabbitmq-diagnostics -q check_port_connectivity"] | |||||
| interval: 5s | |||||
| timeout: 5s | |||||
| retries: 5 | |||||
| @ -1,263 +0,0 @@ | |||||
| package integration_test | |||||
| import ( | |||||
| "encoding/json" | |||||
| "fmt" | |||||
| "math/rand" | |||||
| "testing" | |||||
| "time" | |||||
| "github.com/streadway/amqp" | |||||
| "github.com/stretchr/testify/require" | |||||
| "github.com/wagslane/go-rabbitmq" | |||||
| ) | |||||
| const ( | |||||
| rabbitmqUser = "rabbitmq" | |||||
| rabbitmqPass = "wagslane" | |||||
| mgmtUrl = "http://localhost:15672" | |||||
| ) | |||||
| var ( | |||||
| defaultUrl = fmt.Sprintf("amqp://%s:%s@localhost", rabbitmqUser, rabbitmqPass) | |||||
| messageDelay = 250 * time.Millisecond | |||||
| reconnectInterval = 1 * time.Second | |||||
| ) | |||||
| type TestMessage struct { | |||||
| Text string | |||||
| } | |||||
| func TestBasic(t *testing.T) { | |||||
| require := require.New(t) | |||||
| require.True(true) | |||||
| consumerConn, err := rabbitmq.NewConn( | |||||
| defaultUrl, | |||||
| rabbitmq.WithConnectionOptionsLogging, | |||||
| rabbitmq.WithConnectionOptionsReconnectInterval(reconnectInterval), | |||||
| ) | |||||
| require.NoError(err) | |||||
| defer consumerConn.Close() | |||||
| receivedMessages := []rabbitmq.Delivery{} | |||||
| exchange := fmt.Sprintf("test-%d", rand.Intn(10000)) | |||||
| consumer, err := rabbitmq.NewConsumer( | |||||
| consumerConn, | |||||
| func(d rabbitmq.Delivery) rabbitmq.Action { | |||||
| receivedMessages = append(receivedMessages, d) | |||||
| return rabbitmq.Ack | |||||
| }, | |||||
| "test", | |||||
| rabbitmq.WithConsumerOptionsRoutingKey("test"), | |||||
| rabbitmq.WithConsumerOptionsExchangeName(exchange), | |||||
| rabbitmq.WithConsumerOptionsExchangeDeclare, | |||||
| ) | |||||
| require.NoError(err) | |||||
| defer consumer.Close() | |||||
| pusblisherConn, err := rabbitmq.NewConn( | |||||
| defaultUrl, | |||||
| rabbitmq.WithConnectionOptionsLogging, | |||||
| rabbitmq.WithConnectionOptionsReconnectInterval(reconnectInterval), | |||||
| ) | |||||
| require.NoError(err) | |||||
| defer pusblisherConn.Close() | |||||
| publisher, err := rabbitmq.NewPublisher( | |||||
| pusblisherConn, | |||||
| rabbitmq.WithPublisherOptionsLogging, | |||||
| rabbitmq.WithPublisherOptionsExchangeName(exchange), | |||||
| rabbitmq.WithPublisherOptionsExchangeDeclare, | |||||
| ) | |||||
| require.NoError(err) | |||||
| defer publisher.Close() | |||||
| msg := publishTestMessage(require, publisher) | |||||
| publishTestMessage(require, publisher) | |||||
| time.Sleep(messageDelay) | |||||
| require.Len(receivedMessages, 2) | |||||
| var receivedMsg TestMessage | |||||
| require.NoError(json.Unmarshal(receivedMessages[0].Body, &receivedMsg)) | |||||
| require.Equal(msg, receivedMsg) | |||||
| } | |||||
| func TestBasicReconnect(t *testing.T) { | |||||
| require := require.New(t) | |||||
| require.True(true) | |||||
| consumerConn, err := rabbitmq.NewConn( | |||||
| defaultUrl, | |||||
| rabbitmq.WithConnectionOptionsLogging, | |||||
| rabbitmq.WithConnectionOptionsReconnectInterval(reconnectInterval), | |||||
| ) | |||||
| require.NoError(err) | |||||
| defer consumerConn.Close() | |||||
| receivedMessages := []rabbitmq.Delivery{} | |||||
| exchange := fmt.Sprintf("test-%d", rand.Intn(10000)) | |||||
| consumer, err := rabbitmq.NewConsumer( | |||||
| consumerConn, | |||||
| func(d rabbitmq.Delivery) rabbitmq.Action { | |||||
| receivedMessages = append(receivedMessages, d) | |||||
| return rabbitmq.Ack | |||||
| }, | |||||
| "test", | |||||
| rabbitmq.WithConsumerOptionsRoutingKey("test"), | |||||
| rabbitmq.WithConsumerOptionsExchangeName(exchange), | |||||
| rabbitmq.WithConsumerOptionsExchangeDeclare, | |||||
| ) | |||||
| require.NoError(err) | |||||
| defer consumer.Close() | |||||
| pusblisherConn, err := rabbitmq.NewConn( | |||||
| defaultUrl, | |||||
| rabbitmq.WithConnectionOptionsLogging, | |||||
| rabbitmq.WithConnectionOptionsReconnectInterval(reconnectInterval), | |||||
| ) | |||||
| require.NoError(err) | |||||
| defer pusblisherConn.Close() | |||||
| publisher, err := rabbitmq.NewPublisher( | |||||
| pusblisherConn, | |||||
| rabbitmq.WithPublisherOptionsLogging, | |||||
| rabbitmq.WithPublisherOptionsExchangeName(exchange), | |||||
| rabbitmq.WithPublisherOptionsExchangeDeclare, | |||||
| ) | |||||
| require.NoError(err) | |||||
| defer publisher.Close() | |||||
| waitForConnections(require, 2) | |||||
| msg := publishTestMessage(require, publisher) | |||||
| time.Sleep(messageDelay) | |||||
| terminateConnections(require) | |||||
| waitForConnections(require, 2) | |||||
| publishTestMessage(require, publisher) | |||||
| time.Sleep(messageDelay) | |||||
| require.Len(receivedMessages, 2) | |||||
| var receivedMsg TestMessage | |||||
| require.NoError(json.Unmarshal(receivedMessages[0].Body, &receivedMsg)) | |||||
| require.Equal(msg, receivedMsg) | |||||
| } | |||||
| func TestBasicReconnectBrokenChannel(t *testing.T) { | |||||
| require := require.New(t) | |||||
| require.True(true) | |||||
| consumerConn, err := rabbitmq.NewConn( | |||||
| defaultUrl, | |||||
| rabbitmq.WithConnectionOptionsLogging, | |||||
| rabbitmq.WithConnectionOptionsReconnectInterval(reconnectInterval), | |||||
| ) | |||||
| require.NoError(err) | |||||
| defer consumerConn.Close() | |||||
| receivedMessages := []rabbitmq.Delivery{} | |||||
| exchange := fmt.Sprintf("test-%d", rand.Intn(10000)) | |||||
| consumer, err := rabbitmq.NewConsumer( | |||||
| consumerConn, | |||||
| func(d rabbitmq.Delivery) rabbitmq.Action { | |||||
| receivedMessages = append(receivedMessages, d) | |||||
| return rabbitmq.Ack | |||||
| }, | |||||
| "test", | |||||
| rabbitmq.WithConsumerOptionsRoutingKey("test"), | |||||
| rabbitmq.WithConsumerOptionsExchangeName(exchange), | |||||
| rabbitmq.WithConsumerOptionsExchangeDeclare, | |||||
| ) | |||||
| require.NoError(err) | |||||
| defer consumer.Close() | |||||
| pusblisherConn, err := rabbitmq.NewConn( | |||||
| defaultUrl, | |||||
| rabbitmq.WithConnectionOptionsLogging, | |||||
| rabbitmq.WithConnectionOptionsReconnectInterval(reconnectInterval), | |||||
| ) | |||||
| require.NoError(err) | |||||
| defer pusblisherConn.Close() | |||||
| publisher, err := rabbitmq.NewPublisher( | |||||
| pusblisherConn, | |||||
| rabbitmq.WithPublisherOptionsLogging, | |||||
| rabbitmq.WithPublisherOptionsExchangeName(exchange), | |||||
| rabbitmq.WithPublisherOptionsExchangeDeclare, | |||||
| ) | |||||
| require.NoError(err) | |||||
| defer publisher.Close() | |||||
| waitForConnections(require, 2) | |||||
| msg := publishTestMessage(require, publisher) | |||||
| time.Sleep(messageDelay) | |||||
| terminateConnections(require) | |||||
| // Simulate a channel failure, but registering the same queue with a different type | |||||
| // this will cause an error | |||||
| // | |||||
| // Delete the queue after some time, which should re-establish the connection | |||||
| time.Sleep(reconnectInterval / 2) | |||||
| conn, err := amqp.DialConfig(defaultUrl, amqp.Config{}) | |||||
| require.NoError(err) | |||||
| defer conn.Close() | |||||
| channel, err := conn.Channel() | |||||
| require.NoError(err) | |||||
| _, err = channel.QueueDelete("test", false, false, false) | |||||
| require.NoError(err) | |||||
| _, err = channel.QueueDeclare( | |||||
| "test", // name of the queue | |||||
| true, // durable | |||||
| false, // delete when unused | |||||
| true, // exclusive | |||||
| false, // noWait | |||||
| nil, // arguments | |||||
| ) | |||||
| require.NoError(err) | |||||
| time.Sleep(reconnectInterval * 2) | |||||
| _, err = channel.QueueDelete("test", false, false, false) | |||||
| require.NoError(err) | |||||
| waitForConnections(require, 2) | |||||
| publishTestMessage(require, publisher) | |||||
| time.Sleep(messageDelay) | |||||
| publishTestMessage(require, publisher) | |||||
| time.Sleep(messageDelay) | |||||
| require.Len(receivedMessages, 3) | |||||
| var receivedMsg TestMessage | |||||
| require.NoError(json.Unmarshal(receivedMessages[0].Body, &receivedMsg)) | |||||
| require.Equal(msg, receivedMsg) | |||||
| } | |||||
| @ -1,86 +0,0 @@ | |||||
| package integration_test | |||||
| import ( | |||||
| "encoding/json" | |||||
| "fmt" | |||||
| "io" | |||||
| "net/http" | |||||
| "time" | |||||
| "github.com/stretchr/testify/require" | |||||
| "github.com/wagslane/go-rabbitmq" | |||||
| ) | |||||
| func waitForConnections(require *require.Assertions, expected int) { | |||||
| maxWait := 30 | |||||
| for i := 0; i < maxWait; i++ { | |||||
| req, err := http.NewRequest(http.MethodGet, mgmtUrl+"/api/connections", http.NoBody) | |||||
| require.NoError(err) | |||||
| req.SetBasicAuth(rabbitmqUser, rabbitmqPass) | |||||
| res, err := http.DefaultClient.Do(req) | |||||
| require.NoError(err) | |||||
| defer res.Body.Close() | |||||
| require.Equal(200, res.StatusCode) | |||||
| resBody, err := io.ReadAll(res.Body) | |||||
| require.NoError(err) | |||||
| connections := []map[string]interface{}{} | |||||
| require.NoError(json.Unmarshal(resBody, &connections)) | |||||
| if len(connections) == expected { | |||||
| return | |||||
| } | |||||
| time.Sleep(1 * time.Second) | |||||
| } | |||||
| require.Fail("waitForConnections timed out") | |||||
| } | |||||
| func terminateConnections(require *require.Assertions) { | |||||
| req, err := http.NewRequest(http.MethodGet, mgmtUrl+"/api/connections", http.NoBody) | |||||
| require.NoError(err) | |||||
| req.SetBasicAuth(rabbitmqUser, rabbitmqPass) | |||||
| res, err := http.DefaultClient.Do(req) | |||||
| require.NoError(err) | |||||
| defer res.Body.Close() | |||||
| require.Equal(200, res.StatusCode) | |||||
| resBody, err := io.ReadAll(res.Body) | |||||
| require.NoError(err) | |||||
| connections := []map[string]interface{}{} | |||||
| require.NoError(json.Unmarshal(resBody, &connections)) | |||||
| require.Len(connections, 2) | |||||
| for _, connection := range connections { | |||||
| name := connection["name"].(string) | |||||
| fmt.Println("connection", name) | |||||
| req, err := http.NewRequest(http.MethodDelete, mgmtUrl+"/api/connections/"+name, http.NoBody) | |||||
| require.NoError(err) | |||||
| req.SetBasicAuth(rabbitmqUser, rabbitmqPass) | |||||
| res, err := http.DefaultClient.Do(req) | |||||
| require.NoError(err) | |||||
| require.Equal(204, res.StatusCode) | |||||
| } | |||||
| } | |||||
| func publishTestMessage(require *require.Assertions, publisher *rabbitmq.Publisher) TestMessage { | |||||
| msg := TestMessage{Text: "Test"} | |||||
| msgBytes, err := json.Marshal(msg) | |||||
| require.NoError(err) | |||||
| require.NoError(publisher.Publish(msgBytes, []string{"test"})) | |||||
| return msg | |||||
| } | |||||
| @ -0,0 +1,6 @@ | |||||
| certs/* | |||||
| spec/spec | |||||
| examples/simple-consumer/simple-consumer | |||||
| examples/simple-producer/simple-producer | |||||
| .idea/ | |||||
| @ -0,0 +1,3 @@ | |||||
| run: | |||||
| build-tags: | |||||
| - integration | |||||
| @ -0,0 +1,249 @@ | |||||
| # Changelog | |||||
| ## [v1.6.1](https://github.com/rabbitmq/amqp091-go/tree/v1.6.1) (2023-02-01) | |||||
| [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.1-rc.2...v1.6.1) | |||||
| **Merged pull requests:** | |||||
| - Update Makefile targets related to RabbitMQ [\#163](https://github.com/rabbitmq/amqp091-go/pull/163) ([Zerpet](https://github.com/Zerpet)) | |||||
| ## [v1.6.1-rc.2](https://github.com/rabbitmq/amqp091-go/tree/v1.6.1-rc.2) (2023-01-31) | |||||
| [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.1-rc.1...v1.6.1-rc.2) | |||||
| **Merged pull requests:** | |||||
| - Do not overly protect writes [\#162](https://github.com/rabbitmq/amqp091-go/pull/162) ([lukebakken](https://github.com/lukebakken)) | |||||
| ## [v1.6.1-rc.1](https://github.com/rabbitmq/amqp091-go/tree/v1.6.1-rc.1) (2023-01-31) | |||||
| [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.0...v1.6.1-rc.1) | |||||
| **Closed issues:** | |||||
| - Calling Channel\(\) on an empty connection panics [\#148](https://github.com/rabbitmq/amqp091-go/issues/148) | |||||
| **Merged pull requests:** | |||||
| - Ensure flush happens and correctly lock connection for a series of unflushed writes [\#161](https://github.com/rabbitmq/amqp091-go/pull/161) ([lukebakken](https://github.com/lukebakken)) | |||||
| ## [v1.6.0](https://github.com/rabbitmq/amqp091-go/tree/v1.6.0) (2023-01-20) | |||||
| [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.5.0...v1.6.0) | |||||
| **Implemented enhancements:** | |||||
| - Add constants for Queue arguments [\#145](https://github.com/rabbitmq/amqp091-go/pull/145) ([Zerpet](https://github.com/Zerpet)) | |||||
| **Closed issues:** | |||||
| - README not up to date [\#154](https://github.com/rabbitmq/amqp091-go/issues/154) | |||||
| - Allow re-using default connection config \(custom properties\) [\#152](https://github.com/rabbitmq/amqp091-go/issues/152) | |||||
| - Rename package name to amqp in V2 [\#151](https://github.com/rabbitmq/amqp091-go/issues/151) | |||||
| - Helper types to declare quorum queues [\#144](https://github.com/rabbitmq/amqp091-go/issues/144) | |||||
| - Inefficient use of buffers reduces potential throughput for basicPublish with small messages. [\#141](https://github.com/rabbitmq/amqp091-go/issues/141) | |||||
| - bug, close cause panic [\#130](https://github.com/rabbitmq/amqp091-go/issues/130) | |||||
| - Publishing Headers are unable to store Table with slice values [\#125](https://github.com/rabbitmq/amqp091-go/issues/125) | |||||
| - Example client can deadlock in Close due to unconsumed confirmations [\#122](https://github.com/rabbitmq/amqp091-go/issues/122) | |||||
| - SAC not working properly [\#106](https://github.com/rabbitmq/amqp091-go/issues/106) | |||||
| **Merged pull requests:** | |||||
| - Add automatic CHANGELOG.md generation [\#158](https://github.com/rabbitmq/amqp091-go/pull/158) ([lukebakken](https://github.com/lukebakken)) | |||||
| - Supply library-defined props with NewConnectionProperties [\#157](https://github.com/rabbitmq/amqp091-go/pull/157) ([slagiewka](https://github.com/slagiewka)) | |||||
| - Fix linter warnings [\#156](https://github.com/rabbitmq/amqp091-go/pull/156) ([Zerpet](https://github.com/Zerpet)) | |||||
| - Remove outdated information from README [\#155](https://github.com/rabbitmq/amqp091-go/pull/155) ([scriptcoded](https://github.com/scriptcoded)) | |||||
| - Add example producer using DeferredConfirm [\#149](https://github.com/rabbitmq/amqp091-go/pull/149) ([Zerpet](https://github.com/Zerpet)) | |||||
| - Ensure code is formatted [\#147](https://github.com/rabbitmq/amqp091-go/pull/147) ([lukebakken](https://github.com/lukebakken)) | |||||
| - Fix inefficient use of buffers that reduces the potential throughput of basicPublish [\#142](https://github.com/rabbitmq/amqp091-go/pull/142) ([fadams](https://github.com/fadams)) | |||||
| - Do not embed context in DeferredConfirmation [\#140](https://github.com/rabbitmq/amqp091-go/pull/140) ([tie](https://github.com/tie)) | |||||
| - Add constant for default exchange [\#139](https://github.com/rabbitmq/amqp091-go/pull/139) ([marlongerson](https://github.com/marlongerson)) | |||||
| - Fix indentation and remove unnecessary instructions [\#138](https://github.com/rabbitmq/amqp091-go/pull/138) ([alraujo](https://github.com/alraujo)) | |||||
| - Remove unnecessary instruction [\#135](https://github.com/rabbitmq/amqp091-go/pull/135) ([alraujo](https://github.com/alraujo)) | |||||
| - Fix example client to avoid deadlock in Close [\#123](https://github.com/rabbitmq/amqp091-go/pull/123) ([Zerpet](https://github.com/Zerpet)) | |||||
| - Bump go.uber.org/goleak from 1.1.12 to 1.2.0 [\#116](https://github.com/rabbitmq/amqp091-go/pull/116) ([dependabot[bot]](https://github.com/apps/dependabot)) | |||||
| ## [v1.5.0](https://github.com/rabbitmq/amqp091-go/tree/v1.5.0) (2022-09-07) | |||||
| [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.4.0...v1.5.0) | |||||
| **Implemented enhancements:** | |||||
| - Provide a friendly way to set connection name [\#105](https://github.com/rabbitmq/amqp091-go/issues/105) | |||||
| **Closed issues:** | |||||
| - Support connection.update-secret [\#107](https://github.com/rabbitmq/amqp091-go/issues/107) | |||||
| - Example Client: Implementation of a Consumer with reconnection support [\#40](https://github.com/rabbitmq/amqp091-go/issues/40) | |||||
| **Merged pull requests:** | |||||
| - use PublishWithContext instead of Publish [\#115](https://github.com/rabbitmq/amqp091-go/pull/115) ([Gsantomaggio](https://github.com/Gsantomaggio)) | |||||
| - Add support for connection.update-secret [\#114](https://github.com/rabbitmq/amqp091-go/pull/114) ([Zerpet](https://github.com/Zerpet)) | |||||
| - Remove warning on RabbitMQ tutorials in go [\#113](https://github.com/rabbitmq/amqp091-go/pull/113) ([ChunyiLyu](https://github.com/ChunyiLyu)) | |||||
| - Update AMQP Spec [\#110](https://github.com/rabbitmq/amqp091-go/pull/110) ([Zerpet](https://github.com/Zerpet)) | |||||
| - Add an example of reliable consumer [\#109](https://github.com/rabbitmq/amqp091-go/pull/109) ([Zerpet](https://github.com/Zerpet)) | |||||
| - Add convenience function to set connection name [\#108](https://github.com/rabbitmq/amqp091-go/pull/108) ([Zerpet](https://github.com/Zerpet)) | |||||
| ## [v1.4.0](https://github.com/rabbitmq/amqp091-go/tree/v1.4.0) (2022-07-19) | |||||
| [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.3.4...v1.4.0) | |||||
| **Closed issues:** | |||||
| - target machine actively refused connection [\#99](https://github.com/rabbitmq/amqp091-go/issues/99) | |||||
| - 504 channel/connection is not open error occurred in multiple connection with same rabbitmq service [\#97](https://github.com/rabbitmq/amqp091-go/issues/97) | |||||
| - Add possible cancel of DeferredConfirmation [\#92](https://github.com/rabbitmq/amqp091-go/issues/92) | |||||
| - Documentation [\#89](https://github.com/rabbitmq/amqp091-go/issues/89) | |||||
| - Channel Close gets stuck after closing a connection \(via management UI\) [\#88](https://github.com/rabbitmq/amqp091-go/issues/88) | |||||
| - this library has same issue [\#83](https://github.com/rabbitmq/amqp091-go/issues/83) | |||||
| - Provide a logging interface [\#81](https://github.com/rabbitmq/amqp091-go/issues/81) | |||||
| - 1.4.0 release checklist [\#77](https://github.com/rabbitmq/amqp091-go/issues/77) | |||||
| - Data race in the client example [\#72](https://github.com/rabbitmq/amqp091-go/issues/72) | |||||
| - reader go routine hangs and leaks when Connection.Close\(\) is called multiple times [\#69](https://github.com/rabbitmq/amqp091-go/issues/69) | |||||
| - Support auto-reconnect and cluster [\#65](https://github.com/rabbitmq/amqp091-go/issues/65) | |||||
| - Connection/Channel Deadlock [\#32](https://github.com/rabbitmq/amqp091-go/issues/32) | |||||
| - Closing connection and/or channel hangs NotifyPublish is used [\#21](https://github.com/rabbitmq/amqp091-go/issues/21) | |||||
| - Consumer channel isn't closed in the event of unexpected disconnection [\#18](https://github.com/rabbitmq/amqp091-go/issues/18) | |||||
| **Merged pull requests:** | |||||
| - fix race condition with context close and confirm at the same time on DeferredConfirmation. [\#101](https://github.com/rabbitmq/amqp091-go/pull/101) ([sapk](https://github.com/sapk)) | |||||
| - Add build TLS config from URI [\#98](https://github.com/rabbitmq/amqp091-go/pull/98) ([reddec](https://github.com/reddec)) | |||||
| - Use context for Publish methods [\#96](https://github.com/rabbitmq/amqp091-go/pull/96) ([sapk](https://github.com/sapk)) | |||||
| - Added function to get the remote peer's IP address \(conn.RemoteAddr\(\)\) [\#95](https://github.com/rabbitmq/amqp091-go/pull/95) ([rabb1t](https://github.com/rabb1t)) | |||||
| - Update connection documentation [\#90](https://github.com/rabbitmq/amqp091-go/pull/90) ([Zerpet](https://github.com/Zerpet)) | |||||
| - Revert test to demonstrate actual bug [\#87](https://github.com/rabbitmq/amqp091-go/pull/87) ([lukebakken](https://github.com/lukebakken)) | |||||
| - Minor improvements to examples [\#86](https://github.com/rabbitmq/amqp091-go/pull/86) ([lukebakken](https://github.com/lukebakken)) | |||||
| - Do not skip flaky test in CI [\#85](https://github.com/rabbitmq/amqp091-go/pull/85) ([lukebakken](https://github.com/lukebakken)) | |||||
| - Add logging [\#84](https://github.com/rabbitmq/amqp091-go/pull/84) ([lukebakken](https://github.com/lukebakken)) | |||||
| - Add a win32 build [\#82](https://github.com/rabbitmq/amqp091-go/pull/82) ([lukebakken](https://github.com/lukebakken)) | |||||
| - channel: return nothing instead of always a nil-error in receive methods [\#80](https://github.com/rabbitmq/amqp091-go/pull/80) ([fho](https://github.com/fho)) | |||||
| - update the contributing & readme files, improve makefile [\#79](https://github.com/rabbitmq/amqp091-go/pull/79) ([fho](https://github.com/fho)) | |||||
| - Fix lint errors [\#78](https://github.com/rabbitmq/amqp091-go/pull/78) ([lukebakken](https://github.com/lukebakken)) | |||||
| - ci: run golangci-lint [\#76](https://github.com/rabbitmq/amqp091-go/pull/76) ([fho](https://github.com/fho)) | |||||
| - ci: run test via make & remove travis CI config [\#75](https://github.com/rabbitmq/amqp091-go/pull/75) ([fho](https://github.com/fho)) | |||||
| - ci: run tests with race detector [\#74](https://github.com/rabbitmq/amqp091-go/pull/74) ([fho](https://github.com/fho)) | |||||
| - Detect go routine leaks in integration testcases [\#73](https://github.com/rabbitmq/amqp091-go/pull/73) ([fho](https://github.com/fho)) | |||||
| - connection: fix: reader go-routine is leaked on connection close [\#70](https://github.com/rabbitmq/amqp091-go/pull/70) ([fho](https://github.com/fho)) | |||||
| - adding best practises for NotifyPublish for issue\_21 scenario [\#68](https://github.com/rabbitmq/amqp091-go/pull/68) ([DanielePalaia](https://github.com/DanielePalaia)) | |||||
| - Update Go version [\#67](https://github.com/rabbitmq/amqp091-go/pull/67) ([Zerpet](https://github.com/Zerpet)) | |||||
| - Regenerate certs with SHA256 to fix test with Go 1.18+ [\#66](https://github.com/rabbitmq/amqp091-go/pull/66) ([anthonyfok](https://github.com/anthonyfok)) | |||||
| ## [v1.3.4](https://github.com/rabbitmq/amqp091-go/tree/v1.3.4) (2022-04-01) | |||||
| [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.3.3...v1.3.4) | |||||
| **Merged pull requests:** | |||||
| - bump version to 1.3.4 [\#63](https://github.com/rabbitmq/amqp091-go/pull/63) ([DanielePalaia](https://github.com/DanielePalaia)) | |||||
| - updating doc [\#62](https://github.com/rabbitmq/amqp091-go/pull/62) ([DanielePalaia](https://github.com/DanielePalaia)) | |||||
| ## [v1.3.3](https://github.com/rabbitmq/amqp091-go/tree/v1.3.3) (2022-04-01) | |||||
| [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.3.2...v1.3.3) | |||||
| **Closed issues:** | |||||
| - Add Client Version [\#49](https://github.com/rabbitmq/amqp091-go/issues/49) | |||||
| - OpenTelemetry Propagation [\#22](https://github.com/rabbitmq/amqp091-go/issues/22) | |||||
| **Merged pull requests:** | |||||
| - bump buildVersion for release [\#61](https://github.com/rabbitmq/amqp091-go/pull/61) ([DanielePalaia](https://github.com/DanielePalaia)) | |||||
| - adding documentation for notifyClose best pratices [\#60](https://github.com/rabbitmq/amqp091-go/pull/60) ([DanielePalaia](https://github.com/DanielePalaia)) | |||||
| - adding documentation on NotifyClose of connection and channel to enfo… [\#59](https://github.com/rabbitmq/amqp091-go/pull/59) ([DanielePalaia](https://github.com/DanielePalaia)) | |||||
| ## [v1.3.2](https://github.com/rabbitmq/amqp091-go/tree/v1.3.2) (2022-03-28) | |||||
| [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.3.1...v1.3.2) | |||||
| **Closed issues:** | |||||
| - Potential race condition in Connection module [\#31](https://github.com/rabbitmq/amqp091-go/issues/31) | |||||
| **Merged pull requests:** | |||||
| - bump versioning to 1.3.2 [\#58](https://github.com/rabbitmq/amqp091-go/pull/58) ([DanielePalaia](https://github.com/DanielePalaia)) | |||||
| ## [v1.3.1](https://github.com/rabbitmq/amqp091-go/tree/v1.3.1) (2022-03-25) | |||||
| [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.3.0...v1.3.1) | |||||
| **Closed issues:** | |||||
| - Possible deadlock on DeferredConfirmation.Wait\(\) [\#46](https://github.com/rabbitmq/amqp091-go/issues/46) | |||||
| - Call to Delivery.Ack blocks indefinitely in case of disconnection [\#19](https://github.com/rabbitmq/amqp091-go/issues/19) | |||||
| - Unexpacted behavor of channel.IsClosed\(\) [\#14](https://github.com/rabbitmq/amqp091-go/issues/14) | |||||
| - A possible dead lock in connection close notification Go channel [\#11](https://github.com/rabbitmq/amqp091-go/issues/11) | |||||
| **Merged pull requests:** | |||||
| - These ones were the ones testing Open scenarios. The issue is that Op… [\#57](https://github.com/rabbitmq/amqp091-go/pull/57) ([DanielePalaia](https://github.com/DanielePalaia)) | |||||
| - changing defaultVersion to buildVersion and create a simple change\_ve… [\#54](https://github.com/rabbitmq/amqp091-go/pull/54) ([DanielePalaia](https://github.com/DanielePalaia)) | |||||
| - adding integration test for issue 11 [\#50](https://github.com/rabbitmq/amqp091-go/pull/50) ([DanielePalaia](https://github.com/DanielePalaia)) | |||||
| - Remove the old link product [\#48](https://github.com/rabbitmq/amqp091-go/pull/48) ([Gsantomaggio](https://github.com/Gsantomaggio)) | |||||
| - Fix deadlock on DeferredConfirmations [\#47](https://github.com/rabbitmq/amqp091-go/pull/47) ([SpencerTorres](https://github.com/SpencerTorres)) | |||||
| - Example client: Rename Stream\(\) to Consume\(\) to avoid confusion with RabbitMQ streams [\#39](https://github.com/rabbitmq/amqp091-go/pull/39) ([andygrunwald](https://github.com/andygrunwald)) | |||||
| - Example client: Rename `name` to `queueName` to make the usage clear and explicit [\#38](https://github.com/rabbitmq/amqp091-go/pull/38) ([andygrunwald](https://github.com/andygrunwald)) | |||||
| - Client example: Renamed concept "Session" to "Client" [\#37](https://github.com/rabbitmq/amqp091-go/pull/37) ([andygrunwald](https://github.com/andygrunwald)) | |||||
| - delete unuseful code [\#36](https://github.com/rabbitmq/amqp091-go/pull/36) ([liutaot](https://github.com/liutaot)) | |||||
| - Client Example: Fix closing order [\#35](https://github.com/rabbitmq/amqp091-go/pull/35) ([andygrunwald](https://github.com/andygrunwald)) | |||||
| - Client example: Use instance logger instead of global logger [\#34](https://github.com/rabbitmq/amqp091-go/pull/34) ([andygrunwald](https://github.com/andygrunwald)) | |||||
| ## [v1.3.0](https://github.com/rabbitmq/amqp091-go/tree/v1.3.0) (2022-01-13) | |||||
| [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.2.0...v1.3.0) | |||||
| **Closed issues:** | |||||
| - documentation of changes triggering version updates [\#29](https://github.com/rabbitmq/amqp091-go/issues/29) | |||||
| - Persistent messages folder [\#27](https://github.com/rabbitmq/amqp091-go/issues/27) | |||||
| **Merged pull requests:** | |||||
| - Expose a method to enable out-of-order Publisher Confirms [\#33](https://github.com/rabbitmq/amqp091-go/pull/33) ([benmoss](https://github.com/benmoss)) | |||||
| - Fix Signed 8-bit headers being treated as unsigned [\#26](https://github.com/rabbitmq/amqp091-go/pull/26) ([alex-goodisman](https://github.com/alex-goodisman)) | |||||
| ## [v1.2.0](https://github.com/rabbitmq/amqp091-go/tree/v1.2.0) (2021-11-17) | |||||
| [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.1.0...v1.2.0) | |||||
| **Closed issues:** | |||||
| - No access to this vhost [\#24](https://github.com/rabbitmq/amqp091-go/issues/24) | |||||
| - copyright issue? [\#12](https://github.com/rabbitmq/amqp091-go/issues/12) | |||||
| - A possible dead lock when publishing message with confirmation [\#10](https://github.com/rabbitmq/amqp091-go/issues/10) | |||||
| - Semver release [\#7](https://github.com/rabbitmq/amqp091-go/issues/7) | |||||
| **Merged pull requests:** | |||||
| - Fix deadlock between publishing and receiving confirms [\#25](https://github.com/rabbitmq/amqp091-go/pull/25) ([benmoss](https://github.com/benmoss)) | |||||
| - Add GetNextPublishSeqNo for channel in confirm mode [\#23](https://github.com/rabbitmq/amqp091-go/pull/23) ([kamal-github](https://github.com/kamal-github)) | |||||
| - Added support for cert-only login without user and password [\#20](https://github.com/rabbitmq/amqp091-go/pull/20) ([mihaitodor](https://github.com/mihaitodor)) | |||||
| ## [v1.1.0](https://github.com/rabbitmq/amqp091-go/tree/v1.1.0) (2021-09-21) | |||||
| [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/ebd83429aa8cb06fa569473f623e87675f96d3a9...v1.1.0) | |||||
| **Closed issues:** | |||||
| - AMQPLAIN authentication does not work [\#15](https://github.com/rabbitmq/amqp091-go/issues/15) | |||||
| **Merged pull requests:** | |||||
| - Fix AMQPLAIN authentication mechanism [\#16](https://github.com/rabbitmq/amqp091-go/pull/16) ([hodbn](https://github.com/hodbn)) | |||||
| - connection: clarify documented behavior of NotifyClose [\#13](https://github.com/rabbitmq/amqp091-go/pull/13) ([pabigot](https://github.com/pabigot)) | |||||
| - Add a link to pkg.go.dev API docs [\#9](https://github.com/rabbitmq/amqp091-go/pull/9) ([benmoss](https://github.com/benmoss)) | |||||
| - add test go version 1.16.x and 1.17.x [\#8](https://github.com/rabbitmq/amqp091-go/pull/8) ([k4n4ry](https://github.com/k4n4ry)) | |||||
| - fix typos [\#6](https://github.com/rabbitmq/amqp091-go/pull/6) ([h44z](https://github.com/h44z)) | |||||
| - Heartbeat interval should be timeout/2 [\#5](https://github.com/rabbitmq/amqp091-go/pull/5) ([ifo20](https://github.com/ifo20)) | |||||
| - Exporting Channel State [\#4](https://github.com/rabbitmq/amqp091-go/pull/4) ([eibrunorodrigues](https://github.com/eibrunorodrigues)) | |||||
| - Add codeql analysis [\#3](https://github.com/rabbitmq/amqp091-go/pull/3) ([MirahImage](https://github.com/MirahImage)) | |||||
| - Add PR github action. [\#2](https://github.com/rabbitmq/amqp091-go/pull/2) ([MirahImage](https://github.com/MirahImage)) | |||||
| - Update Copyright Statement [\#1](https://github.com/rabbitmq/amqp091-go/pull/1) ([rlewis24](https://github.com/rlewis24)) | |||||
| \* *This Changelog was automatically generated by [github_changelog_generator](https://github.com/github-changelog-generator/github-changelog-generator)* | |||||
| @ -0,0 +1,77 @@ | |||||
| # Contributor Covenant Code of Conduct | |||||
| ## Our Pledge | |||||
| In the interest of fostering an open and welcoming environment, we as | |||||
| contributors and maintainers pledge to making participation in RabbitMQ Operator project and | |||||
| our community a harassment-free experience for everyone, regardless of age, body | |||||
| size, disability, ethnicity, sex characteristics, gender identity and expression, | |||||
| level of experience, education, socio-economic status, nationality, personal | |||||
| appearance, race, religion, or sexual identity and orientation. | |||||
| ## Our Standards | |||||
| Examples of behavior that contributes to creating a positive environment | |||||
| include: | |||||
| * Using welcoming and inclusive language | |||||
| * Being respectful of differing viewpoints and experiences | |||||
| * Gracefully accepting constructive criticism | |||||
| * Focusing on what is best for the community | |||||
| * Showing empathy towards other community members | |||||
| Examples of unacceptable behavior by participants include: | |||||
| * The use of sexualized language or imagery and unwelcome sexual attention or | |||||
| advances | |||||
| * Trolling, insulting/derogatory comments, and personal or political attacks | |||||
| * Public or private harassment | |||||
| * Publishing others' private information, such as a physical or electronic | |||||
| address, without explicit permission | |||||
| * Other conduct which could reasonably be considered inappropriate in a | |||||
| professional setting | |||||
| ## Our Responsibilities | |||||
| Project maintainers are responsible for clarifying the standards of acceptable | |||||
| behavior and are expected to take appropriate and fair corrective action in | |||||
| response to any instances of unacceptable behavior. | |||||
| Project maintainers have the right and responsibility to remove, edit, or | |||||
| reject comments, commits, code, wiki edits, issues, and other contributions | |||||
| that are not aligned to this Code of Conduct, or to ban temporarily or | |||||
| permanently any contributor for other behaviors that they deem inappropriate, | |||||
| threatening, offensive, or harmful. | |||||
| ## Scope | |||||
| This Code of Conduct applies both within project spaces and in public spaces | |||||
| when an individual is representing the project or its community. Examples of | |||||
| representing a project or community include using an official project e-mail | |||||
| address, posting via an official social media account, or acting as an appointed | |||||
| representative at an online or offline event. Representation of a project may be | |||||
| further defined and clarified by project maintainers. | |||||
| ## Enforcement | |||||
| Instances of abusive, harassing, or otherwise unacceptable behavior may be | |||||
| reported by contacting the project team at oss-coc@vmware.com. All | |||||
| complaints will be reviewed and investigated and will result in a response that | |||||
| is deemed necessary and appropriate to the circumstances. The project team is | |||||
| obligated to maintain confidentiality with regard to the reporter of an incident. | |||||
| Further details of specific enforcement policies may be posted separately. | |||||
| Project maintainers who do not follow or enforce the Code of Conduct in good | |||||
| faith may face temporary or permanent repercussions as determined by other | |||||
| members of the project's leadership. | |||||
| ## Attribution | |||||
| This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, | |||||
| available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html | |||||
| [homepage]: https://www.contributor-covenant.org | |||||
| For answers to common questions about this code of conduct, see | |||||
| https://www.contributor-covenant.org/faq | |||||
| @ -0,0 +1,48 @@ | |||||
| # Contributing | |||||
| ## Workflow | |||||
| Here is the recommended workflow: | |||||
| 1. Fork this repository, **github.com/rabbitmq/amqp091-go** | |||||
| 1. Create your feature branch (`git checkout -b my-new-feature`) | |||||
| 1. Run Static Checks | |||||
| 1. Run integration tests (see below) | |||||
| 1. **Implement tests** | |||||
| 1. Implement fixs | |||||
| 1. Commit your changes (`git commit -am 'Add some feature'`) | |||||
| 1. Push to a branch (`git push -u origin my-new-feature`) | |||||
| 1. Submit a pull request | |||||
| ## Running Static Checks | |||||
| golangci-lint must be installed to run the static checks. See [installation | |||||
| docs](https://golangci-lint.run/usage/install/) for more information. | |||||
| The static checks can be run via: | |||||
| ```shell | |||||
| make checks | |||||
| ``` | |||||
| ## Running Tests | |||||
| ### Integration Tests | |||||
| Running the Integration tests require: | |||||
| * A running RabbitMQ node with all defaults: | |||||
| [https://www.rabbitmq.com/download.html](https://www.rabbitmq.com/download.html) | |||||
| * That the server is either reachable via `amqp://guest:guest@127.0.0.1:5672/` | |||||
| or the environment variable `AMQP_URL` set to it's URL | |||||
| (e.g.: `export AMQP_URL="amqp://guest:verysecretpasswd@rabbitmq-host:5772/`) | |||||
| The integration tests can be run via: | |||||
| ```shell | |||||
| make tests | |||||
| ``` | |||||
| All integration tests should use the `integrationConnection(...)` test | |||||
| helpers defined in `integration_test.go` to setup the integration environment | |||||
| and logging. | |||||
| @ -0,0 +1,25 @@ | |||||
| AMQP 0-9-1 Go Client | |||||
| Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. | |||||
| Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. | |||||
| Redistribution and use in source and binary forms, with or without | |||||
| modification, are permitted provided that the following conditions are met: | |||||
| Redistributions of source code must retain the above copyright notice, this | |||||
| list of conditions and the following disclaimer. | |||||
| Redistributions in binary form must reproduce the above copyright notice, this | |||||
| list of conditions and the following disclaimer in the documentation and/or | |||||
| other materials provided with the distribution. | |||||
| THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND | |||||
| ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED | |||||
| WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | |||||
| DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE | |||||
| FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | |||||
| DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR | |||||
| SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER | |||||
| CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, | |||||
| OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | |||||
| OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |||||
| @ -0,0 +1,36 @@ | |||||
| .DEFAULT_GOAL := list | |||||
| # Insert a comment starting with '##' after a target, and it will be printed by 'make' and 'make list' | |||||
| .PHONY: list | |||||
| list: ## list Makefile targets | |||||
| @echo "The most used targets: \n" | |||||
| @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}' | |||||
| .PHONY: check-fmt | |||||
| check-fmt: ## Ensure code is formatted | |||||
| gofmt -l -d . # For the sake of debugging | |||||
| test -z "$$(gofmt -l .)" | |||||
| .PHONY: fmt | |||||
| fmt: ## Run go fmt against code | |||||
| go fmt ./... | |||||
| .PHONY: tests | |||||
| tests: ## Run all tests and requires a running rabbitmq-server. Use GO_TEST_FLAGS to add extra flags to go test | |||||
| go test -race -v -tags integration $(GO_TEST_FLAGS) | |||||
| .PHONY: check | |||||
| check: | |||||
| golangci-lint run ./... | |||||
| CONTAINER_NAME ?= amqp091-go-rabbitmq | |||||
| .PHONY: rabbitmq-server | |||||
| rabbitmq-server: ## Start a RabbitMQ server using Docker. Container name can be customised with CONTAINER_NAME=some-rabbit | |||||
| docker run --detach --rm --name $(CONTAINER_NAME) \ | |||||
| --publish 5672:5672 --publish 15672:15672 \ | |||||
| --pull always rabbitmq:3-management | |||||
| .PHONY: stop-rabbitmq-server | |||||
| stop-rabbitmq-server: ## Stop a RabbitMQ server using Docker. Container name can be customised with CONTAINER_NAME=some-rabbit | |||||
| docker stop $(CONTAINER_NAME) | |||||
| @ -0,0 +1,105 @@ | |||||
| # Go RabbitMQ Client Library | |||||
| [](https://github.com/rabbitmq/amqp091-go/actions/workflows/tests.yml) | |||||
| [](https://pkg.go.dev/github.com/rabbitmq/amqp091-go) | |||||
| [](https://goreportcard.com/report/github.com/rabbitmq/amqp091-go) | |||||
| This is a Go AMQP 0.9.1 client maintained by the [RabbitMQ core team](https://github.com/rabbitmq). | |||||
| It was [originally developed by Sean Treadway](https://github.com/streadway/amqp). | |||||
| ## Differences from streadway/amqp | |||||
| Some things are different compared to the original client, | |||||
| others haven't changed. | |||||
| ### Package Name | |||||
| This library uses a different package name. If moving from `streadway/amqp`, | |||||
| using an alias may reduce the number of changes needed: | |||||
| ``` go | |||||
| amqp "github.com/rabbitmq/amqp091-go" | |||||
| ``` | |||||
| ### License | |||||
| This client uses the same 2-clause BSD license as the original project. | |||||
| ### Public API Evolution | |||||
| This client retains key API elements as practically possible. | |||||
| It is, however, open to reasonable breaking public API changes suggested by the community. | |||||
| We don't have the "no breaking public API changes ever" rule and fully recognize | |||||
| that a good client API evolves over time. | |||||
| ## Project Maturity | |||||
| This project is based on a mature Go client that's been around for over a decade. | |||||
| ## Supported Go Versions | |||||
| This client supports two most recent Go release series. | |||||
| ## Supported RabbitMQ Versions | |||||
| This project supports RabbitMQ versions starting with `2.0` but primarily tested | |||||
| against [currently supported RabbitMQ release series](https://www.rabbitmq.com/versions.html). | |||||
| Some features and behaviours may be server version-specific. | |||||
| ## Goals | |||||
| Provide a functional interface that closely represents the AMQP 0.9.1 model | |||||
| targeted to RabbitMQ as a server. This includes the minimum necessary to | |||||
| interact the semantics of the protocol. | |||||
| ## Non-goals | |||||
| Things not intended to be supported. | |||||
| * Auto reconnect and re-synchronization of client and server topologies. | |||||
| * Reconnection would require understanding the error paths when the | |||||
| topology cannot be declared on reconnect. This would require a new set | |||||
| of types and code paths that are best suited at the call-site of this | |||||
| package. AMQP has a dynamic topology that needs all peers to agree. If | |||||
| this doesn't happen, the behavior is undefined. Instead of producing a | |||||
| possible interface with undefined behavior, this package is designed to | |||||
| be simple for the caller to implement the necessary connection-time | |||||
| topology declaration so that reconnection is trivial and encapsulated in | |||||
| the caller's application code. | |||||
| * AMQP Protocol negotiation for forward or backward compatibility. | |||||
| * 0.9.1 is stable and widely deployed. AMQP 1.0 is a divergent | |||||
| specification (a different protocol) and belongs to a different library. | |||||
| * Anything other than PLAIN and EXTERNAL authentication mechanisms. | |||||
| * Keeping the mechanisms interface modular makes it possible to extend | |||||
| outside of this package. If other mechanisms prove to be popular, then | |||||
| we would accept patches to include them in this package. | |||||
| * Support for [`basic.return` and `basic.ack` frame ordering](https://www.rabbitmq.com/confirms.html#when-publishes-are-confirmed). | |||||
| This client uses Go channels for certain protocol events and ordering between | |||||
| events sent to two different channels generally cannot be guaranteed. | |||||
| ## Usage | |||||
| See the [_examples](_examples) subdirectory for simple producers and consumers executables. | |||||
| If you have a use-case in mind which isn't well-represented by the examples, | |||||
| please file an issue. | |||||
| ## Documentation | |||||
| * [Godoc API reference](http://godoc.org/github.com/rabbitmq/amqp091-go) | |||||
| * [RabbitMQ tutorials in Go](https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/go) | |||||
| ## Contributing | |||||
| Pull requests are very much welcomed. Create your pull request on a non-main | |||||
| branch, make sure a test or example is included that covers your change, and | |||||
| your commits represent coherent changes that include a reason for the change. | |||||
| See [CONTRIBUTING.md](CONTRIBUTING.md) for more information. | |||||
| ## License | |||||
| BSD 2 clause, see LICENSE for more details. | |||||
| @ -0,0 +1,5 @@ | |||||
| ## Changelog Generation | |||||
| ``` | |||||
| github_changelog_generator --token GITHUB-TOKEN -u rabbitmq -p amqp091-go --no-unreleased --release-branch main | |||||
| ``` | |||||
| @ -0,0 +1,111 @@ | |||||
| // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. | |||||
| // Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. | |||||
| // Use of this source code is governed by a BSD-style | |||||
| // license that can be found in the LICENSE file. | |||||
| package amqp091 | |||||
| import ( | |||||
| "bytes" | |||||
| "fmt" | |||||
| "math/big" | |||||
| ) | |||||
| const ( | |||||
| free = 0 | |||||
| allocated = 1 | |||||
| ) | |||||
| // allocator maintains a bitset of allocated numbers. | |||||
| type allocator struct { | |||||
| pool *big.Int | |||||
| last int | |||||
| low int | |||||
| high int | |||||
| } | |||||
| // NewAllocator reserves and frees integers out of a range between low and | |||||
| // high. | |||||
| // | |||||
| // O(N) worst case space used, where N is maximum allocated, divided by | |||||
| // sizeof(big.Word) | |||||
| func newAllocator(low, high int) *allocator { | |||||
| return &allocator{ | |||||
| pool: big.NewInt(0), | |||||
| last: low, | |||||
| low: low, | |||||
| high: high, | |||||
| } | |||||
| } | |||||
| // String returns a string describing the contents of the allocator like | |||||
| // "allocator[low..high] reserved..until" | |||||
| // | |||||
| // O(N) where N is high-low | |||||
| func (a allocator) String() string { | |||||
| b := &bytes.Buffer{} | |||||
| fmt.Fprintf(b, "allocator[%d..%d]", a.low, a.high) | |||||
| for low := a.low; low <= a.high; low++ { | |||||
| high := low | |||||
| for a.reserved(high) && high <= a.high { | |||||
| high++ | |||||
| } | |||||
| if high > low+1 { | |||||
| fmt.Fprintf(b, " %d..%d", low, high-1) | |||||
| } else if high > low { | |||||
| fmt.Fprintf(b, " %d", high-1) | |||||
| } | |||||
| low = high | |||||
| } | |||||
| return b.String() | |||||
| } | |||||
| // Next reserves and returns the next available number out of the range between | |||||
| // low and high. If no number is available, false is returned. | |||||
| // | |||||
| // O(N) worst case runtime where N is allocated, but usually O(1) due to a | |||||
| // rolling index into the oldest allocation. | |||||
| func (a *allocator) next() (int, bool) { | |||||
| wrapped := a.last | |||||
| // Find trailing bit | |||||
| for ; a.last <= a.high; a.last++ { | |||||
| if a.reserve(a.last) { | |||||
| return a.last, true | |||||
| } | |||||
| } | |||||
| // Find preceding free'd pool | |||||
| a.last = a.low | |||||
| for ; a.last < wrapped; a.last++ { | |||||
| if a.reserve(a.last) { | |||||
| return a.last, true | |||||
| } | |||||
| } | |||||
| return 0, false | |||||
| } | |||||
| // reserve claims the bit if it is not already claimed, returning true if | |||||
| // successfully claimed. | |||||
| func (a *allocator) reserve(n int) bool { | |||||
| if a.reserved(n) { | |||||
| return false | |||||
| } | |||||
| a.pool.SetBit(a.pool, n-a.low, allocated) | |||||
| return true | |||||
| } | |||||
| // reserved returns true if the integer has been allocated | |||||
| func (a *allocator) reserved(n int) bool { | |||||
| return a.pool.Bit(n-a.low) == allocated | |||||
| } | |||||
| // release frees the use of the number for another allocation | |||||
| func (a *allocator) release(n int) { | |||||
| a.pool.SetBit(a.pool, n-a.low, free) | |||||
| } | |||||
| @ -0,0 +1,83 @@ | |||||
| // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. | |||||
| // Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. | |||||
| // Use of this source code is governed by a BSD-style | |||||
| // license that can be found in the LICENSE file. | |||||
| package amqp091 | |||||
| import ( | |||||
| "bytes" | |||||
| "fmt" | |||||
| ) | |||||
| // Authentication interface provides a means for different SASL authentication | |||||
| // mechanisms to be used during connection tuning. | |||||
| type Authentication interface { | |||||
| Mechanism() string | |||||
| Response() string | |||||
| } | |||||
| // PlainAuth is a similar to Basic Auth in HTTP. | |||||
| type PlainAuth struct { | |||||
| Username string | |||||
| Password string | |||||
| } | |||||
| // Mechanism returns "PLAIN" | |||||
| func (auth *PlainAuth) Mechanism() string { | |||||
| return "PLAIN" | |||||
| } | |||||
| // Response returns the null character delimited encoding for the SASL PLAIN Mechanism. | |||||
| func (auth *PlainAuth) Response() string { | |||||
| return fmt.Sprintf("\000%s\000%s", auth.Username, auth.Password) | |||||
| } | |||||
| // AMQPlainAuth is similar to PlainAuth | |||||
| type AMQPlainAuth struct { | |||||
| Username string | |||||
| Password string | |||||
| } | |||||
| // Mechanism returns "AMQPLAIN" | |||||
| func (auth *AMQPlainAuth) Mechanism() string { | |||||
| return "AMQPLAIN" | |||||
| } | |||||
| // Response returns an AMQP encoded credentials table, without the field table size. | |||||
| func (auth *AMQPlainAuth) Response() string { | |||||
| var buf bytes.Buffer | |||||
| table := Table{"LOGIN": auth.Username, "PASSWORD": auth.Password} | |||||
| if err := writeTable(&buf, table); err != nil { | |||||
| return "" | |||||
| } | |||||
| return buf.String()[4:] | |||||
| } | |||||
| // ExternalAuth for RabbitMQ-auth-mechanism-ssl. | |||||
| type ExternalAuth struct { | |||||
| } | |||||
| // Mechanism returns "EXTERNAL" | |||||
| func (*ExternalAuth) Mechanism() string { | |||||
| return "EXTERNAL" | |||||
| } | |||||
| // Response returns an AMQP encoded credentials table, without the field table size. | |||||
| func (*ExternalAuth) Response() string { | |||||
| return "\000*\000*" | |||||
| } | |||||
| // Finds the first mechanism preferred by the client that the server supports. | |||||
| func pickSASLMechanism(client []Authentication, serverMechanisms []string) (auth Authentication, ok bool) { | |||||
| for _, auth = range client { | |||||
| for _, mech := range serverMechanisms { | |||||
| if auth.Mechanism() == mech { | |||||
| return auth, true | |||||
| } | |||||
| } | |||||
| } | |||||
| return | |||||
| } | |||||
| @ -0,0 +1,159 @@ | |||||
| #!/bin/sh | |||||
| # | |||||
| # Creates the CA, server and client certs to be used by tls_test.go | |||||
| # http://www.rabbitmq.com/ssl.html | |||||
| # | |||||
| # Copy stdout into the const section of tls_test.go or use for RabbitMQ | |||||
| # | |||||
| root=$PWD/certs | |||||
| if [ -f $root/ca/serial ]; then | |||||
| echo >&2 "Previous installation found" | |||||
| echo >&2 "Remove $root/ca and rerun to overwrite" | |||||
| exit 1 | |||||
| fi | |||||
| mkdir -p $root/ca/private | |||||
| mkdir -p $root/ca/certs | |||||
| mkdir -p $root/server | |||||
| mkdir -p $root/client | |||||
| cd $root/ca | |||||
| chmod 700 private | |||||
| touch index.txt | |||||
| echo 'unique_subject = no' > index.txt.attr | |||||
| echo '01' > serial | |||||
| echo >openssl.cnf ' | |||||
| [ ca ] | |||||
| default_ca = testca | |||||
| [ testca ] | |||||
| dir = . | |||||
| certificate = $dir/cacert.pem | |||||
| database = $dir/index.txt | |||||
| new_certs_dir = $dir/certs | |||||
| private_key = $dir/private/cakey.pem | |||||
| serial = $dir/serial | |||||
| default_crl_days = 7 | |||||
| default_days = 3650 | |||||
| default_md = sha256 | |||||
| policy = testca_policy | |||||
| x509_extensions = certificate_extensions | |||||
| [ testca_policy ] | |||||
| commonName = supplied | |||||
| stateOrProvinceName = optional | |||||
| countryName = optional | |||||
| emailAddress = optional | |||||
| organizationName = optional | |||||
| organizationalUnitName = optional | |||||
| [ certificate_extensions ] | |||||
| basicConstraints = CA:false | |||||
| [ req ] | |||||
| default_bits = 2048 | |||||
| default_keyfile = ./private/cakey.pem | |||||
| default_md = sha256 | |||||
| prompt = yes | |||||
| distinguished_name = root_ca_distinguished_name | |||||
| x509_extensions = root_ca_extensions | |||||
| [ root_ca_distinguished_name ] | |||||
| commonName = hostname | |||||
| [ root_ca_extensions ] | |||||
| basicConstraints = CA:true | |||||
| keyUsage = keyCertSign, cRLSign | |||||
| [ client_ca_extensions ] | |||||
| basicConstraints = CA:false | |||||
| keyUsage = digitalSignature | |||||
| extendedKeyUsage = 1.3.6.1.5.5.7.3.2 | |||||
| [ server_ca_extensions ] | |||||
| basicConstraints = CA:false | |||||
| keyUsage = keyEncipherment | |||||
| extendedKeyUsage = 1.3.6.1.5.5.7.3.1 | |||||
| subjectAltName = @alt_names | |||||
| [ alt_names ] | |||||
| IP.1 = 127.0.0.1 | |||||
| ' | |||||
| openssl req \ | |||||
| -x509 \ | |||||
| -nodes \ | |||||
| -config openssl.cnf \ | |||||
| -newkey rsa:2048 \ | |||||
| -days 3650 \ | |||||
| -subj "/CN=MyTestCA/" \ | |||||
| -out cacert.pem \ | |||||
| -outform PEM | |||||
| openssl x509 \ | |||||
| -in cacert.pem \ | |||||
| -out cacert.cer \ | |||||
| -outform DER | |||||
| openssl genrsa -out $root/server/key.pem 2048 | |||||
| openssl genrsa -out $root/client/key.pem 2048 | |||||
| openssl req \ | |||||
| -new \ | |||||
| -nodes \ | |||||
| -config openssl.cnf \ | |||||
| -subj "/CN=127.0.0.1/O=server/" \ | |||||
| -key $root/server/key.pem \ | |||||
| -out $root/server/req.pem \ | |||||
| -outform PEM | |||||
| openssl req \ | |||||
| -new \ | |||||
| -nodes \ | |||||
| -config openssl.cnf \ | |||||
| -subj "/CN=127.0.0.1/O=client/" \ | |||||
| -key $root/client/key.pem \ | |||||
| -out $root/client/req.pem \ | |||||
| -outform PEM | |||||
| openssl ca \ | |||||
| -config openssl.cnf \ | |||||
| -in $root/server/req.pem \ | |||||
| -out $root/server/cert.pem \ | |||||
| -notext \ | |||||
| -batch \ | |||||
| -extensions server_ca_extensions | |||||
| openssl ca \ | |||||
| -config openssl.cnf \ | |||||
| -in $root/client/req.pem \ | |||||
| -out $root/client/cert.pem \ | |||||
| -notext \ | |||||
| -batch \ | |||||
| -extensions client_ca_extensions | |||||
| cat <<-END | |||||
| const caCert = \` | |||||
| `cat $root/ca/cacert.pem` | |||||
| \` | |||||
| const serverCert = \` | |||||
| `cat $root/server/cert.pem` | |||||
| \` | |||||
| const serverKey = \` | |||||
| `cat $root/server/key.pem` | |||||
| \` | |||||
| const clientCert = \` | |||||
| `cat $root/client/cert.pem` | |||||
| \` | |||||
| const clientKey = \` | |||||
| `cat $root/client/key.pem` | |||||
| \` | |||||
| END | |||||
| @ -0,0 +1,4 @@ | |||||
| #!/bin/bash | |||||
| echo $1 > VERSION | |||||
| sed -i -e "s/.*buildVersion = \"*.*/buildVersion = \"$1\"/" ./connection.go | |||||
| go fmt ./... | |||||
| @ -0,0 +1,217 @@ | |||||
| // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. | |||||
| // Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. | |||||
| // Use of this source code is governed by a BSD-style | |||||
| // license that can be found in the LICENSE file. | |||||
| package amqp091 | |||||
| import ( | |||||
| "context" | |||||
| "sync" | |||||
| ) | |||||
| // confirms resequences and notifies one or multiple publisher confirmation listeners | |||||
| type confirms struct { | |||||
| m sync.Mutex | |||||
| listeners []chan Confirmation | |||||
| sequencer map[uint64]Confirmation | |||||
| deferredConfirmations *deferredConfirmations | |||||
| published uint64 | |||||
| publishedMut sync.Mutex | |||||
| expecting uint64 | |||||
| } | |||||
| // newConfirms allocates a confirms | |||||
| func newConfirms() *confirms { | |||||
| return &confirms{ | |||||
| sequencer: map[uint64]Confirmation{}, | |||||
| deferredConfirmations: newDeferredConfirmations(), | |||||
| published: 0, | |||||
| expecting: 1, | |||||
| } | |||||
| } | |||||
| func (c *confirms) Listen(l chan Confirmation) { | |||||
| c.m.Lock() | |||||
| defer c.m.Unlock() | |||||
| c.listeners = append(c.listeners, l) | |||||
| } | |||||
| // Publish increments the publishing counter | |||||
| func (c *confirms) Publish() *DeferredConfirmation { | |||||
| c.publishedMut.Lock() | |||||
| defer c.publishedMut.Unlock() | |||||
| c.published++ | |||||
| return c.deferredConfirmations.Add(c.published) | |||||
| } | |||||
| // confirm confirms one publishing, increments the expecting delivery tag, and | |||||
| // removes bookkeeping for that delivery tag. | |||||
| func (c *confirms) confirm(confirmation Confirmation) { | |||||
| delete(c.sequencer, c.expecting) | |||||
| c.expecting++ | |||||
| for _, l := range c.listeners { | |||||
| l <- confirmation | |||||
| } | |||||
| } | |||||
| // resequence confirms any out of order delivered confirmations | |||||
| func (c *confirms) resequence() { | |||||
| c.publishedMut.Lock() | |||||
| defer c.publishedMut.Unlock() | |||||
| for c.expecting <= c.published { | |||||
| sequenced, found := c.sequencer[c.expecting] | |||||
| if !found { | |||||
| return | |||||
| } | |||||
| c.confirm(sequenced) | |||||
| } | |||||
| } | |||||
| // One confirms one publishing and all following in the publishing sequence | |||||
| func (c *confirms) One(confirmed Confirmation) { | |||||
| c.m.Lock() | |||||
| defer c.m.Unlock() | |||||
| c.deferredConfirmations.Confirm(confirmed) | |||||
| if c.expecting == confirmed.DeliveryTag { | |||||
| c.confirm(confirmed) | |||||
| } else { | |||||
| c.sequencer[confirmed.DeliveryTag] = confirmed | |||||
| } | |||||
| c.resequence() | |||||
| } | |||||
| // Multiple confirms all publishings up until the delivery tag | |||||
| func (c *confirms) Multiple(confirmed Confirmation) { | |||||
| c.m.Lock() | |||||
| defer c.m.Unlock() | |||||
| c.deferredConfirmations.ConfirmMultiple(confirmed) | |||||
| for c.expecting <= confirmed.DeliveryTag { | |||||
| c.confirm(Confirmation{c.expecting, confirmed.Ack}) | |||||
| } | |||||
| c.resequence() | |||||
| } | |||||
| // Cleans up the confirms struct and its dependencies. | |||||
| // Closes all listeners, discarding any out of sequence confirmations | |||||
| func (c *confirms) Close() error { | |||||
| c.m.Lock() | |||||
| defer c.m.Unlock() | |||||
| c.deferredConfirmations.Close() | |||||
| for _, l := range c.listeners { | |||||
| close(l) | |||||
| } | |||||
| c.listeners = nil | |||||
| return nil | |||||
| } | |||||
| type deferredConfirmations struct { | |||||
| m sync.Mutex | |||||
| confirmations map[uint64]*DeferredConfirmation | |||||
| } | |||||
| func newDeferredConfirmations() *deferredConfirmations { | |||||
| return &deferredConfirmations{ | |||||
| confirmations: map[uint64]*DeferredConfirmation{}, | |||||
| } | |||||
| } | |||||
| func (d *deferredConfirmations) Add(tag uint64) *DeferredConfirmation { | |||||
| d.m.Lock() | |||||
| defer d.m.Unlock() | |||||
| dc := &DeferredConfirmation{DeliveryTag: tag} | |||||
| dc.done = make(chan struct{}) | |||||
| d.confirmations[tag] = dc | |||||
| return dc | |||||
| } | |||||
| func (d *deferredConfirmations) Confirm(confirmation Confirmation) { | |||||
| d.m.Lock() | |||||
| defer d.m.Unlock() | |||||
| dc, found := d.confirmations[confirmation.DeliveryTag] | |||||
| if !found { | |||||
| // We should never receive a confirmation for a tag that hasn't | |||||
| // been published, but a test causes this to happen. | |||||
| return | |||||
| } | |||||
| dc.setAck(confirmation.Ack) | |||||
| delete(d.confirmations, confirmation.DeliveryTag) | |||||
| } | |||||
| func (d *deferredConfirmations) ConfirmMultiple(confirmation Confirmation) { | |||||
| d.m.Lock() | |||||
| defer d.m.Unlock() | |||||
| for k, v := range d.confirmations { | |||||
| if k <= confirmation.DeliveryTag { | |||||
| v.setAck(confirmation.Ack) | |||||
| delete(d.confirmations, k) | |||||
| } | |||||
| } | |||||
| } | |||||
| // Close nacks all pending DeferredConfirmations being blocked by dc.Wait(). | |||||
| func (d *deferredConfirmations) Close() { | |||||
| d.m.Lock() | |||||
| defer d.m.Unlock() | |||||
| for k, v := range d.confirmations { | |||||
| v.setAck(false) | |||||
| delete(d.confirmations, k) | |||||
| } | |||||
| } | |||||
| // setAck sets the acknowledgement status of the confirmation. Note that it must | |||||
| // not be called more than once. | |||||
| func (d *DeferredConfirmation) setAck(ack bool) { | |||||
| d.ack = ack | |||||
| close(d.done) | |||||
| } | |||||
| // Done returns the channel that can be used to wait for the publisher | |||||
| // confirmation. | |||||
| func (d *DeferredConfirmation) Done() <-chan struct{} { | |||||
| return d.done | |||||
| } | |||||
| // Acked returns the publisher confirmation in a non-blocking manner. It returns | |||||
| // false if the confirmation was not acknowledged yet or received negative | |||||
| // acknowledgement. | |||||
| func (d *DeferredConfirmation) Acked() bool { | |||||
| select { | |||||
| case <-d.done: | |||||
| default: | |||||
| return false | |||||
| } | |||||
| return d.ack | |||||
| } | |||||
| // Wait blocks until the publisher confirmation. It returns true if the server | |||||
| // successfully received the publishing. | |||||
| func (d *DeferredConfirmation) Wait() bool { | |||||
| <-d.done | |||||
| return d.ack | |||||
| } | |||||
| // WaitContext waits until the publisher confirmation. It returns true if the | |||||
| // server successfully received the publishing. If the context expires before | |||||
| // that, ctx.Err() is returned. | |||||
| func (d *DeferredConfirmation) WaitContext(ctx context.Context) (bool, error) { | |||||
| select { | |||||
| case <-ctx.Done(): | |||||
| return false, ctx.Err() | |||||
| case <-d.done: | |||||
| } | |||||
| return d.ack, nil | |||||
| } | |||||
| @ -0,0 +1,142 @@ | |||||
| // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. | |||||
| // Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. | |||||
| // Use of this source code is governed by a BSD-style | |||||
| // license that can be found in the LICENSE file. | |||||
| package amqp091 | |||||
| import ( | |||||
| "os" | |||||
| "strconv" | |||||
| "sync" | |||||
| "sync/atomic" | |||||
| ) | |||||
| var consumerSeq uint64 | |||||
| const consumerTagLengthMax = 0xFF // see writeShortstr | |||||
| func uniqueConsumerTag() string { | |||||
| return commandNameBasedUniqueConsumerTag(os.Args[0]) | |||||
| } | |||||
| func commandNameBasedUniqueConsumerTag(commandName string) string { | |||||
| tagPrefix := "ctag-" | |||||
| tagInfix := commandName | |||||
| tagSuffix := "-" + strconv.FormatUint(atomic.AddUint64(&consumerSeq, 1), 10) | |||||
| if len(tagPrefix)+len(tagInfix)+len(tagSuffix) > consumerTagLengthMax { | |||||
| tagInfix = "streadway/amqp" | |||||
| } | |||||
| return tagPrefix + tagInfix + tagSuffix | |||||
| } | |||||
| type consumerBuffers map[string]chan *Delivery | |||||
| // Concurrent type that manages the consumerTag -> | |||||
| // ingress consumerBuffer mapping | |||||
| type consumers struct { | |||||
| sync.WaitGroup // one for buffer | |||||
| closed chan struct{} // signal buffer | |||||
| sync.Mutex // protects below | |||||
| chans consumerBuffers | |||||
| } | |||||
| func makeConsumers() *consumers { | |||||
| return &consumers{ | |||||
| closed: make(chan struct{}), | |||||
| chans: make(consumerBuffers), | |||||
| } | |||||
| } | |||||
| func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) { | |||||
| defer close(out) | |||||
| defer subs.Done() | |||||
| var inflight = in | |||||
| var queue []*Delivery | |||||
| for delivery := range in { | |||||
| queue = append(queue, delivery) | |||||
| for len(queue) > 0 { | |||||
| select { | |||||
| case <-subs.closed: | |||||
| // closed before drained, drop in-flight | |||||
| return | |||||
| case delivery, consuming := <-inflight: | |||||
| if consuming { | |||||
| queue = append(queue, delivery) | |||||
| } else { | |||||
| inflight = nil | |||||
| } | |||||
| case out <- *queue[0]: | |||||
| queue = queue[1:] | |||||
| } | |||||
| } | |||||
| } | |||||
| } | |||||
| // On key conflict, close the previous channel. | |||||
| func (subs *consumers) add(tag string, consumer chan Delivery) { | |||||
| subs.Lock() | |||||
| defer subs.Unlock() | |||||
| if prev, found := subs.chans[tag]; found { | |||||
| close(prev) | |||||
| } | |||||
| in := make(chan *Delivery) | |||||
| subs.chans[tag] = in | |||||
| subs.Add(1) | |||||
| go subs.buffer(in, consumer) | |||||
| } | |||||
| func (subs *consumers) cancel(tag string) (found bool) { | |||||
| subs.Lock() | |||||
| defer subs.Unlock() | |||||
| ch, found := subs.chans[tag] | |||||
| if found { | |||||
| delete(subs.chans, tag) | |||||
| close(ch) | |||||
| } | |||||
| return found | |||||
| } | |||||
| func (subs *consumers) close() { | |||||
| subs.Lock() | |||||
| defer subs.Unlock() | |||||
| close(subs.closed) | |||||
| for tag, ch := range subs.chans { | |||||
| delete(subs.chans, tag) | |||||
| close(ch) | |||||
| } | |||||
| subs.Wait() | |||||
| } | |||||
| // Sends a delivery to a the consumer identified by `tag`. | |||||
| // If unbuffered channels are used for Consume this method | |||||
| // could block all deliveries until the consumer | |||||
| // receives on the other end of the channel. | |||||
| func (subs *consumers) send(tag string, msg *Delivery) bool { | |||||
| subs.Lock() | |||||
| defer subs.Unlock() | |||||
| buffer, found := subs.chans[tag] | |||||
| if found { | |||||
| buffer <- msg | |||||
| } | |||||
| return found | |||||
| } | |||||
| @ -0,0 +1,173 @@ | |||||
| // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. | |||||
| // Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. | |||||
| // Use of this source code is governed by a BSD-style | |||||
| // license that can be found in the LICENSE file. | |||||
| package amqp091 | |||||
| import ( | |||||
| "errors" | |||||
| "time" | |||||
| ) | |||||
| var errDeliveryNotInitialized = errors.New("delivery not initialized") | |||||
| // Acknowledger notifies the server of successful or failed consumption of | |||||
| // deliveries via identifier found in the Delivery.DeliveryTag field. | |||||
| // | |||||
| // Applications can provide mock implementations in tests of Delivery handlers. | |||||
| type Acknowledger interface { | |||||
| Ack(tag uint64, multiple bool) error | |||||
| Nack(tag uint64, multiple bool, requeue bool) error | |||||
| Reject(tag uint64, requeue bool) error | |||||
| } | |||||
| // Delivery captures the fields for a previously delivered message resident in | |||||
| // a queue to be delivered by the server to a consumer from Channel.Consume or | |||||
| // Channel.Get. | |||||
| type Delivery struct { | |||||
| Acknowledger Acknowledger // the channel from which this delivery arrived | |||||
| Headers Table // Application or header exchange table | |||||
| // Properties | |||||
| ContentType string // MIME content type | |||||
| ContentEncoding string // MIME content encoding | |||||
| DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) | |||||
| Priority uint8 // queue implementation use - 0 to 9 | |||||
| CorrelationId string // application use - correlation identifier | |||||
| ReplyTo string // application use - address to reply to (ex: RPC) | |||||
| Expiration string // implementation use - message expiration spec | |||||
| MessageId string // application use - message identifier | |||||
| Timestamp time.Time // application use - message timestamp | |||||
| Type string // application use - message type name | |||||
| UserId string // application use - creating user - should be authenticated user | |||||
| AppId string // application use - creating application id | |||||
| // Valid only with Channel.Consume | |||||
| ConsumerTag string | |||||
| // Valid only with Channel.Get | |||||
| MessageCount uint32 | |||||
| DeliveryTag uint64 | |||||
| Redelivered bool | |||||
| Exchange string // basic.publish exchange | |||||
| RoutingKey string // basic.publish routing key | |||||
| Body []byte | |||||
| } | |||||
| func newDelivery(channel *Channel, msg messageWithContent) *Delivery { | |||||
| props, body := msg.getContent() | |||||
| delivery := Delivery{ | |||||
| Acknowledger: channel, | |||||
| Headers: props.Headers, | |||||
| ContentType: props.ContentType, | |||||
| ContentEncoding: props.ContentEncoding, | |||||
| DeliveryMode: props.DeliveryMode, | |||||
| Priority: props.Priority, | |||||
| CorrelationId: props.CorrelationId, | |||||
| ReplyTo: props.ReplyTo, | |||||
| Expiration: props.Expiration, | |||||
| MessageId: props.MessageId, | |||||
| Timestamp: props.Timestamp, | |||||
| Type: props.Type, | |||||
| UserId: props.UserId, | |||||
| AppId: props.AppId, | |||||
| Body: body, | |||||
| } | |||||
| // Properties for the delivery types | |||||
| switch m := msg.(type) { | |||||
| case *basicDeliver: | |||||
| delivery.ConsumerTag = m.ConsumerTag | |||||
| delivery.DeliveryTag = m.DeliveryTag | |||||
| delivery.Redelivered = m.Redelivered | |||||
| delivery.Exchange = m.Exchange | |||||
| delivery.RoutingKey = m.RoutingKey | |||||
| case *basicGetOk: | |||||
| delivery.MessageCount = m.MessageCount | |||||
| delivery.DeliveryTag = m.DeliveryTag | |||||
| delivery.Redelivered = m.Redelivered | |||||
| delivery.Exchange = m.Exchange | |||||
| delivery.RoutingKey = m.RoutingKey | |||||
| } | |||||
| return &delivery | |||||
| } | |||||
| /* | |||||
| Ack delegates an acknowledgement through the Acknowledger interface that the | |||||
| client or server has finished work on a delivery. | |||||
| All deliveries in AMQP must be acknowledged. If you called Channel.Consume | |||||
| with autoAck true then the server will be automatically ack each message and | |||||
| this method should not be called. Otherwise, you must call Delivery.Ack after | |||||
| you have successfully processed this delivery. | |||||
| When multiple is true, this delivery and all prior unacknowledged deliveries | |||||
| on the same channel will be acknowledged. This is useful for batch processing | |||||
| of deliveries. | |||||
| An error will indicate that the acknowledge could not be delivered to the | |||||
| channel it was sent from. | |||||
| Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every | |||||
| delivery that is not automatically acknowledged. | |||||
| */ | |||||
| func (d Delivery) Ack(multiple bool) error { | |||||
| if d.Acknowledger == nil { | |||||
| return errDeliveryNotInitialized | |||||
| } | |||||
| return d.Acknowledger.Ack(d.DeliveryTag, multiple) | |||||
| } | |||||
| /* | |||||
| Reject delegates a negatively acknowledgement through the Acknowledger interface. | |||||
| When requeue is true, queue this message to be delivered to a consumer on a | |||||
| different channel. When requeue is false or the server is unable to queue this | |||||
| message, it will be dropped. | |||||
| If you are batch processing deliveries, and your server supports it, prefer | |||||
| Delivery.Nack. | |||||
| Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every | |||||
| delivery that is not automatically acknowledged. | |||||
| */ | |||||
| func (d Delivery) Reject(requeue bool) error { | |||||
| if d.Acknowledger == nil { | |||||
| return errDeliveryNotInitialized | |||||
| } | |||||
| return d.Acknowledger.Reject(d.DeliveryTag, requeue) | |||||
| } | |||||
| /* | |||||
| Nack negatively acknowledge the delivery of message(s) identified by the | |||||
| delivery tag from either the client or server. | |||||
| When multiple is true, nack messages up to and including delivered messages up | |||||
| until the delivery tag delivered on the same channel. | |||||
| When requeue is true, request the server to deliver this message to a different | |||||
| consumer. If it is not possible or requeue is false, the message will be | |||||
| dropped or delivered to a server configured dead-letter queue. | |||||
| This method must not be used to select or requeue messages the client wishes | |||||
| not to handle, rather it is to inform the server that the client is incapable | |||||
| of handling this message at this time. | |||||
| Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every | |||||
| delivery that is not automatically acknowledged. | |||||
| */ | |||||
| func (d Delivery) Nack(multiple, requeue bool) error { | |||||
| if d.Acknowledger == nil { | |||||
| return errDeliveryNotInitialized | |||||
| } | |||||
| return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue) | |||||
| } | |||||
| @ -0,0 +1,165 @@ | |||||
| // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. | |||||
| // Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. | |||||
| // Use of this source code is governed by a BSD-style | |||||
| // license that can be found in the LICENSE file. | |||||
| /* | |||||
| Package amqp091 is an AMQP 0.9.1 client with RabbitMQ extensions | |||||
| Understand the AMQP 0.9.1 messaging model by reviewing these links first. Much | |||||
| of the terminology in this library directly relates to AMQP concepts. | |||||
| Resources | |||||
| http://www.rabbitmq.com/tutorials/amqp-concepts.html | |||||
| http://www.rabbitmq.com/getstarted.html | |||||
| http://www.rabbitmq.com/amqp-0-9-1-reference.html | |||||
| # Design | |||||
| Most other broker clients publish to queues, but in AMQP, clients publish | |||||
| Exchanges instead. AMQP is programmable, meaning that both the producers and | |||||
| consumers agree on the configuration of the broker, instead of requiring an | |||||
| operator or system configuration that declares the logical topology in the | |||||
| broker. The routing between producers and consumer queues is via Bindings. | |||||
| These bindings form the logical topology of the broker. | |||||
| In this library, a message sent from publisher is called a "Publishing" and a | |||||
| message received to a consumer is called a "Delivery". The fields of | |||||
| Publishings and Deliveries are close but not exact mappings to the underlying | |||||
| wire format to maintain stronger types. Many other libraries will combine | |||||
| message properties with message headers. In this library, the message well | |||||
| known properties are strongly typed fields on the Publishings and Deliveries, | |||||
| whereas the user defined headers are in the Headers field. | |||||
| The method naming closely matches the protocol's method name with positional | |||||
| parameters mapping to named protocol message fields. The motivation here is to | |||||
| present a comprehensive view over all possible interactions with the server. | |||||
| Generally, methods that map to protocol methods of the "basic" class will be | |||||
| elided in this interface, and "select" methods of various channel mode selectors | |||||
| will be elided for example Channel.Confirm and Channel.Tx. | |||||
| The library is intentionally designed to be synchronous, where responses for | |||||
| each protocol message are required to be received in an RPC manner. Some | |||||
| methods have a noWait parameter like Channel.QueueDeclare, and some methods are | |||||
| asynchronous like Channel.Publish. The error values should still be checked for | |||||
| these methods as they will indicate IO failures like when the underlying | |||||
| connection closes. | |||||
| # Asynchronous Events | |||||
| Clients of this library may be interested in receiving some of the protocol | |||||
| messages other than Deliveries like basic.ack methods while a channel is in | |||||
| confirm mode. | |||||
| The Notify* methods with Connection and Channel receivers model the pattern of | |||||
| asynchronous events like closes due to exceptions, or messages that are sent out | |||||
| of band from an RPC call like basic.ack or basic.flow. | |||||
| Any asynchronous events, including Deliveries and Publishings must always have | |||||
| a receiver until the corresponding chans are closed. Without asynchronous | |||||
| receivers, the synchronous methods will block. | |||||
| # Use Case | |||||
| It's important as a client to an AMQP topology to ensure the state of the | |||||
| broker matches your expectations. For both publish and consume use cases, | |||||
| make sure you declare the queues, exchanges and bindings you expect to exist | |||||
| prior to calling [Channel.PublishWithContext] or [Channel.Consume]. | |||||
| // Connections start with amqp.Dial() typically from a command line argument | |||||
| // or environment variable. | |||||
| connection, err := amqp.Dial(os.Getenv("AMQP_URL")) | |||||
| // To cleanly shutdown by flushing kernel buffers, make sure to close and | |||||
| // wait for the response. | |||||
| defer connection.Close() | |||||
| // Most operations happen on a channel. If any error is returned on a | |||||
| // channel, the channel will no longer be valid, throw it away and try with | |||||
| // a different channel. If you use many channels, it's useful for the | |||||
| // server to | |||||
| channel, err := connection.Channel() | |||||
| // Declare your topology here, if it doesn't exist, it will be created, if | |||||
| // it existed already and is not what you expect, then that's considered an | |||||
| // error. | |||||
| // Use your connection on this topology with either Publish or Consume, or | |||||
| // inspect your queues with QueueInspect. It's unwise to mix Publish and | |||||
| // Consume to let TCP do its job well. | |||||
| # SSL/TLS - Secure connections | |||||
| When Dial encounters an amqps:// scheme, it will use the zero value of a | |||||
| tls.Config. This will only perform server certificate and host verification. | |||||
| Use DialTLS when you wish to provide a client certificate (recommended), | |||||
| include a private certificate authority's certificate in the cert chain for | |||||
| server validity, or run insecure by not verifying the server certificate dial | |||||
| your own connection. DialTLS will use the provided tls.Config when it | |||||
| encounters an amqps:// scheme and will dial a plain connection when it | |||||
| encounters an amqp:// scheme. | |||||
| SSL/TLS in RabbitMQ is documented here: http://www.rabbitmq.com/ssl.html | |||||
| # Best practises for Connection and Channel notifications: | |||||
| In order to be notified when a connection or channel gets closed, both | |||||
| structures offer the possibility to register channels using | |||||
| [Channel.NotifyClose] and [Connection.NotifyClose] functions: | |||||
| notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error)) | |||||
| No errors will be sent in case of a graceful connection close. In case of a | |||||
| non-graceful closure due to e.g. network issue, or forced connection closure | |||||
| from the Management UI, the error will be notified synchronously by the library. | |||||
| The error is sent synchronously to the channel, so that the flow will wait until | |||||
| the receiver consumes from the channel. To avoid deadlocks in the library, it is | |||||
| necessary to consume from the channels. This could be done inside a | |||||
| different goroutine with a select listening on the two channels inside a for | |||||
| loop like: | |||||
| go func() { | |||||
| for notifyConnClose != nil || notifyChanClose != nil { | |||||
| select { | |||||
| case err, ok := <-notifyConnClose: | |||||
| if !ok { | |||||
| notifyConnClose = nil | |||||
| } else { | |||||
| fmt.Printf("connection closed, error %s", err) | |||||
| } | |||||
| case err, ok := <-notifyChanClose: | |||||
| if !ok { | |||||
| notifyChanClose = nil | |||||
| } else { | |||||
| fmt.Printf("channel closed, error %s", err) | |||||
| } | |||||
| } | |||||
| } | |||||
| }() | |||||
| Another approach is to use buffered channels: | |||||
| notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error, 1)) | |||||
| The library sends to notification channels just once. After sending a notification | |||||
| to all channels, the library closes all registered notification channels. After | |||||
| receiving a notification, the application should create and register a new channel. | |||||
| # Best practises for NotifyPublish notifications: | |||||
| Using [Channel.NotifyPublish] allows the caller of the library to be notified, | |||||
| through a go channel, when a message has been received and confirmed by the | |||||
| broker. It's advisable to wait for all Confirmations to arrive before calling | |||||
| [Channel.Close] or [Connection.Close]. It is also necessary to consume from this | |||||
| channel until it gets closed. The library sends synchronously to the registered channel. | |||||
| It is advisable to use a buffered channel, with capacity set to the maximum acceptable | |||||
| number of unconfirmed messages. | |||||
| It is important to consume from the confirmation channel at all times, in order to avoid | |||||
| deadlocks in the library. | |||||
| */ | |||||
| package amqp091 | |||||
| @ -0,0 +1,23 @@ | |||||
| // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. | |||||
| // Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. | |||||
| // Use of this source code is governed by a BSD-style | |||||
| // license that can be found in the LICENSE file. | |||||
| //go:build gofuzz | |||||
| // +build gofuzz | |||||
| package amqp091 | |||||
| import "bytes" | |||||
| func Fuzz(data []byte) int { | |||||
| r := reader{bytes.NewReader(data)} | |||||
| frame, err := r.ReadFrame() | |||||
| if err != nil { | |||||
| if frame != nil { | |||||
| panic("frame is not nil") | |||||
| } | |||||
| return 0 | |||||
| } | |||||
| return 1 | |||||
| } | |||||
| @ -0,0 +1,2 @@ | |||||
| #!/bin/sh | |||||
| go run spec/gen.go < spec/amqp0-9-1.stripped.extended.xml | gofmt > spec091.go | |||||
| @ -0,0 +1,23 @@ | |||||
| // Copyright (c) 2022 VMware, Inc. or its affiliates. All Rights Reserved. | |||||
| // Use of this source code is governed by a BSD-style | |||||
| // license that can be found in the LICENSE file. | |||||
| package amqp091 | |||||
| type Logging interface { | |||||
| Printf(format string, v ...interface{}) | |||||
| } | |||||
| var Logger Logging = NullLogger{} | |||||
| // Enables logging using a custom Logging instance. Note that this is | |||||
| // not thread safe and should be called at application start | |||||
| func SetLogger(logger Logging) { | |||||
| Logger = logger | |||||
| } | |||||
| type NullLogger struct { | |||||
| } | |||||
| func (l NullLogger) Printf(format string, v ...interface{}) { | |||||
| } | |||||
| @ -0,0 +1,450 @@ | |||||
| // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. | |||||
| // Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. | |||||
| // Use of this source code is governed by a BSD-style | |||||
| // license that can be found in the LICENSE file. | |||||
| package amqp091 | |||||
| import ( | |||||
| "bytes" | |||||
| "encoding/binary" | |||||
| "errors" | |||||
| "io" | |||||
| "time" | |||||
| ) | |||||
| /* | |||||
| ReadFrame reads a frame from an input stream and returns an interface that can be cast into | |||||
| one of the following: | |||||
| methodFrame | |||||
| PropertiesFrame | |||||
| bodyFrame | |||||
| heartbeatFrame | |||||
| 2.3.5 frame Details | |||||
| All frames consist of a header (7 octets), a payload of arbitrary size, and a | |||||
| 'frame-end' octet that detects malformed frames: | |||||
| 0 1 3 7 size+7 size+8 | |||||
| +------+---------+-------------+ +------------+ +-----------+ | |||||
| | type | channel | size | | payload | | frame-end | | |||||
| +------+---------+-------------+ +------------+ +-----------+ | |||||
| octet short long size octets octet | |||||
| To read a frame, we: | |||||
| 1. Read the header and check the frame type and channel. | |||||
| 2. Depending on the frame type, we read the payload and process it. | |||||
| 3. Read the frame end octet. | |||||
| In realistic implementations where performance is a concern, we would use | |||||
| “read-ahead buffering” or | |||||
| “gathering reads” to avoid doing three separate system calls to read a frame. | |||||
| */ | |||||
| func (r *reader) ReadFrame() (frame frame, err error) { | |||||
| var scratch [7]byte | |||||
| if _, err = io.ReadFull(r.r, scratch[:7]); err != nil { | |||||
| return | |||||
| } | |||||
| typ := scratch[0] | |||||
| channel := binary.BigEndian.Uint16(scratch[1:3]) | |||||
| size := binary.BigEndian.Uint32(scratch[3:7]) | |||||
| switch typ { | |||||
| case frameMethod: | |||||
| if frame, err = r.parseMethodFrame(channel, size); err != nil { | |||||
| return | |||||
| } | |||||
| case frameHeader: | |||||
| if frame, err = r.parseHeaderFrame(channel, size); err != nil { | |||||
| return | |||||
| } | |||||
| case frameBody: | |||||
| if frame, err = r.parseBodyFrame(channel, size); err != nil { | |||||
| return nil, err | |||||
| } | |||||
| case frameHeartbeat: | |||||
| if frame, err = r.parseHeartbeatFrame(channel, size); err != nil { | |||||
| return | |||||
| } | |||||
| default: | |||||
| return nil, ErrFrame | |||||
| } | |||||
| if _, err = io.ReadFull(r.r, scratch[:1]); err != nil { | |||||
| return nil, err | |||||
| } | |||||
| if scratch[0] != frameEnd { | |||||
| return nil, ErrFrame | |||||
| } | |||||
| return | |||||
| } | |||||
| func readShortstr(r io.Reader) (v string, err error) { | |||||
| var length uint8 | |||||
| if err = binary.Read(r, binary.BigEndian, &length); err != nil { | |||||
| return | |||||
| } | |||||
| bytes := make([]byte, length) | |||||
| if _, err = io.ReadFull(r, bytes); err != nil { | |||||
| return | |||||
| } | |||||
| return string(bytes), nil | |||||
| } | |||||
| func readLongstr(r io.Reader) (v string, err error) { | |||||
| var length uint32 | |||||
| if err = binary.Read(r, binary.BigEndian, &length); err != nil { | |||||
| return | |||||
| } | |||||
| // slices can't be longer than max int32 value | |||||
| if length > (^uint32(0) >> 1) { | |||||
| return | |||||
| } | |||||
| bytes := make([]byte, length) | |||||
| if _, err = io.ReadFull(r, bytes); err != nil { | |||||
| return | |||||
| } | |||||
| return string(bytes), nil | |||||
| } | |||||
| func readDecimal(r io.Reader) (v Decimal, err error) { | |||||
| if err = binary.Read(r, binary.BigEndian, &v.Scale); err != nil { | |||||
| return | |||||
| } | |||||
| if err = binary.Read(r, binary.BigEndian, &v.Value); err != nil { | |||||
| return | |||||
| } | |||||
| return | |||||
| } | |||||
| func readTimestamp(r io.Reader) (v time.Time, err error) { | |||||
| var sec int64 | |||||
| if err = binary.Read(r, binary.BigEndian, &sec); err != nil { | |||||
| return | |||||
| } | |||||
| return time.Unix(sec, 0), nil | |||||
| } | |||||
| /* | |||||
| 'A': []interface{} | |||||
| 'D': Decimal | |||||
| 'F': Table | |||||
| 'I': int32 | |||||
| 'S': string | |||||
| 'T': time.Time | |||||
| 'V': nil | |||||
| 'b': int8 | |||||
| 'B': byte | |||||
| 'd': float64 | |||||
| 'f': float32 | |||||
| 'l': int64 | |||||
| 's': int16 | |||||
| 't': bool | |||||
| 'x': []byte | |||||
| */ | |||||
| func readField(r io.Reader) (v interface{}, err error) { | |||||
| var typ byte | |||||
| if err = binary.Read(r, binary.BigEndian, &typ); err != nil { | |||||
| return | |||||
| } | |||||
| switch typ { | |||||
| case 't': | |||||
| var value uint8 | |||||
| if err = binary.Read(r, binary.BigEndian, &value); err != nil { | |||||
| return | |||||
| } | |||||
| return value != 0, nil | |||||
| case 'B': | |||||
| var value [1]byte | |||||
| if _, err = io.ReadFull(r, value[0:1]); err != nil { | |||||
| return | |||||
| } | |||||
| return value[0], nil | |||||
| case 'b': | |||||
| var value int8 | |||||
| if err = binary.Read(r, binary.BigEndian, &value); err != nil { | |||||
| return | |||||
| } | |||||
| return value, nil | |||||
| case 's': | |||||
| var value int16 | |||||
| if err = binary.Read(r, binary.BigEndian, &value); err != nil { | |||||
| return | |||||
| } | |||||
| return value, nil | |||||
| case 'I': | |||||
| var value int32 | |||||
| if err = binary.Read(r, binary.BigEndian, &value); err != nil { | |||||
| return | |||||
| } | |||||
| return value, nil | |||||
| case 'l': | |||||
| var value int64 | |||||
| if err = binary.Read(r, binary.BigEndian, &value); err != nil { | |||||
| return | |||||
| } | |||||
| return value, nil | |||||
| case 'f': | |||||
| var value float32 | |||||
| if err = binary.Read(r, binary.BigEndian, &value); err != nil { | |||||
| return | |||||
| } | |||||
| return value, nil | |||||
| case 'd': | |||||
| var value float64 | |||||
| if err = binary.Read(r, binary.BigEndian, &value); err != nil { | |||||
| return | |||||
| } | |||||
| return value, nil | |||||
| case 'D': | |||||
| return readDecimal(r) | |||||
| case 'S': | |||||
| return readLongstr(r) | |||||
| case 'A': | |||||
| return readArray(r) | |||||
| case 'T': | |||||
| return readTimestamp(r) | |||||
| case 'F': | |||||
| return readTable(r) | |||||
| case 'x': | |||||
| var len int32 | |||||
| if err = binary.Read(r, binary.BigEndian, &len); err != nil { | |||||
| return nil, err | |||||
| } | |||||
| value := make([]byte, len) | |||||
| if _, err = io.ReadFull(r, value); err != nil { | |||||
| return nil, err | |||||
| } | |||||
| return value, err | |||||
| case 'V': | |||||
| return nil, nil | |||||
| } | |||||
| return nil, ErrSyntax | |||||
| } | |||||
| /* | |||||
| Field tables are long strings that contain packed name-value pairs. The | |||||
| name-value pairs are encoded as short string defining the name, and octet | |||||
| defining the values type and then the value itself. The valid field types for | |||||
| tables are an extension of the native integer, bit, string, and timestamp | |||||
| types, and are shown in the grammar. Multi-octet integer fields are always | |||||
| held in network byte order. | |||||
| */ | |||||
| func readTable(r io.Reader) (table Table, err error) { | |||||
| var nested bytes.Buffer | |||||
| var str string | |||||
| if str, err = readLongstr(r); err != nil { | |||||
| return | |||||
| } | |||||
| nested.Write([]byte(str)) | |||||
| table = make(Table) | |||||
| for nested.Len() > 0 { | |||||
| var key string | |||||
| var value interface{} | |||||
| if key, err = readShortstr(&nested); err != nil { | |||||
| return | |||||
| } | |||||
| if value, err = readField(&nested); err != nil { | |||||
| return | |||||
| } | |||||
| table[key] = value | |||||
| } | |||||
| return | |||||
| } | |||||
| func readArray(r io.Reader) ([]interface{}, error) { | |||||
| var ( | |||||
| size uint32 | |||||
| err error | |||||
| ) | |||||
| if err = binary.Read(r, binary.BigEndian, &size); err != nil { | |||||
| return nil, err | |||||
| } | |||||
| var ( | |||||
| lim = &io.LimitedReader{R: r, N: int64(size)} | |||||
| arr []interface{} | |||||
| field interface{} | |||||
| ) | |||||
| for { | |||||
| if field, err = readField(lim); err != nil { | |||||
| if err == io.EOF { | |||||
| break | |||||
| } | |||||
| return nil, err | |||||
| } | |||||
| arr = append(arr, field) | |||||
| } | |||||
| return arr, nil | |||||
| } | |||||
| // Checks if this bit mask matches the flags bitset | |||||
| func hasProperty(mask uint16, prop int) bool { | |||||
| return int(mask)&prop > 0 | |||||
| } | |||||
| func (r *reader) parseHeaderFrame(channel uint16, size uint32) (frame frame, err error) { | |||||
| hf := &headerFrame{ | |||||
| ChannelId: channel, | |||||
| } | |||||
| if err = binary.Read(r.r, binary.BigEndian, &hf.ClassId); err != nil { | |||||
| return | |||||
| } | |||||
| if err = binary.Read(r.r, binary.BigEndian, &hf.weight); err != nil { | |||||
| return | |||||
| } | |||||
| if err = binary.Read(r.r, binary.BigEndian, &hf.Size); err != nil { | |||||
| return | |||||
| } | |||||
| var flags uint16 | |||||
| if err = binary.Read(r.r, binary.BigEndian, &flags); err != nil { | |||||
| return | |||||
| } | |||||
| if hasProperty(flags, flagContentType) { | |||||
| if hf.Properties.ContentType, err = readShortstr(r.r); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(flags, flagContentEncoding) { | |||||
| if hf.Properties.ContentEncoding, err = readShortstr(r.r); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(flags, flagHeaders) { | |||||
| if hf.Properties.Headers, err = readTable(r.r); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(flags, flagDeliveryMode) { | |||||
| if err = binary.Read(r.r, binary.BigEndian, &hf.Properties.DeliveryMode); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(flags, flagPriority) { | |||||
| if err = binary.Read(r.r, binary.BigEndian, &hf.Properties.Priority); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(flags, flagCorrelationId) { | |||||
| if hf.Properties.CorrelationId, err = readShortstr(r.r); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(flags, flagReplyTo) { | |||||
| if hf.Properties.ReplyTo, err = readShortstr(r.r); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(flags, flagExpiration) { | |||||
| if hf.Properties.Expiration, err = readShortstr(r.r); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(flags, flagMessageId) { | |||||
| if hf.Properties.MessageId, err = readShortstr(r.r); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(flags, flagTimestamp) { | |||||
| if hf.Properties.Timestamp, err = readTimestamp(r.r); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(flags, flagType) { | |||||
| if hf.Properties.Type, err = readShortstr(r.r); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(flags, flagUserId) { | |||||
| if hf.Properties.UserId, err = readShortstr(r.r); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(flags, flagAppId) { | |||||
| if hf.Properties.AppId, err = readShortstr(r.r); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(flags, flagReserved1) { | |||||
| if hf.Properties.reserved1, err = readShortstr(r.r); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| return hf, nil | |||||
| } | |||||
| func (r *reader) parseBodyFrame(channel uint16, size uint32) (frame frame, err error) { | |||||
| bf := &bodyFrame{ | |||||
| ChannelId: channel, | |||||
| Body: make([]byte, size), | |||||
| } | |||||
| if _, err = io.ReadFull(r.r, bf.Body); err != nil { | |||||
| return nil, err | |||||
| } | |||||
| return bf, nil | |||||
| } | |||||
| var errHeartbeatPayload = errors.New("Heartbeats should not have a payload") | |||||
| func (r *reader) parseHeartbeatFrame(channel uint16, size uint32) (frame frame, err error) { | |||||
| hf := &heartbeatFrame{ | |||||
| ChannelId: channel, | |||||
| } | |||||
| if size > 0 { | |||||
| return nil, errHeartbeatPayload | |||||
| } | |||||
| return hf, nil | |||||
| } | |||||
| @ -0,0 +1,64 @@ | |||||
| // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. | |||||
| // Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. | |||||
| // Use of this source code is governed by a BSD-style | |||||
| // license that can be found in the LICENSE file. | |||||
| package amqp091 | |||||
| import ( | |||||
| "time" | |||||
| ) | |||||
| // Return captures a flattened struct of fields returned by the server when a | |||||
| // Publishing is unable to be delivered either due to the `mandatory` flag set | |||||
| // and no route found, or `immediate` flag set and no free consumer. | |||||
| type Return struct { | |||||
| ReplyCode uint16 // reason | |||||
| ReplyText string // description | |||||
| Exchange string // basic.publish exchange | |||||
| RoutingKey string // basic.publish routing key | |||||
| // Properties | |||||
| ContentType string // MIME content type | |||||
| ContentEncoding string // MIME content encoding | |||||
| Headers Table // Application or header exchange table | |||||
| DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) | |||||
| Priority uint8 // queue implementation use - 0 to 9 | |||||
| CorrelationId string // application use - correlation identifier | |||||
| ReplyTo string // application use - address to to reply to (ex: RPC) | |||||
| Expiration string // implementation use - message expiration spec | |||||
| MessageId string // application use - message identifier | |||||
| Timestamp time.Time // application use - message timestamp | |||||
| Type string // application use - message type name | |||||
| UserId string // application use - creating user id | |||||
| AppId string // application use - creating application | |||||
| Body []byte | |||||
| } | |||||
| func newReturn(msg basicReturn) *Return { | |||||
| props, body := msg.getContent() | |||||
| return &Return{ | |||||
| ReplyCode: msg.ReplyCode, | |||||
| ReplyText: msg.ReplyText, | |||||
| Exchange: msg.Exchange, | |||||
| RoutingKey: msg.RoutingKey, | |||||
| Headers: props.Headers, | |||||
| ContentType: props.ContentType, | |||||
| ContentEncoding: props.ContentEncoding, | |||||
| DeliveryMode: props.DeliveryMode, | |||||
| Priority: props.Priority, | |||||
| CorrelationId: props.CorrelationId, | |||||
| ReplyTo: props.ReplyTo, | |||||
| Expiration: props.Expiration, | |||||
| MessageId: props.MessageId, | |||||
| Timestamp: props.Timestamp, | |||||
| Type: props.Type, | |||||
| UserId: props.UserId, | |||||
| AppId: props.AppId, | |||||
| Body: body, | |||||
| } | |||||
| } | |||||
| @ -0,0 +1,509 @@ | |||||
| // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. | |||||
| // Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. | |||||
| // Use of this source code is governed by a BSD-style | |||||
| // license that can be found in the LICENSE file. | |||||
| package amqp091 | |||||
| import ( | |||||
| "fmt" | |||||
| "io" | |||||
| "time" | |||||
| ) | |||||
| const DefaultExchange = "" | |||||
| // Constants for standard AMQP 0-9-1 exchange types. | |||||
| const ( | |||||
| ExchangeDirect = "direct" | |||||
| ExchangeFanout = "fanout" | |||||
| ExchangeTopic = "topic" | |||||
| ExchangeHeaders = "headers" | |||||
| ) | |||||
| var ( | |||||
| // ErrClosed is returned when the channel or connection is not open | |||||
| ErrClosed = &Error{Code: ChannelError, Reason: "channel/connection is not open"} | |||||
| // ErrChannelMax is returned when Connection.Channel has been called enough | |||||
| // times that all channel IDs have been exhausted in the client or the | |||||
| // server. | |||||
| ErrChannelMax = &Error{Code: ChannelError, Reason: "channel id space exhausted"} | |||||
| // ErrSASL is returned from Dial when the authentication mechanism could not | |||||
| // be negotiated. | |||||
| ErrSASL = &Error{Code: AccessRefused, Reason: "SASL could not negotiate a shared mechanism"} | |||||
| // ErrCredentials is returned when the authenticated client is not authorized | |||||
| // to any vhost. | |||||
| ErrCredentials = &Error{Code: AccessRefused, Reason: "username or password not allowed"} | |||||
| // ErrVhost is returned when the authenticated user is not permitted to | |||||
| // access the requested Vhost. | |||||
| ErrVhost = &Error{Code: AccessRefused, Reason: "no access to this vhost"} | |||||
| // ErrSyntax is hard protocol error, indicating an unsupported protocol, | |||||
| // implementation or encoding. | |||||
| ErrSyntax = &Error{Code: SyntaxError, Reason: "invalid field or value inside of a frame"} | |||||
| // ErrFrame is returned when the protocol frame cannot be read from the | |||||
| // server, indicating an unsupported protocol or unsupported frame type. | |||||
| ErrFrame = &Error{Code: FrameError, Reason: "frame could not be parsed"} | |||||
| // ErrCommandInvalid is returned when the server sends an unexpected response | |||||
| // to this requested message type. This indicates a bug in this client. | |||||
| ErrCommandInvalid = &Error{Code: CommandInvalid, Reason: "unexpected command received"} | |||||
| // ErrUnexpectedFrame is returned when something other than a method or | |||||
| // heartbeat frame is delivered to the Connection, indicating a bug in the | |||||
| // client. | |||||
| ErrUnexpectedFrame = &Error{Code: UnexpectedFrame, Reason: "unexpected frame received"} | |||||
| // ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP. | |||||
| ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"} | |||||
| ) | |||||
| // Error captures the code and reason a channel or connection has been closed | |||||
| // by the server. | |||||
| type Error struct { | |||||
| Code int // constant code from the specification | |||||
| Reason string // description of the error | |||||
| Server bool // true when initiated from the server, false when from this library | |||||
| Recover bool // true when this error can be recovered by retrying later or with different parameters | |||||
| } | |||||
| func newError(code uint16, text string) *Error { | |||||
| return &Error{ | |||||
| Code: int(code), | |||||
| Reason: text, | |||||
| Recover: isSoftExceptionCode(int(code)), | |||||
| Server: true, | |||||
| } | |||||
| } | |||||
| func (e Error) Error() string { | |||||
| return fmt.Sprintf("Exception (%d) Reason: %q", e.Code, e.Reason) | |||||
| } | |||||
| // Used by header frames to capture routing and header information | |||||
| type properties struct { | |||||
| ContentType string // MIME content type | |||||
| ContentEncoding string // MIME content encoding | |||||
| Headers Table // Application or header exchange table | |||||
| DeliveryMode uint8 // queue implementation use - Transient (1) or Persistent (2) | |||||
| Priority uint8 // queue implementation use - 0 to 9 | |||||
| CorrelationId string // application use - correlation identifier | |||||
| ReplyTo string // application use - address to to reply to (ex: RPC) | |||||
| Expiration string // implementation use - message expiration spec | |||||
| MessageId string // application use - message identifier | |||||
| Timestamp time.Time // application use - message timestamp | |||||
| Type string // application use - message type name | |||||
| UserId string // application use - creating user id | |||||
| AppId string // application use - creating application | |||||
| reserved1 string // was cluster-id - process for buffer consumption | |||||
| } | |||||
| // DeliveryMode. Transient means higher throughput but messages will not be | |||||
| // restored on broker restart. The delivery mode of publishings is unrelated | |||||
| // to the durability of the queues they reside on. Transient messages will | |||||
| // not be restored to durable queues, persistent messages will be restored to | |||||
| // durable queues and lost on non-durable queues during server restart. | |||||
| // | |||||
| // This remains typed as uint8 to match Publishing.DeliveryMode. Other | |||||
| // delivery modes specific to custom queue implementations are not enumerated | |||||
| // here. | |||||
| const ( | |||||
| Transient uint8 = 1 | |||||
| Persistent uint8 = 2 | |||||
| ) | |||||
| // The property flags are an array of bits that indicate the presence or | |||||
| // absence of each property value in sequence. The bits are ordered from most | |||||
| // high to low - bit 15 indicates the first property. | |||||
| const ( | |||||
| flagContentType = 0x8000 | |||||
| flagContentEncoding = 0x4000 | |||||
| flagHeaders = 0x2000 | |||||
| flagDeliveryMode = 0x1000 | |||||
| flagPriority = 0x0800 | |||||
| flagCorrelationId = 0x0400 | |||||
| flagReplyTo = 0x0200 | |||||
| flagExpiration = 0x0100 | |||||
| flagMessageId = 0x0080 | |||||
| flagTimestamp = 0x0040 | |||||
| flagType = 0x0020 | |||||
| flagUserId = 0x0010 | |||||
| flagAppId = 0x0008 | |||||
| flagReserved1 = 0x0004 | |||||
| ) | |||||
| // Queue captures the current server state of the queue on the server returned | |||||
| // from Channel.QueueDeclare or Channel.QueueInspect. | |||||
| type Queue struct { | |||||
| Name string // server confirmed or generated name | |||||
| Messages int // count of messages not awaiting acknowledgment | |||||
| Consumers int // number of consumers receiving deliveries | |||||
| } | |||||
| // Publishing captures the client message sent to the server. The fields | |||||
| // outside of the Headers table included in this struct mirror the underlying | |||||
| // fields in the content frame. They use native types for convenience and | |||||
| // efficiency. | |||||
| type Publishing struct { | |||||
| // Application or exchange specific fields, | |||||
| // the headers exchange will inspect this field. | |||||
| Headers Table | |||||
| // Properties | |||||
| ContentType string // MIME content type | |||||
| ContentEncoding string // MIME content encoding | |||||
| DeliveryMode uint8 // Transient (0 or 1) or Persistent (2) | |||||
| Priority uint8 // 0 to 9 | |||||
| CorrelationId string // correlation identifier | |||||
| ReplyTo string // address to to reply to (ex: RPC) | |||||
| Expiration string // message expiration spec | |||||
| MessageId string // message identifier | |||||
| Timestamp time.Time // message timestamp | |||||
| Type string // message type name | |||||
| UserId string // creating user id - ex: "guest" | |||||
| AppId string // creating application id | |||||
| // The application specific payload of the message | |||||
| Body []byte | |||||
| } | |||||
| // Blocking notifies the server's TCP flow control of the Connection. When a | |||||
| // server hits a memory or disk alarm it will block all connections until the | |||||
| // resources are reclaimed. Use NotifyBlock on the Connection to receive these | |||||
| // events. | |||||
| type Blocking struct { | |||||
| Active bool // TCP pushback active/inactive on server | |||||
| Reason string // Server reason for activation | |||||
| } | |||||
| // DeferredConfirmation represents a future publisher confirm for a message. It | |||||
| // allows users to directly correlate a publishing to a confirmation. These are | |||||
| // returned from PublishWithDeferredConfirm on Channels. | |||||
| type DeferredConfirmation struct { | |||||
| DeliveryTag uint64 | |||||
| done chan struct{} | |||||
| ack bool | |||||
| } | |||||
| // Confirmation notifies the acknowledgment or negative acknowledgement of a | |||||
| // publishing identified by its delivery tag. Use NotifyPublish on the Channel | |||||
| // to consume these events. | |||||
| type Confirmation struct { | |||||
| DeliveryTag uint64 // A 1 based counter of publishings from when the channel was put in Confirm mode | |||||
| Ack bool // True when the server successfully received the publishing | |||||
| } | |||||
| // Decimal matches the AMQP decimal type. Scale is the number of decimal | |||||
| // digits Scale == 2, Value == 12345, Decimal == 123.45 | |||||
| type Decimal struct { | |||||
| Scale uint8 | |||||
| Value int32 | |||||
| } | |||||
| // Most common queue argument keys in queue declaration. For a comprehensive list | |||||
| // of queue arguments, visit [RabbitMQ Queue docs]. | |||||
| // | |||||
| // QueueTypeArg queue argument is used to declare quorum and stream queues. | |||||
| // Accepted values are QueueTypeClassic (default), QueueTypeQuorum and | |||||
| // QueueTypeStream. [Quorum Queues] accept (almost) all queue arguments as their | |||||
| // Classic Queues counterparts. Check [feature comparison] docs for more | |||||
| // information. | |||||
| // | |||||
| // Queues can define their [max length] using QueueMaxLenArg and | |||||
| // QueueMaxLenBytesArg queue arguments. Overflow behaviour is set using | |||||
| // QueueOverflowArg. Accepted values are QueueOverflowDropHead (default), | |||||
| // QueueOverflowRejectPublish and QueueOverflowRejectPublishDLX. | |||||
| // | |||||
| // [Queue TTL] can be defined using QueueTTLArg. That is, the time-to-live for an | |||||
| // unused queue. [Queue Message TTL] can be defined using QueueMessageTTLArg. | |||||
| // This will set a time-to-live for **messages** in the queue. | |||||
| // | |||||
| // [Stream retention] can be configured using StreamMaxLenBytesArg, to set the | |||||
| // maximum size of the stream. Please note that stream queues always keep, at | |||||
| // least, one segment. [Stream retention] can also be set using StreamMaxAgeArg, | |||||
| // to set time-based retention. Values are string with unit suffix. Valid | |||||
| // suffixes are Y, M, D, h, m, s. E.g. "7D" for one week. The maximum segment | |||||
| // size can be set using StreamMaxSegmentSizeBytesArg. The default value is | |||||
| // 500_000_000 bytes ~= 500 megabytes | |||||
| // | |||||
| // [RabbitMQ Queue docs]: https://rabbitmq.com/queues.html | |||||
| // [Stream retention]: https://rabbitmq.com/streams.html#retention | |||||
| // [max length]: https://rabbitmq.com/maxlength.html | |||||
| // [Queue TTL]: https://rabbitmq.com/ttl.html#queue-ttl | |||||
| // [Queue Message TTL]: https://rabbitmq.com/ttl.html#per-queue-message-ttl | |||||
| // [Quorum Queues]: https://rabbitmq.com/quorum-queues.html | |||||
| // [feature comparison]: https://rabbitmq.com/quorum-queues.html#feature-comparison | |||||
| const ( | |||||
| QueueTypeArg = "x-queue-type" | |||||
| QueueMaxLenArg = "x-max-length" | |||||
| QueueMaxLenBytesArg = "x-max-length-bytes" | |||||
| StreamMaxLenBytesArg = "x-max-length-bytes" | |||||
| QueueOverflowArg = "x-overflow" | |||||
| QueueMessageTTLArg = "x-message-ttl" | |||||
| QueueTTLArg = "x-expires" | |||||
| StreamMaxAgeArg = "x-max-age" | |||||
| StreamMaxSegmentSizeBytesArg = "x-stream-max-segment-size-bytes" | |||||
| ) | |||||
| // Values for queue arguments. Use as values for queue arguments during queue declaration. | |||||
| // The following argument table will create a classic queue, with max length set to 100 messages, | |||||
| // and a queue TTL of 30 minutes. | |||||
| // | |||||
| // args := amqp.Table{ | |||||
| // amqp.QueueTypeArg: QueueTypeClassic, | |||||
| // amqp.QueueMaxLenArg: 100, | |||||
| // amqp.QueueTTLArg: 1800000, | |||||
| // } | |||||
| const ( | |||||
| QueueTypeClassic = "classic" | |||||
| QueueTypeQuorum = "quorum" | |||||
| QueueTypeStream = "stream" | |||||
| QueueOverflowDropHead = "drop-head" | |||||
| QueueOverflowRejectPublish = "reject-publish" | |||||
| QueueOverflowRejectPublishDLX = "reject-publish-dlx" | |||||
| ) | |||||
| // Table stores user supplied fields of the following types: | |||||
| // | |||||
| // bool | |||||
| // byte | |||||
| // int8 | |||||
| // float32 | |||||
| // float64 | |||||
| // int | |||||
| // int16 | |||||
| // int32 | |||||
| // int64 | |||||
| // nil | |||||
| // string | |||||
| // time.Time | |||||
| // amqp.Decimal | |||||
| // amqp.Table | |||||
| // []byte | |||||
| // []interface{} - containing above types | |||||
| // | |||||
| // Functions taking a table will immediately fail when the table contains a | |||||
| // value of an unsupported type. | |||||
| // | |||||
| // The caller must be specific in which precision of integer it wishes to | |||||
| // encode. | |||||
| // | |||||
| // Use a type assertion when reading values from a table for type conversion. | |||||
| // | |||||
| // RabbitMQ expects int32 for integer values. | |||||
| type Table map[string]interface{} | |||||
| func validateField(f interface{}) error { | |||||
| switch fv := f.(type) { | |||||
| case nil, bool, byte, int8, int, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time: | |||||
| return nil | |||||
| case []interface{}: | |||||
| for _, v := range fv { | |||||
| if err := validateField(v); err != nil { | |||||
| return fmt.Errorf("in array %s", err) | |||||
| } | |||||
| } | |||||
| return nil | |||||
| case Table: | |||||
| for k, v := range fv { | |||||
| if err := validateField(v); err != nil { | |||||
| return fmt.Errorf("table field %q %s", k, err) | |||||
| } | |||||
| } | |||||
| return nil | |||||
| } | |||||
| return fmt.Errorf("value %T not supported", f) | |||||
| } | |||||
| // Validate returns and error if any Go types in the table are incompatible with AMQP types. | |||||
| func (t Table) Validate() error { | |||||
| return validateField(t) | |||||
| } | |||||
| // Sets the connection name property. This property can be used in | |||||
| // amqp.Config to set a custom connection name during amqp.DialConfig(). This | |||||
| // can be helpful to identify specific connections in RabbitMQ, for debugging or | |||||
| // tracing purposes. | |||||
| func (t Table) SetClientConnectionName(connName string) { | |||||
| t["connection_name"] = connName | |||||
| } | |||||
| type message interface { | |||||
| id() (uint16, uint16) | |||||
| wait() bool | |||||
| read(io.Reader) error | |||||
| write(io.Writer) error | |||||
| } | |||||
| type messageWithContent interface { | |||||
| message | |||||
| getContent() (properties, []byte) | |||||
| setContent(properties, []byte) | |||||
| } | |||||
| /* | |||||
| The base interface implemented as: | |||||
| 2.3.5 frame Details | |||||
| All frames consist of a header (7 octets), a payload of arbitrary size, and a 'frame-end' octet that detects | |||||
| malformed frames: | |||||
| 0 1 3 7 size+7 size+8 | |||||
| +------+---------+-------------+ +------------+ +-----------+ | |||||
| | type | channel | size | | payload | | frame-end | | |||||
| +------+---------+-------------+ +------------+ +-----------+ | |||||
| octet short long size octets octet | |||||
| To read a frame, we: | |||||
| 1. Read the header and check the frame type and channel. | |||||
| 2. Depending on the frame type, we read the payload and process it. | |||||
| 3. Read the frame end octet. | |||||
| In realistic implementations where performance is a concern, we would use | |||||
| “read-ahead buffering” or “gathering reads” to avoid doing three separate | |||||
| system calls to read a frame. | |||||
| */ | |||||
| type frame interface { | |||||
| write(io.Writer) error | |||||
| channel() uint16 | |||||
| } | |||||
| /* | |||||
| Perform any updates on the channel immediately after the frame is decoded while the | |||||
| connection mutex is held. | |||||
| */ | |||||
| func updateChannel(f frame, channel *Channel) { | |||||
| if mf, isMethodFrame := f.(*methodFrame); isMethodFrame { | |||||
| if _, isChannelClose := mf.Method.(*channelClose); isChannelClose { | |||||
| channel.setClosed() | |||||
| } | |||||
| } | |||||
| } | |||||
| type reader struct { | |||||
| r io.Reader | |||||
| } | |||||
| type writer struct { | |||||
| w io.Writer | |||||
| } | |||||
| // Implements the frame interface for Connection RPC | |||||
| type protocolHeader struct{} | |||||
| func (protocolHeader) write(w io.Writer) error { | |||||
| _, err := w.Write([]byte{'A', 'M', 'Q', 'P', 0, 0, 9, 1}) | |||||
| return err | |||||
| } | |||||
| func (protocolHeader) channel() uint16 { | |||||
| panic("only valid as initial handshake") | |||||
| } | |||||
| /* | |||||
| Method frames carry the high-level protocol commands (which we call "methods"). | |||||
| One method frame carries one command. The method frame payload has this format: | |||||
| 0 2 4 | |||||
| +----------+-----------+-------------- - - | |||||
| | class-id | method-id | arguments... | |||||
| +----------+-----------+-------------- - - | |||||
| short short ... | |||||
| To process a method frame, we: | |||||
| 1. Read the method frame payload. | |||||
| 2. Unpack it into a structure. A given method always has the same structure, | |||||
| so we can unpack the method rapidly. 3. Check that the method is allowed in | |||||
| the current context. | |||||
| 4. Check that the method arguments are valid. | |||||
| 5. Execute the method. | |||||
| Method frame bodies are constructed as a list of AMQP data fields (bits, | |||||
| integers, strings and string tables). The marshalling code is trivially | |||||
| generated directly from the protocol specifications, and can be very rapid. | |||||
| */ | |||||
| type methodFrame struct { | |||||
| ChannelId uint16 | |||||
| ClassId uint16 | |||||
| MethodId uint16 | |||||
| Method message | |||||
| } | |||||
| func (f *methodFrame) channel() uint16 { return f.ChannelId } | |||||
| /* | |||||
| Heartbeating is a technique designed to undo one of TCP/IP's features, namely | |||||
| its ability to recover from a broken physical connection by closing only after | |||||
| a quite long time-out. In some scenarios we need to know very rapidly if a | |||||
| peer is disconnected or not responding for other reasons (e.g. it is looping). | |||||
| Since heartbeating can be done at a low level, we implement this as a special | |||||
| type of frame that peers exchange at the transport level, rather than as a | |||||
| class method. | |||||
| */ | |||||
| type heartbeatFrame struct { | |||||
| ChannelId uint16 | |||||
| } | |||||
| func (f *heartbeatFrame) channel() uint16 { return f.ChannelId } | |||||
| /* | |||||
| Certain methods (such as Basic.Publish, Basic.Deliver, etc.) are formally | |||||
| defined as carrying content. When a peer sends such a method frame, it always | |||||
| follows it with a content header and zero or more content body frames. | |||||
| A content header frame has this format: | |||||
| 0 2 4 12 14 | |||||
| +----------+--------+-----------+----------------+------------- - - | |||||
| | class-id | weight | body size | property flags | property list... | |||||
| +----------+--------+-----------+----------------+------------- - - | |||||
| short short long long short remainder... | |||||
| We place content body in distinct frames (rather than including it in the | |||||
| method) so that AMQP may support "zero copy" techniques in which content is | |||||
| never marshalled or encoded. We place the content properties in their own | |||||
| frame so that recipients can selectively discard contents they do not want to | |||||
| process | |||||
| */ | |||||
| type headerFrame struct { | |||||
| ChannelId uint16 | |||||
| ClassId uint16 | |||||
| weight uint16 | |||||
| Size uint64 | |||||
| Properties properties | |||||
| } | |||||
| func (f *headerFrame) channel() uint16 { return f.ChannelId } | |||||
| /* | |||||
| Content is the application data we carry from client-to-client via the AMQP | |||||
| server. Content is, roughly speaking, a set of properties plus a binary data | |||||
| part. The set of allowed properties are defined by the Basic class, and these | |||||
| form the "content header frame". The data can be any size, and MAY be broken | |||||
| into several (or many) chunks, each forming a "content body frame". | |||||
| Looking at the frames for a specific channel, as they pass on the wire, we | |||||
| might see something like this: | |||||
| [method] | |||||
| [method] [header] [body] [body] | |||||
| [method] | |||||
| ... | |||||
| */ | |||||
| type bodyFrame struct { | |||||
| ChannelId uint16 | |||||
| Body []byte | |||||
| } | |||||
| func (f *bodyFrame) channel() uint16 { return f.ChannelId } | |||||
| @ -0,0 +1,196 @@ | |||||
| // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. | |||||
| // Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. | |||||
| // Use of this source code is governed by a BSD-style | |||||
| // license that can be found in the LICENSE file. | |||||
| package amqp091 | |||||
| import ( | |||||
| "errors" | |||||
| "net" | |||||
| "net/url" | |||||
| "strconv" | |||||
| "strings" | |||||
| ) | |||||
| var errURIScheme = errors.New("AMQP scheme must be either 'amqp://' or 'amqps://'") | |||||
| var errURIWhitespace = errors.New("URI must not contain whitespace") | |||||
| var schemePorts = map[string]int{ | |||||
| "amqp": 5672, | |||||
| "amqps": 5671, | |||||
| } | |||||
| var defaultURI = URI{ | |||||
| Scheme: "amqp", | |||||
| Host: "localhost", | |||||
| Port: 5672, | |||||
| Username: "guest", | |||||
| Password: "guest", | |||||
| Vhost: "/", | |||||
| } | |||||
| // URI represents a parsed AMQP URI string. | |||||
| type URI struct { | |||||
| Scheme string | |||||
| Host string | |||||
| Port int | |||||
| Username string | |||||
| Password string | |||||
| Vhost string | |||||
| CertFile string // client TLS auth - path to certificate (PEM) | |||||
| CACertFile string // client TLS auth - path to CA certificate (PEM) | |||||
| KeyFile string // client TLS auth - path to private key (PEM) | |||||
| ServerName string // client TLS auth - server name | |||||
| } | |||||
| // ParseURI attempts to parse the given AMQP URI according to the spec. | |||||
| // See http://www.rabbitmq.com/uri-spec.html. | |||||
| // | |||||
| // Default values for the fields are: | |||||
| // | |||||
| // Scheme: amqp | |||||
| // Host: localhost | |||||
| // Port: 5672 | |||||
| // Username: guest | |||||
| // Password: guest | |||||
| // Vhost: / | |||||
| // | |||||
| // Supports TLS query parameters. See https://www.rabbitmq.com/uri-query-parameters.html | |||||
| // | |||||
| // certfile: <path/to/client_cert.pem> | |||||
| // keyfile: <path/to/client_key.pem> | |||||
| // cacertfile: <path/to/ca.pem> | |||||
| // server_name_indication: <server name> | |||||
| // | |||||
| // If cacertfile is not provided, system CA certificates will be used. | |||||
| // Mutual TLS (client auth) will be enabled only in case keyfile AND certfile provided. | |||||
| // | |||||
| // If Config.TLSClientConfig is set, TLS parameters from URI will be ignored. | |||||
| func ParseURI(uri string) (URI, error) { | |||||
| builder := defaultURI | |||||
| if strings.Contains(uri, " ") { | |||||
| return builder, errURIWhitespace | |||||
| } | |||||
| u, err := url.Parse(uri) | |||||
| if err != nil { | |||||
| return builder, err | |||||
| } | |||||
| defaultPort, okScheme := schemePorts[u.Scheme] | |||||
| if okScheme { | |||||
| builder.Scheme = u.Scheme | |||||
| } else { | |||||
| return builder, errURIScheme | |||||
| } | |||||
| host := u.Hostname() | |||||
| port := u.Port() | |||||
| if host != "" { | |||||
| builder.Host = host | |||||
| } | |||||
| if port != "" { | |||||
| port32, err := strconv.ParseInt(port, 10, 32) | |||||
| if err != nil { | |||||
| return builder, err | |||||
| } | |||||
| builder.Port = int(port32) | |||||
| } else { | |||||
| builder.Port = defaultPort | |||||
| } | |||||
| if u.User != nil { | |||||
| builder.Username = u.User.Username() | |||||
| if password, ok := u.User.Password(); ok { | |||||
| builder.Password = password | |||||
| } | |||||
| } | |||||
| if u.Path != "" { | |||||
| if strings.HasPrefix(u.Path, "/") { | |||||
| if u.Host == "" && strings.HasPrefix(u.Path, "///") { | |||||
| // net/url doesn't handle local context authorities and leaves that up | |||||
| // to the scheme handler. In our case, we translate amqp:/// into the | |||||
| // default host and whatever the vhost should be | |||||
| if len(u.Path) > 3 { | |||||
| builder.Vhost = u.Path[3:] | |||||
| } | |||||
| } else if len(u.Path) > 1 { | |||||
| builder.Vhost = u.Path[1:] | |||||
| } | |||||
| } else { | |||||
| builder.Vhost = u.Path | |||||
| } | |||||
| } | |||||
| // see https://www.rabbitmq.com/uri-query-parameters.html | |||||
| params := u.Query() | |||||
| builder.CertFile = params.Get("certfile") | |||||
| builder.KeyFile = params.Get("keyfile") | |||||
| builder.CACertFile = params.Get("cacertfile") | |||||
| builder.ServerName = params.Get("server_name_indication") | |||||
| return builder, nil | |||||
| } | |||||
| // PlainAuth returns a PlainAuth structure based on the parsed URI's | |||||
| // Username and Password fields. | |||||
| func (uri URI) PlainAuth() *PlainAuth { | |||||
| return &PlainAuth{ | |||||
| Username: uri.Username, | |||||
| Password: uri.Password, | |||||
| } | |||||
| } | |||||
| // AMQPlainAuth returns a PlainAuth structure based on the parsed URI's | |||||
| // Username and Password fields. | |||||
| func (uri URI) AMQPlainAuth() *AMQPlainAuth { | |||||
| return &AMQPlainAuth{ | |||||
| Username: uri.Username, | |||||
| Password: uri.Password, | |||||
| } | |||||
| } | |||||
| func (uri URI) String() string { | |||||
| authority, err := url.Parse("") | |||||
| if err != nil { | |||||
| return err.Error() | |||||
| } | |||||
| authority.Scheme = uri.Scheme | |||||
| if uri.Username != defaultURI.Username || uri.Password != defaultURI.Password { | |||||
| authority.User = url.User(uri.Username) | |||||
| if uri.Password != defaultURI.Password { | |||||
| authority.User = url.UserPassword(uri.Username, uri.Password) | |||||
| } | |||||
| } | |||||
| if defaultPort, found := schemePorts[uri.Scheme]; !found || defaultPort != uri.Port { | |||||
| authority.Host = net.JoinHostPort(uri.Host, strconv.Itoa(uri.Port)) | |||||
| } else { | |||||
| // JoinHostPort() automatically add brackets to the host if it's | |||||
| // an IPv6 address. | |||||
| // | |||||
| // If not port is specified, JoinHostPort() return an IP address in the | |||||
| // form of "[::1]:", so we use TrimSuffix() to remove the extra ":". | |||||
| authority.Host = strings.TrimSuffix(net.JoinHostPort(uri.Host, ""), ":") | |||||
| } | |||||
| if uri.Vhost != defaultURI.Vhost { | |||||
| // Make sure net/url does not double escape, e.g. | |||||
| // "%2F" does not become "%252F". | |||||
| authority.Path = uri.Vhost | |||||
| authority.RawPath = url.QueryEscape(uri.Vhost) | |||||
| } else { | |||||
| authority.Path = "/" | |||||
| } | |||||
| return authority.String() | |||||
| } | |||||
| @ -0,0 +1,427 @@ | |||||
| // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. | |||||
| // Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. | |||||
| // Use of this source code is governed by a BSD-style | |||||
| // license that can be found in the LICENSE file. | |||||
| package amqp091 | |||||
| import ( | |||||
| "bufio" | |||||
| "bytes" | |||||
| "encoding/binary" | |||||
| "errors" | |||||
| "io" | |||||
| "math" | |||||
| "time" | |||||
| ) | |||||
| func (w *writer) WriteFrameNoFlush(frame frame) (err error) { | |||||
| err = frame.write(w.w) | |||||
| return | |||||
| } | |||||
| func (w *writer) WriteFrame(frame frame) (err error) { | |||||
| if err = frame.write(w.w); err != nil { | |||||
| return | |||||
| } | |||||
| if buf, ok := w.w.(*bufio.Writer); ok { | |||||
| err = buf.Flush() | |||||
| } | |||||
| return | |||||
| } | |||||
| func (f *methodFrame) write(w io.Writer) (err error) { | |||||
| var payload bytes.Buffer | |||||
| if f.Method == nil { | |||||
| return errors.New("malformed frame: missing method") | |||||
| } | |||||
| class, method := f.Method.id() | |||||
| if err = binary.Write(&payload, binary.BigEndian, class); err != nil { | |||||
| return | |||||
| } | |||||
| if err = binary.Write(&payload, binary.BigEndian, method); err != nil { | |||||
| return | |||||
| } | |||||
| if err = f.Method.write(&payload); err != nil { | |||||
| return | |||||
| } | |||||
| return writeFrame(w, frameMethod, f.ChannelId, payload.Bytes()) | |||||
| } | |||||
| // Heartbeat | |||||
| // | |||||
| // Payload is empty | |||||
| func (f *heartbeatFrame) write(w io.Writer) (err error) { | |||||
| return writeFrame(w, frameHeartbeat, f.ChannelId, []byte{}) | |||||
| } | |||||
| // CONTENT HEADER | |||||
| // 0 2 4 12 14 | |||||
| // +----------+--------+-----------+----------------+------------- - - | |||||
| // | class-id | weight | body size | property flags | property list... | |||||
| // +----------+--------+-----------+----------------+------------- - - | |||||
| // | |||||
| // short short long long short remainder... | |||||
| func (f *headerFrame) write(w io.Writer) (err error) { | |||||
| var payload bytes.Buffer | |||||
| var zeroTime time.Time | |||||
| if err = binary.Write(&payload, binary.BigEndian, f.ClassId); err != nil { | |||||
| return | |||||
| } | |||||
| if err = binary.Write(&payload, binary.BigEndian, f.weight); err != nil { | |||||
| return | |||||
| } | |||||
| if err = binary.Write(&payload, binary.BigEndian, f.Size); err != nil { | |||||
| return | |||||
| } | |||||
| // First pass will build the mask to be serialized, second pass will serialize | |||||
| // each of the fields that appear in the mask. | |||||
| var mask uint16 | |||||
| if len(f.Properties.ContentType) > 0 { | |||||
| mask = mask | flagContentType | |||||
| } | |||||
| if len(f.Properties.ContentEncoding) > 0 { | |||||
| mask = mask | flagContentEncoding | |||||
| } | |||||
| if f.Properties.Headers != nil && len(f.Properties.Headers) > 0 { | |||||
| mask = mask | flagHeaders | |||||
| } | |||||
| if f.Properties.DeliveryMode > 0 { | |||||
| mask = mask | flagDeliveryMode | |||||
| } | |||||
| if f.Properties.Priority > 0 { | |||||
| mask = mask | flagPriority | |||||
| } | |||||
| if len(f.Properties.CorrelationId) > 0 { | |||||
| mask = mask | flagCorrelationId | |||||
| } | |||||
| if len(f.Properties.ReplyTo) > 0 { | |||||
| mask = mask | flagReplyTo | |||||
| } | |||||
| if len(f.Properties.Expiration) > 0 { | |||||
| mask = mask | flagExpiration | |||||
| } | |||||
| if len(f.Properties.MessageId) > 0 { | |||||
| mask = mask | flagMessageId | |||||
| } | |||||
| if f.Properties.Timestamp != zeroTime { | |||||
| mask = mask | flagTimestamp | |||||
| } | |||||
| if len(f.Properties.Type) > 0 { | |||||
| mask = mask | flagType | |||||
| } | |||||
| if len(f.Properties.UserId) > 0 { | |||||
| mask = mask | flagUserId | |||||
| } | |||||
| if len(f.Properties.AppId) > 0 { | |||||
| mask = mask | flagAppId | |||||
| } | |||||
| if err = binary.Write(&payload, binary.BigEndian, mask); err != nil { | |||||
| return | |||||
| } | |||||
| if hasProperty(mask, flagContentType) { | |||||
| if err = writeShortstr(&payload, f.Properties.ContentType); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(mask, flagContentEncoding) { | |||||
| if err = writeShortstr(&payload, f.Properties.ContentEncoding); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(mask, flagHeaders) { | |||||
| if err = writeTable(&payload, f.Properties.Headers); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(mask, flagDeliveryMode) { | |||||
| if err = binary.Write(&payload, binary.BigEndian, f.Properties.DeliveryMode); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(mask, flagPriority) { | |||||
| if err = binary.Write(&payload, binary.BigEndian, f.Properties.Priority); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(mask, flagCorrelationId) { | |||||
| if err = writeShortstr(&payload, f.Properties.CorrelationId); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(mask, flagReplyTo) { | |||||
| if err = writeShortstr(&payload, f.Properties.ReplyTo); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(mask, flagExpiration) { | |||||
| if err = writeShortstr(&payload, f.Properties.Expiration); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(mask, flagMessageId) { | |||||
| if err = writeShortstr(&payload, f.Properties.MessageId); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(mask, flagTimestamp) { | |||||
| if err = binary.Write(&payload, binary.BigEndian, uint64(f.Properties.Timestamp.Unix())); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(mask, flagType) { | |||||
| if err = writeShortstr(&payload, f.Properties.Type); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(mask, flagUserId) { | |||||
| if err = writeShortstr(&payload, f.Properties.UserId); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| if hasProperty(mask, flagAppId) { | |||||
| if err = writeShortstr(&payload, f.Properties.AppId); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| return writeFrame(w, frameHeader, f.ChannelId, payload.Bytes()) | |||||
| } | |||||
| // Body | |||||
| // | |||||
| // Payload is one byterange from the full body who's size is declared in the | |||||
| // Header frame | |||||
| func (f *bodyFrame) write(w io.Writer) (err error) { | |||||
| return writeFrame(w, frameBody, f.ChannelId, f.Body) | |||||
| } | |||||
| func writeFrame(w io.Writer, typ uint8, channel uint16, payload []byte) (err error) { | |||||
| end := []byte{frameEnd} | |||||
| size := uint(len(payload)) | |||||
| _, err = w.Write([]byte{ | |||||
| typ, | |||||
| byte((channel & 0xff00) >> 8), | |||||
| byte((channel & 0x00ff) >> 0), | |||||
| byte((size & 0xff000000) >> 24), | |||||
| byte((size & 0x00ff0000) >> 16), | |||||
| byte((size & 0x0000ff00) >> 8), | |||||
| byte((size & 0x000000ff) >> 0), | |||||
| }) | |||||
| if err != nil { | |||||
| return | |||||
| } | |||||
| if _, err = w.Write(payload); err != nil { | |||||
| return | |||||
| } | |||||
| if _, err = w.Write(end); err != nil { | |||||
| return | |||||
| } | |||||
| return | |||||
| } | |||||
| func writeShortstr(w io.Writer, s string) (err error) { | |||||
| b := []byte(s) | |||||
| var length = uint8(len(b)) | |||||
| if err = binary.Write(w, binary.BigEndian, length); err != nil { | |||||
| return | |||||
| } | |||||
| if _, err = w.Write(b[:length]); err != nil { | |||||
| return | |||||
| } | |||||
| return | |||||
| } | |||||
| func writeLongstr(w io.Writer, s string) (err error) { | |||||
| b := []byte(s) | |||||
| var length = uint32(len(b)) | |||||
| if err = binary.Write(w, binary.BigEndian, length); err != nil { | |||||
| return | |||||
| } | |||||
| if _, err = w.Write(b[:length]); err != nil { | |||||
| return | |||||
| } | |||||
| return | |||||
| } | |||||
| /* | |||||
| 'A': []interface{} | |||||
| 'D': Decimal | |||||
| 'F': Table | |||||
| 'I': int32 | |||||
| 'S': string | |||||
| 'T': time.Time | |||||
| 'V': nil | |||||
| 'b': int8 | |||||
| 'B': byte | |||||
| 'd': float64 | |||||
| 'f': float32 | |||||
| 'l': int64 | |||||
| 's': int16 | |||||
| 't': bool | |||||
| 'x': []byte | |||||
| */ | |||||
| func writeField(w io.Writer, value interface{}) (err error) { | |||||
| var buf [9]byte | |||||
| var enc []byte | |||||
| switch v := value.(type) { | |||||
| case bool: | |||||
| buf[0] = 't' | |||||
| if v { | |||||
| buf[1] = byte(1) | |||||
| } else { | |||||
| buf[1] = byte(0) | |||||
| } | |||||
| enc = buf[:2] | |||||
| case byte: | |||||
| buf[0] = 'B' | |||||
| buf[1] = v | |||||
| enc = buf[:2] | |||||
| case int8: | |||||
| buf[0] = 'b' | |||||
| buf[1] = uint8(v) | |||||
| enc = buf[:2] | |||||
| case int16: | |||||
| buf[0] = 's' | |||||
| binary.BigEndian.PutUint16(buf[1:3], uint16(v)) | |||||
| enc = buf[:3] | |||||
| case int: | |||||
| buf[0] = 'I' | |||||
| binary.BigEndian.PutUint32(buf[1:5], uint32(v)) | |||||
| enc = buf[:5] | |||||
| case int32: | |||||
| buf[0] = 'I' | |||||
| binary.BigEndian.PutUint32(buf[1:5], uint32(v)) | |||||
| enc = buf[:5] | |||||
| case int64: | |||||
| buf[0] = 'l' | |||||
| binary.BigEndian.PutUint64(buf[1:9], uint64(v)) | |||||
| enc = buf[:9] | |||||
| case float32: | |||||
| buf[0] = 'f' | |||||
| binary.BigEndian.PutUint32(buf[1:5], math.Float32bits(v)) | |||||
| enc = buf[:5] | |||||
| case float64: | |||||
| buf[0] = 'd' | |||||
| binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(v)) | |||||
| enc = buf[:9] | |||||
| case Decimal: | |||||
| buf[0] = 'D' | |||||
| buf[1] = v.Scale | |||||
| binary.BigEndian.PutUint32(buf[2:6], uint32(v.Value)) | |||||
| enc = buf[:6] | |||||
| case string: | |||||
| buf[0] = 'S' | |||||
| binary.BigEndian.PutUint32(buf[1:5], uint32(len(v))) | |||||
| enc = append(buf[:5], []byte(v)...) | |||||
| case []interface{}: // field-array | |||||
| buf[0] = 'A' | |||||
| sec := new(bytes.Buffer) | |||||
| for _, val := range v { | |||||
| if err = writeField(sec, val); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| binary.BigEndian.PutUint32(buf[1:5], uint32(sec.Len())) | |||||
| if _, err = w.Write(buf[:5]); err != nil { | |||||
| return | |||||
| } | |||||
| if _, err = w.Write(sec.Bytes()); err != nil { | |||||
| return | |||||
| } | |||||
| return | |||||
| case time.Time: | |||||
| buf[0] = 'T' | |||||
| binary.BigEndian.PutUint64(buf[1:9], uint64(v.Unix())) | |||||
| enc = buf[:9] | |||||
| case Table: | |||||
| if _, err = w.Write([]byte{'F'}); err != nil { | |||||
| return | |||||
| } | |||||
| return writeTable(w, v) | |||||
| case []byte: | |||||
| buf[0] = 'x' | |||||
| binary.BigEndian.PutUint32(buf[1:5], uint32(len(v))) | |||||
| if _, err = w.Write(buf[0:5]); err != nil { | |||||
| return | |||||
| } | |||||
| if _, err = w.Write(v); err != nil { | |||||
| return | |||||
| } | |||||
| return | |||||
| case nil: | |||||
| buf[0] = 'V' | |||||
| enc = buf[:1] | |||||
| default: | |||||
| return ErrFieldType | |||||
| } | |||||
| _, err = w.Write(enc) | |||||
| return | |||||
| } | |||||
| func writeTable(w io.Writer, table Table) (err error) { | |||||
| var buf bytes.Buffer | |||||
| for key, val := range table { | |||||
| if err = writeShortstr(&buf, key); err != nil { | |||||
| return | |||||
| } | |||||
| if err = writeField(&buf, val); err != nil { | |||||
| return | |||||
| } | |||||
| } | |||||
| return writeLongstr(w, buf.String()) | |||||
| } | |||||
| @ -0,0 +1,3 @@ | |||||
| # github.com/rabbitmq/amqp091-go v1.7.0 | |||||
| ## explicit; go 1.16 | |||||
| github.com/rabbitmq/amqp091-go | |||||