From df42cff33a06917e73297c83c07f2ba0ce20c7ed Mon Sep 17 00:00:00 2001 From: WiRight Date: Mon, 24 Apr 2023 09:46:05 +0300 Subject: [PATCH] rename to github.com/DizoftTeam/go-rabbitmq --- README.md | 4 +- connection.go | 2 +- consume.go | 2 +- consumer_options.go | 2 +- declare.go | 2 +- examples/consumer/main.go | 2 +- examples/logger/main.go | 2 +- examples/multiconsumer/main.go | 2 +- examples/multipublisher/main.go | 2 +- examples/publisher/main.go | 2 +- go.mod | 4 +- go.sum | 34 ++------------ internal/channelmanager/channel_manager.go | 6 +-- .../connectionmanager/connection_manager.go | 4 +- logger.go | 2 +- publish.go | 4 +- .../rabbitmq/amqp091-go/CHANGELOG.md | 18 +++++++ .../rabbitmq/amqp091-go/CONTRIBUTING.md | 18 ++++++- .../github.com/rabbitmq/amqp091-go/Makefile | 5 ++ .../github.com/rabbitmq/amqp091-go/channel.go | 14 ++++-- .../rabbitmq/amqp091-go/confirms.go | 23 ++++++++- .../rabbitmq/amqp091-go/connection.go | 47 +++++++++++++++++++ .../rabbitmq/amqp091-go/consumers.go | 27 +++++++++++ .../github.com/rabbitmq/amqp091-go/types.go | 5 ++ vendor/modules.txt | 2 +- 25 files changed, 175 insertions(+), 60 deletions(-) diff --git a/README.md b/README.md index 00e9ce9..cddec6e 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Wrapper of [rabbitmq/amqp091-go](https://github.com/rabbitmq/amqp091-go) that pr Supported by [Boot.dev](https://boot.dev) -[![](https://godoc.org/github.com/wagslane/go-rabbitmq?status.svg)](https://godoc.org/github.com/wagslane/go-rabbitmq)![Deploy](https://github.com/wagslane/go-rabbitmq/workflows/Tests/badge.svg) +[![](https://godoc.org/github.com/DizoftTeam/go-rabbitmq?status.svg)](https://godoc.org/github.com/DizoftTeam/go-rabbitmq)![Deploy](https://github.com/DizoftTeam/go-rabbitmq/workflows/Tests/badge.svg) ## Motivation @@ -25,7 +25,7 @@ The goal with `go-rabbitmq` is to provide *most* (but not all) of the nitty-grit Inside a Go module: ```bash -go get github.com/wagslane/go-rabbitmq +go get github.com/DizoftTeam/go-rabbitmq ``` ## 🚀 Quick Start Consumer diff --git a/connection.go b/connection.go index 97b8bb4..32a81a8 100644 --- a/connection.go +++ b/connection.go @@ -1,8 +1,8 @@ package rabbitmq import ( + "github.com/DizoftTeam/go-rabbitmq/internal/connectionmanager" amqp "github.com/rabbitmq/amqp091-go" - "github.com/wagslane/go-rabbitmq/internal/connectionmanager" ) // Conn manages the connection to a rabbit cluster diff --git a/consume.go b/consume.go index d1f802f..b3b682f 100644 --- a/consume.go +++ b/consume.go @@ -5,8 +5,8 @@ import ( "fmt" "sync" + "github.com/DizoftTeam/go-rabbitmq/internal/channelmanager" amqp "github.com/rabbitmq/amqp091-go" - "github.com/wagslane/go-rabbitmq/internal/channelmanager" ) // Action is an action that occurs after processed this delivery diff --git a/consumer_options.go b/consumer_options.go index 80f2979..5b7a313 100644 --- a/consumer_options.go +++ b/consumer_options.go @@ -1,8 +1,8 @@ package rabbitmq import ( + "github.com/DizoftTeam/go-rabbitmq/internal/logger" amqp "github.com/rabbitmq/amqp091-go" - "github.com/wagslane/go-rabbitmq/internal/logger" ) // getDefaultConsumerOptions describes the options that will be used when a value isn't provided diff --git a/declare.go b/declare.go index 86abe85..7379d4c 100644 --- a/declare.go +++ b/declare.go @@ -1,7 +1,7 @@ package rabbitmq import ( - "github.com/wagslane/go-rabbitmq/internal/channelmanager" + "github.com/DizoftTeam/go-rabbitmq/internal/channelmanager" ) func declareQueue(chanManager *channelmanager.ChannelManager, options QueueOptions) error { diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 7c68733..2a7ed75 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -7,7 +7,7 @@ import ( "os/signal" "syscall" - rabbitmq "github.com/wagslane/go-rabbitmq" + rabbitmq "github.com/DizoftTeam/go-rabbitmq" ) func main() { diff --git a/examples/logger/main.go b/examples/logger/main.go index 8620e4c..ecb31c5 100644 --- a/examples/logger/main.go +++ b/examples/logger/main.go @@ -4,7 +4,7 @@ import ( "context" "log" - rabbitmq "github.com/wagslane/go-rabbitmq" + rabbitmq "github.com/DizoftTeam/go-rabbitmq" ) // errorLogger is used in WithPublisherOptionsLogger to create a custom logger diff --git a/examples/multiconsumer/main.go b/examples/multiconsumer/main.go index 571af8b..e481052 100644 --- a/examples/multiconsumer/main.go +++ b/examples/multiconsumer/main.go @@ -7,7 +7,7 @@ import ( "os/signal" "syscall" - rabbitmq "github.com/wagslane/go-rabbitmq" + rabbitmq "github.com/DizoftTeam/go-rabbitmq" ) func main() { diff --git a/examples/multipublisher/main.go b/examples/multipublisher/main.go index 5121a3f..12d966d 100644 --- a/examples/multipublisher/main.go +++ b/examples/multipublisher/main.go @@ -9,7 +9,7 @@ import ( "syscall" "time" - rabbitmq "github.com/wagslane/go-rabbitmq" + rabbitmq "github.com/DizoftTeam/go-rabbitmq" ) func main() { diff --git a/examples/publisher/main.go b/examples/publisher/main.go index d07cc27..b871640 100644 --- a/examples/publisher/main.go +++ b/examples/publisher/main.go @@ -9,7 +9,7 @@ import ( "syscall" "time" - rabbitmq "github.com/wagslane/go-rabbitmq" + rabbitmq "github.com/DizoftTeam/go-rabbitmq" ) func main() { diff --git a/go.mod b/go.mod index 3c21622..3aba60f 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,5 @@ -module github.com/wagslane/go-rabbitmq +module github.com/DizoftTeam/go-rabbitmq go 1.20 -require github.com/rabbitmq/amqp091-go v1.7.0 +require github.com/rabbitmq/amqp091-go v1.8.0 diff --git a/go.sum b/go.sum index e627709..787640a 100644 --- a/go.sum +++ b/go.sum @@ -4,40 +4,14 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo= -github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI= +github.com/rabbitmq/amqp091-go v1.8.0 h1:GBFy5PpLQ5jSVVSYv8ecHGqeX7UTLYR4ItQbDCss9MM= +github.com/rabbitmq/amqp091-go v1.8.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= -go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/channelmanager/channel_manager.go b/internal/channelmanager/channel_manager.go index 67faf0a..81a9759 100644 --- a/internal/channelmanager/channel_manager.go +++ b/internal/channelmanager/channel_manager.go @@ -5,10 +5,10 @@ import ( "sync" "time" + "github.com/DizoftTeam/go-rabbitmq/internal/connectionmanager" + "github.com/DizoftTeam/go-rabbitmq/internal/dispatcher" + "github.com/DizoftTeam/go-rabbitmq/internal/logger" amqp "github.com/rabbitmq/amqp091-go" - "github.com/wagslane/go-rabbitmq/internal/connectionmanager" - "github.com/wagslane/go-rabbitmq/internal/dispatcher" - "github.com/wagslane/go-rabbitmq/internal/logger" ) // ChannelManager - diff --git a/internal/connectionmanager/connection_manager.go b/internal/connectionmanager/connection_manager.go index fce1f2b..bfbd513 100644 --- a/internal/connectionmanager/connection_manager.go +++ b/internal/connectionmanager/connection_manager.go @@ -4,9 +4,9 @@ import ( "sync" "time" + "github.com/DizoftTeam/go-rabbitmq/internal/dispatcher" + "github.com/DizoftTeam/go-rabbitmq/internal/logger" amqp "github.com/rabbitmq/amqp091-go" - "github.com/wagslane/go-rabbitmq/internal/dispatcher" - "github.com/wagslane/go-rabbitmq/internal/logger" ) // ConnectionManager - diff --git a/logger.go b/logger.go index 2c3f231..4960129 100644 --- a/logger.go +++ b/logger.go @@ -4,7 +4,7 @@ import ( "fmt" "log" - "github.com/wagslane/go-rabbitmq/internal/logger" + "github.com/DizoftTeam/go-rabbitmq/internal/logger" ) // Logger is describes a logging structure. It can be set using diff --git a/publish.go b/publish.go index 8954018..bee7828 100644 --- a/publish.go +++ b/publish.go @@ -6,9 +6,9 @@ import ( "fmt" "sync" + "github.com/DizoftTeam/go-rabbitmq/internal/channelmanager" + "github.com/DizoftTeam/go-rabbitmq/internal/connectionmanager" amqp "github.com/rabbitmq/amqp091-go" - "github.com/wagslane/go-rabbitmq/internal/channelmanager" - "github.com/wagslane/go-rabbitmq/internal/connectionmanager" ) // DeliveryMode. Transient means higher throughput but messages will not be diff --git a/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md b/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md index 1165775..b59599d 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md +++ b/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md @@ -1,5 +1,23 @@ # Changelog +## [v1.7.0](https://github.com/rabbitmq/amqp091-go/tree/v1.7.0) (2023-02-09) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.1...v1.7.0) + +**Closed issues:** + +- \#31 resurfacing \(?\) [\#170](https://github.com/rabbitmq/amqp091-go/issues/170) +- Deprecate QueueInspect [\#167](https://github.com/rabbitmq/amqp091-go/issues/167) +- v1.6.0 causing rabbit connection errors [\#160](https://github.com/rabbitmq/amqp091-go/issues/160) + +**Merged pull requests:** + +- Set channels and allocator to nil in shutdown [\#172](https://github.com/rabbitmq/amqp091-go/pull/172) ([lukebakken](https://github.com/lukebakken)) +- Fix racing in Open [\#171](https://github.com/rabbitmq/amqp091-go/pull/171) ([Zerpet](https://github.com/Zerpet)) +- adding go 1.20 to tests [\#169](https://github.com/rabbitmq/amqp091-go/pull/169) ([halilylm](https://github.com/halilylm)) +- Deprecate the QueueInspect function [\#168](https://github.com/rabbitmq/amqp091-go/pull/168) ([lukebakken](https://github.com/lukebakken)) +- Check if channel is nil before updating it [\#150](https://github.com/rabbitmq/amqp091-go/pull/150) ([julienschmidt](https://github.com/julienschmidt)) + ## [v1.6.1](https://github.com/rabbitmq/amqp091-go/tree/v1.6.1) (2023-02-01) [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.1-rc.2...v1.6.1) diff --git a/vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md b/vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md index ed1b971..ec86fe5 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md +++ b/vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md @@ -9,11 +9,13 @@ Here is the recommended workflow: 1. Run Static Checks 1. Run integration tests (see below) 1. **Implement tests** -1. Implement fixs -1. Commit your changes (`git commit -am 'Add some feature'`) +1. Implement fixes +1. Commit your changes. Use a [good, descriptive, commit message][good-commit]. 1. Push to a branch (`git push -u origin my-new-feature`) 1. Submit a pull request +[good-commit]: https://cbea.ms/git-commit/ + ## Running Static Checks golangci-lint must be installed to run the static checks. See [installation @@ -43,6 +45,18 @@ The integration tests can be run via: make tests ``` +Some tests require access to `rabbitmqctl` CLI. Use the environment variable +`RABBITMQ_RABBITMQCTL_PATH=/some/path/to/rabbitmqctl` to run those tests. + +If you have Docker available in your machine, you can run: + +```shell +make tests-docker +``` + +This target will start a RabbitMQ container, run the test suite with the environment +variable setup, and stop RabbitMQ container after a successful run. + All integration tests should use the `integrationConnection(...)` test helpers defined in `integration_test.go` to setup the integration environment and logging. diff --git a/vendor/github.com/rabbitmq/amqp091-go/Makefile b/vendor/github.com/rabbitmq/amqp091-go/Makefile index 7342731..69e9e2b 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/Makefile +++ b/vendor/github.com/rabbitmq/amqp091-go/Makefile @@ -19,6 +19,11 @@ fmt: ## Run go fmt against code tests: ## Run all tests and requires a running rabbitmq-server. Use GO_TEST_FLAGS to add extra flags to go test go test -race -v -tags integration $(GO_TEST_FLAGS) +.PHONY: tests-docker +tests-docker: rabbitmq-server + RABBITMQ_RABBITMQCTL_PATH="DOCKER:$(CONTAINER_NAME)" go test -race -v -tags integration $(GO_TEST_FLAGS) + $(MAKE) stop-rabbitmq-server + .PHONY: check check: golangci-lint run ./... diff --git a/vendor/github.com/rabbitmq/amqp091-go/channel.go b/vendor/github.com/rabbitmq/amqp091-go/channel.go index 8ba9bab..ae6f2d1 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/channel.go +++ b/vendor/github.com/rabbitmq/amqp091-go/channel.go @@ -1435,6 +1435,11 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex ch.m.Lock() defer ch.m.Unlock() + var dc *DeferredConfirmation + if ch.confirming { + dc = ch.confirms.publish() + } + if err := ch.send(&basicPublish{ Exchange: exchange, RoutingKey: key, @@ -1457,14 +1462,13 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex AppId: msg.AppId, }, }); err != nil { + if ch.confirming { + ch.confirms.unpublish() + } return nil, err } - if ch.confirming { - return ch.confirms.Publish(), nil - } - - return nil, nil + return dc, nil } /* diff --git a/vendor/github.com/rabbitmq/amqp091-go/confirms.go b/vendor/github.com/rabbitmq/amqp091-go/confirms.go index f9973b7..577e042 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/confirms.go +++ b/vendor/github.com/rabbitmq/amqp091-go/confirms.go @@ -39,7 +39,7 @@ func (c *confirms) Listen(l chan Confirmation) { } // Publish increments the publishing counter -func (c *confirms) Publish() *DeferredConfirmation { +func (c *confirms) publish() *DeferredConfirmation { c.publishedMut.Lock() defer c.publishedMut.Unlock() @@ -47,6 +47,15 @@ func (c *confirms) Publish() *DeferredConfirmation { return c.deferredConfirmations.Add(c.published) } +// unpublish decrements the publishing counter and removes the +// DeferredConfirmation. It must be called immediately after a publish fails. +func (c *confirms) unpublish() { + c.publishedMut.Lock() + defer c.publishedMut.Unlock() + c.deferredConfirmations.remove(c.published) + c.published-- +} + // confirm confirms one publishing, increments the expecting delivery tag, and // removes bookkeeping for that delivery tag. func (c *confirms) confirm(confirmation Confirmation) { @@ -135,6 +144,18 @@ func (d *deferredConfirmations) Add(tag uint64) *DeferredConfirmation { return dc } +// remove is only used to drop a tag whose publish failed +func (d *deferredConfirmations) remove(tag uint64) { + d.m.Lock() + defer d.m.Unlock() + dc, found := d.confirmations[tag] + if !found { + return + } + close(dc.done) + delete(d.confirmations, tag) +} + func (d *deferredConfirmations) Confirm(confirmation Confirmation) { d.m.Lock() defer d.m.Unlock() diff --git a/vendor/github.com/rabbitmq/amqp091-go/connection.go b/vendor/github.com/rabbitmq/amqp091-go/connection.go index def2260..abe4b02 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/connection.go +++ b/vendor/github.com/rabbitmq/amqp091-go/connection.go @@ -399,12 +399,47 @@ func (c *Connection) Close() error { ) } +// CloseDeadline requests and waits for the response to close this AMQP connection. +// +// Accepts a deadline for waiting the server response. The deadline is passed +// to the low-level connection i.e. network socket. +// +// Regardless of the error returned, the connection is considered closed, and it +// should not be used after calling this function. +// +// In the event of an I/O timeout, connection-closed listeners are NOT informed. +// +// After returning from this call, all resources associated with this connection, +// including the underlying io, Channels, Notify listeners and Channel consumers +// will also be closed. +func (c *Connection) CloseDeadline(deadline time.Time) error { + if c.IsClosed() { + return ErrClosed + } + + defer c.shutdown(nil) + + err := c.setDeadline(deadline) + if err != nil { + return err + } + + return c.call( + &connectionClose{ + ReplyCode: replySuccess, + ReplyText: "kthxbai", + }, + &connectionCloseOk{}, + ) +} + func (c *Connection) closeWith(err *Error) error { if c.IsClosed() { return ErrClosed } defer c.shutdown(err) + return c.call( &connectionClose{ ReplyCode: uint16(err.Code), @@ -420,6 +455,18 @@ func (c *Connection) IsClosed() bool { return atomic.LoadInt32(&c.closed) == 1 } +// setDeadline is a wrapper to type assert Connection.conn and set an I/O +// deadline in the underlying TCP connection socket, by calling +// net.Conn.SetDeadline(). It returns an error, in case the type assertion fails, +// although this should never happen. +func (c *Connection) setDeadline(t time.Time) error { + con, ok := c.conn.(net.Conn) + if !ok { + return errInvalidTypeAssertion + } + return con.SetDeadline(t) +} + func (c *Connection) send(f frame) error { if c.IsClosed() { return ErrClosed diff --git a/vendor/github.com/rabbitmq/amqp091-go/consumers.go b/vendor/github.com/rabbitmq/amqp091-go/consumers.go index 8c23fad..c352fec 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/consumers.go +++ b/vendor/github.com/rabbitmq/amqp091-go/consumers.go @@ -75,6 +75,33 @@ func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) { } case out <- *queue[0]: + /* + * https://github.com/rabbitmq/amqp091-go/issues/179 + * https://github.com/rabbitmq/amqp091-go/pull/180 + * + * Comment from @lars-t-hansen: + * + * Given Go's slice semantics, and barring any information + * available to the compiler that proves that queue is the only + * pointer to the memory it references, the only meaning that + * queue = queue[1:] can have is basically queue += sizeof(queue + * element), ie, it bumps a pointer. Looking at the generated + * code for a simple example (on ARM64 in this case) bears this + * out. So what we're left with is an array that we have a + * pointer into the middle of. When the GC traces this pointer, + * it too does not know whether the array has multiple + * referents, and so its only sensible choice is to find the + * beginning of the array, and if the array is not already + * visited, mark every element in it, including the "dead" + * pointer. + * + * (Depending on the program dynamics, an element may eventually + * be appended to the queue when the queue is at capacity, and + * in this case the live elements are copied into a new array + * and the old array is left to be GC'd eventually, along with + * the dead object. But that can take time.) + */ + queue[0] = nil queue = queue[1:] } } diff --git a/vendor/github.com/rabbitmq/amqp091-go/types.go b/vendor/github.com/rabbitmq/amqp091-go/types.go index 427eefb..e8d8986 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/types.go +++ b/vendor/github.com/rabbitmq/amqp091-go/types.go @@ -63,6 +63,11 @@ var ( ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"} ) +// internal errors used inside the library +var ( + errInvalidTypeAssertion = &Error{Code: InternalError, Reason: "type assertion unsuccessful", Server: false, Recover: true} +) + // Error captures the code and reason a channel or connection has been closed // by the server. type Error struct { diff --git a/vendor/modules.txt b/vendor/modules.txt index 8765d0d..ca577cd 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,3 +1,3 @@ -# github.com/rabbitmq/amqp091-go v1.7.0 +# github.com/rabbitmq/amqp091-go v1.8.0 ## explicit; go 1.16 github.com/rabbitmq/amqp091-go