Browse Source

update dep

pull/168/head
wagslane 1 year ago
parent
commit
786f782071
10 changed files with 305 additions and 137 deletions
  1. +1
    -1
      go.mod
  2. +3
    -48
      go.sum
  3. +68
    -0
      vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md
  4. +24
    -32
      vendor/github.com/rabbitmq/amqp091-go/channel.go
  5. +66
    -13
      vendor/github.com/rabbitmq/amqp091-go/connection.go
  6. +14
    -19
      vendor/github.com/rabbitmq/amqp091-go/doc.go
  7. +14
    -0
      vendor/github.com/rabbitmq/amqp091-go/gen.ps1
  8. +45
    -12
      vendor/github.com/rabbitmq/amqp091-go/types.go
  9. +68
    -10
      vendor/github.com/rabbitmq/amqp091-go/uri.go
  10. +2
    -2
      vendor/modules.txt

+ 1
- 1
go.mod View File

@ -2,4 +2,4 @@ module github.com/wagslane/go-rabbitmq
go 1.20
require github.com/rabbitmq/amqp091-go v1.9.0
require github.com/rabbitmq/amqp091-go v1.10.0

+ 3
- 48
go.sum View File

@ -1,48 +1,3 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo=
github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI=
github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=

+ 68
- 0
vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md View File

