Browse Source

rename to github.com/DizoftTeam/go-rabbitmq

pull/127/head
WiRight 3 years ago
parent
commit
df42cff33a
No known key found for this signature in database GPG Key ID: 427DBE0B77ED3FBD
25 changed files with 175 additions and 60 deletions
  1. +2
    -2
      README.md
  2. +1
    -1
      connection.go
  3. +1
    -1
      consume.go
  4. +1
    -1
      consumer_options.go
  5. +1
    -1
      declare.go
  6. +1
    -1
      examples/consumer/main.go
  7. +1
    -1
      examples/logger/main.go
  8. +1
    -1
      examples/multiconsumer/main.go
  9. +1
    -1
      examples/multipublisher/main.go
  10. +1
    -1
      examples/publisher/main.go
  11. +2
    -2
      go.mod
  12. +4
    -30
      go.sum
  13. +3
    -3
      internal/channelmanager/channel_manager.go
  14. +2
    -2
      internal/connectionmanager/connection_manager.go
  15. +1
    -1
      logger.go
  16. +2
    -2
      publish.go
  17. +18
    -0
      vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md
  18. +16
    -2
      vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md
  19. +5
    -0
      vendor/github.com/rabbitmq/amqp091-go/Makefile
  20. +9
    -5
      vendor/github.com/rabbitmq/amqp091-go/channel.go
  21. +22
    -1
      vendor/github.com/rabbitmq/amqp091-go/confirms.go
  22. +47
    -0
      vendor/github.com/rabbitmq/amqp091-go/connection.go
  23. +27
    -0
      vendor/github.com/rabbitmq/amqp091-go/consumers.go
  24. +5
    -0
      vendor/github.com/rabbitmq/amqp091-go/types.go
  25. +1
    -1
      vendor/modules.txt

+ 2
- 2
README.md View File

