From 8eef0b4785df18c2d33875188b0af16eed56dcf8 Mon Sep 17 00:00:00 2001 From: wagslane Date: Sun, 27 Feb 2022 17:39:58 -0700 Subject: [PATCH] small refactor, update amqp --- Makefile | 4 - consume.go | 54 ++++++----- examples/consumer/main.go | 11 ++- go.mod | 2 +- go.sum | 2 + .../github.com/rabbitmq/amqp091-go/.gitignore | 8 +- .../github.com/rabbitmq/amqp091-go/README.md | 2 + .../rabbitmq/amqp091-go/allocator.go | 5 + vendor/github.com/rabbitmq/amqp091-go/auth.go | 26 ++++- .../github.com/rabbitmq/amqp091-go/channel.go | 29 +++++- .../rabbitmq/amqp091-go/confirms.go | 96 ++++++++++++++++--- .../rabbitmq/amqp091-go/connection.go | 21 +++- .../rabbitmq/amqp091-go/consumers.go | 1 + .../rabbitmq/amqp091-go/delivery.go | 1 + vendor/github.com/rabbitmq/amqp091-go/doc.go | 1 + vendor/github.com/rabbitmq/amqp091-go/fuzz.go | 5 + vendor/github.com/rabbitmq/amqp091-go/read.go | 13 ++- .../github.com/rabbitmq/amqp091-go/return.go | 1 + .../github.com/rabbitmq/amqp091-go/spec091.go | 1 + .../github.com/rabbitmq/amqp091-go/types.go | 14 ++- vendor/github.com/rabbitmq/amqp091-go/uri.go | 1 + .../github.com/rabbitmq/amqp091-go/write.go | 11 ++- vendor/modules.txt | 2 +- 23 files changed, 242 insertions(+), 69 deletions(-) diff --git a/Makefile b/Makefile index e81f29d..517467f 100644 --- a/Makefile +++ b/Makefile @@ -3,10 +3,6 @@ all: test fmt vet lint staticcheck test: go test ./... -fmt: - go list -f '{{.Dir}}' ./... | grep -v /vendor/ | xargs -L1 gofmt -l - test -z $$(go list -f '{{.Dir}}' ./... | grep -v /vendor/ | xargs -L1 gofmt -l) - vet: go vet ./... diff --git a/consume.go b/consume.go index 77bca61..e470bc3 100644 --- a/consume.go +++ b/consume.go @@ -254,33 +254,35 @@ func (consumer Consumer) startGoroutines( } for i := 0; i < consumeOptions.Concurrency; i++ { - go func() { - for msg := range msgs { - if consumeOptions.ConsumerAutoAck { - handler(Delivery{msg}) - continue - } - switch handler(Delivery{msg}) { - case Ack: - err := msg.Ack(false) - if err != nil { - consumer.logger.Printf("can't ack message: %v", err) - } - case NackDiscard: - err := msg.Nack(false, false) - if err != nil { - consumer.logger.Printf("can't nack message: %v", err) - } - case NackRequeue: - err := msg.Nack(false, true) - if err != nil { - consumer.logger.Printf("can't nack message: %v", err) - } - } - } - consumer.logger.Printf("rabbit consumer goroutine closed") - }() + go handlerGoroutine(consumer, msgs, consumeOptions, handler) } consumer.logger.Printf("Processing messages on %v goroutines", consumeOptions.Concurrency) return nil } + +func handlerGoroutine(consumer Consumer, msgs <-chan amqp.Delivery, consumeOptions ConsumeOptions, handler Handler) { + for msg := range msgs { + if consumeOptions.ConsumerAutoAck { + handler(Delivery{msg}) + continue + } + switch handler(Delivery{msg}) { + case Ack: + err := msg.Ack(false) + if err != nil { + consumer.logger.Printf("can't ack message: %v", err) + } + case NackDiscard: + err := msg.Nack(false, false) + if err != nil { + consumer.logger.Printf("can't nack message: %v", err) + } + case NackRequeue: + err := msg.Nack(false, true) + if err != nil { + consumer.logger.Printf("can't nack message: %v", err) + } + } + } + consumer.logger.Printf("rabbit consumer goroutine closed") +} diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 9923de0..f2fbeef 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -21,6 +21,12 @@ func main() { if err != nil { log.Fatal(err) } + + // wait for server to acknowledge the cancel + noWait := false + defer consumer.Disconnect() + defer consumer.StopConsuming(consumerName, noWait) + err = consumer.StartConsuming( func(d rabbitmq.Delivery) rabbitmq.Action { log.Printf("consumed: %v", string(d.Body)) @@ -57,9 +63,4 @@ func main() { fmt.Println("awaiting signal") <-done fmt.Println("stopping consumer") - - // wait for server to acknowledge the cancel - noWait := false - consumer.StopConsuming(consumerName, noWait) - consumer.Disconnect() } diff --git a/go.mod b/go.mod index ab9741a..ebf3dba 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/wagslane/go-rabbitmq go 1.16 -require github.com/rabbitmq/amqp091-go v0.0.0-20210823000215-c428a6150891 +require github.com/rabbitmq/amqp091-go v1.3.0 diff --git a/go.sum b/go.sum index b073bc2..5f8cc0f 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,4 @@ github.com/rabbitmq/amqp091-go v0.0.0-20210823000215-c428a6150891 h1:13nv5f/LNJxNpvpYm/u0NqrlFebon342f9Xu9GpklKc= github.com/rabbitmq/amqp091-go v0.0.0-20210823000215-c428a6150891/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= +github.com/rabbitmq/amqp091-go v1.3.0 h1:A/QuHiNw7LMCJsxx9iZn5lrIz6OrhIn7Dfk5/1YatWM= +github.com/rabbitmq/amqp091-go v1.3.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= diff --git a/vendor/github.com/rabbitmq/amqp091-go/.gitignore b/vendor/github.com/rabbitmq/amqp091-go/.gitignore index 667fb50..a93cced 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/.gitignore +++ b/vendor/github.com/rabbitmq/amqp091-go/.gitignore @@ -3,10 +3,4 @@ spec/spec examples/simple-consumer/simple-consumer examples/simple-producer/simple-producer -.idea/**/workspace.xml -.idea/**/tasks.xml -.idea/**/usage.statistics.xml -.idea/**/dictionaries -.idea/**/shelf - -.idea/**/contentModel.xml +.idea/ diff --git a/vendor/github.com/rabbitmq/amqp091-go/README.md b/vendor/github.com/rabbitmq/amqp091-go/README.md index 85e0491..daeb7aa 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/README.md +++ b/vendor/github.com/rabbitmq/amqp091-go/README.md @@ -1,5 +1,7 @@ # Go RabbitMQ Client Library +[![Go Reference](https://pkg.go.dev/badge/github.com/rabbitmq/amqp091-go.svg)](https://pkg.go.dev/github.com/rabbitmq/amqp091-go) + This is a Go AMQP 0.9.1 client maintained by the [RabbitMQ core team](https://github.com/rabbitmq). It was [originally developed by Sean Treadway](https://github.com/streadway/amqp). diff --git a/vendor/github.com/rabbitmq/amqp091-go/allocator.go b/vendor/github.com/rabbitmq/amqp091-go/allocator.go index d85614a..0688e4b 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/allocator.go +++ b/vendor/github.com/rabbitmq/amqp091-go/allocator.go @@ -1,3 +1,8 @@ +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + package amqp091 import ( diff --git a/vendor/github.com/rabbitmq/amqp091-go/auth.go b/vendor/github.com/rabbitmq/amqp091-go/auth.go index f283d34..0c07bb3 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/auth.go +++ b/vendor/github.com/rabbitmq/amqp091-go/auth.go @@ -1,10 +1,12 @@ // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package amqp091 import ( + "bytes" "fmt" ) @@ -42,13 +44,33 @@ func (auth *AMQPlainAuth) Mechanism() string { return "AMQPLAIN" } -// Response returns the null character delimited encoding for the SASL PLAIN Mechanism. +// Response returns an AMQP encoded credentials table, without the field table size. func (auth *AMQPlainAuth) Response() string { - return fmt.Sprintf("LOGIN:%sPASSWORD:%s", auth.Username, auth.Password) + var buf bytes.Buffer + table := Table{"LOGIN": auth.Username, "PASSWORD": auth.Password} + if err := writeTable(&buf, table); err != nil { + return "" + } + return buf.String()[4:] +} + +// ExternalAuth for RabbitMQ-auth-mechanism-ssl. +type ExternalAuth struct { +} + +// Mechanism returns "EXTERNAL" +func (*ExternalAuth) Mechanism() string { + return "EXTERNAL" +} + +// Response returns an AMQP encoded credentials table, without the field table size. +func (*ExternalAuth) Response() string { + return "\000*\000*" } // Finds the first mechanism preferred by the client that the server supports. func pickSASLMechanism(client []Authentication, serverMechanisms []string) (auth Authentication, ok bool) { + for _, auth = range client { for _, mech := range serverMechanisms { if auth.Mechanism() == mech { diff --git a/vendor/github.com/rabbitmq/amqp091-go/channel.go b/vendor/github.com/rabbitmq/amqp091-go/channel.go index c54d5a0..a4afc98 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/channel.go +++ b/vendor/github.com/rabbitmq/amqp091-go/channel.go @@ -1,4 +1,5 @@ // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. @@ -1329,8 +1330,19 @@ internal counter for DeliveryTags with the first confirmation starts at 1. */ func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error { + _, err := ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg) + return err +} + +/* +PublishWithDeferredConfirm behaves identically to Publish but additionally returns a +DeferredConfirmation, allowing the caller to wait on the publisher confirmation +for this message. If the channel has not been put into confirm mode, +the DeferredConfirmation will be nil. +*/ +func (ch *Channel) PublishWithDeferredConfirm(exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) { if err := msg.Headers.Validate(); err != nil { - return err + return nil, err } ch.m.Lock() @@ -1358,14 +1370,14 @@ func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg AppId: msg.AppId, }, }); err != nil { - return err + return nil, err } if ch.confirming { - ch.confirms.Publish() + return ch.confirms.Publish(), nil } - return nil + return nil, nil } /* @@ -1596,3 +1608,12 @@ func (ch *Channel) Reject(tag uint64, requeue bool) error { Requeue: requeue, }) } + +// GetNextPublishSeqNo returns the sequence number of the next message to be +// published, when in confirm mode. +func (ch *Channel) GetNextPublishSeqNo() uint64 { + ch.confirms.m.Lock() + defer ch.confirms.m.Unlock() + + return ch.confirms.published + 1 +} diff --git a/vendor/github.com/rabbitmq/amqp091-go/confirms.go b/vendor/github.com/rabbitmq/amqp091-go/confirms.go index 42450f4..654d755 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/confirms.go +++ b/vendor/github.com/rabbitmq/amqp091-go/confirms.go @@ -1,22 +1,32 @@ +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + package amqp091 -import "sync" +import ( + "sync" +) // confirms resequences and notifies one or multiple publisher confirmation listeners type confirms struct { - m sync.Mutex - listeners []chan Confirmation - sequencer map[uint64]Confirmation - published uint64 - expecting uint64 + m sync.Mutex + listeners []chan Confirmation + sequencer map[uint64]Confirmation + deferredConfirmations *deferredConfirmations + published uint64 + publishedMut sync.Mutex + expecting uint64 } // newConfirms allocates a confirms func newConfirms() *confirms { return &confirms{ - sequencer: map[uint64]Confirmation{}, - published: 0, - expecting: 1, + sequencer: map[uint64]Confirmation{}, + deferredConfirmations: newDeferredConfirmations(), + published: 0, + expecting: 1, } } @@ -28,12 +38,12 @@ func (c *confirms) Listen(l chan Confirmation) { } // Publish increments the publishing counter -func (c *confirms) Publish() uint64 { - c.m.Lock() - defer c.m.Unlock() +func (c *confirms) Publish() *DeferredConfirmation { + c.publishedMut.Lock() + defer c.publishedMut.Unlock() c.published++ - return c.published + return c.deferredConfirmations.Add(c.published) } // confirm confirms one publishing, increments the expecting delivery tag, and @@ -48,6 +58,9 @@ func (c *confirms) confirm(confirmation Confirmation) { // resequence confirms any out of order delivered confirmations func (c *confirms) resequence() { + c.publishedMut.Lock() + defer c.publishedMut.Unlock() + for c.expecting <= c.published { sequenced, found := c.sequencer[c.expecting] if !found { @@ -62,6 +75,8 @@ func (c *confirms) One(confirmed Confirmation) { c.m.Lock() defer c.m.Unlock() + c.deferredConfirmations.Confirm(confirmed) + if c.expecting == confirmed.DeliveryTag { c.confirm(confirmed) } else { @@ -75,6 +90,8 @@ func (c *confirms) Multiple(confirmed Confirmation) { c.m.Lock() defer c.m.Unlock() + c.deferredConfirmations.ConfirmMultiple(confirmed) + for c.expecting <= confirmed.DeliveryTag { c.confirm(Confirmation{c.expecting, confirmed.Ack}) } @@ -92,3 +109,56 @@ func (c *confirms) Close() error { c.listeners = nil return nil } + +type deferredConfirmations struct { + m sync.Mutex + confirmations map[uint64]*DeferredConfirmation +} + +func newDeferredConfirmations() *deferredConfirmations { + return &deferredConfirmations{ + confirmations: map[uint64]*DeferredConfirmation{}, + } +} + +func (d *deferredConfirmations) Add(tag uint64) *DeferredConfirmation { + d.m.Lock() + defer d.m.Unlock() + + dc := &DeferredConfirmation{DeliveryTag: tag} + dc.wg.Add(1) + d.confirmations[tag] = dc + return dc +} + +func (d *deferredConfirmations) Confirm(confirmation Confirmation) { + d.m.Lock() + defer d.m.Unlock() + + dc, found := d.confirmations[confirmation.DeliveryTag] + if !found { + // we should never receive a confirmation for a tag that hasn't been published, but a test causes this to happen + return + } + dc.confirmation = confirmation + dc.wg.Done() + delete(d.confirmations, confirmation.DeliveryTag) +} + +func (d *deferredConfirmations) ConfirmMultiple(confirmation Confirmation) { + d.m.Lock() + defer d.m.Unlock() + + for k, v := range d.confirmations { + if k <= confirmation.DeliveryTag { + v.confirmation = Confirmation{DeliveryTag: k, Ack: confirmation.Ack} + v.wg.Done() + delete(d.confirmations, k) + } + } +} + +func (d *DeferredConfirmation) Wait() bool { + d.wg.Wait() + return d.confirmation.Ack +} diff --git a/vendor/github.com/rabbitmq/amqp091-go/connection.go b/vendor/github.com/rabbitmq/amqp091-go/connection.go index a967ec4..4023ade 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/connection.go +++ b/vendor/github.com/rabbitmq/amqp091-go/connection.go @@ -1,4 +1,5 @@ // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. @@ -156,6 +157,23 @@ func DialTLS(url string, amqps *tls.Config) (*Connection, error) { }) } +// DialTLS_ExternalAuth accepts a string in the AMQP URI format and returns a +// new Connection over TCP using EXTERNAL auth. Defaults to a server heartbeat +// interval of 10 seconds and sets the initial read deadline to 30 seconds. +// +// This mechanism is used, when RabbitMQ is configured for EXTERNAL auth with +// ssl_cert_login plugin for userless/passwordless logons +// +// DialTLS_ExternalAuth uses the provided tls.Config when encountering an +// amqps:// scheme. +func DialTLS_ExternalAuth(url string, amqps *tls.Config) (*Connection, error) { + return DialConfig(url, Config{ + Heartbeat: defaultHeartbeat, + TLSClientConfig: amqps, + SASL: []Authentication{&ExternalAuth{}}, + }) +} + // DialConfig accepts a string in the AMQP URI format and a configuration for // the transport and connection setup, returning a new Connection. Defaults to // a server heartbeat interval of 10 seconds and sets the initial read deadline @@ -262,7 +280,8 @@ func (c *Connection) ConnectionState() tls.ConnectionState { NotifyClose registers a listener for close events either initiated by an error accompanying a connection.close method or by a normal shutdown. -On normal shutdowns, the chan will be closed. +The chan provided will be closed when the Channel is closed and on a +graceful close, no error will be sent. To reconnect after a transport or protocol error, register a listener here and re-run your setup process. diff --git a/vendor/github.com/rabbitmq/amqp091-go/consumers.go b/vendor/github.com/rabbitmq/amqp091-go/consumers.go index dfd0380..8c23fad 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/consumers.go +++ b/vendor/github.com/rabbitmq/amqp091-go/consumers.go @@ -1,4 +1,5 @@ // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. diff --git a/vendor/github.com/rabbitmq/amqp091-go/delivery.go b/vendor/github.com/rabbitmq/amqp091-go/delivery.go index 84d9da5..e94cf34 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/delivery.go +++ b/vendor/github.com/rabbitmq/amqp091-go/delivery.go @@ -1,4 +1,5 @@ // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. diff --git a/vendor/github.com/rabbitmq/amqp091-go/doc.go b/vendor/github.com/rabbitmq/amqp091-go/doc.go index b9ecbb6..ba2efb0 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/doc.go +++ b/vendor/github.com/rabbitmq/amqp091-go/doc.go @@ -1,4 +1,5 @@ // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. diff --git a/vendor/github.com/rabbitmq/amqp091-go/fuzz.go b/vendor/github.com/rabbitmq/amqp091-go/fuzz.go index ed55ad4..602220f 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/fuzz.go +++ b/vendor/github.com/rabbitmq/amqp091-go/fuzz.go @@ -1,3 +1,8 @@ +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + // +build gofuzz package amqp091 diff --git a/vendor/github.com/rabbitmq/amqp091-go/read.go b/vendor/github.com/rabbitmq/amqp091-go/read.go index e9f7253..57444b0 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/read.go +++ b/vendor/github.com/rabbitmq/amqp091-go/read.go @@ -1,4 +1,5 @@ // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. @@ -160,7 +161,8 @@ func readTimestamp(r io.Reader) (v time.Time, err error) { 'S': string 'T': time.Time 'V': nil -'b': byte +'b': int8 +'B': byte 'd': float64 'f': float32 'l': int64 @@ -182,13 +184,20 @@ func readField(r io.Reader) (v interface{}, err error) { } return (value != 0), nil - case 'b': + case 'B': var value [1]byte if _, err = io.ReadFull(r, value[0:1]); err != nil { return } return value[0], nil + case 'b': + var value int8 + if err = binary.Read(r, binary.BigEndian, &value); err != nil { + return + } + return value, nil + case 's': var value int16 if err = binary.Read(r, binary.BigEndian, &value); err != nil { diff --git a/vendor/github.com/rabbitmq/amqp091-go/return.go b/vendor/github.com/rabbitmq/amqp091-go/return.go index 5816f3a..cdc3875 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/return.go +++ b/vendor/github.com/rabbitmq/amqp091-go/return.go @@ -1,4 +1,5 @@ // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. diff --git a/vendor/github.com/rabbitmq/amqp091-go/spec091.go b/vendor/github.com/rabbitmq/amqp091-go/spec091.go index cfcdf1c..0261c15 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/spec091.go +++ b/vendor/github.com/rabbitmq/amqp091-go/spec091.go @@ -1,4 +1,5 @@ // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. diff --git a/vendor/github.com/rabbitmq/amqp091-go/types.go b/vendor/github.com/rabbitmq/amqp091-go/types.go index 3875f7e..3319990 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/types.go +++ b/vendor/github.com/rabbitmq/amqp091-go/types.go @@ -1,4 +1,5 @@ // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. @@ -7,6 +8,7 @@ package amqp091 import ( "fmt" "io" + "sync" "time" ) @@ -178,6 +180,15 @@ type Blocking struct { Reason string // Server reason for activation } +// DeferredConfirmation represents a future publisher confirm for a message. It +// allows users to directly correlate a publishing to a confirmation. These are +// returned from PublishWithDeferredConfirm on Channels. +type DeferredConfirmation struct { + wg sync.WaitGroup + DeliveryTag uint64 + confirmation Confirmation +} + // Confirmation notifies the acknowledgment or negative acknowledgement of a // publishing identified by its delivery tag. Use NotifyPublish on the Channel // to consume these events. @@ -197,6 +208,7 @@ type Decimal struct { // // bool // byte +// int8 // float32 // float64 // int @@ -225,7 +237,7 @@ type Table map[string]interface{} func validateField(f interface{}) error { switch fv := f.(type) { - case nil, bool, byte, int, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time: + case nil, bool, byte, int8, int, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time: return nil case []interface{}: diff --git a/vendor/github.com/rabbitmq/amqp091-go/uri.go b/vendor/github.com/rabbitmq/amqp091-go/uri.go index 73e29bc..f50abe0 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/uri.go +++ b/vendor/github.com/rabbitmq/amqp091-go/uri.go @@ -1,4 +1,5 @@ // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. diff --git a/vendor/github.com/rabbitmq/amqp091-go/write.go b/vendor/github.com/rabbitmq/amqp091-go/write.go index 936a021..e7307d2 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/write.go +++ b/vendor/github.com/rabbitmq/amqp091-go/write.go @@ -1,4 +1,5 @@ // Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. @@ -275,7 +276,8 @@ func writeLongstr(w io.Writer, s string) (err error) { 'S': string 'T': time.Time 'V': nil -'b': byte +'b': int8 +'B': byte 'd': float64 'f': float32 'l': int64 @@ -298,10 +300,15 @@ func writeField(w io.Writer, value interface{}) (err error) { enc = buf[:2] case byte: - buf[0] = 'b' + buf[0] = 'B' buf[1] = v enc = buf[:2] + case int8: + buf[0] = 'b' + buf[1] = uint8(v) + enc = buf[:2] + case int16: buf[0] = 's' binary.BigEndian.PutUint16(buf[1:3], uint16(v)) diff --git a/vendor/modules.txt b/vendor/modules.txt index 802a23c..b4cc059 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,3 +1,3 @@ -# github.com/rabbitmq/amqp091-go v0.0.0-20210823000215-c428a6150891 +# github.com/rabbitmq/amqp091-go v1.3.0 ## explicit github.com/rabbitmq/amqp091-go