Browse Source

update `amqp091-go` from `v1.7.0` to `v1.9.0`

pull/158/head
Pablo Aguilar 2 years ago
parent
commit
85bdd9e9e7
No known key found for this signature in database GPG Key ID: 262821CED9B63938
16 changed files with 506 additions and 131 deletions
  1. +1
    -1
      go.mod
  2. +4
    -0
      go.sum
  3. +46
    -0
      vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md
  4. +16
    -2
      vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md
  5. +21
    -0
      vendor/github.com/rabbitmq/amqp091-go/Makefile
  6. +11
    -0
      vendor/github.com/rabbitmq/amqp091-go/RELEASE.md
  7. +24
    -16
      vendor/github.com/rabbitmq/amqp091-go/allocator.go
  8. +4
    -4
      vendor/github.com/rabbitmq/amqp091-go/certs.sh
  9. +139
    -8
      vendor/github.com/rabbitmq/amqp091-go/channel.go
  10. +22
    -1
      vendor/github.com/rabbitmq/amqp091-go/confirms.go
  11. +86
    -19
      vendor/github.com/rabbitmq/amqp091-go/connection.go
  12. +27
    -0
      vendor/github.com/rabbitmq/amqp091-go/consumers.go
  13. +64
    -64
      vendor/github.com/rabbitmq/amqp091-go/spec091.go
  14. +39
    -13
      vendor/github.com/rabbitmq/amqp091-go/types.go
  15. +1
    -2
      vendor/github.com/rabbitmq/amqp091-go/write.go
  16. +1
    -1
      vendor/modules.txt

+ 1
- 1
go.mod View File

@ -2,4 +2,4 @@ module github.com/wagslane/go-rabbitmq
go 1.20 go 1.20
require github.com/rabbitmq/amqp091-go v1.7.0
require github.com/rabbitmq/amqp091-go v1.9.0

+ 4
- 0
go.sum View File

@ -6,6 +6,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo= github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo=
github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI= github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI=
github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
@ -13,6 +15,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=


+ 46
- 0
vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md View File