@ -1,5 +1,73 @@
# Changelog
## [v1.10.0](https://github.com/rabbitmq/amqp091-go/tree/v1.10.0) (2024-05-08)
[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.9.0...v1.10.0)
**Implemented enhancements:**
- Undeprecate non-context publish functions [\#259](https://github.com/rabbitmq/amqp091-go/pull/259) ([Zerpet](https://github.com/Zerpet))
- Update Go directive [\#257](https://github.com/rabbitmq/amqp091-go/pull/257) ([Zerpet](https://github.com/Zerpet))
**Fixed bugs:**
- republishing on reconnect bug in the example [\#249](https://github.com/rabbitmq/amqp091-go/issues/249)
- Channel Notify Close not receive event when connection is closed by RMQ server. [\#241](https://github.com/rabbitmq/amqp091-go/issues/241)
- Inconsistent documentation [\#231](https://github.com/rabbitmq/amqp091-go/issues/231)
- Data race in the client example [\#72](https://github.com/rabbitmq/amqp091-go/issues/72)
- Fix string function of URI [\#258](https://github.com/rabbitmq/amqp091-go/pull/258) ([Zerpet](https://github.com/Zerpet))
**Closed issues:**
- Documentation needed \(`PublishWithContext` does not use context\) [\#195](https://github.com/rabbitmq/amqp091-go/issues/195)
- concurrent dispatch data race [\#226](https://github.com/rabbitmq/amqp091-go/issues/226)
**Merged pull requests:**
- Fix data race in example [\#260](https://github.com/rabbitmq/amqp091-go/pull/260) ([Zerpet](https://github.com/Zerpet))
- Address CodeQL warning [\#252](https://github.com/rabbitmq/amqp091-go/pull/252) ([lukebakken](https://github.com/lukebakken))
- Add support for additional AMQP URI query parameters [\#251](https://github.com/rabbitmq/amqp091-go/pull/251) ([vilius-g](https://github.com/vilius-g))
- Example fix [\#250](https://github.com/rabbitmq/amqp091-go/pull/250) ([Boris-Plato](https://github.com/Boris-Plato))
- Increasing the code coverage [\#248](https://github.com/rabbitmq/amqp091-go/pull/248) ([edercarloscosta](https://github.com/edercarloscosta))
- Use correct mutex to guard confirms.published [\#240](https://github.com/rabbitmq/amqp091-go/pull/240) ([hjr265](https://github.com/hjr265))
- Documenting Publishing.Expiration usage [\#232](https://github.com/rabbitmq/amqp091-go/pull/232) ([niksteff](https://github.com/niksteff))
- fix comment typo in example\_client\_test.go [\#228](https://github.com/rabbitmq/amqp091-go/pull/228) ([wisaTong](https://github.com/wisaTong))
- Bump go.uber.org/goleak from 1.2.1 to 1.3.0 [\#227](https://github.com/rabbitmq/amqp091-go/pull/227) ([dependabot[bot]](https://github.com/apps/dependabot))
## [v1.9.0](https://github.com/rabbitmq/amqp091-go/tree/v1.9.0) (2023-10-02)
[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.8.1...v1.9.0)
**Implemented enhancements:**
- Use of buffered delivery channels when prefetch\_count is not null [\#200](https://github.com/rabbitmq/amqp091-go/issues/200)
**Fixed bugs:**
- connection block when write connection reset by peer [\#222](https://github.com/rabbitmq/amqp091-go/issues/222)
- Test failure on 32bit architectures [\#202](https://github.com/rabbitmq/amqp091-go/issues/202)
**Closed issues:**
- Add a constant to set consumer timeout as queue argument [\#201](https://github.com/rabbitmq/amqp091-go/issues/201)
- Add a constant for CQ version [\#199](https://github.com/rabbitmq/amqp091-go/issues/199)
- Examples may need to be updated after \#140 [\#153](https://github.com/rabbitmq/amqp091-go/issues/153)
**Merged pull requests:**
- Update spec091.go [\#224](https://github.com/rabbitmq/amqp091-go/pull/224) ([pinkfish](https://github.com/pinkfish))
- Closes 222 [\#223](https://github.com/rabbitmq/amqp091-go/pull/223) ([yywing](https://github.com/yywing))
- Update write.go [\#221](https://github.com/rabbitmq/amqp091-go/pull/221) ([pinkfish](https://github.com/pinkfish))
- Bump versions [\#219](https://github.com/rabbitmq/amqp091-go/pull/219) ([lukebakken](https://github.com/lukebakken))
- remove extra word 'accept' from ExchangeDeclare description [\#217](https://github.com/rabbitmq/amqp091-go/pull/217) ([a-sabzian](https://github.com/a-sabzian))
- Misc Windows CI updates [\#216](https://github.com/rabbitmq/amqp091-go/pull/216) ([lukebakken](https://github.com/lukebakken))
- Stop using deprecated Publish function [\#207](https://github.com/rabbitmq/amqp091-go/pull/207) ([Zerpet](https://github.com/Zerpet))
- Constant for consumer timeout queue argument [\#206](https://github.com/rabbitmq/amqp091-go/pull/206) ([Zerpet](https://github.com/Zerpet))
- Add a constant for CQ v2 queue argument [\#205](https://github.com/rabbitmq/amqp091-go/pull/205) ([Zerpet](https://github.com/Zerpet))
- Fix example for 32-bit compatibility [\#204](https://github.com/rabbitmq/amqp091-go/pull/204) ([Zerpet](https://github.com/Zerpet))
- Fix to increase timeout milliseconds since it's too tight [\#203](https://github.com/rabbitmq/amqp091-go/pull/203) ([t2y](https://github.com/t2y))
- Add Channel.ConsumeWithContext to be able to cancel delivering [\#192](https://github.com/rabbitmq/amqp091-go/pull/192) ([t2y](https://github.com/t2y))
## [v1.8.1](https://github.com/rabbitmq/amqp091-go/tree/v1.8.1) (2023-05-04)
[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.8.0...v1.8.1)


+ 24
- 32
vendor/github.com/rabbitmq/amqp091-go/channel.go View File

@ -7,7 +7,6 @@ package amqp091
import (
"context"
"errors"
"reflect"
"sync"
"sync/atomic"
@ -971,9 +970,6 @@ func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table
/*
QueueUnbind removes a binding between an exchange and queue matching the key and
arguments.
It is possible to send and empty string for the exchange name which means to
unbind the queue from the default exchange.
*/
func (ch *Channel) QueueUnbind(name, key, exchange string, args Table) error {
if err := args.Validate(); err != nil {
@ -1487,17 +1483,17 @@ confirmations start at 1. Exit when all publishings are confirmed.
When Publish does not return an error and the channel is in confirm mode, the
internal counter for DeliveryTags with the first confirmation starts at 1.
Deprecated: Use PublishWithContext instead.
*/
func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error {
_, err := ch.PublishWithDeferredConfirmWithContext(context.Background(), exchange, key, mandatory, immediate, msg)
_, err := ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg)
return err
}
/*
PublishWithContext sends a Publishing from the client to an exchange on the server.
NOTE: this function is equivalent to [Channel.Publish]. Context is not honoured.
When you want a single message to be delivered to a single queue, you can
publish to the default exchange with the routingKey of the queue name. This is
because every declared queue gets an implicit route to the default exchange.
@ -1527,34 +1523,17 @@ confirmations start at 1. Exit when all publishings are confirmed.
When Publish does not return an error and the channel is in confirm mode, the
internal counter for DeliveryTags with the first confirmation starts at 1.
*/
func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error {
_, err := ch.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg)
return err
func (ch *Channel) PublishWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error {
return ch.Publish(exchange, key, mandatory, immediate, msg)
}
/*
PublishWithDeferredConfirm behaves identically to Publish but additionally returns a
DeferredConfirmation, allowing the caller to wait on the publisher confirmation
for this message. If the channel has not been put into confirm mode,
the DeferredConfirmation will be nil.
Deprecated: Use PublishWithDeferredConfirmWithContext instead.
PublishWithDeferredConfirm behaves identically to Publish, but additionally
returns a DeferredConfirmation, allowing the caller to wait on the publisher
confirmation for this message. If the channel has not been put into confirm
mode, the DeferredConfirmation will be nil.
*/
func (ch *Channel) PublishWithDeferredConfirm(exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) {
return ch.PublishWithDeferredConfirmWithContext(context.Background(), exchange, key, mandatory, immediate, msg)
}
/*
PublishWithDeferredConfirmWithContext behaves identically to Publish but additionally returns a
DeferredConfirmation, allowing the caller to wait on the publisher confirmation
for this message. If the channel has not been put into confirm mode,
the DeferredConfirmation will be nil.
*/
func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) {
if ctx == nil {
return nil, errors.New("amqp091-go: nil Context")
}
if err := msg.Headers.Validate(); err != nil {
return nil, err
}
@ -1598,6 +1577,19 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex
return dc, nil
}
/*
PublishWithDeferredConfirmWithContext behaves identically to Publish but additionally returns a
DeferredConfirmation, allowing the caller to wait on the publisher confirmation
for this message. If the channel has not been put into confirm mode,
the DeferredConfirmation will be nil.
NOTE: PublishWithDeferredConfirmWithContext is equivalent to its non-context variant. The context passed
to this function is not honoured.
*/
func (ch *Channel) PublishWithDeferredConfirmWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) {
return ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg)
}
/*
Get synchronously receives a single Delivery from the head of a queue from the
server to the client. In almost all cases, using Channel.Consume will be
@ -1829,8 +1821,8 @@ func (ch *Channel) Reject(tag uint64, requeue bool) error {
// GetNextPublishSeqNo returns the sequence number of the next message to be
// published, when in confirm mode.
func (ch *Channel) GetNextPublishSeqNo() uint64 {
ch.confirms.m.Lock()
defer ch.confirms.m.Unlock()
ch.confirms.publishedMut.Lock()
defer ch.confirms.publishedMut.Unlock()
return ch.confirms.published + 1
}

+ 66
- 13
vendor/github.com/rabbitmq/amqp091-go/connection.go View File

@ -28,11 +28,11 @@ const (
defaultHeartbeat = 10 * time.Second
defaultConnectionTimeout = 30 * time.Second
defaultProduct = "AMQP 0.9.1 Client"
buildVersion = "1.9.0"
buildVersion = "1.10.0"
platform = "golang"
// Safer default that makes channel leaks a lot easier to spot
// before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593.
defaultChannelMax = (2 << 10) - 1
defaultChannelMax = uint16((2 << 10) - 1)
defaultLocale = "en_US"
)
@ -49,7 +49,7 @@ type Config struct {
// bindings on the server. Dial sets this to the path parsed from the URL.
Vhost string
ChannelMax int // 0 max channels means 2^16 - 1
ChannelMax uint16 // 0 max channels means 2^16 - 1
FrameSize int // 0 max bytes means unlimited
Heartbeat time.Duration // less than 1s uses the server's interval
@ -157,8 +157,7 @@ func DefaultDial(connectionTimeout time.Duration) func(network, addr string) (ne
// scheme. It is equivalent to calling DialTLS(amqp, nil).
func Dial(url string) (*Connection, error) {
return DialConfig(url, Config{
Heartbeat: defaultHeartbeat,
Locale: defaultLocale,
Locale: defaultLocale,
})
}
@ -169,7 +168,6 @@ func Dial(url string) (*Connection, error) {
// DialTLS uses the provided tls.Config when encountering an amqps:// scheme.
func DialTLS(url string, amqps *tls.Config) (*Connection, error) {
return DialConfig(url, Config{
Heartbeat: defaultHeartbeat,
TLSClientConfig: amqps,
Locale: defaultLocale,
})
@ -186,7 +184,6 @@ func DialTLS(url string, amqps *tls.Config) (*Connection, error) {
// amqps:// scheme.
func DialTLS_ExternalAuth(url string, amqps *tls.Config) (*Connection, error) {
return DialConfig(url, Config{
Heartbeat: defaultHeartbeat,
TLSClientConfig: amqps,
SASL: []Authentication{&ExternalAuth{}},
})
@ -195,7 +192,9 @@ func DialTLS_ExternalAuth(url string, amqps *tls.Config) (*Connection, error) {
// DialConfig accepts a string in the AMQP URI format and a configuration for
// the transport and connection setup, returning a new Connection. Defaults to
// a server heartbeat interval of 10 seconds and sets the initial read deadline
// to 30 seconds.
// to 30 seconds. The heartbeat interval specified in the AMQP URI takes precedence
// over the value specified in the config. To disable heartbeats, you must use
// the AMQP URI and set heartbeat=0 there.
func DialConfig(url string, config Config) (*Connection, error) {
var err error
var conn net.Conn
@ -206,18 +205,50 @@ func DialConfig(url string, config Config) (*Connection, error) {
}
if config.SASL == nil {
config.SASL = []Authentication{uri.PlainAuth()}
if uri.AuthMechanism != nil {
for _, identifier := range uri.AuthMechanism {
switch strings.ToUpper(identifier) {
case "PLAIN":
config.SASL = append(config.SASL, uri.PlainAuth())
case "AMQPLAIN":
config.SASL = append(config.SASL, uri.AMQPlainAuth())
case "EXTERNAL":
config.SASL = append(config.SASL, &ExternalAuth{})
default:
return nil, fmt.Errorf("unsupported auth_mechanism: %v", identifier)
}
}
} else {
config.SASL = []Authentication{uri.PlainAuth()}
}
}
if config.Vhost == "" {
config.Vhost = uri.Vhost
}
if uri.Heartbeat.hasValue {
config.Heartbeat = uri.Heartbeat.value
} else {
if config.Heartbeat == 0 {
config.Heartbeat = defaultHeartbeat
}
}
if config.ChannelMax == 0 {
config.ChannelMax = uri.ChannelMax
}
connectionTimeout := defaultConnectionTimeout
if uri.ConnectionTimeout != 0 {
connectionTimeout = time.Duration(uri.ConnectionTimeout) * time.Millisecond
}
addr := net.JoinHostPort(uri.Host, strconv.FormatInt(int64(uri.Port), 10))
dialer := config.Dial
if dialer == nil {
dialer = DefaultDial(defaultConnectionTimeout)
dialer = DefaultDial(connectionTimeout)
}
conn, err = dialer("tcp", addr)
@ -991,13 +1022,13 @@ func (c *Connection) openTune(config Config, auth Authentication) error {
// When the server and client both use default 0, then the max channel is
// only limited by uint16.
c.Config.ChannelMax = pick(config.ChannelMax, int(tune.ChannelMax))
c.Config.ChannelMax = pickUInt16(config.ChannelMax, tune.ChannelMax)
if c.Config.ChannelMax == 0 {
c.Config.ChannelMax = defaultChannelMax
}
c.Config.ChannelMax = min(c.Config.ChannelMax, maxChannelMax)
c.Config.ChannelMax = minUInt16(c.Config.ChannelMax, maxChannelMax)
c.allocator = newAllocator(1, c.Config.ChannelMax)
c.allocator = newAllocator(1, int(c.Config.ChannelMax))
c.m.Unlock()
@ -1104,6 +1135,13 @@ func max(a, b int) int {
return b
}
func maxUInt16(a, b uint16) uint16 {
if a > b {
return a
}
return b
}
func min(a, b int) int {
if a < b {
return a
@ -1111,6 +1149,21 @@ func min(a, b int) int {
return b
}
func minUInt16(a, b uint16) uint16 {
if a < b {
return a
}
return b
}
func pickUInt16(client, server uint16) uint16 {
if client == 0 || server == 0 {
return maxUInt16(client, server)
} else {
return minUInt16(client, server)
}
}
func pick(client, server int) int {
if client == 0 || server == 0 {
return max(client, server)


+ 14
- 19
vendor/github.com/rabbitmq/amqp091-go/doc.go View File

@ -95,12 +95,11 @@ prior to calling [Channel.PublishWithContext] or [Channel.Consume].
When Dial encounters an amqps:// scheme, it will use the zero value of a
tls.Config. This will only perform server certificate and host verification.
Use DialTLS when you wish to provide a client certificate (recommended),
include a private certificate authority's certificate in the cert chain for
server validity, or run insecure by not verifying the server certificate dial
your own connection. DialTLS will use the provided tls.Config when it
encounters an amqps:// scheme and will dial a plain connection when it
encounters an amqp:// scheme.
Use DialTLS when you wish to provide a client certificate (recommended), include
a private certificate authority's certificate in the cert chain for server
validity, or run insecure by not verifying the server certificate. DialTLS will
use the provided tls.Config when it encounters an amqps:// scheme and will dial
a plain connection when it encounters an amqp:// scheme.
SSL/TLS in RabbitMQ is documented here: http://www.rabbitmq.com/ssl.html
@ -110,17 +109,18 @@ In order to be notified when a connection or channel gets closed, both
structures offer the possibility to register channels using
[Channel.NotifyClose] and [Connection.NotifyClose] functions:
notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error))
notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error, 1))
No errors will be sent in case of a graceful connection close. In case of a
non-graceful closure due to e.g. network issue, or forced connection closure
from the Management UI, the error will be notified synchronously by the library.
The error is sent synchronously to the channel, so that the flow will wait until
the receiver consumes from the channel. To avoid deadlocks in the library, it is
necessary to consume from the channels. This could be done inside a
different goroutine with a select listening on the two channels inside a for
loop like:
The library sends to notification channels just once. After sending a
notification to all channels, the library closes all registered notification
channels. After receiving a notification, the application should create and
register a new channel. To avoid deadlocks in the library, it is necessary to
consume from the channels. This could be done inside a different goroutine with
a select listening on the two channels inside a for loop like:
go func() {
for notifyConnClose != nil || notifyChanClose != nil {
@ -141,13 +141,8 @@ loop like:
}
}()
Another approach is to use buffered channels:
notifyConnCloseCh := conn.NotifyClose(make(chan *amqp.Error, 1))
The library sends to notification channels just once. After sending a notification
to all channels, the library closes all registered notification channels. After
receiving a notification, the application should create and register a new channel.
It is strongly recommended to use buffered channels to avoid deadlocks inside
the library.
# Best practises for NotifyPublish notifications:


+ 14
- 0
vendor/github.com/rabbitmq/amqp091-go/gen.ps1 View File

@ -0,0 +1,14 @@
$DebugPreference = 'Continue'
$ErrorActionPreference = 'Stop'
Set-PSDebug -Off
Set-StrictMode -Version 'Latest' -ErrorAction 'Stop' -Verbose
New-Variable -Name curdir -Option Constant -Value $PSScriptRoot
$specDir = Resolve-Path -LiteralPath (Join-Path -Path $curdir -ChildPath 'spec')
$amqpSpecXml = Resolve-Path -LiteralPath (Join-Path -Path $specDir -ChildPath 'amqp0-9-1.stripped.extended.xml')
$gen = Resolve-Path -LiteralPath (Join-Path -Path $specDir -ChildPath 'gen.go')
$spec091 = Resolve-Path -LiteralPath (Join-Path -Path $curdir -ChildPath 'spec091.go')
Get-Content -LiteralPath $amqpSpecXml | go run $gen | gofmt | Set-Content -Force -Path $spec091

+ 45
- 12
vendor/github.com/rabbitmq/amqp091-go/types.go View File

@ -144,6 +144,19 @@ const (
flagReserved1 = 0x0004
)
// Expiration. These constants can be used to set a messages expiration TTL.
// They should be viewed as a clarification of the expiration functionality in
// messages and their usage is not enforced by this pkg.
//
// The server requires a string value that is interpreted by the server as
// milliseconds. If no value is set, which translates to the nil value of
// string, the message will never expire by itself. This does not influence queue
// configured TTL configurations.
const (
NeverExpire string = "" // empty value means never expire
ImmediatelyExpire string = "0" // 0 means immediately expire
)
// Queue captures the current server state of the queue on the server returned
// from Channel.QueueDeclare or Channel.QueueInspect.
type Queue struct {
@ -162,18 +175,25 @@ type Publishing struct {
Headers Table
// Properties
ContentType string // MIME content type
ContentEncoding string // MIME content encoding
DeliveryMode uint8 // Transient (0 or 1) or Persistent (2)
Priority uint8 // 0 to 9
CorrelationId string // correlation identifier
ReplyTo string // address to to reply to (ex: RPC)
Expiration string // message expiration spec
MessageId string // message identifier
Timestamp time.Time // message timestamp
Type string // message type name
UserId string // creating user id - ex: "guest"
AppId string // creating application id
ContentType string // MIME content type
ContentEncoding string // MIME content encoding
DeliveryMode uint8 // Transient (0 or 1) or Persistent (2)
Priority uint8 // 0 to 9
CorrelationId string // correlation identifier
ReplyTo string // address to to reply to (ex: RPC)
// Expiration represents the message TTL in milliseconds. A value of "0"
// indicates that the message will immediately expire if the message arrives
// at its destination and the message is not directly handled by a consumer
// that currently has the capacatity to do so. If you wish the message to
// not expire on its own, set this value to any ttl value, empty string or
// use the corresponding constants NeverExpire and ImmediatelyExpire. This
// does not influence queue configured TTL values.
Expiration string
MessageId string // message identifier
Timestamp time.Time // message timestamp
Type string // message type name
UserId string // creating user id - ex: "guest"
AppId string // creating application id
// The application specific payload of the message
Body []byte
@ -533,3 +553,16 @@ type bodyFrame struct {
}
func (f *bodyFrame) channel() uint16 { return f.ChannelId }
type heartbeatDuration struct {
value time.Duration
hasValue bool
}
func newHeartbeatDurationFromSeconds(s int) heartbeatDuration {
v := time.Duration(s) * time.Second
return heartbeatDuration{
value: v,
hasValue: true,
}
}

+ 68
- 10
vendor/github.com/rabbitmq/amqp091-go/uri.go View File

@ -7,6 +7,7 @@ package amqp091
import (
"errors"
"fmt"
"net"
"net/url"
"strconv"
@ -32,16 +33,20 @@ var defaultURI = URI{
// URI represents a parsed AMQP URI string.
type URI struct {
Scheme string
Host string
Port int
Username string
Password string
Vhost string
CertFile string // client TLS auth - path to certificate (PEM)
CACertFile string // client TLS auth - path to CA certificate (PEM)
KeyFile string // client TLS auth - path to private key (PEM)
ServerName string // client TLS auth - server name
Scheme string
Host string
Port int
Username string
Password string
Vhost string
CertFile string // client TLS auth - path to certificate (PEM)
CACertFile string // client TLS auth - path to CA certificate (PEM)
KeyFile string // client TLS auth - path to private key (PEM)
ServerName string // client TLS auth - server name
AuthMechanism []string
Heartbeat heartbeatDuration
ConnectionTimeout int
ChannelMax uint16
}
// ParseURI attempts to parse the given AMQP URI according to the spec.
@ -62,6 +67,10 @@ type URI struct {
// keyfile: <path/to/client_key.pem>
// cacertfile: <path/to/ca.pem>
// server_name_indication: <server name>
// auth_mechanism: <one or more: plain, amqplain, external>
// heartbeat: <seconds (integer)>
// connection_timeout: <milliseconds (integer)>
// channel_max: <max number of channels (integer)>
//
// If cacertfile is not provided, system CA certificates will be used.
// Mutual TLS (client auth) will be enabled only in case keyfile AND certfile provided.
@ -134,6 +143,31 @@ func ParseURI(uri string) (URI, error) {
builder.KeyFile = params.Get("keyfile")
builder.CACertFile = params.Get("cacertfile")
builder.ServerName = params.Get("server_name_indication")
builder.AuthMechanism = params["auth_mechanism"]
if params.Has("heartbeat") {
value, err := strconv.Atoi(params.Get("heartbeat"))
if err != nil {
return builder, fmt.Errorf("heartbeat is not an integer: %v", err)
}
builder.Heartbeat = newHeartbeatDurationFromSeconds(value)
}
if params.Has("connection_timeout") {
value, err := strconv.Atoi(params.Get("connection_timeout"))
if err != nil {
return builder, fmt.Errorf("connection_timeout is not an integer: %v", err)
}
builder.ConnectionTimeout = value
}
if params.Has("channel_max") {
value, err := strconv.ParseUint(params.Get("channel_max"), 10, 16)
if err != nil {
return builder, fmt.Errorf("connection_timeout is not an integer: %v", err)
}
builder.ChannelMax = uint16(value)
}
return builder, nil
}
@ -192,5 +226,29 @@ func (uri URI) String() string {
authority.Path = "/"
}
if uri.CertFile != "" || uri.KeyFile != "" || uri.CACertFile != "" || uri.ServerName != "" {
rawQuery := strings.Builder{}
if uri.CertFile != "" {
rawQuery.WriteString("certfile=")
rawQuery.WriteString(uri.CertFile)
rawQuery.WriteRune('&')
}
if uri.KeyFile != "" {
rawQuery.WriteString("keyfile=")
rawQuery.WriteString(uri.KeyFile)
rawQuery.WriteRune('&')
}
if uri.CACertFile != "" {
rawQuery.WriteString("cacertfile=")
rawQuery.WriteString(uri.CACertFile)
rawQuery.WriteRune('&')
}
if uri.ServerName != "" {
rawQuery.WriteString("server_name_indication=")
rawQuery.WriteString(uri.ServerName)
}
authority.RawQuery = rawQuery.String()
}
return authority.String()
}

+ 2
- 2
vendor/modules.txt View File

@ -1,3 +1,3 @@
# github.com/rabbitmq/amqp091-go v1.9.0
## explicit; go 1.16
# github.com/rabbitmq/amqp091-go v1.10.0
## explicit; go 1.20
github.com/rabbitmq/amqp091-go

Loading…
Cancel
Save