Browse Source

Merge pull request #158 from thepabloaguilar/update-amqp091

update `amqp091-go` from `v1.7.0` to `v1.9.0`
pull/208/head
Lane Wagner 2 years ago
committed by GitHub
parent
commit
10483c01f1
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
16 changed files with 506 additions and 131 deletions
  1. +1
    -1
      go.mod
  2. +4
    -0
      go.sum
  3. +46
    -0
      vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md
  4. +16
    -2
      vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md
  5. +21
    -0
      vendor/github.com/rabbitmq/amqp091-go/Makefile
  6. +11
    -0
      vendor/github.com/rabbitmq/amqp091-go/RELEASE.md
  7. +24
    -16
      vendor/github.com/rabbitmq/amqp091-go/allocator.go
  8. +4
    -4
      vendor/github.com/rabbitmq/amqp091-go/certs.sh
  9. +139
    -8
      vendor/github.com/rabbitmq/amqp091-go/channel.go
  10. +22
    -1
      vendor/github.com/rabbitmq/amqp091-go/confirms.go
  11. +86
    -19
      vendor/github.com/rabbitmq/amqp091-go/connection.go
  12. +27
    -0
      vendor/github.com/rabbitmq/amqp091-go/consumers.go
  13. +64
    -64
      vendor/github.com/rabbitmq/amqp091-go/spec091.go
  14. +39
    -13
      vendor/github.com/rabbitmq/amqp091-go/types.go
  15. +1
    -2
      vendor/github.com/rabbitmq/amqp091-go/write.go
  16. +1
    -1
      vendor/modules.txt

+ 1
- 1
go.mod View File

@ -2,4 +2,4 @@ module github.com/wagslane/go-rabbitmq
go 1.20
require github.com/rabbitmq/amqp091-go v1.7.0
require github.com/rabbitmq/amqp091-go v1.9.0

+ 4
- 0
go.sum View File

@ -6,6 +6,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo=
github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI=
github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
@ -13,6 +15,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=


+ 46
- 0
vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md View File