@ -1,5 +1,51 @@
# Changelog # Changelog
## [v1.8.1](https://github.com/rabbitmq/amqp091-go/tree/v1.8.1) (2023-05-04)
[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.8.0...v1.8.1)
**Fixed bugs:**
- Fixed incorrect version reported in client properties [52ce2efd03c53dcf77d5496977da46840e9abd24](https://github.com/rabbitmq/amqp091-go/commit/52ce2efd03c53dcf77d5496977da46840e9abd24)
**Merged pull requests:**
- Fix Example Client not reconnecting [\#186](https://github.com/rabbitmq/amqp091-go/pull/186) ([frankfil](https://github.com/frankfil))
## [v1.8.0](https://github.com/rabbitmq/amqp091-go/tree/v1.8.0) (2023-03-21)
[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.7.0...v1.8.0)
**Closed issues:**
- memory leak [\#179](https://github.com/rabbitmq/amqp091-go/issues/179)
- the publishWithContext interface will not return when it times out [\#178](https://github.com/rabbitmq/amqp091-go/issues/178)
**Merged pull requests:**
- Fix race condition on confirms [\#183](https://github.com/rabbitmq/amqp091-go/pull/183) ([calloway-jacob](https://github.com/calloway-jacob))
- Add a CloseDeadline function to Connection [\#181](https://github.com/rabbitmq/amqp091-go/pull/181) ([Zerpet](https://github.com/Zerpet))
- Fix memory leaks [\#180](https://github.com/rabbitmq/amqp091-go/pull/180) ([GXKe](https://github.com/GXKe))
- Bump go.uber.org/goleak from 1.2.0 to 1.2.1 [\#177](https://github.com/rabbitmq/amqp091-go/pull/177) ([dependabot[bot]](https://github.com/apps/dependabot))
## [v1.7.0](https://github.com/rabbitmq/amqp091-go/tree/v1.7.0) (2023-02-09)
[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.1...v1.7.0)
**Closed issues:**
- \#31 resurfacing \(?\) [\#170](https://github.com/rabbitmq/amqp091-go/issues/170)
- Deprecate QueueInspect [\#167](https://github.com/rabbitmq/amqp091-go/issues/167)
- v1.6.0 causing rabbit connection errors [\#160](https://github.com/rabbitmq/amqp091-go/issues/160)
**Merged pull requests:**
- Set channels and allocator to nil in shutdown [\#172](https://github.com/rabbitmq/amqp091-go/pull/172) ([lukebakken](https://github.com/lukebakken))
- Fix racing in Open [\#171](https://github.com/rabbitmq/amqp091-go/pull/171) ([Zerpet](https://github.com/Zerpet))
- adding go 1.20 to tests [\#169](https://github.com/rabbitmq/amqp091-go/pull/169) ([halilylm](https://github.com/halilylm))
- Deprecate the QueueInspect function [\#168](https://github.com/rabbitmq/amqp091-go/pull/168) ([lukebakken](https://github.com/lukebakken))
- Check if channel is nil before updating it [\#150](https://github.com/rabbitmq/amqp091-go/pull/150) ([julienschmidt](https://github.com/julienschmidt))
## [v1.6.1](https://github.com/rabbitmq/amqp091-go/tree/v1.6.1) (2023-02-01) ## [v1.6.1](https://github.com/rabbitmq/amqp091-go/tree/v1.6.1) (2023-02-01)
[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.1-rc.2...v1.6.1) [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.1-rc.2...v1.6.1)


+ 16
- 2
vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md View File

@ -9,11 +9,13 @@ Here is the recommended workflow:
1. Run Static Checks 1. Run Static Checks
1. Run integration tests (see below) 1. Run integration tests (see below)
1. **Implement tests** 1. **Implement tests**
1. Implement fixs
1. Commit your changes (`git commit -am 'Add some feature'`)
1. Implement fixes
1. Commit your changes. Use a [good, descriptive, commit message][good-commit].
1. Push to a branch (`git push -u origin my-new-feature`) 1. Push to a branch (`git push -u origin my-new-feature`)
1. Submit a pull request 1. Submit a pull request
[good-commit]: https://cbea.ms/git-commit/
## Running Static Checks ## Running Static Checks
golangci-lint must be installed to run the static checks. See [installation golangci-lint must be installed to run the static checks. See [installation
@ -43,6 +45,18 @@ The integration tests can be run via:
make tests make tests
``` ```
Some tests require access to `rabbitmqctl` CLI. Use the environment variable
`RABBITMQ_RABBITMQCTL_PATH=/some/path/to/rabbitmqctl` to run those tests.
If you have Docker available in your machine, you can run:
```shell
make tests-docker
```
This target will start a RabbitMQ container, run the test suite with the environment
variable setup, and stop RabbitMQ container after a successful run.
All integration tests should use the `integrationConnection(...)` test All integration tests should use the `integrationConnection(...)` test
helpers defined in `integration_test.go` to setup the integration environment helpers defined in `integration_test.go` to setup the integration environment
and logging. and logging.

+ 21
- 0
vendor/github.com/rabbitmq/amqp091-go/Makefile View File

@ -19,6 +19,11 @@ fmt: ## Run go fmt against code
tests: ## Run all tests and requires a running rabbitmq-server. Use GO_TEST_FLAGS to add extra flags to go test tests: ## Run all tests and requires a running rabbitmq-server. Use GO_TEST_FLAGS to add extra flags to go test
go test -race -v -tags integration $(GO_TEST_FLAGS) go test -race -v -tags integration $(GO_TEST_FLAGS)
.PHONY: tests-docker
tests-docker: rabbitmq-server
RABBITMQ_RABBITMQCTL_PATH="DOCKER:$(CONTAINER_NAME)" go test -race -v -tags integration $(GO_TEST_FLAGS)
$(MAKE) stop-rabbitmq-server
.PHONY: check .PHONY: check
check: check:
golangci-lint run ./... golangci-lint run ./...
@ -34,3 +39,19 @@ rabbitmq-server: ## Start a RabbitMQ server using Docker. Container name can be
.PHONY: stop-rabbitmq-server .PHONY: stop-rabbitmq-server
stop-rabbitmq-server: ## Stop a RabbitMQ server using Docker. Container name can be customised with CONTAINER_NAME=some-rabbit stop-rabbitmq-server: ## Stop a RabbitMQ server using Docker. Container name can be customised with CONTAINER_NAME=some-rabbit
docker stop $(CONTAINER_NAME) docker stop $(CONTAINER_NAME)
certs:
./certs.sh
.PHONY: certs-rm
certs-rm:
rm -r ./certs/
.PHONY: rabbitmq-server-tls
rabbitmq-server-tls: | certs ## Start a RabbitMQ server using Docker. Container name can be customised with CONTAINER_NAME=some-rabbit
docker run --detach --rm --name $(CONTAINER_NAME) \
--publish 5672:5672 --publish 5671:5671 --publish 15672:15672 \
--mount type=bind,src=./certs/server,dst=/certs \
--mount type=bind,src=./certs/ca/cacert.pem,dst=/certs/cacert.pem,readonly \
--mount type=bind,src=./rabbitmq-confs/tls/90-tls.conf,dst=/etc/rabbitmq/conf.d/90-tls.conf \
--pull always rabbitmq:3-management

+ 11
- 0
vendor/github.com/rabbitmq/amqp091-go/RELEASE.md View File

@ -1,3 +1,14 @@
# Guide to release a new version
1. Update the `buildVersion` constant in [connection.go](https://github.com/rabbitmq/amqp091-go/blob/4886c35d10b273bd374e3ed2356144ad41d27940/connection.go#L31)
2. Commit and push. Include the version in the commit message e.g. [this commit](https://github.com/rabbitmq/amqp091-go/commit/52ce2efd03c53dcf77d5496977da46840e9abd24)
3. Create a new [GitHub Release](https://github.com/rabbitmq/amqp091-go/releases). Create a new tag as `v<MAJOR>.<MINOR>.<PATCH>`
1. Use auto-generate release notes feature in GitHub
4. Generate the change log, see [Changelog Generation](#changelog-generation)
5. Review the changelog. Watch out for issues closed as "not-fixed" or without a PR
6. Commit and Push. Pro-tip: include `[skip ci]` in the commit message to skip the CI run, since it's only documentation
7. Send an announcement to the mailing list. Take inspiration from [this message](https://groups.google.com/g/rabbitmq-users/c/EBGYGOWiSgs/m/0sSFuAGICwAJ)
## Changelog Generation ## Changelog Generation
``` ```


+ 24
- 16
vendor/github.com/rabbitmq/amqp091-go/allocator.go View File

@ -18,10 +18,10 @@ const (
// allocator maintains a bitset of allocated numbers. // allocator maintains a bitset of allocated numbers.
type allocator struct { type allocator struct {
pool *big.Int
last int
low int
high int
pool *big.Int
follow int
low int
high int
} }
// NewAllocator reserves and frees integers out of a range between low and // NewAllocator reserves and frees integers out of a range between low and
@ -31,10 +31,10 @@ type allocator struct {
// sizeof(big.Word) // sizeof(big.Word)
func newAllocator(low, high int) *allocator { func newAllocator(low, high int) *allocator {
return &allocator{ return &allocator{
pool: big.NewInt(0),
last: low,
low: low,
high: high,
pool: big.NewInt(0),
follow: low,
low: low,
high: high,
} }
} }
@ -69,21 +69,29 @@ func (a allocator) String() string {
// O(N) worst case runtime where N is allocated, but usually O(1) due to a // O(N) worst case runtime where N is allocated, but usually O(1) due to a
// rolling index into the oldest allocation. // rolling index into the oldest allocation.
func (a *allocator) next() (int, bool) { func (a *allocator) next() (int, bool) {
wrapped := a.last
wrapped := a.follow
defer func() {
// make a.follow point to next value
if a.follow == a.high {
a.follow = a.low
} else {
a.follow += 1
}
}()
// Find trailing bit // Find trailing bit
for ; a.last <= a.high; a.last++ {
if a.reserve(a.last) {
return a.last, true
for ; a.follow <= a.high; a.follow++ {
if a.reserve(a.follow) {
return a.follow, true
} }
} }
// Find preceding free'd pool // Find preceding free'd pool
a.last = a.low
a.follow = a.low
for ; a.last < wrapped; a.last++ {
if a.reserve(a.last) {
return a.last, true
for ; a.follow < wrapped; a.follow++ {
if a.reserve(a.follow) {
return a.follow, true
} }
} }


+ 4
- 4
vendor/github.com/rabbitmq/amqp091-go/certs.sh View File

@ -71,12 +71,12 @@ keyUsage = keyCertSign, cRLSign
[ client_ca_extensions ] [ client_ca_extensions ]
basicConstraints = CA:false basicConstraints = CA:false
keyUsage = digitalSignature
keyUsage = keyEncipherment,digitalSignature
extendedKeyUsage = 1.3.6.1.5.5.7.3.2 extendedKeyUsage = 1.3.6.1.5.5.7.3.2
[ server_ca_extensions ] [ server_ca_extensions ]
basicConstraints = CA:false basicConstraints = CA:false
keyUsage = keyEncipherment
keyUsage = keyEncipherment,digitalSignature
extendedKeyUsage = 1.3.6.1.5.5.7.3.1 extendedKeyUsage = 1.3.6.1.5.5.7.3.1
subjectAltName = @alt_names subjectAltName = @alt_names
@ -106,7 +106,7 @@ openssl req \
-new \ -new \
-nodes \ -nodes \
-config openssl.cnf \ -config openssl.cnf \
-subj "/CN=127.0.0.1/O=server/" \
-subj "/CN=localhost/O=server/" \
-key $root/server/key.pem \ -key $root/server/key.pem \
-out $root/server/req.pem \ -out $root/server/req.pem \
-outform PEM -outform PEM
@ -115,7 +115,7 @@ openssl req \
-new \ -new \
-nodes \ -nodes \
-config openssl.cnf \ -config openssl.cnf \
-subj "/CN=127.0.0.1/O=client/" \
-subj "/CN=localhost/O=client/" \
-key $root/client/key.pem \ -key $root/client/key.pem \
-out $root/client/req.pem \ -out $root/client/req.pem \
-outform PEM -outform PEM


+ 139
- 8
vendor/github.com/rabbitmq/amqp091-go/channel.go View File

@ -41,6 +41,7 @@ type Channel struct {
// closed is set to 1 when the channel has been closed - see Channel.send() // closed is set to 1 when the channel has been closed - see Channel.send()
closed int32 closed int32
close chan struct{}
// true when we will never notify again // true when we will never notify again
noNotify bool noNotify bool
@ -86,6 +87,7 @@ func newChannel(c *Connection, id uint16) *Channel {
confirms: newConfirms(), confirms: newConfirms(),
recv: (*Channel).recvMethod, recv: (*Channel).recvMethod,
errors: make(chan *Error, 1), errors: make(chan *Error, 1),
close: make(chan struct{}),
} }
} }
@ -146,6 +148,7 @@ func (ch *Channel) shutdown(e *Error) {
} }
close(ch.errors) close(ch.errors)
close(ch.close)
ch.noNotify = true ch.noNotify = true
}) })
} }
@ -368,7 +371,11 @@ func (ch *Channel) dispatch(msg message) {
// deliveries are in flight and a no-wait cancel has happened // deliveries are in flight and a no-wait cancel has happened
default: default:
ch.rpc <- msg
select {
case <-ch.close:
return
case ch.rpc <- msg:
}
} }
} }
@ -468,6 +475,10 @@ code set to '200'.
It is safe to call this method multiple times. It is safe to call this method multiple times.
*/ */
func (ch *Channel) Close() error { func (ch *Channel) Close() error {
if ch.IsClosed() {
return nil
}
defer ch.connection.closeChannel(ch, nil) defer ch.connection.closeChannel(ch, nil)
return ch.call( return ch.call(
&channelClose{ReplyCode: replySuccess}, &channelClose{ReplyCode: replySuccess},
@ -1085,7 +1096,8 @@ Inflight messages, limited by Channel.Qos will be buffered until received from
the returned chan. the returned chan.
When the Channel or Connection is closed, all buffered and inflight messages will When the Channel or Connection is closed, all buffered and inflight messages will
be dropped.
be dropped. RabbitMQ will requeue messages not acknowledged. In other words, dropped
messages in this way won't be lost.
When the consumer tag is cancelled, all inflight messages will be delivered until When the consumer tag is cancelled, all inflight messages will be delivered until
the returned chan is closed. the returned chan is closed.
@ -1126,6 +1138,121 @@ func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal,
return deliveries, nil return deliveries, nil
} }
/*
ConsumeWithContext immediately starts delivering queued messages.
This function is similar to Channel.Consume, and accepts a context to control
consumer lifecycle. When the context passed to this function is canceled, the
consumer associated with the deliveries channel will be canceled too. When the
context passed to this function is cancelled, the deliveries channel will be closed.
An application is advised to keep on receiving messages from the delivery channel
until the channel is empty. This is specially important to avoid memory leaks from
unconsumed messages from the delivery channel.
Begin receiving on the returned chan Delivery before any other operation on the
Connection or Channel.
Continues deliveries to the returned chan Delivery until Channel.Cancel,
Connection.Close, Channel.Close, context is cancelled, or an AMQP exception
occurs. Consumers must range over the chan to ensure all deliveries are
received. Unreceived deliveries will block all methods on the same connection.
All deliveries in AMQP must be acknowledged. It is expected of the consumer to
call Delivery.Ack after it has successfully processed the delivery. If the
consumer is cancelled or the channel or connection is closed any unacknowledged
deliveries will be requeued at the end of the same queue.
The consumer is identified by a string that is unique and scoped for all
consumers on this channel. If you wish to eventually cancel the consumer, use
the same non-empty identifier in Channel.Cancel. An empty string will cause
the library to generate a unique identity. The consumer identity will be
included in every Delivery in the ConsumerTag field
When autoAck (also known as noAck) is true, the server will acknowledge
deliveries to this consumer prior to writing the delivery to the network. When
autoAck is true, the consumer should not call Delivery.Ack. Automatically
acknowledging deliveries means that some deliveries may get lost if the
consumer is unable to process them after the server delivers them.
See http://www.rabbitmq.com/confirms.html for more details.
When exclusive is true, the server will ensure that this is the sole consumer
from this queue. When exclusive is false, the server will fairly distribute
deliveries across multiple consumers.
The noLocal flag is not supported by RabbitMQ.
It's advisable to use separate connections for Channel.Publish and
Channel.Consume so not to have TCP pushback on publishing affect the ability to
consume messages, so this parameter is here mostly for completeness.
When noWait is true, do not wait for the server to confirm the request and
immediately begin deliveries. If it is not possible to consume, a channel
exception will be raised and the channel will be closed.
Optional arguments can be provided that have specific semantics for the queue
or server.
Inflight messages, limited by Channel.Qos will be buffered until received from
the returned chan.
When the Channel or Connection is closed, all buffered and inflight messages will
be dropped. RabbitMQ will requeue messages not acknowledged. In other words, dropped
messages in this way won't be lost.
*/
func (ch *Channel) ConsumeWithContext(ctx context.Context, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
// When we return from ch.call, there may be a delivery already for the
// consumer that hasn't been added to the consumer hash yet. Because of
// this, we never rely on the server picking a consumer tag for us.
if err := args.Validate(); err != nil {
return nil, err
}
if consumer == "" {
consumer = uniqueConsumerTag()
}
req := &basicConsume{
Queue: queue,
ConsumerTag: consumer,
NoLocal: noLocal,
NoAck: autoAck,
Exclusive: exclusive,
NoWait: noWait,
Arguments: args,
}
res := &basicConsumeOk{}
select {
default:
case <-ctx.Done():
return nil, ctx.Err()
}
deliveries := make(chan Delivery)
ch.consumers.add(consumer, deliveries)
if err := ch.call(req, res); err != nil {
ch.consumers.cancel(consumer)
return nil, err
}
go func() {
select {
case <-ch.consumers.closed:
return
case <-ctx.Done():
if ch != nil {
_ = ch.Cancel(consumer, false)
}
}
}()
return deliveries, nil
}
/* /*
ExchangeDeclare declares an exchange on the server. If the exchange does not ExchangeDeclare declares an exchange on the server. If the exchange does not
already exist, the server will create it. If the exchange exists, the server already exist, the server will create it. If the exchange exists, the server
@ -1167,7 +1294,7 @@ Note: RabbitMQ declares the default exchange types like 'amq.fanout' as
durable, so queues that bind to these pre-declared exchanges must also be durable, so queues that bind to these pre-declared exchanges must also be
durable. durable.
Exchanges declared as `internal` do not accept accept publishings. Internal
Exchanges declared as `internal` do not accept publishings. Internal
exchanges are useful when you wish to implement inter-exchange topologies exchanges are useful when you wish to implement inter-exchange topologies
that should not be exposed to users of the broker. that should not be exposed to users of the broker.
@ -1435,6 +1562,11 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex
ch.m.Lock() ch.m.Lock()
defer ch.m.Unlock() defer ch.m.Unlock()
var dc *DeferredConfirmation
if ch.confirming {
dc = ch.confirms.publish()
}
if err := ch.send(&basicPublish{ if err := ch.send(&basicPublish{
Exchange: exchange, Exchange: exchange,
RoutingKey: key, RoutingKey: key,
@ -1457,14 +1589,13 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex
AppId: msg.AppId, AppId: msg.AppId,
}, },
}); err != nil { }); err != nil {
if ch.confirming {
ch.confirms.unpublish()
}
return nil, err return nil, err
} }
if ch.confirming {
return ch.confirms.Publish(), nil
}
return nil, nil
return dc, nil
} }
/* /*


+ 22
- 1
vendor/github.com/rabbitmq/amqp091-go/confirms.go View File

@ -39,7 +39,7 @@ func (c *confirms) Listen(l chan Confirmation) {
} }
// Publish increments the publishing counter // Publish increments the publishing counter
func (c *confirms) Publish() *DeferredConfirmation {
func (c *confirms) publish() *DeferredConfirmation {
c.publishedMut.Lock() c.publishedMut.Lock()
defer c.publishedMut.Unlock() defer c.publishedMut.Unlock()
@ -47,6 +47,15 @@ func (c *confirms) Publish() *DeferredConfirmation {
return c.deferredConfirmations.Add(c.published) return c.deferredConfirmations.Add(c.published)
} }
// unpublish decrements the publishing counter and removes the
// DeferredConfirmation. It must be called immediately after a publish fails.
func (c *confirms) unpublish() {
c.publishedMut.Lock()
defer c.publishedMut.Unlock()
c.deferredConfirmations.remove(c.published)
c.published--
}
// confirm confirms one publishing, increments the expecting delivery tag, and // confirm confirms one publishing, increments the expecting delivery tag, and
// removes bookkeeping for that delivery tag. // removes bookkeeping for that delivery tag.
func (c *confirms) confirm(confirmation Confirmation) { func (c *confirms) confirm(confirmation Confirmation) {
@ -135,6 +144,18 @@ func (d *deferredConfirmations) Add(tag uint64) *DeferredConfirmation {
return dc return dc
} }
// remove is only used to drop a tag whose publish failed
func (d *deferredConfirmations) remove(tag uint64) {
d.m.Lock()
defer d.m.Unlock()
dc, found := d.confirmations[tag]
if !found {
return
}
close(dc.done)
delete(d.confirmations, tag)
}
func (d *deferredConfirmations) Confirm(confirmation Confirmation) { func (d *deferredConfirmations) Confirm(confirmation Confirmation) {
d.m.Lock() d.m.Lock()
defer d.m.Unlock() defer d.m.Unlock()


+ 86
- 19
vendor/github.com/rabbitmq/amqp091-go/connection.go View File

@ -28,7 +28,7 @@ const (
defaultHeartbeat = 10 * time.Second defaultHeartbeat = 10 * time.Second
defaultConnectionTimeout = 30 * time.Second defaultConnectionTimeout = 30 * time.Second
defaultProduct = "AMQP 0.9.1 Client" defaultProduct = "AMQP 0.9.1 Client"
buildVersion = "1.6.0"
buildVersion = "1.9.0"
platform = "golang" platform = "golang"
// Safer default that makes channel leaks a lot easier to spot // Safer default that makes channel leaks a lot easier to spot
// before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593. // before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593.
@ -112,6 +112,8 @@ type Connection struct {
blocks []chan Blocking blocks []chan Blocking
errors chan *Error errors chan *Error
// if connection is closed should close this chan
close chan struct{}
Config Config // The negotiated Config after connection.open Config Config // The negotiated Config after connection.open
@ -263,6 +265,7 @@ func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) {
rpc: make(chan message), rpc: make(chan message),
sends: make(chan time.Time), sends: make(chan time.Time),
errors: make(chan *Error, 1), errors: make(chan *Error, 1),
close: make(chan struct{}),
deadlines: make(chan readDeadliner, 1), deadlines: make(chan readDeadliner, 1),
} }
go c.reader(conn) go c.reader(conn)
@ -399,12 +402,47 @@ func (c *Connection) Close() error {
) )
} }
// CloseDeadline requests and waits for the response to close this AMQP connection.
//
// Accepts a deadline for waiting the server response. The deadline is passed
// to the low-level connection i.e. network socket.
//
// Regardless of the error returned, the connection is considered closed, and it
// should not be used after calling this function.
//
// In the event of an I/O timeout, connection-closed listeners are NOT informed.
//
// After returning from this call, all resources associated with this connection,
// including the underlying io, Channels, Notify listeners and Channel consumers
// will also be closed.
func (c *Connection) CloseDeadline(deadline time.Time) error {
if c.IsClosed() {
return ErrClosed
}
defer c.shutdown(nil)
err := c.setDeadline(deadline)
if err != nil {
return err
}
return c.call(
&connectionClose{
ReplyCode: replySuccess,
ReplyText: "kthxbai",
},
&connectionCloseOk{},
)
}
func (c *Connection) closeWith(err *Error) error { func (c *Connection) closeWith(err *Error) error {
if c.IsClosed() { if c.IsClosed() {
return ErrClosed return ErrClosed
} }
defer c.shutdown(err) defer c.shutdown(err)
return c.call( return c.call(
&connectionClose{ &connectionClose{
ReplyCode: uint16(err.Code), ReplyCode: uint16(err.Code),
@ -420,6 +458,18 @@ func (c *Connection) IsClosed() bool {
return atomic.LoadInt32(&c.closed) == 1 return atomic.LoadInt32(&c.closed) == 1
} }
// setDeadline is a wrapper to type assert Connection.conn and set an I/O
// deadline in the underlying TCP connection socket, by calling
// net.Conn.SetDeadline(). It returns an error, in case the type assertion fails,
// although this should never happen.
func (c *Connection) setDeadline(t time.Time) error {
con, ok := c.conn.(net.Conn)
if !ok {
return errInvalidTypeAssertion
}
return con.SetDeadline(t)
}
func (c *Connection) send(f frame) error { func (c *Connection) send(f frame) error {
if c.IsClosed() { if c.IsClosed() {
return ErrClosed return ErrClosed
@ -550,6 +600,8 @@ func (c *Connection) shutdown(err *Error) {
} }
c.conn.Close() c.conn.Close()
// reader exit
close(c.close)
c.channels = nil c.channels = nil
c.allocator = nil c.allocator = nil
@ -587,15 +639,23 @@ func (c *Connection) dispatch0(f frame) {
c <- Blocking{Active: false} c <- Blocking{Active: false}
} }
default: default:
c.rpc <- m
select {
case <-c.close:
return
case c.rpc <- m:
}
} }
case *heartbeatFrame: case *heartbeatFrame:
// kthx - all reads reset our deadline. so we can drop this // kthx - all reads reset our deadline. so we can drop this
default: default:
// lolwat - channel0 only responds to methods and heartbeats // lolwat - channel0 only responds to methods and heartbeats
if err := c.closeWith(ErrUnexpectedFrame); err != nil {
Logger.Printf("error sending connectionCloseOk with ErrUnexpectedFrame, error: %+v", err)
}
// closeWith use call don't block reader
go func() {
if err := c.closeWith(ErrUnexpectedFrame); err != nil {
Logger.Printf("error sending connectionCloseOk with ErrUnexpectedFrame, error: %+v", err)
}
}()
} }
} }
@ -642,9 +702,12 @@ func (c *Connection) dispatchClosed(f frame) {
// we are already closed, so do nothing // we are already closed, so do nothing
default: default:
// unexpected method on closed channel // unexpected method on closed channel
if err := c.closeWith(ErrClosed); err != nil {
Logger.Printf("error sending connectionCloseOk with ErrClosed, error: %+v", err)
}
// closeWith use call don't block reader
go func() {
if err := c.closeWith(ErrClosed); err != nil {
Logger.Printf("error sending connectionCloseOk with ErrClosed, error: %+v", err)
}
}()
} }
} }
} }
@ -766,13 +829,16 @@ func (c *Connection) allocateChannel() (*Channel, error) {
// releaseChannel removes a channel from the registry as the final part of the // releaseChannel removes a channel from the registry as the final part of the
// channel lifecycle // channel lifecycle
func (c *Connection) releaseChannel(id uint16) {
func (c *Connection) releaseChannel(ch *Channel) {
c.m.Lock() c.m.Lock()
defer c.m.Unlock() defer c.m.Unlock()
if !c.IsClosed() { if !c.IsClosed() {
delete(c.channels, id)
c.allocator.release(int(id))
got, ok := c.channels[ch.id]
if ok && got == ch {
delete(c.channels, ch.id)
c.allocator.release(int(ch.id))
}
} }
} }
@ -784,7 +850,7 @@ func (c *Connection) openChannel() (*Channel, error) {
} }
if err := ch.open(); err != nil { if err := ch.open(); err != nil {
c.releaseChannel(ch.id)
c.releaseChannel(ch)
return nil, err return nil, err
} }
return ch, nil return ch, nil
@ -795,7 +861,7 @@ func (c *Connection) openChannel() (*Channel, error) {
// this connection. // this connection.
func (c *Connection) closeChannel(ch *Channel, e *Error) { func (c *Connection) closeChannel(ch *Channel, e *Error) {
ch.shutdown(e) ch.shutdown(e)
c.releaseChannel(ch.id)
c.releaseChannel(ch)
} }
/* /*
@ -816,13 +882,14 @@ func (c *Connection) call(req message, res ...message) error {
} }
} }
msg, ok := <-c.rpc
if !ok {
err, errorsChanIsOpen := <-c.errors
if !errorsChanIsOpen {
return ErrClosed
var msg message
select {
case e, ok := <-c.errors:
if ok {
return e
} }
return err
return ErrClosed
case msg = <-c.rpc:
} }
// Try to match one of the result types // Try to match one of the result types


+ 27
- 0
vendor/github.com/rabbitmq/amqp091-go/consumers.go View File

@ -75,6 +75,33 @@ func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) {
} }
case out <- *queue[0]: case out <- *queue[0]:
/*
* https://github.com/rabbitmq/amqp091-go/issues/179
* https://github.com/rabbitmq/amqp091-go/pull/180
*
* Comment from @lars-t-hansen:
*
* Given Go's slice semantics, and barring any information
* available to the compiler that proves that queue is the only
* pointer to the memory it references, the only meaning that
* queue = queue[1:] can have is basically queue += sizeof(queue
* element), ie, it bumps a pointer. Looking at the generated
* code for a simple example (on ARM64 in this case) bears this
* out. So what we're left with is an array that we have a
* pointer into the middle of. When the GC traces this pointer,
* it too does not know whether the array has multiple
* referents, and so its only sensible choice is to find the
* beginning of the array, and if the array is not already
* visited, mark every element in it, including the "dead"
* pointer.
*
* (Depending on the program dynamics, an element may eventually
* be appended to the queue when the queue is at capacity, and
* in this case the live elements are copied into a new array
* and the old array is left to be GC'd eventually, along with
* the dead object. But that can take time.)
*/
queue[0] = nil
queue = queue[1:] queue = queue[1:]
} }
} }


+ 64
- 64
vendor/github.com/rabbitmq/amqp091-go/spec091.go View File

@ -2817,7 +2817,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
switch mf.MethodId { switch mf.MethodId {
case 10: // connection start case 10: // connection start
//fmt.Println("NextMethod: class:10 method:10")
// fmt.Println("NextMethod: class:10 method:10")
method := &connectionStart{} method := &connectionStart{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2825,7 +2825,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 11: // connection start-ok case 11: // connection start-ok
//fmt.Println("NextMethod: class:10 method:11")
// fmt.Println("NextMethod: class:10 method:11")
method := &connectionStartOk{} method := &connectionStartOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2833,7 +2833,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 20: // connection secure case 20: // connection secure
//fmt.Println("NextMethod: class:10 method:20")
// fmt.Println("NextMethod: class:10 method:20")
method := &connectionSecure{} method := &connectionSecure{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2841,7 +2841,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 21: // connection secure-ok case 21: // connection secure-ok
//fmt.Println("NextMethod: class:10 method:21")
// fmt.Println("NextMethod: class:10 method:21")
method := &connectionSecureOk{} method := &connectionSecureOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2849,7 +2849,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 30: // connection tune case 30: // connection tune
//fmt.Println("NextMethod: class:10 method:30")
// fmt.Println("NextMethod: class:10 method:30")
method := &connectionTune{} method := &connectionTune{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2857,7 +2857,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 31: // connection tune-ok case 31: // connection tune-ok
//fmt.Println("NextMethod: class:10 method:31")
// fmt.Println("NextMethod: class:10 method:31")
method := &connectionTuneOk{} method := &connectionTuneOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2865,7 +2865,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 40: // connection open case 40: // connection open
//fmt.Println("NextMethod: class:10 method:40")
// fmt.Println("NextMethod: class:10 method:40")
method := &connectionOpen{} method := &connectionOpen{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2873,7 +2873,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 41: // connection open-ok case 41: // connection open-ok
//fmt.Println("NextMethod: class:10 method:41")
// fmt.Println("NextMethod: class:10 method:41")
method := &connectionOpenOk{} method := &connectionOpenOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2881,7 +2881,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 50: // connection close case 50: // connection close
//fmt.Println("NextMethod: class:10 method:50")
// fmt.Println("NextMethod: class:10 method:50")
method := &connectionClose{} method := &connectionClose{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2889,7 +2889,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 51: // connection close-ok case 51: // connection close-ok
//fmt.Println("NextMethod: class:10 method:51")
// fmt.Println("NextMethod: class:10 method:51")
method := &connectionCloseOk{} method := &connectionCloseOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2897,7 +2897,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 60: // connection blocked case 60: // connection blocked
//fmt.Println("NextMethod: class:10 method:60")
// fmt.Println("NextMethod: class:10 method:60")
method := &connectionBlocked{} method := &connectionBlocked{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2905,7 +2905,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 61: // connection unblocked case 61: // connection unblocked
//fmt.Println("NextMethod: class:10 method:61")
// fmt.Println("NextMethod: class:10 method:61")
method := &connectionUnblocked{} method := &connectionUnblocked{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2913,7 +2913,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 70: // connection update-secret case 70: // connection update-secret
//fmt.Println("NextMethod: class:10 method:70")
// fmt.Println("NextMethod: class:10 method:70")
method := &connectionUpdateSecret{} method := &connectionUpdateSecret{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2921,7 +2921,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 71: // connection update-secret-ok case 71: // connection update-secret-ok
//fmt.Println("NextMethod: class:10 method:71")
// fmt.Println("NextMethod: class:10 method:71")
method := &connectionUpdateSecretOk{} method := &connectionUpdateSecretOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2936,7 +2936,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
switch mf.MethodId { switch mf.MethodId {
case 10: // channel open case 10: // channel open
//fmt.Println("NextMethod: class:20 method:10")
// fmt.Println("NextMethod: class:20 method:10")
method := &channelOpen{} method := &channelOpen{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2944,7 +2944,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 11: // channel open-ok case 11: // channel open-ok
//fmt.Println("NextMethod: class:20 method:11")
// fmt.Println("NextMethod: class:20 method:11")
method := &channelOpenOk{} method := &channelOpenOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2952,7 +2952,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 20: // channel flow case 20: // channel flow
//fmt.Println("NextMethod: class:20 method:20")
// fmt.Println("NextMethod: class:20 method:20")
method := &channelFlow{} method := &channelFlow{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2960,7 +2960,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 21: // channel flow-ok case 21: // channel flow-ok
//fmt.Println("NextMethod: class:20 method:21")
// fmt.Println("NextMethod: class:20 method:21")
method := &channelFlowOk{} method := &channelFlowOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2968,7 +2968,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 40: // channel close case 40: // channel close
//fmt.Println("NextMethod: class:20 method:40")
// fmt.Println("NextMethod: class:20 method:40")
method := &channelClose{} method := &channelClose{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2976,7 +2976,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 41: // channel close-ok case 41: // channel close-ok
//fmt.Println("NextMethod: class:20 method:41")
// fmt.Println("NextMethod: class:20 method:41")
method := &channelCloseOk{} method := &channelCloseOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2991,7 +2991,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
switch mf.MethodId { switch mf.MethodId {
case 10: // exchange declare case 10: // exchange declare
//fmt.Println("NextMethod: class:40 method:10")
// fmt.Println("NextMethod: class:40 method:10")
method := &exchangeDeclare{} method := &exchangeDeclare{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -2999,7 +2999,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 11: // exchange declare-ok case 11: // exchange declare-ok
//fmt.Println("NextMethod: class:40 method:11")
// fmt.Println("NextMethod: class:40 method:11")
method := &exchangeDeclareOk{} method := &exchangeDeclareOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3007,7 +3007,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 20: // exchange delete case 20: // exchange delete
//fmt.Println("NextMethod: class:40 method:20")
// fmt.Println("NextMethod: class:40 method:20")
method := &exchangeDelete{} method := &exchangeDelete{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3015,7 +3015,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 21: // exchange delete-ok case 21: // exchange delete-ok
//fmt.Println("NextMethod: class:40 method:21")
// fmt.Println("NextMethod: class:40 method:21")
method := &exchangeDeleteOk{} method := &exchangeDeleteOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3023,7 +3023,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 30: // exchange bind case 30: // exchange bind
//fmt.Println("NextMethod: class:40 method:30")
// fmt.Println("NextMethod: class:40 method:30")
method := &exchangeBind{} method := &exchangeBind{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3031,7 +3031,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 31: // exchange bind-ok case 31: // exchange bind-ok
//fmt.Println("NextMethod: class:40 method:31")
// fmt.Println("NextMethod: class:40 method:31")
method := &exchangeBindOk{} method := &exchangeBindOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3039,7 +3039,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 40: // exchange unbind case 40: // exchange unbind
//fmt.Println("NextMethod: class:40 method:40")
// fmt.Println("NextMethod: class:40 method:40")
method := &exchangeUnbind{} method := &exchangeUnbind{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3047,7 +3047,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 51: // exchange unbind-ok case 51: // exchange unbind-ok
//fmt.Println("NextMethod: class:40 method:51")
// fmt.Println("NextMethod: class:40 method:51")
method := &exchangeUnbindOk{} method := &exchangeUnbindOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3062,7 +3062,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
switch mf.MethodId { switch mf.MethodId {
case 10: // queue declare case 10: // queue declare
//fmt.Println("NextMethod: class:50 method:10")
// fmt.Println("NextMethod: class:50 method:10")
method := &queueDeclare{} method := &queueDeclare{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3070,7 +3070,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 11: // queue declare-ok case 11: // queue declare-ok
//fmt.Println("NextMethod: class:50 method:11")
// fmt.Println("NextMethod: class:50 method:11")
method := &queueDeclareOk{} method := &queueDeclareOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3078,7 +3078,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 20: // queue bind case 20: // queue bind
//fmt.Println("NextMethod: class:50 method:20")
// fmt.Println("NextMethod: class:50 method:20")
method := &queueBind{} method := &queueBind{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3086,7 +3086,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 21: // queue bind-ok case 21: // queue bind-ok
//fmt.Println("NextMethod: class:50 method:21")
// fmt.Println("NextMethod: class:50 method:21")
method := &queueBindOk{} method := &queueBindOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3094,7 +3094,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 50: // queue unbind case 50: // queue unbind
//fmt.Println("NextMethod: class:50 method:50")
// fmt.Println("NextMethod: class:50 method:50")
method := &queueUnbind{} method := &queueUnbind{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3102,7 +3102,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 51: // queue unbind-ok case 51: // queue unbind-ok
//fmt.Println("NextMethod: class:50 method:51")
// fmt.Println("NextMethod: class:50 method:51")
method := &queueUnbindOk{} method := &queueUnbindOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3110,7 +3110,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 30: // queue purge case 30: // queue purge
//fmt.Println("NextMethod: class:50 method:30")
// fmt.Println("NextMethod: class:50 method:30")
method := &queuePurge{} method := &queuePurge{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3118,7 +3118,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 31: // queue purge-ok case 31: // queue purge-ok
//fmt.Println("NextMethod: class:50 method:31")
// fmt.Println("NextMethod: class:50 method:31")
method := &queuePurgeOk{} method := &queuePurgeOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3126,7 +3126,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 40: // queue delete case 40: // queue delete
//fmt.Println("NextMethod: class:50 method:40")
// fmt.Println("NextMethod: class:50 method:40")
method := &queueDelete{} method := &queueDelete{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3134,7 +3134,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 41: // queue delete-ok case 41: // queue delete-ok
//fmt.Println("NextMethod: class:50 method:41")
// fmt.Println("NextMethod: class:50 method:41")
method := &queueDeleteOk{} method := &queueDeleteOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3149,7 +3149,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
switch mf.MethodId { switch mf.MethodId {
case 10: // basic qos case 10: // basic qos
//fmt.Println("NextMethod: class:60 method:10")
// fmt.Println("NextMethod: class:60 method:10")
method := &basicQos{} method := &basicQos{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3157,7 +3157,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 11: // basic qos-ok case 11: // basic qos-ok
//fmt.Println("NextMethod: class:60 method:11")
// fmt.Println("NextMethod: class:60 method:11")
method := &basicQosOk{} method := &basicQosOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3165,7 +3165,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 20: // basic consume case 20: // basic consume
//fmt.Println("NextMethod: class:60 method:20")
// fmt.Println("NextMethod: class:60 method:20")
method := &basicConsume{} method := &basicConsume{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3173,7 +3173,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 21: // basic consume-ok case 21: // basic consume-ok
//fmt.Println("NextMethod: class:60 method:21")
// fmt.Println("NextMethod: class:60 method:21")
method := &basicConsumeOk{} method := &basicConsumeOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3181,7 +3181,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 30: // basic cancel case 30: // basic cancel
//fmt.Println("NextMethod: class:60 method:30")
// fmt.Println("NextMethod: class:60 method:30")
method := &basicCancel{} method := &basicCancel{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3189,7 +3189,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 31: // basic cancel-ok case 31: // basic cancel-ok
//fmt.Println("NextMethod: class:60 method:31")
// fmt.Println("NextMethod: class:60 method:31")
method := &basicCancelOk{} method := &basicCancelOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3197,7 +3197,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 40: // basic publish case 40: // basic publish
//fmt.Println("NextMethod: class:60 method:40")
// fmt.Println("NextMethod: class:60 method:40")
method := &basicPublish{} method := &basicPublish{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3205,7 +3205,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 50: // basic return case 50: // basic return
//fmt.Println("NextMethod: class:60 method:50")
// fmt.Println("NextMethod: class:60 method:50")
method := &basicReturn{} method := &basicReturn{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3213,7 +3213,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 60: // basic deliver case 60: // basic deliver
//fmt.Println("NextMethod: class:60 method:60")
// fmt.Println("NextMethod: class:60 method:60")
method := &basicDeliver{} method := &basicDeliver{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3221,7 +3221,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 70: // basic get case 70: // basic get
//fmt.Println("NextMethod: class:60 method:70")
// fmt.Println("NextMethod: class:60 method:70")
method := &basicGet{} method := &basicGet{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3229,7 +3229,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 71: // basic get-ok case 71: // basic get-ok
//fmt.Println("NextMethod: class:60 method:71")
// fmt.Println("NextMethod: class:60 method:71")
method := &basicGetOk{} method := &basicGetOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3237,7 +3237,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 72: // basic get-empty case 72: // basic get-empty
//fmt.Println("NextMethod: class:60 method:72")
// fmt.Println("NextMethod: class:60 method:72")
method := &basicGetEmpty{} method := &basicGetEmpty{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3245,7 +3245,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 80: // basic ack case 80: // basic ack
//fmt.Println("NextMethod: class:60 method:80")
// fmt.Println("NextMethod: class:60 method:80")
method := &basicAck{} method := &basicAck{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3253,7 +3253,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 90: // basic reject case 90: // basic reject
//fmt.Println("NextMethod: class:60 method:90")
// fmt.Println("NextMethod: class:60 method:90")
method := &basicReject{} method := &basicReject{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3261,7 +3261,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 100: // basic recover-async case 100: // basic recover-async
//fmt.Println("NextMethod: class:60 method:100")
// fmt.Println("NextMethod: class:60 method:100")
method := &basicRecoverAsync{} method := &basicRecoverAsync{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3269,7 +3269,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 110: // basic recover case 110: // basic recover
//fmt.Println("NextMethod: class:60 method:110")
// fmt.Println("NextMethod: class:60 method:110")
method := &basicRecover{} method := &basicRecover{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3277,7 +3277,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 111: // basic recover-ok case 111: // basic recover-ok
//fmt.Println("NextMethod: class:60 method:111")
// fmt.Println("NextMethod: class:60 method:111")
method := &basicRecoverOk{} method := &basicRecoverOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3285,7 +3285,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 120: // basic nack case 120: // basic nack
//fmt.Println("NextMethod: class:60 method:120")
// fmt.Println("NextMethod: class:60 method:120")
method := &basicNack{} method := &basicNack{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3300,7 +3300,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
switch mf.MethodId { switch mf.MethodId {
case 10: // tx select case 10: // tx select
//fmt.Println("NextMethod: class:90 method:10")
// fmt.Println("NextMethod: class:90 method:10")
method := &txSelect{} method := &txSelect{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3308,7 +3308,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 11: // tx select-ok case 11: // tx select-ok
//fmt.Println("NextMethod: class:90 method:11")
// fmt.Println("NextMethod: class:90 method:11")
method := &txSelectOk{} method := &txSelectOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3316,7 +3316,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 20: // tx commit case 20: // tx commit
//fmt.Println("NextMethod: class:90 method:20")
// fmt.Println("NextMethod: class:90 method:20")
method := &txCommit{} method := &txCommit{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3324,7 +3324,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 21: // tx commit-ok case 21: // tx commit-ok
//fmt.Println("NextMethod: class:90 method:21")
// fmt.Println("NextMethod: class:90 method:21")
method := &txCommitOk{} method := &txCommitOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3332,7 +3332,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 30: // tx rollback case 30: // tx rollback
//fmt.Println("NextMethod: class:90 method:30")
// fmt.Println("NextMethod: class:90 method:30")
method := &txRollback{} method := &txRollback{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3340,7 +3340,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 31: // tx rollback-ok case 31: // tx rollback-ok
//fmt.Println("NextMethod: class:90 method:31")
// fmt.Println("NextMethod: class:90 method:31")
method := &txRollbackOk{} method := &txRollbackOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3355,7 +3355,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
switch mf.MethodId { switch mf.MethodId {
case 10: // confirm select case 10: // confirm select
//fmt.Println("NextMethod: class:85 method:10")
// fmt.Println("NextMethod: class:85 method:10")
method := &confirmSelect{} method := &confirmSelect{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return
@ -3363,7 +3363,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method mf.Method = method
case 11: // confirm select-ok case 11: // confirm select-ok
//fmt.Println("NextMethod: class:85 method:11")
// fmt.Println("NextMethod: class:85 method:11")
method := &confirmSelectOk{} method := &confirmSelectOk{}
if err = method.read(r.r); err != nil { if err = method.read(r.r); err != nil {
return return


+ 39
- 13
vendor/github.com/rabbitmq/amqp091-go/types.go View File

@ -11,6 +11,8 @@ import (
"time" "time"
) )
// DefaultExchange is the default direct exchange that binds every queue by its
// name. Applications can route to a queue using the queue name as routing key.
const DefaultExchange = "" const DefaultExchange = ""
// Constants for standard AMQP 0-9-1 exchange types. // Constants for standard AMQP 0-9-1 exchange types.
@ -63,6 +65,11 @@ var (
ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"} ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"}
) )
// internal errors used inside the library
var (
errInvalidTypeAssertion = &Error{Code: InternalError, Reason: "type assertion unsuccessful", Server: false, Recover: true}
)
// Error captures the code and reason a channel or connection has been closed // Error captures the code and reason a channel or connection has been closed
// by the server. // by the server.
type Error struct { type Error struct {
@ -209,29 +216,39 @@ type Decimal struct {
// Most common queue argument keys in queue declaration. For a comprehensive list // Most common queue argument keys in queue declaration. For a comprehensive list
// of queue arguments, visit [RabbitMQ Queue docs]. // of queue arguments, visit [RabbitMQ Queue docs].
// //
// QueueTypeArg queue argument is used to declare quorum and stream queues.
// Accepted values are QueueTypeClassic (default), QueueTypeQuorum and
// QueueTypeStream. [Quorum Queues] accept (almost) all queue arguments as their
// [QueueTypeArg] queue argument is used to declare quorum and stream queues.
// Accepted values are [QueueTypeClassic] (default), [QueueTypeQuorum] and
// [QueueTypeStream]. [Quorum Queues] accept (almost) all queue arguments as their
// Classic Queues counterparts. Check [feature comparison] docs for more // Classic Queues counterparts. Check [feature comparison] docs for more
// information. // information.
// //
// Queues can define their [max length] using QueueMaxLenArg and
// QueueMaxLenBytesArg queue arguments. Overflow behaviour is set using
// QueueOverflowArg. Accepted values are QueueOverflowDropHead (default),
// QueueOverflowRejectPublish and QueueOverflowRejectPublishDLX.
// Queues can define their [max length] using [QueueMaxLenArg] and
// [QueueMaxLenBytesArg] queue arguments. Overflow behaviour is set using
// [QueueOverflowArg]. Accepted values are [QueueOverflowDropHead] (default),
// [QueueOverflowRejectPublish] and [QueueOverflowRejectPublishDLX].
// //
// [Queue TTL] can be defined using QueueTTLArg. That is, the time-to-live for an
// unused queue. [Queue Message TTL] can be defined using QueueMessageTTLArg.
// This will set a time-to-live for **messages** in the queue.
// [Queue TTL] can be defined using [QueueTTLArg]. That is, the time-to-live for an
// unused queue. [Queue Message TTL] can be defined using [QueueMessageTTLArg].
// This will set a time-to-live for messages in the queue.
// //
// [Stream retention] can be configured using StreamMaxLenBytesArg, to set the
// [Stream retention] can be configured using [StreamMaxLenBytesArg], to set the
// maximum size of the stream. Please note that stream queues always keep, at // maximum size of the stream. Please note that stream queues always keep, at
// least, one segment. [Stream retention] can also be set using StreamMaxAgeArg,
// least, one segment. [Stream retention] can also be set using [StreamMaxAgeArg],
// to set time-based retention. Values are string with unit suffix. Valid // to set time-based retention. Values are string with unit suffix. Valid
// suffixes are Y, M, D, h, m, s. E.g. "7D" for one week. The maximum segment // suffixes are Y, M, D, h, m, s. E.g. "7D" for one week. The maximum segment
// size can be set using StreamMaxSegmentSizeBytesArg. The default value is
// size can be set using [StreamMaxSegmentSizeBytesArg]. The default value is
// 500_000_000 bytes ~= 500 megabytes // 500_000_000 bytes ~= 500 megabytes
// //
// Starting with RabbitMQ 3.12, consumer timeout can be configured as a queue
// argument. This is the timeout for a consumer to acknowledge a message. The
// value is the time in milliseconds. The timeout is evaluated periodically,
// at one minute intervals. Values lower than one minute are not supported.
// See the [consumer timeout] guide for more information.
//
// [Single Active Consumer] on quorum and classic queues can be configured
// using [SingleActiveConsumerArg]. This argument expects a boolean value. It is
// false by default.
//
// [RabbitMQ Queue docs]: https://rabbitmq.com/queues.html // [RabbitMQ Queue docs]: https://rabbitmq.com/queues.html
// [Stream retention]: https://rabbitmq.com/streams.html#retention // [Stream retention]: https://rabbitmq.com/streams.html#retention
// [max length]: https://rabbitmq.com/maxlength.html // [max length]: https://rabbitmq.com/maxlength.html
@ -239,6 +256,8 @@ type Decimal struct {
// [Queue Message TTL]: https://rabbitmq.com/ttl.html#per-queue-message-ttl // [Queue Message TTL]: https://rabbitmq.com/ttl.html#per-queue-message-ttl
// [Quorum Queues]: https://rabbitmq.com/quorum-queues.html // [Quorum Queues]: https://rabbitmq.com/quorum-queues.html
// [feature comparison]: https://rabbitmq.com/quorum-queues.html#feature-comparison // [feature comparison]: https://rabbitmq.com/quorum-queues.html#feature-comparison
// [consumer timeout]: https://rabbitmq.com/consumers.html#acknowledgement-timeout
// [Single Active Consumer]: https://rabbitmq.com/consumers.html#single-active-consumer
const ( const (
QueueTypeArg = "x-queue-type" QueueTypeArg = "x-queue-type"
QueueMaxLenArg = "x-max-length" QueueMaxLenArg = "x-max-length"
@ -249,6 +268,11 @@ const (
QueueTTLArg = "x-expires" QueueTTLArg = "x-expires"
StreamMaxAgeArg = "x-max-age" StreamMaxAgeArg = "x-max-age"
StreamMaxSegmentSizeBytesArg = "x-stream-max-segment-size-bytes" StreamMaxSegmentSizeBytesArg = "x-stream-max-segment-size-bytes"
// QueueVersionArg declares the Classic Queue version to use. Expects an integer, either 1 or 2.
QueueVersionArg = "x-queue-version"
// ConsumerTimeoutArg is available in RabbitMQ 3.12+ as a queue argument.
ConsumerTimeoutArg = "x-consumer-timeout"
SingleActiveConsumerArg = "x-single-active-consumer"
) )
// Values for queue arguments. Use as values for queue arguments during queue declaration. // Values for queue arguments. Use as values for queue arguments during queue declaration.
@ -260,6 +284,8 @@ const (
// amqp.QueueMaxLenArg: 100, // amqp.QueueMaxLenArg: 100,
// amqp.QueueTTLArg: 1800000, // amqp.QueueTTLArg: 1800000,
// } // }
//
// Refer to [Channel.QueueDeclare] for more examples.
const ( const (
QueueTypeClassic = "classic" QueueTypeClassic = "classic"
QueueTypeQuorum = "quorum" QueueTypeQuorum = "quorum"


+ 1
- 2
vendor/github.com/rabbitmq/amqp091-go/write.go View File

@ -72,7 +72,6 @@ func (f *heartbeatFrame) write(w io.Writer) (err error) {
// short short long long short remainder... // short short long long short remainder...
func (f *headerFrame) write(w io.Writer) (err error) { func (f *headerFrame) write(w io.Writer) (err error) {
var payload bytes.Buffer var payload bytes.Buffer
var zeroTime time.Time
if err = binary.Write(&payload, binary.BigEndian, f.ClassId); err != nil { if err = binary.Write(&payload, binary.BigEndian, f.ClassId); err != nil {
return return
@ -118,7 +117,7 @@ func (f *headerFrame) write(w io.Writer) (err error) {
if len(f.Properties.MessageId) > 0 { if len(f.Properties.MessageId) > 0 {
mask = mask | flagMessageId mask = mask | flagMessageId
} }
if f.Properties.Timestamp != zeroTime {
if !f.Properties.Timestamp.IsZero() {
mask = mask | flagTimestamp mask = mask | flagTimestamp
} }
if len(f.Properties.Type) > 0 { if len(f.Properties.Type) > 0 {


+ 1
- 1
vendor/modules.txt View File

@ -1,3 +1,3 @@
# github.com/rabbitmq/amqp091-go v1.7.0
# github.com/rabbitmq/amqp091-go v1.9.0
## explicit; go 1.16 ## explicit; go 1.16
github.com/rabbitmq/amqp091-go github.com/rabbitmq/amqp091-go

Loading…
Cancel
Save