@ -4,7 +4,7 @@ Wrapper of [rabbitmq/amqp091-go](https://github.com/rabbitmq/amqp091-go) that pr
Supported by [Boot.dev](https://boot.dev) Supported by [Boot.dev](https://boot.dev)
[![](https://godoc.org/github.com/wagslane/go-rabbitmq?status.svg)](https://godoc.org/github.com/wagslane/go-rabbitmq)![Deploy](https://github.com/wagslane/go-rabbitmq/workflows/Tests/badge.svg)
[![](https://godoc.org/github.com/DizoftTeam/go-rabbitmq?status.svg)](https://godoc.org/github.com/DizoftTeam/go-rabbitmq)![Deploy](https://github.com/DizoftTeam/go-rabbitmq/workflows/Tests/badge.svg)
## Motivation ## Motivation
@ -25,7 +25,7 @@ The goal with `go-rabbitmq` is to provide *most* (but not all) of the nitty-grit
Inside a Go module: Inside a Go module:
```bash ```bash
go get github.com/wagslane/go-rabbitmq
go get github.com/DizoftTeam/go-rabbitmq
``` ```
## 🚀 Quick Start Consumer ## 🚀 Quick Start Consumer


+ 1
- 1
connection.go View File

@ -1,8 +1,8 @@
package rabbitmq package rabbitmq
import ( import (
"github.com/DizoftTeam/go-rabbitmq/internal/connectionmanager"
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/connectionmanager"
) )
// Conn manages the connection to a rabbit cluster // Conn manages the connection to a rabbit cluster


+ 1
- 1
consume.go View File

@ -5,8 +5,8 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/DizoftTeam/go-rabbitmq/internal/channelmanager"
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/channelmanager"
) )
// Action is an action that occurs after processed this delivery // Action is an action that occurs after processed this delivery


+ 1
- 1
consumer_options.go View File

@ -1,8 +1,8 @@
package rabbitmq package rabbitmq
import ( import (
"github.com/DizoftTeam/go-rabbitmq/internal/logger"
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/logger"
) )
// getDefaultConsumerOptions describes the options that will be used when a value isn't provided // getDefaultConsumerOptions describes the options that will be used when a value isn't provided


+ 1
- 1
declare.go View File

@ -1,7 +1,7 @@
package rabbitmq package rabbitmq
import ( import (
"github.com/wagslane/go-rabbitmq/internal/channelmanager"
"github.com/DizoftTeam/go-rabbitmq/internal/channelmanager"
) )
func declareQueue(chanManager *channelmanager.ChannelManager, options QueueOptions) error { func declareQueue(chanManager *channelmanager.ChannelManager, options QueueOptions) error {


+ 1
- 1
examples/consumer/main.go View File

@ -7,7 +7,7 @@ import (
"os/signal" "os/signal"
"syscall" "syscall"
rabbitmq "github.com/wagslane/go-rabbitmq"
rabbitmq "github.com/DizoftTeam/go-rabbitmq"
) )
func main() { func main() {


+ 1
- 1
examples/logger/main.go View File

@ -4,7 +4,7 @@ import (
"context" "context"
"log" "log"
rabbitmq "github.com/wagslane/go-rabbitmq"
rabbitmq "github.com/DizoftTeam/go-rabbitmq"
) )
// errorLogger is used in WithPublisherOptionsLogger to create a custom logger // errorLogger is used in WithPublisherOptionsLogger to create a custom logger


+ 1
- 1
examples/multiconsumer/main.go View File

@ -7,7 +7,7 @@ import (
"os/signal" "os/signal"
"syscall" "syscall"
rabbitmq "github.com/wagslane/go-rabbitmq"
rabbitmq "github.com/DizoftTeam/go-rabbitmq"
) )
func main() { func main() {


+ 1
- 1
examples/multipublisher/main.go View File

@ -9,7 +9,7 @@ import (
"syscall" "syscall"
"time" "time"
rabbitmq "github.com/wagslane/go-rabbitmq"
rabbitmq "github.com/DizoftTeam/go-rabbitmq"
) )
func main() { func main() {


+ 1
- 1
examples/publisher/main.go View File

@ -9,7 +9,7 @@ import (
"syscall" "syscall"
"time" "time"
rabbitmq "github.com/wagslane/go-rabbitmq"
rabbitmq "github.com/DizoftTeam/go-rabbitmq"
) )
func main() { func main() {


+ 2
- 2
go.mod View File

@ -1,5 +1,5 @@
module github.com/wagslane/go-rabbitmq
module github.com/DizoftTeam/go-rabbitmq
go 1.20 go 1.20
require github.com/rabbitmq/amqp091-go v1.7.0
require github.com/rabbitmq/amqp091-go v1.8.0

+ 4
- 30
go.sum View File

@ -4,40 +4,14 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo=
github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI=
github.com/rabbitmq/amqp091-go v1.8.0 h1:GBFy5PpLQ5jSVVSYv8ecHGqeX7UTLYR4ItQbDCss9MM=
github.com/rabbitmq/amqp091-go v1.8.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=


+ 3
- 3
internal/channelmanager/channel_manager.go View File

@ -5,10 +5,10 @@ import (
"sync" "sync"
"time" "time"
"github.com/DizoftTeam/go-rabbitmq/internal/connectionmanager"
"github.com/DizoftTeam/go-rabbitmq/internal/dispatcher"
"github.com/DizoftTeam/go-rabbitmq/internal/logger"
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/connectionmanager"
"github.com/wagslane/go-rabbitmq/internal/dispatcher"
"github.com/wagslane/go-rabbitmq/internal/logger"
) )
// ChannelManager - // ChannelManager -


+ 2
- 2
internal/connectionmanager/connection_manager.go View File

@ -4,9 +4,9 @@ import (
"sync" "sync"
"time" "time"
"github.com/DizoftTeam/go-rabbitmq/internal/dispatcher"
"github.com/DizoftTeam/go-rabbitmq/internal/logger"
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/dispatcher"
"github.com/wagslane/go-rabbitmq/internal/logger"
) )
// ConnectionManager - // ConnectionManager -


+ 1
- 1
logger.go View File

@ -4,7 +4,7 @@ import (
"fmt" "fmt"
"log" "log"
"github.com/wagslane/go-rabbitmq/internal/logger"
"github.com/DizoftTeam/go-rabbitmq/internal/logger"
) )
// Logger is describes a logging structure. It can be set using // Logger is describes a logging structure. It can be set using


+ 2
- 2
publish.go View File

@ -6,9 +6,9 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/DizoftTeam/go-rabbitmq/internal/channelmanager"
"github.com/DizoftTeam/go-rabbitmq/internal/connectionmanager"
amqp "github.com/rabbitmq/amqp091-go" amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/channelmanager"
"github.com/wagslane/go-rabbitmq/internal/connectionmanager"
) )
// DeliveryMode. Transient means higher throughput but messages will not be // DeliveryMode. Transient means higher throughput but messages will not be


+ 18
- 0
vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md View File

@ -1,5 +1,23 @@
# Changelog # Changelog
## [v1.7.0](https://github.com/rabbitmq/amqp091-go/tree/v1.7.0) (2023-02-09)
[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.1...v1.7.0)
**Closed issues:**
- \#31 resurfacing \(?\) [\#170](https://github.com/rabbitmq/amqp091-go/issues/170)
- Deprecate QueueInspect [\#167](https://github.com/rabbitmq/amqp091-go/issues/167)
- v1.6.0 causing rabbit connection errors [\#160](https://github.com/rabbitmq/amqp091-go/issues/160)
**Merged pull requests:**
- Set channels and allocator to nil in shutdown [\#172](https://github.com/rabbitmq/amqp091-go/pull/172) ([lukebakken](https://github.com/lukebakken))
- Fix racing in Open [\#171](https://github.com/rabbitmq/amqp091-go/pull/171) ([Zerpet](https://github.com/Zerpet))
- adding go 1.20 to tests [\#169](https://github.com/rabbitmq/amqp091-go/pull/169) ([halilylm](https://github.com/halilylm))
- Deprecate the QueueInspect function [\#168](https://github.com/rabbitmq/amqp091-go/pull/168) ([lukebakken](https://github.com/lukebakken))
- Check if channel is nil before updating it [\#150](https://github.com/rabbitmq/amqp091-go/pull/150) ([julienschmidt](https://github.com/julienschmidt))
## [v1.6.1](https://github.com/rabbitmq/amqp091-go/tree/v1.6.1) (2023-02-01) ## [v1.6.1](https://github.com/rabbitmq/amqp091-go/tree/v1.6.1) (2023-02-01)
[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.1-rc.2...v1.6.1) [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.1-rc.2...v1.6.1)


+ 16
- 2
vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md View File

@ -9,11 +9,13 @@ Here is the recommended workflow:
1. Run Static Checks 1. Run Static Checks
1. Run integration tests (see below) 1. Run integration tests (see below)
1. **Implement tests** 1. **Implement tests**
1. Implement fixs
1. Commit your changes (`git commit -am 'Add some feature'`)
1. Implement fixes
1. Commit your changes. Use a [good, descriptive, commit message][good-commit].
1. Push to a branch (`git push -u origin my-new-feature`) 1. Push to a branch (`git push -u origin my-new-feature`)
1. Submit a pull request 1. Submit a pull request
[good-commit]: https://cbea.ms/git-commit/
## Running Static Checks ## Running Static Checks
golangci-lint must be installed to run the static checks. See [installation golangci-lint must be installed to run the static checks. See [installation
@ -43,6 +45,18 @@ The integration tests can be run via:
make tests make tests
``` ```
Some tests require access to `rabbitmqctl` CLI. Use the environment variable
`RABBITMQ_RABBITMQCTL_PATH=/some/path/to/rabbitmqctl` to run those tests.
If you have Docker available in your machine, you can run:
```shell
make tests-docker
```
This target will start a RabbitMQ container, run the test suite with the environment
variable setup, and stop RabbitMQ container after a successful run.
All integration tests should use the `integrationConnection(...)` test All integration tests should use the `integrationConnection(...)` test
helpers defined in `integration_test.go` to setup the integration environment helpers defined in `integration_test.go` to setup the integration environment
and logging. and logging.

+ 5
- 0
vendor/github.com/rabbitmq/amqp091-go/Makefile View File

@ -19,6 +19,11 @@ fmt: ## Run go fmt against code
tests: ## Run all tests and requires a running rabbitmq-server. Use GO_TEST_FLAGS to add extra flags to go test tests: ## Run all tests and requires a running rabbitmq-server. Use GO_TEST_FLAGS to add extra flags to go test
go test -race -v -tags integration $(GO_TEST_FLAGS) go test -race -v -tags integration $(GO_TEST_FLAGS)
.PHONY: tests-docker
tests-docker: rabbitmq-server
RABBITMQ_RABBITMQCTL_PATH="DOCKER:$(CONTAINER_NAME)" go test -race -v -tags integration $(GO_TEST_FLAGS)
$(MAKE) stop-rabbitmq-server
.PHONY: check .PHONY: check
check: check:
golangci-lint run ./... golangci-lint run ./...


+ 9
- 5
vendor/github.com/rabbitmq/amqp091-go/channel.go View File

@ -1435,6 +1435,11 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex
ch.m.Lock() ch.m.Lock()
defer ch.m.Unlock() defer ch.m.Unlock()
var dc *DeferredConfirmation
if ch.confirming {
dc = ch.confirms.publish()
}
if err := ch.send(&basicPublish{ if err := ch.send(&basicPublish{
Exchange: exchange, Exchange: exchange,
RoutingKey: key, RoutingKey: key,
@ -1457,14 +1462,13 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex
AppId: msg.AppId, AppId: msg.AppId,
}, },
}); err != nil { }); err != nil {
if ch.confirming {
ch.confirms.unpublish()
}
return nil, err return nil, err
} }
if ch.confirming {
return ch.confirms.Publish(), nil
}
return nil, nil
return dc, nil
} }
/* /*


+ 22
- 1
vendor/github.com/rabbitmq/amqp091-go/confirms.go View File

@ -39,7 +39,7 @@ func (c *confirms) Listen(l chan Confirmation) {
} }
// Publish increments the publishing counter // Publish increments the publishing counter
func (c *confirms) Publish() *DeferredConfirmation {
func (c *confirms) publish() *DeferredConfirmation {
c.publishedMut.Lock() c.publishedMut.Lock()
defer c.publishedMut.Unlock() defer c.publishedMut.Unlock()
@ -47,6 +47,15 @@ func (c *confirms) Publish() *DeferredConfirmation {
return c.deferredConfirmations.Add(c.published) return c.deferredConfirmations.Add(c.published)
} }
// unpublish decrements the publishing counter and removes the
// DeferredConfirmation. It must be called immediately after a publish fails.
func (c *confirms) unpublish() {
c.publishedMut.Lock()
defer c.publishedMut.Unlock()
c.deferredConfirmations.remove(c.published)
c.published--
}
// confirm confirms one publishing, increments the expecting delivery tag, and // confirm confirms one publishing, increments the expecting delivery tag, and
// removes bookkeeping for that delivery tag. // removes bookkeeping for that delivery tag.
func (c *confirms) confirm(confirmation Confirmation) { func (c *confirms) confirm(confirmation Confirmation) {
@ -135,6 +144,18 @@ func (d *deferredConfirmations) Add(tag uint64) *DeferredConfirmation {
return dc return dc
} }
// remove is only used to drop a tag whose publish failed
func (d *deferredConfirmations) remove(tag uint64) {
d.m.Lock()
defer d.m.Unlock()
dc, found := d.confirmations[tag]
if !found {
return
}
close(dc.done)
delete(d.confirmations, tag)
}
func (d *deferredConfirmations) Confirm(confirmation Confirmation) { func (d *deferredConfirmations) Confirm(confirmation Confirmation) {
d.m.Lock() d.m.Lock()
defer d.m.Unlock() defer d.m.Unlock()


+ 47
- 0
vendor/github.com/rabbitmq/amqp091-go/connection.go View File

@ -399,12 +399,47 @@ func (c *Connection) Close() error {
) )
} }
// CloseDeadline requests and waits for the response to close this AMQP connection.
//
// Accepts a deadline for waiting the server response. The deadline is passed
// to the low-level connection i.e. network socket.
//
// Regardless of the error returned, the connection is considered closed, and it
// should not be used after calling this function.
//
// In the event of an I/O timeout, connection-closed listeners are NOT informed.
//
// After returning from this call, all resources associated with this connection,
// including the underlying io, Channels, Notify listeners and Channel consumers
// will also be closed.
func (c *Connection) CloseDeadline(deadline time.Time) error {
if c.IsClosed() {
return ErrClosed
}
defer c.shutdown(nil)
err := c.setDeadline(deadline)
if err != nil {
return err
}
return c.call(
&connectionClose{
ReplyCode: replySuccess,
ReplyText: "kthxbai",
},
&connectionCloseOk{},
)
}
func (c *Connection) closeWith(err *Error) error { func (c *Connection) closeWith(err *Error) error {
if c.IsClosed() { if c.IsClosed() {
return ErrClosed return ErrClosed
} }
defer c.shutdown(err) defer c.shutdown(err)
return c.call( return c.call(
&connectionClose{ &connectionClose{
ReplyCode: uint16(err.Code), ReplyCode: uint16(err.Code),
@ -420,6 +455,18 @@ func (c *Connection) IsClosed() bool {
return atomic.LoadInt32(&c.closed) == 1 return atomic.LoadInt32(&c.closed) == 1
} }
// setDeadline is a wrapper to type assert Connection.conn and set an I/O
// deadline in the underlying TCP connection socket, by calling
// net.Conn.SetDeadline(). It returns an error, in case the type assertion fails,
// although this should never happen.
func (c *Connection) setDeadline(t time.Time) error {
con, ok := c.conn.(net.Conn)
if !ok {
return errInvalidTypeAssertion
}
return con.SetDeadline(t)
}
func (c *Connection) send(f frame) error { func (c *Connection) send(f frame) error {
if c.IsClosed() { if c.IsClosed() {
return ErrClosed return ErrClosed


+ 27
- 0
vendor/github.com/rabbitmq/amqp091-go/consumers.go View File

@ -75,6 +75,33 @@ func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) {
} }
case out <- *queue[0]: case out <- *queue[0]:
/*
* https://github.com/rabbitmq/amqp091-go/issues/179
* https://github.com/rabbitmq/amqp091-go/pull/180
*
* Comment from @lars-t-hansen:
*
* Given Go's slice semantics, and barring any information
* available to the compiler that proves that queue is the only
* pointer to the memory it references, the only meaning that
* queue = queue[1:] can have is basically queue += sizeof(queue
* element), ie, it bumps a pointer. Looking at the generated
* code for a simple example (on ARM64 in this case) bears this
* out. So what we're left with is an array that we have a
* pointer into the middle of. When the GC traces this pointer,
* it too does not know whether the array has multiple
* referents, and so its only sensible choice is to find the
* beginning of the array, and if the array is not already
* visited, mark every element in it, including the "dead"
* pointer.
*
* (Depending on the program dynamics, an element may eventually
* be appended to the queue when the queue is at capacity, and
* in this case the live elements are copied into a new array
* and the old array is left to be GC'd eventually, along with
* the dead object. But that can take time.)
*/
queue[0] = nil
queue = queue[1:] queue = queue[1:]
} }
} }


+ 5
- 0
vendor/github.com/rabbitmq/amqp091-go/types.go View File

@ -63,6 +63,11 @@ var (
ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"} ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"}
) )
// internal errors used inside the library
var (
errInvalidTypeAssertion = &Error{Code: InternalError, Reason: "type assertion unsuccessful", Server: false, Recover: true}
)
// Error captures the code and reason a channel or connection has been closed // Error captures the code and reason a channel or connection has been closed
// by the server. // by the server.
type Error struct { type Error struct {


+ 1
- 1
vendor/modules.txt View File

@ -1,3 +1,3 @@
# github.com/rabbitmq/amqp091-go v1.7.0
# github.com/rabbitmq/amqp091-go v1.8.0
## explicit; go 1.16 ## explicit; go 1.16
github.com/rabbitmq/amqp091-go github.com/rabbitmq/amqp091-go

Loading…
Cancel
Save