@ -1,5 +1,51 @@
# Changelog
## [v1.8.1](https://github.com/rabbitmq/amqp091-go/tree/v1.8.1) (2023-05-04)
[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.8.0...v1.8.1)
**Fixed bugs:**
- Fixed incorrect version reported in client properties [52ce2efd03c53dcf77d5496977da46840e9abd24](https://github.com/rabbitmq/amqp091-go/commit/52ce2efd03c53dcf77d5496977da46840e9abd24)
**Merged pull requests:**
- Fix Example Client not reconnecting [\#186](https://github.com/rabbitmq/amqp091-go/pull/186) ([frankfil](https://github.com/frankfil))
## [v1.8.0](https://github.com/rabbitmq/amqp091-go/tree/v1.8.0) (2023-03-21)
[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.7.0...v1.8.0)
**Closed issues:**
- memory leak [\#179](https://github.com/rabbitmq/amqp091-go/issues/179)
- the publishWithContext interface will not return when it times out [\#178](https://github.com/rabbitmq/amqp091-go/issues/178)
**Merged pull requests:**
- Fix race condition on confirms [\#183](https://github.com/rabbitmq/amqp091-go/pull/183) ([calloway-jacob](https://github.com/calloway-jacob))
- Add a CloseDeadline function to Connection [\#181](https://github.com/rabbitmq/amqp091-go/pull/181) ([Zerpet](https://github.com/Zerpet))
- Fix memory leaks [\#180](https://github.com/rabbitmq/amqp091-go/pull/180) ([GXKe](https://github.com/GXKe))
- Bump go.uber.org/goleak from 1.2.0 to 1.2.1 [\#177](https://github.com/rabbitmq/amqp091-go/pull/177) ([dependabot[bot]](https://github.com/apps/dependabot))
## [v1.7.0](https://github.com/rabbitmq/amqp091-go/tree/v1.7.0) (2023-02-09)
[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.1...v1.7.0)
**Closed issues:**
- \#31 resurfacing \(?\) [\#170](https://github.com/rabbitmq/amqp091-go/issues/170)
- Deprecate QueueInspect [\#167](https://github.com/rabbitmq/amqp091-go/issues/167)
- v1.6.0 causing rabbit connection errors [\#160](https://github.com/rabbitmq/amqp091-go/issues/160)
**Merged pull requests:**
- Set channels and allocator to nil in shutdown [\#172](https://github.com/rabbitmq/amqp091-go/pull/172) ([lukebakken](https://github.com/lukebakken))
- Fix racing in Open [\#171](https://github.com/rabbitmq/amqp091-go/pull/171) ([Zerpet](https://github.com/Zerpet))
- adding go 1.20 to tests [\#169](https://github.com/rabbitmq/amqp091-go/pull/169) ([halilylm](https://github.com/halilylm))
- Deprecate the QueueInspect function [\#168](https://github.com/rabbitmq/amqp091-go/pull/168) ([lukebakken](https://github.com/lukebakken))
- Check if channel is nil before updating it [\#150](https://github.com/rabbitmq/amqp091-go/pull/150) ([julienschmidt](https://github.com/julienschmidt))
## [v1.6.1](https://github.com/rabbitmq/amqp091-go/tree/v1.6.1) (2023-02-01)
[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.1-rc.2...v1.6.1)


+ 16
- 2
vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md View File

@ -9,11 +9,13 @@ Here is the recommended workflow:
1. Run Static Checks
1. Run integration tests (see below)
1. **Implement tests**
1. Implement fixs
1. Commit your changes (`git commit -am 'Add some feature'`)
1. Implement fixes
1. Commit your changes. Use a [good, descriptive, commit message][good-commit].
1. Push to a branch (`git push -u origin my-new-feature`)
1. Submit a pull request
[good-commit]: https://cbea.ms/git-commit/
## Running Static Checks
golangci-lint must be installed to run the static checks. See [installation
@ -43,6 +45,18 @@ The integration tests can be run via:
make tests
```
Some tests require access to `rabbitmqctl` CLI. Use the environment variable
`RABBITMQ_RABBITMQCTL_PATH=/some/path/to/rabbitmqctl` to run those tests.
If you have Docker available in your machine, you can run:
```shell
make tests-docker
```
This target will start a RabbitMQ container, run the test suite with the environment
variable setup, and stop RabbitMQ container after a successful run.
All integration tests should use the `integrationConnection(...)` test
helpers defined in `integration_test.go` to setup the integration environment
and logging.

+ 21
- 0
vendor/github.com/rabbitmq/amqp091-go/Makefile View File

@ -19,6 +19,11 @@ fmt: ## Run go fmt against code
tests: ## Run all tests and requires a running rabbitmq-server. Use GO_TEST_FLAGS to add extra flags to go test
go test -race -v -tags integration $(GO_TEST_FLAGS)
.PHONY: tests-docker
tests-docker: rabbitmq-server
RABBITMQ_RABBITMQCTL_PATH="DOCKER:$(CONTAINER_NAME)" go test -race -v -tags integration $(GO_TEST_FLAGS)
$(MAKE) stop-rabbitmq-server
.PHONY: check
check:
golangci-lint run ./...
@ -34,3 +39,19 @@ rabbitmq-server: ## Start a RabbitMQ server using Docker. Container name can be
.PHONY: stop-rabbitmq-server
stop-rabbitmq-server: ## Stop a RabbitMQ server using Docker. Container name can be customised with CONTAINER_NAME=some-rabbit
docker stop $(CONTAINER_NAME)
certs:
./certs.sh
.PHONY: certs-rm
certs-rm:
rm -r ./certs/
.PHONY: rabbitmq-server-tls
rabbitmq-server-tls: | certs ## Start a RabbitMQ server using Docker. Container name can be customised with CONTAINER_NAME=some-rabbit
docker run --detach --rm --name $(CONTAINER_NAME) \
--publish 5672:5672 --publish 5671:5671 --publish 15672:15672 \
--mount type=bind,src=./certs/server,dst=/certs \
--mount type=bind,src=./certs/ca/cacert.pem,dst=/certs/cacert.pem,readonly \
--mount type=bind,src=./rabbitmq-confs/tls/90-tls.conf,dst=/etc/rabbitmq/conf.d/90-tls.conf \
--pull always rabbitmq:3-management

+ 11
- 0
vendor/github.com/rabbitmq/amqp091-go/RELEASE.md View File

@ -1,3 +1,14 @@
# Guide to release a new version
1. Update the `buildVersion` constant in [connection.go](https://github.com/rabbitmq/amqp091-go/blob/4886c35d10b273bd374e3ed2356144ad41d27940/connection.go#L31)
2. Commit and push. Include the version in the commit message e.g. [this commit](https://github.com/rabbitmq/amqp091-go/commit/52ce2efd03c53dcf77d5496977da46840e9abd24)
3. Create a new [GitHub Release](https://github.com/rabbitmq/amqp091-go/releases). Create a new tag as `v<MAJOR>.<MINOR>.<PATCH>`
1. Use auto-generate release notes feature in GitHub
4. Generate the change log, see [Changelog Generation](#changelog-generation)
5. Review the changelog. Watch out for issues closed as "not-fixed" or without a PR
6. Commit and Push. Pro-tip: include `[skip ci]` in the commit message to skip the CI run, since it's only documentation
7. Send an announcement to the mailing list. Take inspiration from [this message](https://groups.google.com/g/rabbitmq-users/c/EBGYGOWiSgs/m/0sSFuAGICwAJ)
## Changelog Generation
```


+ 24
- 16
vendor/github.com/rabbitmq/amqp091-go/allocator.go View File

@ -18,10 +18,10 @@ const (
// allocator maintains a bitset of allocated numbers.
type allocator struct {
pool *big.Int
last int
low int
high int
pool *big.Int
follow int
low int
high int
}
// NewAllocator reserves and frees integers out of a range between low and
@ -31,10 +31,10 @@ type allocator struct {
// sizeof(big.Word)
func newAllocator(low, high int) *allocator {
return &allocator{
pool: big.NewInt(0),
last: low,
low: low,
high: high,
pool: big.NewInt(0),
follow: low,
low: low,
high: high,
}
}
@ -69,21 +69,29 @@ func (a allocator) String() string {
// O(N) worst case runtime where N is allocated, but usually O(1) due to a
// rolling index into the oldest allocation.
func (a *allocator) next() (int, bool) {
wrapped := a.last
wrapped := a.follow
defer func() {
// make a.follow point to next value
if a.follow == a.high {
a.follow = a.low
} else {
a.follow += 1
}
}()
// Find trailing bit
for ; a.last <= a.high; a.last++ {
if a.reserve(a.last) {
return a.last, true
for ; a.follow <= a.high; a.follow++ {
if a.reserve(a.follow) {
return a.follow, true
}
}
// Find preceding free'd pool
a.last = a.low
a.follow = a.low
for ; a.last < wrapped; a.last++ {
if a.reserve(a.last) {
return a.last, true
for ; a.follow < wrapped; a.follow++ {
if a.reserve(a.follow) {
return a.follow, true
}
}


+ 4
- 4
vendor/github.com/rabbitmq/amqp091-go/certs.sh View File

@ -71,12 +71,12 @@ keyUsage = keyCertSign, cRLSign
[ client_ca_extensions ]
basicConstraints = CA:false
keyUsage = digitalSignature
keyUsage = keyEncipherment,digitalSignature
extendedKeyUsage = 1.3.6.1.5.5.7.3.2
[ server_ca_extensions ]
basicConstraints = CA:false
keyUsage = keyEncipherment
keyUsage = keyEncipherment,digitalSignature
extendedKeyUsage = 1.3.6.1.5.5.7.3.1
subjectAltName = @alt_names
@ -106,7 +106,7 @@ openssl req \
-new \
-nodes \
-config openssl.cnf \
-subj "/CN=127.0.0.1/O=server/" \
-subj "/CN=localhost/O=server/" \
-key $root/server/key.pem \
-out $root/server/req.pem \
-outform PEM
@ -115,7 +115,7 @@ openssl req \
-new \
-nodes \
-config openssl.cnf \
-subj "/CN=127.0.0.1/O=client/" \
-subj "/CN=localhost/O=client/" \
-key $root/client/key.pem \
-out $root/client/req.pem \
-outform PEM


+ 139
- 8
vendor/github.com/rabbitmq/amqp091-go/channel.go View File

@ -41,6 +41,7 @@ type Channel struct {
// closed is set to 1 when the channel has been closed - see Channel.send()
closed int32
close chan struct{}
// true when we will never notify again
noNotify bool
@ -86,6 +87,7 @@ func newChannel(c *Connection, id uint16) *Channel {
confirms: newConfirms(),
recv: (*Channel).recvMethod,
errors: make(chan *Error, 1),
close: make(chan struct{}),
}
}
@ -146,6 +148,7 @@ func (ch *Channel) shutdown(e *Error) {
}
close(ch.errors)
close(ch.close)
ch.noNotify = true
})
}
@ -368,7 +371,11 @@ func (ch *Channel) dispatch(msg message) {
// deliveries are in flight and a no-wait cancel has happened
default:
ch.rpc <- msg
select {
case <-ch.close:
return
case ch.rpc <- msg:
}
}
}
@ -468,6 +475,10 @@ code set to '200'.
It is safe to call this method multiple times.
*/
func (ch *Channel) Close() error {
if ch.IsClosed() {
return nil
}
defer ch.connection.closeChannel(ch, nil)
return ch.call(
&channelClose{ReplyCode: replySuccess},
@ -1085,7 +1096,8 @@ Inflight messages, limited by Channel.Qos will be buffered until received from
the returned chan.
When the Channel or Connection is closed, all buffered and inflight messages will
be dropped.
be dropped. RabbitMQ will requeue messages not acknowledged. In other words, dropped
messages in this way won't be lost.
When the consumer tag is cancelled, all inflight messages will be delivered until
the returned chan is closed.
@ -1126,6 +1138,121 @@ func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal,
return deliveries, nil
}
/*
ConsumeWithContext immediately starts delivering queued messages.
This function is similar to Channel.Consume, and accepts a context to control
consumer lifecycle. When the context passed to this function is canceled, the
consumer associated with the deliveries channel will be canceled too. When the
context passed to this function is cancelled, the deliveries channel will be closed.
An application is advised to keep on receiving messages from the delivery channel
until the channel is empty. This is specially important to avoid memory leaks from
unconsumed messages from the delivery channel.
Begin receiving on the returned chan Delivery before any other operation on the
Connection or Channel.
Continues deliveries to the returned chan Delivery until Channel.Cancel,
Connection.Close, Channel.Close, context is cancelled, or an AMQP exception
occurs. Consumers must range over the chan to ensure all deliveries are
received. Unreceived deliveries will block all methods on the same connection.
All deliveries in AMQP must be acknowledged. It is expected of the consumer to
call Delivery.Ack after it has successfully processed the delivery. If the
consumer is cancelled or the channel or connection is closed any unacknowledged
deliveries will be requeued at the end of the same queue.
The consumer is identified by a string that is unique and scoped for all
consumers on this channel. If you wish to eventually cancel the consumer, use
the same non-empty identifier in Channel.Cancel. An empty string will cause
the library to generate a unique identity. The consumer identity will be
included in every Delivery in the ConsumerTag field
When autoAck (also known as noAck) is true, the server will acknowledge
deliveries to this consumer prior to writing the delivery to the network. When
autoAck is true, the consumer should not call Delivery.Ack. Automatically
acknowledging deliveries means that some deliveries may get lost if the
consumer is unable to process them after the server delivers them.
See http://www.rabbitmq.com/confirms.html for more details.
When exclusive is true, the server will ensure that this is the sole consumer
from this queue. When exclusive is false, the server will fairly distribute
deliveries across multiple consumers.
The noLocal flag is not supported by RabbitMQ.
It's advisable to use separate connections for Channel.Publish and
Channel.Consume so not to have TCP pushback on publishing affect the ability to
consume messages, so this parameter is here mostly for completeness.
When noWait is true, do not wait for the server to confirm the request and
immediately begin deliveries. If it is not possible to consume, a channel
exception will be raised and the channel will be closed.
Optional arguments can be provided that have specific semantics for the queue
or server.
Inflight messages, limited by Channel.Qos will be buffered until received from
the returned chan.
When the Channel or Connection is closed, all buffered and inflight messages will
be dropped. RabbitMQ will requeue messages not acknowledged. In other words, dropped
messages in this way won't be lost.
*/
func (ch *Channel) ConsumeWithContext(ctx context.Context, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
// When we return from ch.call, there may be a delivery already for the
// consumer that hasn't been added to the consumer hash yet. Because of
// this, we never rely on the server picking a consumer tag for us.
if err := args.Validate(); err != nil {
return nil, err
}
if consumer == "" {
consumer = uniqueConsumerTag()
}
req := &basicConsume{
Queue: queue,
ConsumerTag: consumer,
NoLocal: noLocal,
NoAck: autoAck,
Exclusive: exclusive,
NoWait: noWait,
Arguments: args,
}
res := &basicConsumeOk{}
select {
default:
case <-ctx.Done():
return nil, ctx.Err()
}
deliveries := make(chan Delivery)
ch.consumers.add(consumer, deliveries)
if err := ch.call(req, res); err != nil {
ch.consumers.cancel(consumer)
return nil, err
}
go func() {
select {
case <-ch.consumers.closed:
return
case <-ctx.Done():
if ch != nil {
_ = ch.Cancel(consumer, false)
}
}
}()
return deliveries, nil
}
/*
ExchangeDeclare declares an exchange on the server. If the exchange does not
already exist, the server will create it. If the exchange exists, the server
@ -1167,7 +1294,7 @@ Note: RabbitMQ declares the default exchange types like 'amq.fanout' as
durable, so queues that bind to these pre-declared exchanges must also be
durable.
Exchanges declared as `internal` do not accept accept publishings. Internal
Exchanges declared as `internal` do not accept publishings. Internal
exchanges are useful when you wish to implement inter-exchange topologies
that should not be exposed to users of the broker.
@ -1435,6 +1562,11 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex
ch.m.Lock()
defer ch.m.Unlock()
var dc *DeferredConfirmation
if ch.confirming {
dc = ch.confirms.publish()
}
if err := ch.send(&basicPublish{
Exchange: exchange,
RoutingKey: key,
@ -1457,14 +1589,13 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex
AppId: msg.AppId,
},
}); err != nil {
if ch.confirming {
ch.confirms.unpublish()
}
return nil, err
}
if ch.confirming {
return ch.confirms.Publish(), nil
}
return nil, nil
return dc, nil
}
/*


+ 22
- 1
vendor/github.com/rabbitmq/amqp091-go/confirms.go View File

@ -39,7 +39,7 @@ func (c *confirms) Listen(l chan Confirmation) {
}
// Publish increments the publishing counter
func (c *confirms) Publish() *DeferredConfirmation {
func (c *confirms) publish() *DeferredConfirmation {
c.publishedMut.Lock()
defer c.publishedMut.Unlock()
@ -47,6 +47,15 @@ func (c *confirms) Publish() *DeferredConfirmation {
return c.deferredConfirmations.Add(c.published)
}
// unpublish decrements the publishing counter and removes the
// DeferredConfirmation. It must be called immediately after a publish fails.
func (c *confirms) unpublish() {
c.publishedMut.Lock()
defer c.publishedMut.Unlock()
c.deferredConfirmations.remove(c.published)
c.published--
}
// confirm confirms one publishing, increments the expecting delivery tag, and
// removes bookkeeping for that delivery tag.
func (c *confirms) confirm(confirmation Confirmation) {
@ -135,6 +144,18 @@ func (d *deferredConfirmations) Add(tag uint64) *DeferredConfirmation {
return dc
}
// remove is only used to drop a tag whose publish failed
func (d *deferredConfirmations) remove(tag uint64) {
d.m.Lock()
defer d.m.Unlock()
dc, found := d.confirmations[tag]
if !found {
return
}
close(dc.done)
delete(d.confirmations, tag)
}
func (d *deferredConfirmations) Confirm(confirmation Confirmation) {
d.m.Lock()
defer d.m.Unlock()


+ 86
- 19
vendor/github.com/rabbitmq/amqp091-go/connection.go View File

@ -28,7 +28,7 @@ const (
defaultHeartbeat = 10 * time.Second
defaultConnectionTimeout = 30 * time.Second
defaultProduct = "AMQP 0.9.1 Client"
buildVersion = "1.6.0"
buildVersion = "1.9.0"
platform = "golang"
// Safer default that makes channel leaks a lot easier to spot
// before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593.
@ -112,6 +112,8 @@ type Connection struct {
blocks []chan Blocking
errors chan *Error
// if connection is closed should close this chan
close chan struct{}
Config Config // The negotiated Config after connection.open
@ -263,6 +265,7 @@ func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) {
rpc: make(chan message),
sends: make(chan time.Time),
errors: make(chan *Error, 1),
close: make(chan struct{}),
deadlines: make(chan readDeadliner, 1),
}
go c.reader(conn)
@ -399,12 +402,47 @@ func (c *Connection) Close() error {
)
}
// CloseDeadline requests and waits for the response to close this AMQP connection.
//
// Accepts a deadline for waiting the server response. The deadline is passed
// to the low-level connection i.e. network socket.
//
// Regardless of the error returned, the connection is considered closed, and it
// should not be used after calling this function.
//
// In the event of an I/O timeout, connection-closed listeners are NOT informed.
//
// After returning from this call, all resources associated with this connection,
// including the underlying io, Channels, Notify listeners and Channel consumers
// will also be closed.
func (c *Connection) CloseDeadline(deadline time.Time) error {
if c.IsClosed() {
return ErrClosed
}
defer c.shutdown(nil)
err := c.setDeadline(deadline)
if err != nil {
return err
}
return c.call(
&connectionClose{
ReplyCode: replySuccess,
ReplyText: "kthxbai",
},
&connectionCloseOk{},
)
}
func (c *Connection) closeWith(err *Error) error {
if c.IsClosed() {
return ErrClosed
}
defer c.shutdown(err)
return c.call(
&connectionClose{
ReplyCode: uint16(err.Code),
@ -420,6 +458,18 @@ func (c *Connection) IsClosed() bool {
return atomic.LoadInt32(&c.closed) == 1
}
// setDeadline is a wrapper to type assert Connection.conn and set an I/O
// deadline in the underlying TCP connection socket, by calling
// net.Conn.SetDeadline(). It returns an error, in case the type assertion fails,
// although this should never happen.
func (c *Connection) setDeadline(t time.Time) error {
con, ok := c.conn.(net.Conn)
if !ok {
return errInvalidTypeAssertion
}
return con.SetDeadline(t)
}
func (c *Connection) send(f frame) error {
if c.IsClosed() {
return ErrClosed
@ -550,6 +600,8 @@ func (c *Connection) shutdown(err *Error) {
}
c.conn.Close()
// reader exit
close(c.close)
c.channels = nil
c.allocator = nil
@ -587,15 +639,23 @@ func (c *Connection) dispatch0(f frame) {
c <- Blocking{Active: false}
}
default:
c.rpc <- m
select {
case <-c.close:
return
case c.rpc <- m:
}
}
case *heartbeatFrame:
// kthx - all reads reset our deadline. so we can drop this
default:
// lolwat - channel0 only responds to methods and heartbeats
if err := c.closeWith(ErrUnexpectedFrame); err != nil {
Logger.Printf("error sending connectionCloseOk with ErrUnexpectedFrame, error: %+v", err)
}
// closeWith use call don't block reader
go func() {
if err := c.closeWith(ErrUnexpectedFrame); err != nil {
Logger.Printf("error sending connectionCloseOk with ErrUnexpectedFrame, error: %+v", err)
}
}()
}
}
@ -642,9 +702,12 @@ func (c *Connection) dispatchClosed(f frame) {
// we are already closed, so do nothing
default:
// unexpected method on closed channel
if err := c.closeWith(ErrClosed); err != nil {
Logger.Printf("error sending connectionCloseOk with ErrClosed, error: %+v", err)
}
// closeWith use call don't block reader
go func() {
if err := c.closeWith(ErrClosed); err != nil {
Logger.Printf("error sending connectionCloseOk with ErrClosed, error: %+v", err)
}
}()
}
}
}
@ -766,13 +829,16 @@ func (c *Connection) allocateChannel() (*Channel, error) {
// releaseChannel removes a channel from the registry as the final part of the
// channel lifecycle
func (c *Connection) releaseChannel(id uint16) {
func (c *Connection) releaseChannel(ch *Channel) {
c.m.Lock()
defer c.m.Unlock()
if !c.IsClosed() {
delete(c.channels, id)
c.allocator.release(int(id))
got, ok := c.channels[ch.id]
if ok && got == ch {
delete(c.channels, ch.id)
c.allocator.release(int(ch.id))
}
}
}
@ -784,7 +850,7 @@ func (c *Connection) openChannel() (*Channel, error) {
}
if err := ch.open(); err != nil {
c.releaseChannel(ch.id)
c.releaseChannel(ch)
return nil, err
}
return ch, nil
@ -795,7 +861,7 @@ func (c *Connection) openChannel() (*Channel, error) {
// this connection.
func (c *Connection) closeChannel(ch *Channel, e *Error) {
ch.shutdown(e)
c.releaseChannel(ch.id)
c.releaseChannel(ch)
}
/*
@ -816,13 +882,14 @@ func (c *Connection) call(req message, res ...message) error {
}
}
msg, ok := <-c.rpc
if !ok {
err, errorsChanIsOpen := <-c.errors
if !errorsChanIsOpen {
return ErrClosed
var msg message
select {
case e, ok := <-c.errors:
if ok {
return e
}
return err
return ErrClosed
case msg = <-c.rpc:
}
// Try to match one of the result types


+ 27
- 0
vendor/github.com/rabbitmq/amqp091-go/consumers.go View File

@ -75,6 +75,33 @@ func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) {
}
case out <- *queue[0]:
/*
* https://github.com/rabbitmq/amqp091-go/issues/179
* https://github.com/rabbitmq/amqp091-go/pull/180
*
* Comment from @lars-t-hansen:
*
* Given Go's slice semantics, and barring any information
* available to the compiler that proves that queue is the only
* pointer to the memory it references, the only meaning that
* queue = queue[1:] can have is basically queue += sizeof(queue
* element), ie, it bumps a pointer. Looking at the generated
* code for a simple example (on ARM64 in this case) bears this
* out. So what we're left with is an array that we have a
* pointer into the middle of. When the GC traces this pointer,
* it too does not know whether the array has multiple
* referents, and so its only sensible choice is to find the
* beginning of the array, and if the array is not already
* visited, mark every element in it, including the "dead"
* pointer.
*
* (Depending on the program dynamics, an element may eventually
* be appended to the queue when the queue is at capacity, and
* in this case the live elements are copied into a new array
* and the old array is left to be GC'd eventually, along with
* the dead object. But that can take time.)
*/
queue[0] = nil
queue = queue[1:]
}
}


+ 64
- 64
vendor/github.com/rabbitmq/amqp091-go/spec091.go View File

@ -2817,7 +2817,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
switch mf.MethodId {
case 10: // connection start
//fmt.Println("NextMethod: class:10 method:10")
// fmt.Println("NextMethod: class:10 method:10")
method := &connectionStart{}
if err = method.read(r.r); err != nil {
return
@ -2825,7 +2825,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 11: // connection start-ok
//fmt.Println("NextMethod: class:10 method:11")
// fmt.Println("NextMethod: class:10 method:11")
method := &connectionStartOk{}
if err = method.read(r.r); err != nil {
return
@ -2833,7 +2833,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 20: // connection secure
//fmt.Println("NextMethod: class:10 method:20")
// fmt.Println("NextMethod: class:10 method:20")
method := &connectionSecure{}
if err = method.read(r.r); err != nil {
return
@ -2841,7 +2841,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 21: // connection secure-ok
//fmt.Println("NextMethod: class:10 method:21")
// fmt.Println("NextMethod: class:10 method:21")
method := &connectionSecureOk{}
if err = method.read(r.r); err != nil {
return
@ -2849,7 +2849,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 30: // connection tune
//fmt.Println("NextMethod: class:10 method:30")
// fmt.Println("NextMethod: class:10 method:30")
method := &connectionTune{}
if err = method.read(r.r); err != nil {
return
@ -2857,7 +2857,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 31: // connection tune-ok
//fmt.Println("NextMethod: class:10 method:31")
// fmt.Println("NextMethod: class:10 method:31")
method := &connectionTuneOk{}
if err = method.read(r.r); err != nil {
return
@ -2865,7 +2865,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 40: // connection open
//fmt.Println("NextMethod: class:10 method:40")
// fmt.Println("NextMethod: class:10 method:40")
method := &connectionOpen{}
if err = method.read(r.r); err != nil {
return
@ -2873,7 +2873,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 41: // connection open-ok
//fmt.Println("NextMethod: class:10 method:41")
// fmt.Println("NextMethod: class:10 method:41")
method := &connectionOpenOk{}
if err = method.read(r.r); err != nil {
return
@ -2881,7 +2881,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 50: // connection close
//fmt.Println("NextMethod: class:10 method:50")
// fmt.Println("NextMethod: class:10 method:50")
method := &connectionClose{}
if err = method.read(r.r); err != nil {
return
@ -2889,7 +2889,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 51: // connection close-ok
//fmt.Println("NextMethod: class:10 method:51")
// fmt.Println("NextMethod: class:10 method:51")
method := &connectionCloseOk{}
if err = method.read(r.r); err != nil {
return
@ -2897,7 +2897,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 60: // connection blocked
//fmt.Println("NextMethod: class:10 method:60")
// fmt.Println("NextMethod: class:10 method:60")
method := &connectionBlocked{}
if err = method.read(r.r); err != nil {
return
@ -2905,7 +2905,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 61: // connection unblocked
//fmt.Println("NextMethod: class:10 method:61")
// fmt.Println("NextMethod: class:10 method:61")
method := &connectionUnblocked{}
if err = method.read(r.r); err != nil {
return
@ -2913,7 +2913,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 70: // connection update-secret
//fmt.Println("NextMethod: class:10 method:70")
// fmt.Println("NextMethod: class:10 method:70")
method := &connectionUpdateSecret{}
if err = method.read(r.r); err != nil {
return
@ -2921,7 +2921,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 71: // connection update-secret-ok
//fmt.Println("NextMethod: class:10 method:71")
// fmt.Println("NextMethod: class:10 method:71")
method := &connectionUpdateSecretOk{}
if err = method.read(r.r); err != nil {
return
@ -2936,7 +2936,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
switch mf.MethodId {
case 10: // channel open
//fmt.Println("NextMethod: class:20 method:10")
// fmt.Println("NextMethod: class:20 method:10")
method := &channelOpen{}
if err = method.read(r.r); err != nil {
return
@ -2944,7 +2944,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 11: // channel open-ok
//fmt.Println("NextMethod: class:20 method:11")
// fmt.Println("NextMethod: class:20 method:11")
method := &channelOpenOk{}
if err = method.read(r.r); err != nil {
return
@ -2952,7 +2952,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 20: // channel flow
//fmt.Println("NextMethod: class:20 method:20")
// fmt.Println("NextMethod: class:20 method:20")
method := &channelFlow{}
if err = method.read(r.r); err != nil {
return
@ -2960,7 +2960,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 21: // channel flow-ok
//fmt.Println("NextMethod: class:20 method:21")
// fmt.Println("NextMethod: class:20 method:21")
method := &channelFlowOk{}
if err = method.read(r.r); err != nil {
return
@ -2968,7 +2968,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 40: // channel close
//fmt.Println("NextMethod: class:20 method:40")
// fmt.Println("NextMethod: class:20 method:40")
method := &channelClose{}
if err = method.read(r.r); err != nil {
return
@ -2976,7 +2976,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 41: // channel close-ok
//fmt.Println("NextMethod: class:20 method:41")
// fmt.Println("NextMethod: class:20 method:41")
method := &channelCloseOk{}
if err = method.read(r.r); err != nil {
return
@ -2991,7 +2991,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
switch mf.MethodId {
case 10: // exchange declare
//fmt.Println("NextMethod: class:40 method:10")
// fmt.Println("NextMethod: class:40 method:10")
method := &exchangeDeclare{}
if err = method.read(r.r); err != nil {
return
@ -2999,7 +2999,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 11: // exchange declare-ok
//fmt.Println("NextMethod: class:40 method:11")
// fmt.Println("NextMethod: class:40 method:11")
method := &exchangeDeclareOk{}
if err = method.read(r.r); err != nil {
return
@ -3007,7 +3007,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 20: // exchange delete
//fmt.Println("NextMethod: class:40 method:20")
// fmt.Println("NextMethod: class:40 method:20")
method := &exchangeDelete{}
if err = method.read(r.r); err != nil {
return
@ -3015,7 +3015,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 21: // exchange delete-ok
//fmt.Println("NextMethod: class:40 method:21")
// fmt.Println("NextMethod: class:40 method:21")
method := &exchangeDeleteOk{}
if err = method.read(r.r); err != nil {
return
@ -3023,7 +3023,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 30: // exchange bind
//fmt.Println("NextMethod: class:40 method:30")
// fmt.Println("NextMethod: class:40 method:30")
method := &exchangeBind{}
if err = method.read(r.r); err != nil {
return
@ -3031,7 +3031,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 31: // exchange bind-ok
//fmt.Println("NextMethod: class:40 method:31")
// fmt.Println("NextMethod: class:40 method:31")
method := &exchangeBindOk{}
if err = method.read(r.r); err != nil {
return
@ -3039,7 +3039,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 40: // exchange unbind
//fmt.Println("NextMethod: class:40 method:40")
// fmt.Println("NextMethod: class:40 method:40")
method := &exchangeUnbind{}
if err = method.read(r.r); err != nil {
return
@ -3047,7 +3047,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 51: // exchange unbind-ok
//fmt.Println("NextMethod: class:40 method:51")
// fmt.Println("NextMethod: class:40 method:51")
method := &exchangeUnbindOk{}
if err = method.read(r.r); err != nil {
return
@ -3062,7 +3062,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
switch mf.MethodId {
case 10: // queue declare
//fmt.Println("NextMethod: class:50 method:10")
// fmt.Println("NextMethod: class:50 method:10")
method := &queueDeclare{}
if err = method.read(r.r); err != nil {
return
@ -3070,7 +3070,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 11: // queue declare-ok
//fmt.Println("NextMethod: class:50 method:11")
// fmt.Println("NextMethod: class:50 method:11")
method := &queueDeclareOk{}
if err = method.read(r.r); err != nil {
return
@ -3078,7 +3078,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 20: // queue bind
//fmt.Println("NextMethod: class:50 method:20")
// fmt.Println("NextMethod: class:50 method:20")
method := &queueBind{}
if err = method.read(r.r); err != nil {
return
@ -3086,7 +3086,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 21: // queue bind-ok
//fmt.Println("NextMethod: class:50 method:21")
// fmt.Println("NextMethod: class:50 method:21")
method := &queueBindOk{}
if err = method.read(r.r); err != nil {
return
@ -3094,7 +3094,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 50: // queue unbind
//fmt.Println("NextMethod: class:50 method:50")
// fmt.Println("NextMethod: class:50 method:50")
method := &queueUnbind{}
if err = method.read(r.r); err != nil {
return
@ -3102,7 +3102,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 51: // queue unbind-ok
//fmt.Println("NextMethod: class:50 method:51")
// fmt.Println("NextMethod: class:50 method:51")
method := &queueUnbindOk{}
if err = method.read(r.r); err != nil {
return
@ -3110,7 +3110,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 30: // queue purge
//fmt.Println("NextMethod: class:50 method:30")
// fmt.Println("NextMethod: class:50 method:30")
method := &queuePurge{}
if err = method.read(r.r); err != nil {
return
@ -3118,7 +3118,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 31: // queue purge-ok
//fmt.Println("NextMethod: class:50 method:31")
// fmt.Println("NextMethod: class:50 method:31")
method := &queuePurgeOk{}
if err = method.read(r.r); err != nil {
return
@ -3126,7 +3126,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 40: // queue delete
//fmt.Println("NextMethod: class:50 method:40")
// fmt.Println("NextMethod: class:50 method:40")
method := &queueDelete{}
if err = method.read(r.r); err != nil {
return
@ -3134,7 +3134,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 41: // queue delete-ok
//fmt.Println("NextMethod: class:50 method:41")
// fmt.Println("NextMethod: class:50 method:41")
method := &queueDeleteOk{}
if err = method.read(r.r); err != nil {
return
@ -3149,7 +3149,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
switch mf.MethodId {
case 10: // basic qos
//fmt.Println("NextMethod: class:60 method:10")
// fmt.Println("NextMethod: class:60 method:10")
method := &basicQos{}
if err = method.read(r.r); err != nil {
return
@ -3157,7 +3157,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 11: // basic qos-ok
//fmt.Println("NextMethod: class:60 method:11")
// fmt.Println("NextMethod: class:60 method:11")
method := &basicQosOk{}
if err = method.read(r.r); err != nil {
return
@ -3165,7 +3165,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 20: // basic consume
//fmt.Println("NextMethod: class:60 method:20")
// fmt.Println("NextMethod: class:60 method:20")
method := &basicConsume{}
if err = method.read(r.r); err != nil {
return
@ -3173,7 +3173,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 21: // basic consume-ok
//fmt.Println("NextMethod: class:60 method:21")
// fmt.Println("NextMethod: class:60 method:21")
method := &basicConsumeOk{}
if err = method.read(r.r); err != nil {
return
@ -3181,7 +3181,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 30: // basic cancel
//fmt.Println("NextMethod: class:60 method:30")
// fmt.Println("NextMethod: class:60 method:30")
method := &basicCancel{}
if err = method.read(r.r); err != nil {
return
@ -3189,7 +3189,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 31: // basic cancel-ok
//fmt.Println("NextMethod: class:60 method:31")
// fmt.Println("NextMethod: class:60 method:31")
method := &basicCancelOk{}
if err = method.read(r.r); err != nil {
return
@ -3197,7 +3197,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 40: // basic publish
//fmt.Println("NextMethod: class:60 method:40")
// fmt.Println("NextMethod: class:60 method:40")
method := &basicPublish{}
if err = method.read(r.r); err != nil {
return
@ -3205,7 +3205,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 50: // basic return
//fmt.Println("NextMethod: class:60 method:50")
// fmt.Println("NextMethod: class:60 method:50")
method := &basicReturn{}
if err = method.read(r.r); err != nil {
return
@ -3213,7 +3213,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 60: // basic deliver
//fmt.Println("NextMethod: class:60 method:60")
// fmt.Println("NextMethod: class:60 method:60")
method := &basicDeliver{}
if err = method.read(r.r); err != nil {
return
@ -3221,7 +3221,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 70: // basic get
//fmt.Println("NextMethod: class:60 method:70")
// fmt.Println("NextMethod: class:60 method:70")
method := &basicGet{}
if err = method.read(r.r); err != nil {
return
@ -3229,7 +3229,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 71: // basic get-ok
//fmt.Println("NextMethod: class:60 method:71")
// fmt.Println("NextMethod: class:60 method:71")
method := &basicGetOk{}
if err = method.read(r.r); err != nil {
return
@ -3237,7 +3237,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 72: // basic get-empty
//fmt.Println("NextMethod: class:60 method:72")
// fmt.Println("NextMethod: class:60 method:72")
method := &basicGetEmpty{}
if err = method.read(r.r); err != nil {
return
@ -3245,7 +3245,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 80: // basic ack
//fmt.Println("NextMethod: class:60 method:80")
// fmt.Println("NextMethod: class:60 method:80")
method := &basicAck{}
if err = method.read(r.r); err != nil {
return
@ -3253,7 +3253,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 90: // basic reject
//fmt.Println("NextMethod: class:60 method:90")
// fmt.Println("NextMethod: class:60 method:90")
method := &basicReject{}
if err = method.read(r.r); err != nil {
return
@ -3261,7 +3261,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 100: // basic recover-async
//fmt.Println("NextMethod: class:60 method:100")
// fmt.Println("NextMethod: class:60 method:100")
method := &basicRecoverAsync{}
if err = method.read(r.r); err != nil {
return
@ -3269,7 +3269,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 110: // basic recover
//fmt.Println("NextMethod: class:60 method:110")
// fmt.Println("NextMethod: class:60 method:110")
method := &basicRecover{}
if err = method.read(r.r); err != nil {
return
@ -3277,7 +3277,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 111: // basic recover-ok
//fmt.Println("NextMethod: class:60 method:111")
// fmt.Println("NextMethod: class:60 method:111")
method := &basicRecoverOk{}
if err = method.read(r.r); err != nil {
return
@ -3285,7 +3285,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 120: // basic nack
//fmt.Println("NextMethod: class:60 method:120")
// fmt.Println("NextMethod: class:60 method:120")
method := &basicNack{}
if err = method.read(r.r); err != nil {
return
@ -3300,7 +3300,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
switch mf.MethodId {
case 10: // tx select
//fmt.Println("NextMethod: class:90 method:10")
// fmt.Println("NextMethod: class:90 method:10")
method := &txSelect{}
if err = method.read(r.r); err != nil {
return
@ -3308,7 +3308,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 11: // tx select-ok
//fmt.Println("NextMethod: class:90 method:11")
// fmt.Println("NextMethod: class:90 method:11")
method := &txSelectOk{}
if err = method.read(r.r); err != nil {
return
@ -3316,7 +3316,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 20: // tx commit
//fmt.Println("NextMethod: class:90 method:20")
// fmt.Println("NextMethod: class:90 method:20")
method := &txCommit{}
if err = method.read(r.r); err != nil {
return
@ -3324,7 +3324,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 21: // tx commit-ok
//fmt.Println("NextMethod: class:90 method:21")
// fmt.Println("NextMethod: class:90 method:21")
method := &txCommitOk{}
if err = method.read(r.r); err != nil {
return
@ -3332,7 +3332,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 30: // tx rollback
//fmt.Println("NextMethod: class:90 method:30")
// fmt.Println("NextMethod: class:90 method:30")
method := &txRollback{}
if err = method.read(r.r); err != nil {
return
@ -3340,7 +3340,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 31: // tx rollback-ok
//fmt.Println("NextMethod: class:90 method:31")
// fmt.Println("NextMethod: class:90 method:31")
method := &txRollbackOk{}
if err = method.read(r.r); err != nil {
return
@ -3355,7 +3355,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
switch mf.MethodId {
case 10: // confirm select
//fmt.Println("NextMethod: class:85 method:10")
// fmt.Println("NextMethod: class:85 method:10")
method := &confirmSelect{}
if err = method.read(r.r); err != nil {
return
@ -3363,7 +3363,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err
mf.Method = method
case 11: // confirm select-ok
//fmt.Println("NextMethod: class:85 method:11")
// fmt.Println("NextMethod: class:85 method:11")
method := &confirmSelectOk{}
if err = method.read(r.r); err != nil {
return


+ 39
- 13
vendor/github.com/rabbitmq/amqp091-go/types.go View File

@ -11,6 +11,8 @@ import (
"time"
)
// DefaultExchange is the default direct exchange that binds every queue by its
// name. Applications can route to a queue using the queue name as routing key.
const DefaultExchange = ""
// Constants for standard AMQP 0-9-1 exchange types.
@ -63,6 +65,11 @@ var (
ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"}
)
// internal errors used inside the library
var (
errInvalidTypeAssertion = &Error{Code: InternalError, Reason: "type assertion unsuccessful", Server: false, Recover: true}
)
// Error captures the code and reason a channel or connection has been closed
// by the server.
type Error struct {
@ -209,29 +216,39 @@ type Decimal struct {
// Most common queue argument keys in queue declaration. For a comprehensive list
// of queue arguments, visit [RabbitMQ Queue docs].
//
// QueueTypeArg queue argument is used to declare quorum and stream queues.
// Accepted values are QueueTypeClassic (default), QueueTypeQuorum and
// QueueTypeStream. [Quorum Queues] accept (almost) all queue arguments as their
// [QueueTypeArg] queue argument is used to declare quorum and stream queues.
// Accepted values are [QueueTypeClassic] (default), [QueueTypeQuorum] and
// [QueueTypeStream]. [Quorum Queues] accept (almost) all queue arguments as their
// Classic Queues counterparts. Check [feature comparison] docs for more
// information.
//
// Queues can define their [max length] using QueueMaxLenArg and
// QueueMaxLenBytesArg queue arguments. Overflow behaviour is set using
// QueueOverflowArg. Accepted values are QueueOverflowDropHead (default),
// QueueOverflowRejectPublish and QueueOverflowRejectPublishDLX.
// Queues can define their [max length] using [QueueMaxLenArg] and
// [QueueMaxLenBytesArg] queue arguments. Overflow behaviour is set using
// [QueueOverflowArg]. Accepted values are [QueueOverflowDropHead] (default),
// [QueueOverflowRejectPublish] and [QueueOverflowRejectPublishDLX].
//
// [Queue TTL] can be defined using QueueTTLArg. That is, the time-to-live for an
// unused queue. [Queue Message TTL] can be defined using QueueMessageTTLArg.
// This will set a time-to-live for **messages** in the queue.
// [Queue TTL] can be defined using [QueueTTLArg]. That is, the time-to-live for an
// unused queue. [Queue Message TTL] can be defined using [QueueMessageTTLArg].
// This will set a time-to-live for messages in the queue.
//
// [Stream retention] can be configured using StreamMaxLenBytesArg, to set the
// [Stream retention] can be configured using [StreamMaxLenBytesArg], to set the
// maximum size of the stream. Please note that stream queues always keep, at
// least, one segment. [Stream retention] can also be set using StreamMaxAgeArg,
// least, one segment. [Stream retention] can also be set using [StreamMaxAgeArg],
// to set time-based retention. Values are string with unit suffix. Valid
// suffixes are Y, M, D, h, m, s. E.g. "7D" for one week. The maximum segment
// size can be set using StreamMaxSegmentSizeBytesArg. The default value is
// size can be set using [StreamMaxSegmentSizeBytesArg]. The default value is
// 500_000_000 bytes ~= 500 megabytes
//
// Starting with RabbitMQ 3.12, consumer timeout can be configured as a queue
// argument. This is the timeout for a consumer to acknowledge a message. The
// value is the time in milliseconds. The timeout is evaluated periodically,
// at one minute intervals. Values lower than one minute are not supported.
// See the [consumer timeout] guide for more information.
//
// [Single Active Consumer] on quorum and classic queues can be configured
// using [SingleActiveConsumerArg]. This argument expects a boolean value. It is
// false by default.
//
// [RabbitMQ Queue docs]: https://rabbitmq.com/queues.html
// [Stream retention]: https://rabbitmq.com/streams.html#retention
// [max length]: https://rabbitmq.com/maxlength.html
@ -239,6 +256,8 @@ type Decimal struct {
// [Queue Message TTL]: https://rabbitmq.com/ttl.html#per-queue-message-ttl
// [Quorum Queues]: https://rabbitmq.com/quorum-queues.html
// [feature comparison]: https://rabbitmq.com/quorum-queues.html#feature-comparison
// [consumer timeout]: https://rabbitmq.com/consumers.html#acknowledgement-timeout
// [Single Active Consumer]: https://rabbitmq.com/consumers.html#single-active-consumer
const (
QueueTypeArg = "x-queue-type"
QueueMaxLenArg = "x-max-length"
@ -249,6 +268,11 @@ const (
QueueTTLArg = "x-expires"
StreamMaxAgeArg = "x-max-age"
StreamMaxSegmentSizeBytesArg = "x-stream-max-segment-size-bytes"
// QueueVersionArg declares the Classic Queue version to use. Expects an integer, either 1 or 2.
QueueVersionArg = "x-queue-version"
// ConsumerTimeoutArg is available in RabbitMQ 3.12+ as a queue argument.
ConsumerTimeoutArg = "x-consumer-timeout"
SingleActiveConsumerArg = "x-single-active-consumer"
)
// Values for queue arguments. Use as values for queue arguments during queue declaration.
@ -260,6 +284,8 @@ const (
// amqp.QueueMaxLenArg: 100,
// amqp.QueueTTLArg: 1800000,
// }
//
// Refer to [Channel.QueueDeclare] for more examples.
const (
QueueTypeClassic = "classic"
QueueTypeQuorum = "quorum"


+ 1
- 2
vendor/github.com/rabbitmq/amqp091-go/write.go View File

@ -72,7 +72,6 @@ func (f *heartbeatFrame) write(w io.Writer) (err error) {
// short short long long short remainder...
func (f *headerFrame) write(w io.Writer) (err error) {
var payload bytes.Buffer
var zeroTime time.Time
if err = binary.Write(&payload, binary.BigEndian, f.ClassId); err != nil {
return
@ -118,7 +117,7 @@ func (f *headerFrame) write(w io.Writer) (err error) {
if len(f.Properties.MessageId) > 0 {
mask = mask | flagMessageId
}
if f.Properties.Timestamp != zeroTime {
if !f.Properties.Timestamp.IsZero() {
mask = mask | flagTimestamp
}
if len(f.Properties.Type) > 0 {


+ 1
- 1
vendor/modules.txt View File

@ -1,3 +1,3 @@
# github.com/rabbitmq/amqp091-go v1.7.0
# github.com/rabbitmq/amqp091-go v1.9.0
## explicit; go 1.16
github.com/rabbitmq/amqp091-go

Loading…
Cancel
Save