From 0a0d08895cc13d5b36b8ace4011a6329891adae7 Mon Sep 17 00:00:00 2001 From: wagslane Date: Sat, 21 May 2022 09:04:53 -0600 Subject: [PATCH] deps --- go.mod | 2 +- go.sum | 6 +-- .../rabbitmq/amqp091-go/change_version.sh | 4 ++ .../github.com/rabbitmq/amqp091-go/channel.go | 3 ++ .../rabbitmq/amqp091-go/confirms.go | 18 ++++++++- .../rabbitmq/amqp091-go/connection.go | 13 +++++-- vendor/github.com/rabbitmq/amqp091-go/doc.go | 38 +++++++++++++++++++ vendor/github.com/rabbitmq/amqp091-go/fuzz.go | 1 + vendor/github.com/rabbitmq/amqp091-go/go.mod | 3 -- .../github.com/rabbitmq/amqp091-go/spec091.go | 2 +- vendor/modules.txt | 4 +- 11 files changed, 78 insertions(+), 16 deletions(-) create mode 100644 vendor/github.com/rabbitmq/amqp091-go/change_version.sh delete mode 100644 vendor/github.com/rabbitmq/amqp091-go/go.mod diff --git a/go.mod b/go.mod index 51d17a0..62fba85 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/wagslane/go-rabbitmq go 1.17 -require github.com/rabbitmq/amqp091-go v1.3.0 +require github.com/rabbitmq/amqp091-go v1.3.4 diff --git a/go.sum b/go.sum index 5f8cc0f..6937c44 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,2 @@ -github.com/rabbitmq/amqp091-go v0.0.0-20210823000215-c428a6150891 h1:13nv5f/LNJxNpvpYm/u0NqrlFebon342f9Xu9GpklKc= -github.com/rabbitmq/amqp091-go v0.0.0-20210823000215-c428a6150891/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= -github.com/rabbitmq/amqp091-go v1.3.0 h1:A/QuHiNw7LMCJsxx9iZn5lrIz6OrhIn7Dfk5/1YatWM= -github.com/rabbitmq/amqp091-go v1.3.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= +github.com/rabbitmq/amqp091-go v1.3.4 h1:tXuIslN1nhDqs2t6Jrz3BAoqvt4qIZzxvdbdcxWtHYU= +github.com/rabbitmq/amqp091-go v1.3.4/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= diff --git a/vendor/github.com/rabbitmq/amqp091-go/change_version.sh b/vendor/github.com/rabbitmq/amqp091-go/change_version.sh new file mode 100644 index 0000000..c6401ad --- /dev/null +++ b/vendor/github.com/rabbitmq/amqp091-go/change_version.sh @@ -0,0 +1,4 @@ +#/bin/bash +echo $1 > VERSION +sed -i -e "s/.*buildVersion = \"*.*/buildVersion = \"$1\"/" ./connection.go +go fmt ./... diff --git a/vendor/github.com/rabbitmq/amqp091-go/channel.go b/vendor/github.com/rabbitmq/amqp091-go/channel.go index a4afc98..66ce873 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/channel.go +++ b/vendor/github.com/rabbitmq/amqp091-go/channel.go @@ -449,6 +449,9 @@ this channel. The chan provided will be closed when the Channel is closed and on a graceful close, no error will be sent. +In case of a non graceful close the error will be notified synchronously by the library +so that it will be necessary to consume the Channel from the caller in order to avoid deadlocks + */ func (ch *Channel) NotifyClose(c chan *Error) chan *Error { ch.notifyM.Lock() diff --git a/vendor/github.com/rabbitmq/amqp091-go/confirms.go b/vendor/github.com/rabbitmq/amqp091-go/confirms.go index 654d755..3eba406 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/confirms.go +++ b/vendor/github.com/rabbitmq/amqp091-go/confirms.go @@ -98,11 +98,14 @@ func (c *confirms) Multiple(confirmed Confirmation) { c.resequence() } -// Close closes all listeners, discarding any out of sequence confirmations +// Cleans up the confirms struct and its dependencies. +// Closes all listeners, discarding any out of sequence confirmations func (c *confirms) Close() error { c.m.Lock() defer c.m.Unlock() + c.deferredConfirmations.Close() + for _, l := range c.listeners { close(l) } @@ -158,6 +161,19 @@ func (d *deferredConfirmations) ConfirmMultiple(confirmation Confirmation) { } } +// Nacks all pending DeferredConfirmations being blocked by dc.Wait() +func (d *deferredConfirmations) Close() { + d.m.Lock() + defer d.m.Unlock() + + for k, v := range d.confirmations { + v.confirmation = Confirmation{DeliveryTag: k, Ack: false} + v.wg.Done() + delete(d.confirmations, k) + } +} + +// Waits for publisher confirmation. Returns true if server successfully received the publishing. func (d *DeferredConfirmation) Wait() bool { d.wg.Wait() return d.confirmation.Ack diff --git a/vendor/github.com/rabbitmq/amqp091-go/connection.go b/vendor/github.com/rabbitmq/amqp091-go/connection.go index 4023ade..83ed165 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/connection.go +++ b/vendor/github.com/rabbitmq/amqp091-go/connection.go @@ -23,8 +23,9 @@ const ( defaultHeartbeat = 10 * time.Second defaultConnectionTimeout = 30 * time.Second - defaultProduct = "https://github.com/streadway/amqp" - defaultVersion = "β" + defaultProduct = "Amqp 0.9.1 Client" + buildVersion = "1.3.4" + platform = "golang" // Safer default that makes channel leaks a lot easier to spot // before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593. defaultChannelMax = (2 << 10) - 1 @@ -283,6 +284,9 @@ accompanying a connection.close method or by a normal shutdown. The chan provided will be closed when the Channel is closed and on a graceful close, no error will be sent. +In case of a non graceful close the error will be notified synchronously by the library +so that it will be necessary to consume the Channel from the caller in order to avoid deadlocks + To reconnect after a transport or protocol error, register a listener here and re-run your setup process. @@ -757,8 +761,9 @@ func (c *Connection) openStart(config Config) error { func (c *Connection) openTune(config Config, auth Authentication) error { if len(config.Properties) == 0 { config.Properties = Table{ - "product": defaultProduct, - "version": defaultVersion, + "product": defaultProduct, + "version": buildVersion, + "platform": platform, } } diff --git a/vendor/github.com/rabbitmq/amqp091-go/doc.go b/vendor/github.com/rabbitmq/amqp091-go/doc.go index ba2efb0..a820c6d 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/doc.go +++ b/vendor/github.com/rabbitmq/amqp091-go/doc.go @@ -104,5 +104,43 @@ encounters an amqp:// scheme. SSL/TLS in RabbitMQ is documented here: http://www.rabbitmq.com/ssl.html +Best practises for Connections and Channels notifications. + +In order to be notified when a connection or channel gets closed both the structures offer the possibility to register channels using the `notifyClose` function like: +notifyConnClose := make(chan *amqp.Error) +conn.NotifyClose(notifyConnClose) +No errors will be sent in case of a graceful connection close. +In case of a non-graceful close, because of a network issue of forced disconnection from the UI, the error will be notified synchronously by the library. +You can see that in the shutdown function of connection and channel (see connection.go and channel.go) + + if err != nil { + for _, c := range c.closes { + c <- err + } + } + +The error is sent synchronously to the channel so that the flow will wait until the channel will be consumed by the caller. +To avoid deadlocks it is necessary to consume the messages from the channels. +This could be done inside a different goroutine with a select listening on the two channels inside a for loop like: + +go func() { + for notifyConnClose != nil || notifyChanClose != nil { + select { + case err, ok := <-notifyConnClose: + if !(ok) { + notifyConnClose = nil + } else { + fmt.Printf("connection closed, error %s", err) + } + case err, ok := <-notifyChanClose: + if !(ok) { + notifyChanClose = nil + } else { + fmt.Printf("channel closed, error %s", err) + } + } + } +}() + */ package amqp091 diff --git a/vendor/github.com/rabbitmq/amqp091-go/fuzz.go b/vendor/github.com/rabbitmq/amqp091-go/fuzz.go index 602220f..c9f03ea 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/fuzz.go +++ b/vendor/github.com/rabbitmq/amqp091-go/fuzz.go @@ -3,6 +3,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. +//go:build gofuzz // +build gofuzz package amqp091 diff --git a/vendor/github.com/rabbitmq/amqp091-go/go.mod b/vendor/github.com/rabbitmq/amqp091-go/go.mod deleted file mode 100644 index 978d286..0000000 --- a/vendor/github.com/rabbitmq/amqp091-go/go.mod +++ /dev/null @@ -1,3 +0,0 @@ -module github.com/rabbitmq/amqp091-go - -go 1.15 diff --git a/vendor/github.com/rabbitmq/amqp091-go/spec091.go b/vendor/github.com/rabbitmq/amqp091-go/spec091.go index 0261c15..7af6642 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/spec091.go +++ b/vendor/github.com/rabbitmq/amqp091-go/spec091.go @@ -1174,7 +1174,7 @@ func (msg *queueDeclare) id() (uint16, uint16) { } func (msg *queueDeclare) wait() bool { - return true && !msg.NoWait + return !msg.NoWait } func (msg *queueDeclare) write(w io.Writer) (err error) { diff --git a/vendor/modules.txt b/vendor/modules.txt index b4cc059..8478fc6 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,3 +1,3 @@ -# github.com/rabbitmq/amqp091-go v1.3.0 -## explicit +# github.com/rabbitmq/amqp091-go v1.3.4 +## explicit; go 1.15 github.com/rabbitmq/amqp091-go