| @ -0,0 +1,29 @@ | |||
| name: Tests | |||
| on: | |||
| pull_request: | |||
| branches: [main] | |||
| jobs: | |||
| test: | |||
| name: Test | |||
| runs-on: ubuntu-latest | |||
| env: | |||
| GOFLAGS: -mod=vendor | |||
| GOPROXY: "off" | |||
| steps: | |||
| - name: Set up Go 1.16 | |||
| uses: actions/setup-go@v2 | |||
| with: | |||
| go-version: 1.16 | |||
| id: go | |||
| - name: Check out code into the Go module directory | |||
| uses: actions/checkout@v1 | |||
| - name: Make tests | |||
| run: | | |||
| make install-lint | |||
| make install-staticcheck | |||
| make | |||
| @ -0,0 +1,24 @@ | |||
| all: test fmt vet lint staticcheck | |||
| test: | |||
| go test ./... | |||
| fmt: | |||
| go list -f '{{.Dir}}' ./... | grep -v /vendor/ | xargs -L1 gofmt -l | |||
| test -z $$(go list -f '{{.Dir}}' ./... | grep -v /vendor/ | xargs -L1 gofmt -l) | |||
| vet: | |||
| go vet ./... | |||
| install-lint: | |||
| GO111MODULE=off go get -u golang.org/x/lint/golint | |||
| GO111MODULE=off go list -f {{.Target}} golang.org/x/lint/golint | |||
| lint: | |||
| go list ./... | grep -v /vendor/ | xargs -L1 golint -set_exit_status | |||
| install-staticcheck: | |||
| cd /tmp && go get honnef.co/go/tools/cmd/staticcheck | |||
| staticcheck: | |||
| staticcheck -f stylish ./... | |||
| @ -1,2 +1,106 @@ | |||
| # go-rabbitmq | |||
| Wrapper of streadway/amqp that provides reconnection logic and sane defaults | |||
| Wrapper of streadway/amqp that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful ⭐ | |||
| Supported by [Qvault](https://qvault.io) | |||
| ## Motivation | |||
| Streadway's [AMQP](https://github.com/streadway/amqp) library is currently the most robust and well-supported Go client I'm aware of. It's a fantastic option and I recommend starting there and seeing if it fulfills your needs. Their project has made an effort to stay within the scope of the AMQP protocol. As such, no reconnection logic and few ease-of-use abstractions are provided. | |||
| The goal with `go-rabbit` is to still provide basically all of the nitty-gritty functionality of AMQP, but to make it easier to work with via a higher-level API. Particularly: | |||
| * Automatic reconnection | |||
| * Multithreaded consumers via a handler function | |||
| * Reasonable defaults | |||
| * Flow control handling | |||
| ## ⚙️ Installation | |||
| Outside of a Go module: | |||
| ```bash | |||
| go get github.com/wagslane/go-rabbitmq | |||
| ``` | |||
| ## 🚀 Quick Start Consumer | |||
| ```go | |||
| consumer, err := rabbitmq.GetConsumer("amqp://user:pass@localhost", true) | |||
| if err != nil { | |||
| log.Fatal(err) | |||
| } | |||
| err = consumer.StartConsumers( | |||
| func(d rabbitmq.Delivery) bool { | |||
| log.Printf("consumed: %v", string(d.Body)) | |||
| // true to ACK, false to NACK | |||
| return true | |||
| }, | |||
| // can pass nil here for defaults | |||
| &rabbitmq.ConsumeOptions{ | |||
| QueueOptions: rabbitmq.QueueOptions{ | |||
| Durable: true, | |||
| }, | |||
| QosOptions: rabbitmq.QosOptions{ | |||
| Concurrency: 10, | |||
| Prefetch: 100, | |||
| }, | |||
| }, | |||
| "my_queue", | |||
| "routing_key1", "routing_key2", | |||
| ) | |||
| if err != nil { | |||
| log.Fatal(err) | |||
| } | |||
| ``` | |||
| ## 🚀 Quick Start Publisher | |||
| ```go | |||
| publisher, returns, err := rabbitmq.GetPublisher("amqp://user:pass@localhost", true) | |||
| if err != nil { | |||
| log.Fatal(err) | |||
| } | |||
| err = publisher.Publish( | |||
| []byte("hello, world"), | |||
| // leave nil for defaults | |||
| &rabbitmq.PublishOptions{ | |||
| Exchange: "events", | |||
| Mandatory: true, | |||
| }, | |||
| "routing_key", | |||
| ) | |||
| if err != nil { | |||
| log.Fatal(err) | |||
| } | |||
| go func() { | |||
| for r := range returns { | |||
| log.Printf("message returned from server: %s", string(r.Body)) | |||
| } | |||
| }() | |||
| ``` | |||
| ## 💬 Contact | |||
| [](https://twitter.com/intent/follow?screen_name=wagslane) | |||
| Submit an issue (above in the issues tab) | |||
| ## Transient Dependencies | |||
| * [github.com/streadway/amqp](https://github.com/streadway/amqp) - and that's it. | |||
| ## 👏 Contributing | |||
| I love help! Contribute by forking the repo and opening pull requests. Please ensure that your code passes the existing tests and linting, and write tests to test your changes if applicable. | |||
| All pull requests should be submitted to the `main` branch. | |||
| ```bash | |||
| make test | |||
| make fmt | |||
| make vet | |||
| make lint | |||
| ``` | |||
| @ -0,0 +1,102 @@ | |||
| package rabbitmq | |||
| import ( | |||
| "errors" | |||
| "sync" | |||
| "time" | |||
| "github.com/streadway/amqp" | |||
| ) | |||
| type channelManager struct { | |||
| logger logger | |||
| url string | |||
| channel *amqp.Channel | |||
| channelMux *sync.RWMutex | |||
| notifyCancelOrClose chan error | |||
| } | |||
| func newChannelManager(url string, logging bool) (*channelManager, error) { | |||
| ch, err := getNewChannel(url) | |||
| if err != nil { | |||
| return nil, err | |||
| } | |||
| chManager := channelManager{ | |||
| logger: logger{logging: logging}, | |||
| url: url, | |||
| channel: ch, | |||
| channelMux: &sync.RWMutex{}, | |||
| notifyCancelOrClose: make(chan error), | |||
| } | |||
| go chManager.startNotifyCancelOrClosed() | |||
| return &chManager, nil | |||
| } | |||
| func getNewChannel(url string) (*amqp.Channel, error) { | |||
| amqpConn, err := amqp.Dial(url) | |||
| if err != nil { | |||
| return nil, err | |||
| } | |||
| ch, err := amqpConn.Channel() | |||
| if err != nil { | |||
| return nil, err | |||
| } | |||
| return ch, err | |||
| } | |||
| // startNotifyCancelOrClosed listens on the channel's cancelled and closed | |||
| // notifiers. When it detects a problem, it attempts to reconnect with an exponential | |||
| // backoff. Once reconnected, it sends an error back on the manager's notifyCancelOrClose | |||
| // channel | |||
| func (chManager *channelManager) startNotifyCancelOrClosed() { | |||
| notifyCloseChan := make(chan *amqp.Error) | |||
| notifyCloseChan = chManager.channel.NotifyClose(notifyCloseChan) | |||
| notifyCancelChan := make(chan string) | |||
| notifyCancelChan = chManager.channel.NotifyCancel(notifyCancelChan) | |||
| select { | |||
| case err := <-notifyCloseChan: | |||
| chManager.logger.Println("attempting to reconnect to amqp server after close") | |||
| chManager.reconnectWithBackoff() | |||
| chManager.logger.Println("successfully reconnected to amqp server after close") | |||
| chManager.notifyCancelOrClose <- err | |||
| case err := <-notifyCancelChan: | |||
| chManager.logger.Println("attempting to reconnect to amqp server after cancel") | |||
| chManager.reconnectWithBackoff() | |||
| chManager.logger.Println("successfully reconnected to amqp server after cancel") | |||
| chManager.notifyCancelOrClose <- errors.New(err) | |||
| } | |||
| close(notifyCancelChan) | |||
| close(notifyCloseChan) | |||
| } | |||
| // reconnectWithBackoff continuously attempts to reconnect with an | |||
| // exponential backoff strategy | |||
| func (chManager *channelManager) reconnectWithBackoff() { | |||
| backoffTime := time.Second | |||
| for { | |||
| chManager.logger.Printf("waiting %s seconds to attempt to reconnect to amqp server", backoffTime) | |||
| time.Sleep(backoffTime) | |||
| backoffTime *= 2 | |||
| err := chManager.reconnect() | |||
| if err != nil { | |||
| chManager.logger.Printf("error reconnecting to amqp server: %v", err) | |||
| } else { | |||
| return | |||
| } | |||
| } | |||
| } | |||
| // reconnect safely closes the current channel and obtains a new one | |||
| func (chManager *channelManager) reconnect() error { | |||
| chManager.channelMux.Lock() | |||
| defer chManager.channelMux.Unlock() | |||
| newChannel, err := getNewChannel(chManager.url) | |||
| if err != nil { | |||
| return err | |||
| } | |||
| chManager.channel.Close() | |||
| chManager.channel = newChannel | |||
| go chManager.startNotifyCancelOrClosed() | |||
| return nil | |||
| } | |||
| @ -0,0 +1,264 @@ | |||
| package rabbitmq | |||
| import ( | |||
| "log" | |||
| "time" | |||
| "github.com/streadway/amqp" | |||
| ) | |||
| // Consumer allows you to create and connect to queues for data consumption. | |||
| type Consumer struct { | |||
| chManager *channelManager | |||
| logger logger | |||
| } | |||
| // Delivery captures the fields for a previously delivered message resident in | |||
| // a queue to be delivered by the server to a consumer from Channel.Consume or | |||
| // Channel.Get. | |||
| type Delivery struct { | |||
| amqp.Delivery | |||
| } | |||
| // ConsumeOptions are used to describe how a new consumer will be created. | |||
| type ConsumeOptions struct { | |||
| QueueOptions QueueOptions | |||
| BindingOptions BindingOptions | |||
| QosOptions QosOptions | |||
| ConsumerOptions ConsumerOptions | |||
| Logging bool | |||
| } | |||
| // QueueOptions - | |||
| type QueueOptions struct { | |||
| Durable bool | |||
| AutoDelete bool | |||
| Exclusive bool | |||
| NoWait bool | |||
| Args Table | |||
| } | |||
| // BindingOptions - | |||
| type BindingOptions struct { | |||
| Exchange string | |||
| NoWait bool | |||
| Args Table | |||
| } | |||
| // QosOptions - | |||
| type QosOptions struct { | |||
| Concurrency int | |||
| Prefetch int | |||
| Global bool | |||
| } | |||
| // ConsumerOptions - | |||
| type ConsumerOptions struct { | |||
| Name string | |||
| AutoAck bool | |||
| Exclusive bool | |||
| NoWait bool | |||
| NoLocal bool | |||
| Args Table | |||
| } | |||
| // GetConsumer returns a new Consumer connected to the given rabbitmq server | |||
| func GetConsumer(url string, logging bool) (Consumer, error) { | |||
| chManager, err := newChannelManager(url, logging) | |||
| if err != nil { | |||
| return Consumer{}, err | |||
| } | |||
| consumer := Consumer{ | |||
| chManager: chManager, | |||
| logger: logger{logging: logging}, | |||
| } | |||
| return consumer, nil | |||
| } | |||
| // getDefaultConsumeOptions descibes the options that will be used when a value isn't provided | |||
| func getDefaultConsumeOptions() ConsumeOptions { | |||
| return ConsumeOptions{ | |||
| QueueOptions: QueueOptions{ | |||
| Durable: false, | |||
| AutoDelete: false, | |||
| Exclusive: false, | |||
| NoWait: false, | |||
| Args: nil, | |||
| }, | |||
| BindingOptions: BindingOptions{ | |||
| Exchange: "", | |||
| NoWait: false, | |||
| Args: nil, | |||
| }, | |||
| QosOptions: QosOptions{ | |||
| Concurrency: 1, | |||
| Prefetch: 10, | |||
| Global: false, | |||
| }, | |||
| ConsumerOptions: ConsumerOptions{ | |||
| Name: "", | |||
| AutoAck: false, | |||
| Exclusive: false, | |||
| NoWait: false, | |||
| NoLocal: false, | |||
| Args: nil, | |||
| }, | |||
| } | |||
| } | |||
| // fillInConsumeDefaults - | |||
| func fillInConsumeDefaults(consumeOptions ConsumeOptions) ConsumeOptions { | |||
| defaults := getDefaultConsumeOptions() | |||
| if consumeOptions.QosOptions.Concurrency < 1 { | |||
| consumeOptions.QosOptions.Concurrency = defaults.QosOptions.Concurrency | |||
| } | |||
| return consumeOptions | |||
| } | |||
| // StartConsumers starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency". | |||
| // Each goroutine spawns a handler that consumes off of the qiven queue which binds to the routing key(s). | |||
| // The provided handler is called once for each message. If the provided queue doesn't exist, it | |||
| // will be created on the cluster | |||
| func (consumer Consumer) StartConsumers( | |||
| handler func(d Delivery) bool, | |||
| consumeOptions *ConsumeOptions, | |||
| queue string, | |||
| routingKeys ...string, | |||
| ) error { | |||
| defaults := getDefaultConsumeOptions() | |||
| finalOptions := ConsumeOptions{} | |||
| if consumeOptions == nil { | |||
| finalOptions = defaults | |||
| } else { | |||
| finalOptions = fillInConsumeDefaults(*consumeOptions) | |||
| } | |||
| err := consumer.startGoroutines( | |||
| handler, | |||
| finalOptions, | |||
| queue, | |||
| routingKeys..., | |||
| ) | |||
| if err != nil { | |||
| return err | |||
| } | |||
| go func() { | |||
| for err := range consumer.chManager.notifyCancelOrClose { | |||
| consumer.logger.Printf("consume cancel/close handler triggered. err: %v", err) | |||
| consumer.startGoroutinesWithRetries( | |||
| handler, | |||
| finalOptions, | |||
| queue, | |||
| routingKeys..., | |||
| ) | |||
| } | |||
| }() | |||
| return nil | |||
| } | |||
| // startGoroutinesWithRetries attempts to start consuming on a channel | |||
| // with an exponential backoff | |||
| func (consumer Consumer) startGoroutinesWithRetries( | |||
| handler func(d Delivery) bool, | |||
| consumeOptions ConsumeOptions, | |||
| queue string, | |||
| routingKeys ...string, | |||
| ) { | |||
| backoffTime := time.Second | |||
| for { | |||
| consumer.logger.Printf("waiting %s seconds to attempt to start consumer goroutines", backoffTime) | |||
| time.Sleep(backoffTime) | |||
| backoffTime *= 2 | |||
| err := consumer.startGoroutines( | |||
| handler, | |||
| consumeOptions, | |||
| queue, | |||
| routingKeys..., | |||
| ) | |||
| if err != nil { | |||
| consumer.logger.Printf("couldn't start consumer goroutines. err: %v", err) | |||
| continue | |||
| } | |||
| break | |||
| } | |||
| } | |||
| // startGoroutines declares the queue if it doesn't exist, | |||
| // binds the queue to the routing key(s), and starts the goroutines | |||
| // that will consume from the queue | |||
| func (consumer Consumer) startGoroutines( | |||
| handler func(d Delivery) bool, | |||
| consumeOptions ConsumeOptions, | |||
| queue string, | |||
| routingKeys ...string, | |||
| ) error { | |||
| consumer.chManager.channelMux.RLock() | |||
| defer consumer.chManager.channelMux.RUnlock() | |||
| _, err := consumer.chManager.channel.QueueDeclare( | |||
| queue, | |||
| consumeOptions.QueueOptions.Durable, | |||
| consumeOptions.QueueOptions.AutoDelete, | |||
| consumeOptions.QueueOptions.Exclusive, | |||
| consumeOptions.QueueOptions.NoWait, | |||
| tableToAMQPTable(consumeOptions.QueueOptions.Args), | |||
| ) | |||
| if err != nil { | |||
| return err | |||
| } | |||
| for _, routingKey := range routingKeys { | |||
| err = consumer.chManager.channel.QueueBind( | |||
| queue, | |||
| routingKey, | |||
| consumeOptions.BindingOptions.Exchange, | |||
| consumeOptions.BindingOptions.NoWait, | |||
| tableToAMQPTable(consumeOptions.BindingOptions.Args), | |||
| ) | |||
| if err != nil { | |||
| return err | |||
| } | |||
| } | |||
| err = consumer.chManager.channel.Qos( | |||
| consumeOptions.QosOptions.Prefetch, | |||
| 0, | |||
| consumeOptions.QosOptions.Global, | |||
| ) | |||
| if err != nil { | |||
| return err | |||
| } | |||
| msgs, err := consumer.chManager.channel.Consume( | |||
| queue, | |||
| consumeOptions.ConsumerOptions.Name, | |||
| consumeOptions.ConsumerOptions.AutoAck, | |||
| consumeOptions.ConsumerOptions.Exclusive, | |||
| consumeOptions.ConsumerOptions.NoLocal, // no-local is not supported by RabbitMQ | |||
| consumeOptions.ConsumerOptions.NoWait, | |||
| tableToAMQPTable(consumeOptions.ConsumerOptions.Args), | |||
| ) | |||
| if err != nil { | |||
| return err | |||
| } | |||
| for i := 0; i < consumeOptions.QosOptions.Concurrency; i++ { | |||
| go func() { | |||
| for msg := range msgs { | |||
| if consumeOptions.ConsumerOptions.AutoAck { | |||
| handler(Delivery{msg}) | |||
| continue | |||
| } | |||
| if handler(Delivery{msg}) { | |||
| msg.Ack(false) | |||
| } else { | |||
| msg.Nack(false, true) | |||
| } | |||
| } | |||
| log.Println("rabbit consumer goroutine closed") | |||
| }() | |||
| } | |||
| log.Printf("Processing messages on %v goroutines", consumeOptions.QosOptions.Concurrency) | |||
| return nil | |||
| } | |||
| @ -0,0 +1,41 @@ | |||
| package main | |||
| import ( | |||
| "log" | |||
| rabbitmq "github.com/wagslane/go-rabbitmq" | |||
| ) | |||
| func main() { | |||
| consumer, err := rabbitmq.GetConsumer("amqp://user:pass@localhost", true) | |||
| if err != nil { | |||
| log.Fatal(err) | |||
| } | |||
| err = consumer.StartConsumers( | |||
| func(d rabbitmq.Delivery) bool { | |||
| log.Printf("consumed: %v", string(d.Body)) | |||
| // true to ACK, false to NACK | |||
| return true | |||
| }, | |||
| // can pass nil here for defaults | |||
| &rabbitmq.ConsumeOptions{ | |||
| QueueOptions: rabbitmq.QueueOptions{ | |||
| Durable: true, | |||
| }, | |||
| QosOptions: rabbitmq.QosOptions{ | |||
| Concurrency: 10, | |||
| Prefetch: 100, | |||
| }, | |||
| }, | |||
| "my_queue", | |||
| "routing_key1", "routing_key2", | |||
| ) | |||
| if err != nil { | |||
| log.Fatal(err) | |||
| } | |||
| // block main thread so consumers run forever | |||
| forever := make(chan struct{}) | |||
| <-forever | |||
| } | |||
| @ -0,0 +1,32 @@ | |||
| package main | |||
| import ( | |||
| "log" | |||
| rabbitmq "github.com/wagslane/go-rabbitmq" | |||
| ) | |||
| func main() { | |||
| publisher, returns, err := rabbitmq.GetPublisher("amqp://user:pass@localhost", true) | |||
| if err != nil { | |||
| log.Fatal(err) | |||
| } | |||
| err = publisher.Publish( | |||
| []byte("hello, world"), | |||
| // leave nil for defaults | |||
| &rabbitmq.PublishOptions{ | |||
| Exchange: "events", | |||
| Mandatory: true, | |||
| }, | |||
| "routing_key", | |||
| ) | |||
| if err != nil { | |||
| log.Fatal(err) | |||
| } | |||
| go func() { | |||
| for r := range returns { | |||
| log.Printf("message returned from server: %s", string(r.Body)) | |||
| } | |||
| }() | |||
| } | |||
| @ -0,0 +1,5 @@ | |||
| module github.com/wagslane/go-rabbitmq | |||
| go 1.16 | |||
| require github.com/streadway/amqp v1.0.0 | |||
| @ -0,0 +1,2 @@ | |||
| github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= | |||
| github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= | |||
| @ -0,0 +1,24 @@ | |||
| package rabbitmq | |||
| import ( | |||
| "fmt" | |||
| "log" | |||
| ) | |||
| type logger struct { | |||
| logging bool | |||
| } | |||
| const loggingPrefix = "gorabbit" | |||
| func (l logger) Printf(format string, v ...interface{}) { | |||
| if l.logging { | |||
| log.Printf(fmt.Sprintf("%s: %s", loggingPrefix, format), v...) | |||
| } | |||
| } | |||
| func (l logger) Println(v ...interface{}) { | |||
| if l.logging { | |||
| log.Println(loggingPrefix, fmt.Sprintf("%v", v...)) | |||
| } | |||
| } | |||
| @ -0,0 +1,159 @@ | |||
| package rabbitmq | |||
| import ( | |||
| "fmt" | |||
| "sync" | |||
| "github.com/streadway/amqp" | |||
| ) | |||
| // DeliveryMode. Transient means higher throughput but messages will not be | |||
| // restored on broker restart. The delivery mode of publishings is unrelated | |||
| // to the durability of the queues they reside on. Transient messages will | |||
| // not be restored to durable queues, persistent messages will be restored to | |||
| // durable queues and lost on non-durable queues during server restart. | |||
| // | |||
| // This remains typed as uint8 to match Publishing.DeliveryMode. Other | |||
| // delivery modes specific to custom queue implementations are not enumerated | |||
| // here. | |||
| const ( | |||
| Transient uint8 = amqp.Transient | |||
| Persistent uint8 = amqp.Persistent | |||
| ) | |||
| // Return captures a flattened struct of fields returned by the server when a | |||
| // Publishing is unable to be delivered either due to the `mandatory` flag set | |||
| // and no route found, or `immediate` flag set and no free consumer. | |||
| type Return struct { | |||
| amqp.Return | |||
| } | |||
| // PublishOptions are used to control how data is published | |||
| type PublishOptions struct { | |||
| Exchange string | |||
| // Mandatory fails to publish if there are no queues | |||
| // bound to the routing key | |||
| Mandatory bool | |||
| // Immediate fails to publish if there are no consumers | |||
| // that can ack bound to the queue on the routing key | |||
| Immediate bool | |||
| ContentType string | |||
| // Transient or Persistent | |||
| DeliveryMode uint8 | |||
| } | |||
| // Publisher allows you to publish messages safely across an open connection | |||
| type Publisher struct { | |||
| chManager *channelManager | |||
| notifyFlowChan chan bool | |||
| disablePublishDueToFlow bool | |||
| disablePublishDueToFlowMux *sync.RWMutex | |||
| logger logger | |||
| } | |||
| // GetPublisher returns a new publisher with an open channel to the cluster. | |||
| // If you plan to enforce mandatory or immediate publishing, those failures will be reported | |||
| // on the channel of Returns that you should setup a listener on. | |||
| // Flow controls are automatically handled as they are sent from the server, and publishing | |||
| // will fail with an error when the server is requesting a slowdown | |||
| func GetPublisher(url string, logging bool) (Publisher, <-chan Return, error) { | |||
| chManager, err := newChannelManager(url, logging) | |||
| if err != nil { | |||
| return Publisher{}, nil, err | |||
| } | |||
| publisher := Publisher{ | |||
| chManager: chManager, | |||
| notifyFlowChan: make(chan bool), | |||
| disablePublishDueToFlow: false, | |||
| disablePublishDueToFlowMux: &sync.RWMutex{}, | |||
| } | |||
| returnAMQPChan := make(chan amqp.Return) | |||
| returnChan := make(chan Return) | |||
| returnAMQPChan = publisher.chManager.channel.NotifyReturn(returnAMQPChan) | |||
| go func() { | |||
| for ret := range returnAMQPChan { | |||
| returnChan <- Return{ | |||
| ret, | |||
| } | |||
| } | |||
| }() | |||
| publisher.notifyFlowChan = publisher.chManager.channel.NotifyFlow(publisher.notifyFlowChan) | |||
| go publisher.startNotifyFlowHandler() | |||
| return publisher, returnChan, nil | |||
| } | |||
| // Publish publishes the provided data to the given routing keys over the connection | |||
| func (publisher *Publisher) Publish(data []byte, publishOptions *PublishOptions, routingKeys ...string) error { | |||
| publisher.disablePublishDueToFlowMux.RLock() | |||
| if publisher.disablePublishDueToFlow { | |||
| return fmt.Errorf("publishing blocked due to high flow on the server") | |||
| } | |||
| publisher.disablePublishDueToFlowMux.RUnlock() | |||
| defaults := getDefaultPublishOptions() | |||
| finalOptions := PublishOptions{} | |||
| if publishOptions == nil { | |||
| finalOptions = defaults | |||
| } else { | |||
| finalOptions = fillInPublishDefaults(*publishOptions) | |||
| } | |||
| for _, routingKey := range routingKeys { | |||
| err := publisher.chManager.channel.Publish( | |||
| finalOptions.Exchange, | |||
| routingKey, | |||
| finalOptions.Mandatory, | |||
| finalOptions.Immediate, | |||
| amqp.Publishing{ | |||
| ContentType: finalOptions.ContentType, | |||
| Body: data, | |||
| DeliveryMode: finalOptions.DeliveryMode, | |||
| }) | |||
| if err != nil { | |||
| return err | |||
| } | |||
| } | |||
| return nil | |||
| } | |||
| // getDefaultPublishOptions - | |||
| func getDefaultPublishOptions() PublishOptions { | |||
| return PublishOptions{ | |||
| Exchange: "", | |||
| Mandatory: false, | |||
| Immediate: false, | |||
| ContentType: "", | |||
| DeliveryMode: Transient, | |||
| } | |||
| } | |||
| func (publisher *Publisher) startNotifyFlowHandler() { | |||
| for ok := range publisher.notifyFlowChan { | |||
| publisher.disablePublishDueToFlowMux.Lock() | |||
| publisher.logger.Println("pausing publishing due to flow request from server") | |||
| if ok { | |||
| publisher.disablePublishDueToFlow = false | |||
| } else { | |||
| publisher.disablePublishDueToFlow = true | |||
| } | |||
| publisher.disablePublishDueToFlowMux.Unlock() | |||
| publisher.logger.Println("resuming publishing due to flow request from server") | |||
| } | |||
| } | |||
| // fillInPublishDefaults completes in any fields we're sure weren't set with their defaults | |||
| func fillInPublishDefaults(publishOptions PublishOptions) PublishOptions { | |||
| defaults := getDefaultPublishOptions() | |||
| if publishOptions.DeliveryMode == 0 { | |||
| publishOptions.DeliveryMode = defaults.DeliveryMode | |||
| } | |||
| return publishOptions | |||
| } | |||
| @ -0,0 +1,41 @@ | |||
| package rabbitmq | |||
| import "github.com/streadway/amqp" | |||
| // Table stores user supplied fields of the following types: | |||
| // | |||
| // bool | |||
| // byte | |||
| // float32 | |||
| // float64 | |||
| // int | |||
| // int16 | |||
| // int32 | |||
| // int64 | |||
| // nil | |||
| // string | |||
| // time.Time | |||
| // amqp.Decimal | |||
| // amqp.Table | |||
| // []byte | |||
| // []interface{} - containing above types | |||
| // | |||
| // Functions taking a table will immediately fail when the table contains a | |||
| // value of an unsupported type. | |||
| // | |||
| // The caller must be specific in which precision of integer it wishes to | |||
| // encode. | |||
| // | |||
| // Use a type assertion when reading values from a table for type conversion. | |||
| // | |||
| // RabbitMQ expects int32 for integer values. | |||
| // | |||
| type Table map[string]interface{} | |||
| func tableToAMQPTable(table Table) amqp.Table { | |||
| new := amqp.Table{} | |||
| for k, v := range table { | |||
| new[k] = v | |||
| } | |||
| return new | |||
| } | |||
| @ -0,0 +1,12 @@ | |||
| certs/* | |||
| spec/spec | |||
| examples/simple-consumer/simple-consumer | |||
| examples/simple-producer/simple-producer | |||
| .idea/**/workspace.xml | |||
| .idea/**/tasks.xml | |||
| .idea/**/usage.statistics.xml | |||
| .idea/**/dictionaries | |||
| .idea/**/shelf | |||
| .idea/**/contentModel.xml | |||
| @ -0,0 +1,25 @@ | |||
| language: go | |||
| go: | |||
| - 1.10.x | |||
| - 1.11.x | |||
| - 1.12.x | |||
| - 1.13.x | |||
| addons: | |||
| apt: | |||
| packages: | |||
| - rabbitmq-server | |||
| services: | |||
| - rabbitmq | |||
| env: | |||
| - GO111MODULE=on AMQP_URL=amqp://guest:guest@127.0.0.1:5672/ | |||
| before_install: | |||
| - go get -v golang.org/x/lint/golint | |||
| script: | |||
| - ./pre-commit | |||
| - go test -cpu=1,2 -v -tags integration ./... | |||
| @ -0,0 +1,35 @@ | |||
| ## Prequisites | |||
| 1. Go: [https://golang.org/dl/](https://golang.org/dl/) | |||
| 1. Golint `go get -u -v github.com/golang/lint/golint` | |||
| ## Contributing | |||
| The workflow is pretty standard: | |||
| 1. Fork github.com/streadway/amqp | |||
| 1. Add the pre-commit hook: `ln -s ../../pre-commit .git/hooks/pre-commit` | |||
| 1. Create your feature branch (`git checkout -b my-new-feature`) | |||
| 1. Run integration tests (see below) | |||
| 1. **Implement tests** | |||
| 1. Implement fixs | |||
| 1. Commit your changes (`git commit -am 'Add some feature'`) | |||
| 1. Push to a branch (`git push -u origin my-new-feature`) | |||
| 1. Submit a pull request | |||
| ## Running Tests | |||
| The test suite assumes that: | |||
| * A RabbitMQ node is running on localhost with all defaults: [https://www.rabbitmq.com/download.html](https://www.rabbitmq.com/download.html) | |||
| * `AMQP_URL` is exported to `amqp://guest:guest@127.0.0.1:5672/` | |||
| ### Integration Tests | |||
| After starting a local RabbitMQ, run integration tests with the following: | |||
| env AMQP_URL=amqp://guest:guest@127.0.0.1:5672/ go test -v -cpu 2 -tags integration -race | |||
| All integration tests should use the `integrationConnection(...)` test | |||
| helpers defined in `integration_test.go` to setup the integration environment | |||
| and logging. | |||
| @ -0,0 +1,23 @@ | |||
| Copyright (c) 2012-2019, Sean Treadway, SoundCloud Ltd. | |||
| All rights reserved. | |||
| Redistribution and use in source and binary forms, with or without | |||
| modification, are permitted provided that the following conditions are met: | |||
| Redistributions of source code must retain the above copyright notice, this | |||
| list of conditions and the following disclaimer. | |||
| Redistributions in binary form must reproduce the above copyright notice, this | |||
| list of conditions and the following disclaimer in the documentation and/or | |||
| other materials provided with the distribution. | |||
| THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND | |||
| ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED | |||
| WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | |||
| DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE | |||
| FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | |||
| DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR | |||
| SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER | |||
| CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, | |||
| OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | |||
| OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |||
| @ -0,0 +1,93 @@ | |||
| [](http://travis-ci.org/streadway/amqp) [](http://godoc.org/github.com/streadway/amqp) | |||
| # Go RabbitMQ Client Library | |||
| This is an AMQP 0.9.1 client with RabbitMQ extensions in Go. | |||
| ## Project Maturity | |||
| This project has been used in production systems for many years. It is reasonably mature | |||
| and feature complete, and as of November 2016 has [a team of maintainers](https://github.com/streadway/amqp/issues/215). | |||
| Future API changes are unlikely but possible. They will be discussed on [Github | |||
| issues](https://github.com/streadway/amqp/issues) along with any bugs or | |||
| enhancements. | |||
| ## Supported Go Versions | |||
| This library supports two most recent Go release series, currently 1.10 and 1.11. | |||
| ## Supported RabbitMQ Versions | |||
| This project supports RabbitMQ versions starting with `2.0` but primarily tested | |||
| against reasonably recent `3.x` releases. Some features and behaviours may be | |||
| server version-specific. | |||
| ## Goals | |||
| Provide a functional interface that closely represents the AMQP 0.9.1 model | |||
| targeted to RabbitMQ as a server. This includes the minimum necessary to | |||
| interact the semantics of the protocol. | |||
| ## Non-goals | |||
| Things not intended to be supported. | |||
| * Auto reconnect and re-synchronization of client and server topologies. | |||
| * Reconnection would require understanding the error paths when the | |||
| topology cannot be declared on reconnect. This would require a new set | |||
| of types and code paths that are best suited at the call-site of this | |||
| package. AMQP has a dynamic topology that needs all peers to agree. If | |||
| this doesn't happen, the behavior is undefined. Instead of producing a | |||
| possible interface with undefined behavior, this package is designed to | |||
| be simple for the caller to implement the necessary connection-time | |||
| topology declaration so that reconnection is trivial and encapsulated in | |||
| the caller's application code. | |||
| * AMQP Protocol negotiation for forward or backward compatibility. | |||
| * 0.9.1 is stable and widely deployed. Versions 0.10 and 1.0 are divergent | |||
| specifications that change the semantics and wire format of the protocol. | |||
| We will accept patches for other protocol support but have no plans for | |||
| implementation ourselves. | |||
| * Anything other than PLAIN and EXTERNAL authentication mechanisms. | |||
| * Keeping the mechanisms interface modular makes it possible to extend | |||
| outside of this package. If other mechanisms prove to be popular, then | |||
| we would accept patches to include them in this package. | |||
| ## Usage | |||
| See the 'examples' subdirectory for simple producers and consumers executables. | |||
| If you have a use-case in mind which isn't well-represented by the examples, | |||
| please file an issue. | |||
| ## Documentation | |||
| Use [Godoc documentation](http://godoc.org/github.com/streadway/amqp) for | |||
| reference and usage. | |||
| [RabbitMQ tutorials in | |||
| Go](https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/go) are also | |||
| available. | |||
| ## Contributing | |||
| Pull requests are very much welcomed. Create your pull request on a non-master | |||
| branch, make sure a test or example is included that covers your change and | |||
| your commits represent coherent changes that include a reason for the change. | |||
| To run the integration tests, make sure you have RabbitMQ running on any host, | |||
| export the environment variable `AMQP_URL=amqp://host/` and run `go test -tags | |||
| integration`. TravisCI will also run the integration tests. | |||
| Thanks to the [community of contributors](https://github.com/streadway/amqp/graphs/contributors). | |||
| ## External packages | |||
| * [Google App Engine Dialer support](https://github.com/soundtrackyourbrand/gaeamqp) | |||
| * [RabbitMQ examples in Go](https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/go) | |||
| ## License | |||
| BSD 2 clause - see LICENSE for more details. | |||
| @ -0,0 +1,106 @@ | |||
| package amqp | |||
| import ( | |||
| "bytes" | |||
| "fmt" | |||
| "math/big" | |||
| ) | |||
| const ( | |||
| free = 0 | |||
| allocated = 1 | |||
| ) | |||
| // allocator maintains a bitset of allocated numbers. | |||
| type allocator struct { | |||
| pool *big.Int | |||
| last int | |||
| low int | |||
| high int | |||
| } | |||
| // NewAllocator reserves and frees integers out of a range between low and | |||
| // high. | |||
| // | |||
| // O(N) worst case space used, where N is maximum allocated, divided by | |||
| // sizeof(big.Word) | |||
| func newAllocator(low, high int) *allocator { | |||
| return &allocator{ | |||
| pool: big.NewInt(0), | |||
| last: low, | |||
| low: low, | |||
| high: high, | |||
| } | |||
| } | |||
| // String returns a string describing the contents of the allocator like | |||
| // "allocator[low..high] reserved..until" | |||
| // | |||
| // O(N) where N is high-low | |||
| func (a allocator) String() string { | |||
| b := &bytes.Buffer{} | |||
| fmt.Fprintf(b, "allocator[%d..%d]", a.low, a.high) | |||
| for low := a.low; low <= a.high; low++ { | |||
| high := low | |||
| for a.reserved(high) && high <= a.high { | |||
| high++ | |||
| } | |||
| if high > low+1 { | |||
| fmt.Fprintf(b, " %d..%d", low, high-1) | |||
| } else if high > low { | |||
| fmt.Fprintf(b, " %d", high-1) | |||
| } | |||
| low = high | |||
| } | |||
| return b.String() | |||
| } | |||
| // Next reserves and returns the next available number out of the range between | |||
| // low and high. If no number is available, false is returned. | |||
| // | |||
| // O(N) worst case runtime where N is allocated, but usually O(1) due to a | |||
| // rolling index into the oldest allocation. | |||
| func (a *allocator) next() (int, bool) { | |||
| wrapped := a.last | |||
| // Find trailing bit | |||
| for ; a.last <= a.high; a.last++ { | |||
| if a.reserve(a.last) { | |||
| return a.last, true | |||
| } | |||
| } | |||
| // Find preceding free'd pool | |||
| a.last = a.low | |||
| for ; a.last < wrapped; a.last++ { | |||
| if a.reserve(a.last) { | |||
| return a.last, true | |||
| } | |||
| } | |||
| return 0, false | |||
| } | |||
| // reserve claims the bit if it is not already claimed, returning true if | |||
| // successfully claimed. | |||
| func (a *allocator) reserve(n int) bool { | |||
| if a.reserved(n) { | |||
| return false | |||
| } | |||
| a.pool.SetBit(a.pool, n-a.low, allocated) | |||
| return true | |||
| } | |||
| // reserved returns true if the integer has been allocated | |||
| func (a *allocator) reserved(n int) bool { | |||
| return a.pool.Bit(n-a.low) == allocated | |||
| } | |||
| // release frees the use of the number for another allocation | |||
| func (a *allocator) release(n int) { | |||
| a.pool.SetBit(a.pool, n-a.low, free) | |||
| } | |||
| @ -0,0 +1,62 @@ | |||
| // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. | |||
| // Use of this source code is governed by a BSD-style | |||
| // license that can be found in the LICENSE file. | |||
| // Source code and contact info at http://github.com/streadway/amqp | |||
| package amqp | |||
| import ( | |||
| "fmt" | |||
| ) | |||
| // Authentication interface provides a means for different SASL authentication | |||
| // mechanisms to be used during connection tuning. | |||
| type Authentication interface { | |||
| Mechanism() string | |||
| Response() string | |||
| } | |||
| // PlainAuth is a similar to Basic Auth in HTTP. | |||
| type PlainAuth struct { | |||
| Username string | |||
| Password string | |||
| } | |||
| // Mechanism returns "PLAIN" | |||
| func (auth *PlainAuth) Mechanism() string { | |||
| return "PLAIN" | |||
| } | |||
| // Response returns the null character delimited encoding for the SASL PLAIN Mechanism. | |||
| func (auth *PlainAuth) Response() string { | |||
| return fmt.Sprintf("\000%s\000%s", auth.Username, auth.Password) | |||
| } | |||
| // AMQPlainAuth is similar to PlainAuth | |||
| type AMQPlainAuth struct { | |||
| Username string | |||
| Password string | |||
| } | |||
| // Mechanism returns "AMQPLAIN" | |||
| func (auth *AMQPlainAuth) Mechanism() string { | |||
| return "AMQPLAIN" | |||
| } | |||
| // Response returns the null character delimited encoding for the SASL PLAIN Mechanism. | |||
| func (auth *AMQPlainAuth) Response() string { | |||
| return fmt.Sprintf("LOGIN:%sPASSWORD:%s", auth.Username, auth.Password) | |||
| } | |||
| // Finds the first mechanism preferred by the client that the server supports. | |||
| func pickSASLMechanism(client []Authentication, serverMechanisms []string) (auth Authentication, ok bool) { | |||
| for _, auth = range client { | |||
| for _, mech := range serverMechanisms { | |||
| if auth.Mechanism() == mech { | |||
| return auth, true | |||
| } | |||
| } | |||
| } | |||
| return | |||
| } | |||
| @ -0,0 +1,159 @@ | |||
| #!/bin/sh | |||
| # | |||
| # Creates the CA, server and client certs to be used by tls_test.go | |||
| # http://www.rabbitmq.com/ssl.html | |||
| # | |||
| # Copy stdout into the const section of tls_test.go or use for RabbitMQ | |||
| # | |||
| root=$PWD/certs | |||
| if [ -f $root/ca/serial ]; then | |||
| echo >&2 "Previous installation found" | |||
| echo >&2 "Remove $root/ca and rerun to overwrite" | |||
| exit 1 | |||
| fi | |||
| mkdir -p $root/ca/private | |||
| mkdir -p $root/ca/certs | |||
| mkdir -p $root/server | |||
| mkdir -p $root/client | |||
| cd $root/ca | |||
| chmod 700 private | |||
| touch index.txt | |||
| echo 'unique_subject = no' > index.txt.attr | |||
| echo '01' > serial | |||
| echo >openssl.cnf ' | |||
| [ ca ] | |||
| default_ca = testca | |||
| [ testca ] | |||
| dir = . | |||
| certificate = $dir/cacert.pem | |||
| database = $dir/index.txt | |||
| new_certs_dir = $dir/certs | |||
| private_key = $dir/private/cakey.pem | |||
| serial = $dir/serial | |||
| default_crl_days = 7 | |||
| default_days = 3650 | |||
| default_md = sha1 | |||
| policy = testca_policy | |||
| x509_extensions = certificate_extensions | |||
| [ testca_policy ] | |||
| commonName = supplied | |||
| stateOrProvinceName = optional | |||
| countryName = optional | |||
| emailAddress = optional | |||
| organizationName = optional | |||
| organizationalUnitName = optional | |||
| [ certificate_extensions ] | |||
| basicConstraints = CA:false | |||
| [ req ] | |||
| default_bits = 2048 | |||
| default_keyfile = ./private/cakey.pem | |||
| default_md = sha1 | |||
| prompt = yes | |||
| distinguished_name = root_ca_distinguished_name | |||
| x509_extensions = root_ca_extensions | |||
| [ root_ca_distinguished_name ] | |||
| commonName = hostname | |||
| [ root_ca_extensions ] | |||
| basicConstraints = CA:true | |||
| keyUsage = keyCertSign, cRLSign | |||
| [ client_ca_extensions ] | |||
| basicConstraints = CA:false | |||
| keyUsage = digitalSignature | |||
| extendedKeyUsage = 1.3.6.1.5.5.7.3.2 | |||
| [ server_ca_extensions ] | |||
| basicConstraints = CA:false | |||
| keyUsage = keyEncipherment | |||
| extendedKeyUsage = 1.3.6.1.5.5.7.3.1 | |||
| subjectAltName = @alt_names | |||
| [ alt_names ] | |||
| IP.1 = 127.0.0.1 | |||
| ' | |||
| openssl req \ | |||
| -x509 \ | |||
| -nodes \ | |||
| -config openssl.cnf \ | |||
| -newkey rsa:2048 \ | |||
| -days 3650 \ | |||
| -subj "/CN=MyTestCA/" \ | |||
| -out cacert.pem \ | |||
| -outform PEM | |||
| openssl x509 \ | |||
| -in cacert.pem \ | |||
| -out cacert.cer \ | |||
| -outform DER | |||
| openssl genrsa -out $root/server/key.pem 2048 | |||
| openssl genrsa -out $root/client/key.pem 2048 | |||
| openssl req \ | |||
| -new \ | |||
| -nodes \ | |||
| -config openssl.cnf \ | |||
| -subj "/CN=127.0.0.1/O=server/" \ | |||
| -key $root/server/key.pem \ | |||
| -out $root/server/req.pem \ | |||
| -outform PEM | |||
| openssl req \ | |||
| -new \ | |||
| -nodes \ | |||
| -config openssl.cnf \ | |||
| -subj "/CN=127.0.0.1/O=client/" \ | |||
| -key $root/client/key.pem \ | |||
| -out $root/client/req.pem \ | |||
| -outform PEM | |||
| openssl ca \ | |||
| -config openssl.cnf \ | |||
| -in $root/server/req.pem \ | |||
| -out $root/server/cert.pem \ | |||
| -notext \ | |||
| -batch \ | |||
| -extensions server_ca_extensions | |||
| openssl ca \ | |||
| -config openssl.cnf \ | |||
| -in $root/client/req.pem \ | |||
| -out $root/client/cert.pem \ | |||
| -notext \ | |||
| -batch \ | |||
| -extensions client_ca_extensions | |||
| cat <<-END | |||
| const caCert = \` | |||
| `cat $root/ca/cacert.pem` | |||
| \` | |||
| const serverCert = \` | |||
| `cat $root/server/cert.pem` | |||
| \` | |||
| const serverKey = \` | |||
| `cat $root/server/key.pem` | |||
| \` | |||
| const clientCert = \` | |||
| `cat $root/client/cert.pem` | |||
| \` | |||
| const clientKey = \` | |||
| `cat $root/client/key.pem` | |||
| \` | |||
| END | |||
| @ -0,0 +1,94 @@ | |||
| package amqp | |||
| import "sync" | |||
| // confirms resequences and notifies one or multiple publisher confirmation listeners | |||
| type confirms struct { | |||
| m sync.Mutex | |||
| listeners []chan Confirmation | |||
| sequencer map[uint64]Confirmation | |||
| published uint64 | |||
| expecting uint64 | |||
| } | |||
| // newConfirms allocates a confirms | |||
| func newConfirms() *confirms { | |||
| return &confirms{ | |||
| sequencer: map[uint64]Confirmation{}, | |||
| published: 0, | |||
| expecting: 1, | |||
| } | |||
| } | |||
| func (c *confirms) Listen(l chan Confirmation) { | |||
| c.m.Lock() | |||
| defer c.m.Unlock() | |||
| c.listeners = append(c.listeners, l) | |||
| } | |||
| // publish increments the publishing counter | |||
| func (c *confirms) Publish() uint64 { | |||
| c.m.Lock() | |||
| defer c.m.Unlock() | |||
| c.published++ | |||
| return c.published | |||
| } | |||
| // confirm confirms one publishing, increments the expecting delivery tag, and | |||
| // removes bookkeeping for that delivery tag. | |||
| func (c *confirms) confirm(confirmation Confirmation) { | |||
| delete(c.sequencer, c.expecting) | |||
| c.expecting++ | |||
| for _, l := range c.listeners { | |||
| l <- confirmation | |||
| } | |||
| } | |||
| // resequence confirms any out of order delivered confirmations | |||
| func (c *confirms) resequence() { | |||
| for c.expecting <= c.published { | |||
| sequenced, found := c.sequencer[c.expecting] | |||
| if !found { | |||
| return | |||
| } | |||
| c.confirm(sequenced) | |||
| } | |||
| } | |||
| // one confirms one publishing and all following in the publishing sequence | |||
| func (c *confirms) One(confirmed Confirmation) { | |||
| c.m.Lock() | |||
| defer c.m.Unlock() | |||
| if c.expecting == confirmed.DeliveryTag { | |||
| c.confirm(confirmed) | |||
| } else { | |||
| c.sequencer[confirmed.DeliveryTag] = confirmed | |||
| } | |||
| c.resequence() | |||
| } | |||
| // multiple confirms all publishings up until the delivery tag | |||
| func (c *confirms) Multiple(confirmed Confirmation) { | |||
| c.m.Lock() | |||
| defer c.m.Unlock() | |||
| for c.expecting <= confirmed.DeliveryTag { | |||
| c.confirm(Confirmation{c.expecting, confirmed.Ack}) | |||
| } | |||
| c.resequence() | |||
| } | |||
| // Close closes all listeners, discarding any out of sequence confirmations | |||
| func (c *confirms) Close() error { | |||
| c.m.Lock() | |||
| defer c.m.Unlock() | |||
| for _, l := range c.listeners { | |||
| close(l) | |||
| } | |||
| c.listeners = nil | |||
| return nil | |||
| } | |||
| @ -0,0 +1,847 @@ | |||
| // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. | |||
| // Use of this source code is governed by a BSD-style | |||
| // license that can be found in the LICENSE file. | |||
| // Source code and contact info at http://github.com/streadway/amqp | |||
| package amqp | |||
| import ( | |||
| "bufio" | |||
| "crypto/tls" | |||
| "io" | |||
| "net" | |||
| "reflect" | |||
| "strconv" | |||
| "strings" | |||
| "sync" | |||
| "sync/atomic" | |||
| "time" | |||
| ) | |||
| const ( | |||
| maxChannelMax = (2 << 15) - 1 | |||
| defaultHeartbeat = 10 * time.Second | |||
| defaultConnectionTimeout = 30 * time.Second | |||
| defaultProduct = "https://github.com/streadway/amqp" | |||
| defaultVersion = "β" | |||
| // Safer default that makes channel leaks a lot easier to spot | |||
| // before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593. | |||
| defaultChannelMax = (2 << 10) - 1 | |||
| defaultLocale = "en_US" | |||
| ) | |||
| // Config is used in DialConfig and Open to specify the desired tuning | |||
| // parameters used during a connection open handshake. The negotiated tuning | |||
| // will be stored in the returned connection's Config field. | |||
| type Config struct { | |||
| // The SASL mechanisms to try in the client request, and the successful | |||
| // mechanism used on the Connection object. | |||
| // If SASL is nil, PlainAuth from the URL is used. | |||
| SASL []Authentication | |||
| // Vhost specifies the namespace of permissions, exchanges, queues and | |||
| // bindings on the server. Dial sets this to the path parsed from the URL. | |||
| Vhost string | |||
| ChannelMax int // 0 max channels means 2^16 - 1 | |||
| FrameSize int // 0 max bytes means unlimited | |||
| Heartbeat time.Duration // less than 1s uses the server's interval | |||
| // TLSClientConfig specifies the client configuration of the TLS connection | |||
| // when establishing a tls transport. | |||
| // If the URL uses an amqps scheme, then an empty tls.Config with the | |||
| // ServerName from the URL is used. | |||
| TLSClientConfig *tls.Config | |||
| // Properties is table of properties that the client advertises to the server. | |||
| // This is an optional setting - if the application does not set this, | |||
| // the underlying library will use a generic set of client properties. | |||
| Properties Table | |||
| // Connection locale that we expect to always be en_US | |||
| // Even though servers must return it as per the AMQP 0-9-1 spec, | |||
| // we are not aware of it being used other than to satisfy the spec requirements | |||
| Locale string | |||
| // Dial returns a net.Conn prepared for a TLS handshake with TSLClientConfig, | |||
| // then an AMQP connection handshake. | |||
| // If Dial is nil, net.DialTimeout with a 30s connection and 30s deadline is | |||
| // used during TLS and AMQP handshaking. | |||
| Dial func(network, addr string) (net.Conn, error) | |||
| } | |||
| // Connection manages the serialization and deserialization of frames from IO | |||
| // and dispatches the frames to the appropriate channel. All RPC methods and | |||
| // asynchronous Publishing, Delivery, Ack, Nack and Return messages are | |||
| // multiplexed on this channel. There must always be active receivers for | |||
| // every asynchronous message on this connection. | |||
| type Connection struct { | |||
| destructor sync.Once // shutdown once | |||
| sendM sync.Mutex // conn writer mutex | |||
| m sync.Mutex // struct field mutex | |||
| conn io.ReadWriteCloser | |||
| rpc chan message | |||
| writer *writer | |||
| sends chan time.Time // timestamps of each frame sent | |||
| deadlines chan readDeadliner // heartbeater updates read deadlines | |||
| allocator *allocator // id generator valid after openTune | |||
| channels map[uint16]*Channel | |||
| noNotify bool // true when we will never notify again | |||
| closes []chan *Error | |||
| blocks []chan Blocking | |||
| errors chan *Error | |||
| Config Config // The negotiated Config after connection.open | |||
| Major int // Server's major version | |||
| Minor int // Server's minor version | |||
| Properties Table // Server properties | |||
| Locales []string // Server locales | |||
| closed int32 // Will be 1 if the connection is closed, 0 otherwise. Should only be accessed as atomic | |||
| } | |||
| type readDeadliner interface { | |||
| SetReadDeadline(time.Time) error | |||
| } | |||
| // DefaultDial establishes a connection when config.Dial is not provided | |||
| func DefaultDial(connectionTimeout time.Duration) func(network, addr string) (net.Conn, error) { | |||
| return func(network, addr string) (net.Conn, error) { | |||
| conn, err := net.DialTimeout(network, addr, connectionTimeout) | |||
| if err != nil { | |||
| return nil, err | |||
| } | |||
| // Heartbeating hasn't started yet, don't stall forever on a dead server. | |||
| // A deadline is set for TLS and AMQP handshaking. After AMQP is established, | |||
| // the deadline is cleared in openComplete. | |||
| if err := conn.SetDeadline(time.Now().Add(connectionTimeout)); err != nil { | |||
| return nil, err | |||
| } | |||
| return conn, nil | |||
| } | |||
| } | |||
| // Dial accepts a string in the AMQP URI format and returns a new Connection | |||
| // over TCP using PlainAuth. Defaults to a server heartbeat interval of 10 | |||
| // seconds and sets the handshake deadline to 30 seconds. After handshake, | |||
| // deadlines are cleared. | |||
| // | |||
| // Dial uses the zero value of tls.Config when it encounters an amqps:// | |||
| // scheme. It is equivalent to calling DialTLS(amqp, nil). | |||
| func Dial(url string) (*Connection, error) { | |||
| return DialConfig(url, Config{ | |||
| Heartbeat: defaultHeartbeat, | |||
| Locale: defaultLocale, | |||
| }) | |||
| } | |||
| // DialTLS accepts a string in the AMQP URI format and returns a new Connection | |||
| // over TCP using PlainAuth. Defaults to a server heartbeat interval of 10 | |||
| // seconds and sets the initial read deadline to 30 seconds. | |||
| // | |||
| // DialTLS uses the provided tls.Config when encountering an amqps:// scheme. | |||
| func DialTLS(url string, amqps *tls.Config) (*Connection, error) { | |||
| return DialConfig(url, Config{ | |||
| Heartbeat: defaultHeartbeat, | |||
| TLSClientConfig: amqps, | |||
| Locale: defaultLocale, | |||
| }) | |||
| } | |||
| // DialConfig accepts a string in the AMQP URI format and a configuration for | |||
| // the transport and connection setup, returning a new Connection. Defaults to | |||
| // a server heartbeat interval of 10 seconds and sets the initial read deadline | |||
| // to 30 seconds. | |||
| func DialConfig(url string, config Config) (*Connection, error) { | |||
| var err error | |||
| var conn net.Conn | |||
| uri, err := ParseURI(url) | |||
| if err != nil { | |||
| return nil, err | |||
| } | |||
| if config.SASL == nil { | |||
| config.SASL = []Authentication{uri.PlainAuth()} | |||
| } | |||
| if config.Vhost == "" { | |||
| config.Vhost = uri.Vhost | |||
| } | |||
| addr := net.JoinHostPort(uri.Host, strconv.FormatInt(int64(uri.Port), 10)) | |||
| dialer := config.Dial | |||
| if dialer == nil { | |||
| dialer = DefaultDial(defaultConnectionTimeout) | |||
| } | |||
| conn, err = dialer("tcp", addr) | |||
| if err != nil { | |||
| return nil, err | |||
| } | |||
| if uri.Scheme == "amqps" { | |||
| if config.TLSClientConfig == nil { | |||
| config.TLSClientConfig = new(tls.Config) | |||
| } | |||
| // If ServerName has not been specified in TLSClientConfig, | |||
| // set it to the URI host used for this connection. | |||
| if config.TLSClientConfig.ServerName == "" { | |||
| config.TLSClientConfig.ServerName = uri.Host | |||
| } | |||
| client := tls.Client(conn, config.TLSClientConfig) | |||
| if err := client.Handshake(); err != nil { | |||
| conn.Close() | |||
| return nil, err | |||
| } | |||
| conn = client | |||
| } | |||
| return Open(conn, config) | |||
| } | |||
| /* | |||
| Open accepts an already established connection, or other io.ReadWriteCloser as | |||
| a transport. Use this method if you have established a TLS connection or wish | |||
| to use your own custom transport. | |||
| */ | |||
| func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) { | |||
| c := &Connection{ | |||
| conn: conn, | |||
| writer: &writer{bufio.NewWriter(conn)}, | |||
| channels: make(map[uint16]*Channel), | |||
| rpc: make(chan message), | |||
| sends: make(chan time.Time), | |||
| errors: make(chan *Error, 1), | |||
| deadlines: make(chan readDeadliner, 1), | |||
| } | |||
| go c.reader(conn) | |||
| return c, c.open(config) | |||
| } | |||
| /* | |||
| LocalAddr returns the local TCP peer address, or ":0" (the zero value of net.TCPAddr) | |||
| as a fallback default value if the underlying transport does not support LocalAddr(). | |||
| */ | |||
| func (c *Connection) LocalAddr() net.Addr { | |||
| if conn, ok := c.conn.(interface { | |||
| LocalAddr() net.Addr | |||
| }); ok { | |||
| return conn.LocalAddr() | |||
| } | |||
| return &net.TCPAddr{} | |||
| } | |||
| // ConnectionState returns basic TLS details of the underlying transport. | |||
| // Returns a zero value when the underlying connection does not implement | |||
| // ConnectionState() tls.ConnectionState. | |||
| func (c *Connection) ConnectionState() tls.ConnectionState { | |||
| if conn, ok := c.conn.(interface { | |||
| ConnectionState() tls.ConnectionState | |||
| }); ok { | |||
| return conn.ConnectionState() | |||
| } | |||
| return tls.ConnectionState{} | |||
| } | |||
| /* | |||
| NotifyClose registers a listener for close events either initiated by an error | |||
| accompanying a connection.close method or by a normal shutdown. | |||
| On normal shutdowns, the chan will be closed. | |||
| To reconnect after a transport or protocol error, register a listener here and | |||
| re-run your setup process. | |||
| */ | |||
| func (c *Connection) NotifyClose(receiver chan *Error) chan *Error { | |||
| c.m.Lock() | |||
| defer c.m.Unlock() | |||
| if c.noNotify { | |||
| close(receiver) | |||
| } else { | |||
| c.closes = append(c.closes, receiver) | |||
| } | |||
| return receiver | |||
| } | |||
| /* | |||
| NotifyBlocked registers a listener for RabbitMQ specific TCP flow control | |||
| method extensions connection.blocked and connection.unblocked. Flow control is | |||
| active with a reason when Blocking.Blocked is true. When a Connection is | |||
| blocked, all methods will block across all connections until server resources | |||
| become free again. | |||
| This optional extension is supported by the server when the | |||
| "connection.blocked" server capability key is true. | |||
| */ | |||
| func (c *Connection) NotifyBlocked(receiver chan Blocking) chan Blocking { | |||
| c.m.Lock() | |||
| defer c.m.Unlock() | |||
| if c.noNotify { | |||
| close(receiver) | |||
| } else { | |||
| c.blocks = append(c.blocks, receiver) | |||
| } | |||
| return receiver | |||
| } | |||
| /* | |||
| Close requests and waits for the response to close the AMQP connection. | |||
| It's advisable to use this message when publishing to ensure all kernel buffers | |||
| have been flushed on the server and client before exiting. | |||
| An error indicates that server may not have received this request to close but | |||
| the connection should be treated as closed regardless. | |||
| After returning from this call, all resources associated with this connection, | |||
| including the underlying io, Channels, Notify listeners and Channel consumers | |||
| will also be closed. | |||
| */ | |||
| func (c *Connection) Close() error { | |||
| if c.IsClosed() { | |||
| return ErrClosed | |||
| } | |||
| defer c.shutdown(nil) | |||
| return c.call( | |||
| &connectionClose{ | |||
| ReplyCode: replySuccess, | |||
| ReplyText: "kthxbai", | |||
| }, | |||
| &connectionCloseOk{}, | |||
| ) | |||
| } | |||
| func (c *Connection) closeWith(err *Error) error { | |||
| if c.IsClosed() { | |||
| return ErrClosed | |||
| } | |||
| defer c.shutdown(err) | |||
| return c.call( | |||
| &connectionClose{ | |||
| ReplyCode: uint16(err.Code), | |||
| ReplyText: err.Reason, | |||
| }, | |||
| &connectionCloseOk{}, | |||
| ) | |||
| } | |||
| // IsClosed returns true if the connection is marked as closed, otherwise false | |||
| // is returned. | |||
| func (c *Connection) IsClosed() bool { | |||
| return (atomic.LoadInt32(&c.closed) == 1) | |||
| } | |||
| func (c *Connection) send(f frame) error { | |||
| if c.IsClosed() { | |||
| return ErrClosed | |||
| } | |||
| c.sendM.Lock() | |||
| err := c.writer.WriteFrame(f) | |||
| c.sendM.Unlock() | |||
| if err != nil { | |||
| // shutdown could be re-entrant from signaling notify chans | |||
| go c.shutdown(&Error{ | |||
| Code: FrameError, | |||
| Reason: err.Error(), | |||
| }) | |||
| } else { | |||
| // Broadcast we sent a frame, reducing heartbeats, only | |||
| // if there is something that can receive - like a non-reentrant | |||
| // call or if the heartbeater isn't running | |||
| select { | |||
| case c.sends <- time.Now(): | |||
| default: | |||
| } | |||
| } | |||
| return err | |||
| } | |||
| func (c *Connection) shutdown(err *Error) { | |||
| atomic.StoreInt32(&c.closed, 1) | |||
| c.destructor.Do(func() { | |||
| c.m.Lock() | |||
| defer c.m.Unlock() | |||
| if err != nil { | |||
| for _, c := range c.closes { | |||
| c <- err | |||
| } | |||
| } | |||
| if err != nil { | |||
| c.errors <- err | |||
| } | |||
| // Shutdown handler goroutine can still receive the result. | |||
| close(c.errors) | |||
| for _, c := range c.closes { | |||
| close(c) | |||
| } | |||
| for _, c := range c.blocks { | |||
| close(c) | |||
| } | |||
| // Shutdown the channel, but do not use closeChannel() as it calls | |||
| // releaseChannel() which requires the connection lock. | |||
| // | |||
| // Ranging over c.channels and calling releaseChannel() that mutates | |||
| // c.channels is racy - see commit 6063341 for an example. | |||
| for _, ch := range c.channels { | |||
| ch.shutdown(err) | |||
| } | |||
| c.conn.Close() | |||
| c.channels = map[uint16]*Channel{} | |||
| c.allocator = newAllocator(1, c.Config.ChannelMax) | |||
| c.noNotify = true | |||
| }) | |||
| } | |||
| // All methods sent to the connection channel should be synchronous so we | |||
| // can handle them directly without a framing component | |||
| func (c *Connection) demux(f frame) { | |||
| if f.channel() == 0 { | |||
| c.dispatch0(f) | |||
| } else { | |||
| c.dispatchN(f) | |||
| } | |||
| } | |||
| func (c *Connection) dispatch0(f frame) { | |||
| switch mf := f.(type) { | |||
| case *methodFrame: | |||
| switch m := mf.Method.(type) { | |||
| case *connectionClose: | |||
| // Send immediately as shutdown will close our side of the writer. | |||
| c.send(&methodFrame{ | |||
| ChannelId: 0, | |||
| Method: &connectionCloseOk{}, | |||
| }) | |||
| c.shutdown(newError(m.ReplyCode, m.ReplyText)) | |||
| case *connectionBlocked: | |||
| for _, c := range c.blocks { | |||
| c <- Blocking{Active: true, Reason: m.Reason} | |||
| } | |||
| case *connectionUnblocked: | |||
| for _, c := range c.blocks { | |||
| c <- Blocking{Active: false} | |||
| } | |||
| default: | |||
| c.rpc <- m | |||
| } | |||
| case *heartbeatFrame: | |||
| // kthx - all reads reset our deadline. so we can drop this | |||
| default: | |||
| // lolwat - channel0 only responds to methods and heartbeats | |||
| c.closeWith(ErrUnexpectedFrame) | |||
| } | |||
| } | |||
| func (c *Connection) dispatchN(f frame) { | |||
| c.m.Lock() | |||
| channel := c.channels[f.channel()] | |||
| c.m.Unlock() | |||
| if channel != nil { | |||
| channel.recv(channel, f) | |||
| } else { | |||
| c.dispatchClosed(f) | |||
| } | |||
| } | |||
| // section 2.3.7: "When a peer decides to close a channel or connection, it | |||
| // sends a Close method. The receiving peer MUST respond to a Close with a | |||
| // Close-Ok, and then both parties can close their channel or connection. Note | |||
| // that if peers ignore Close, deadlock can happen when both peers send Close | |||
| // at the same time." | |||
| // | |||
| // When we don't have a channel, so we must respond with close-ok on a close | |||
| // method. This can happen between a channel exception on an asynchronous | |||
| // method like basic.publish and a synchronous close with channel.close. | |||
| // In that case, we'll get both a channel.close and channel.close-ok in any | |||
| // order. | |||
| func (c *Connection) dispatchClosed(f frame) { | |||
| // Only consider method frames, drop content/header frames | |||
| if mf, ok := f.(*methodFrame); ok { | |||
| switch mf.Method.(type) { | |||
| case *channelClose: | |||
| c.send(&methodFrame{ | |||
| ChannelId: f.channel(), | |||
| Method: &channelCloseOk{}, | |||
| }) | |||
| case *channelCloseOk: | |||
| // we are already closed, so do nothing | |||
| default: | |||
| // unexpected method on closed channel | |||
| c.closeWith(ErrClosed) | |||
| } | |||
| } | |||
| } | |||
| // Reads each frame off the IO and hand off to the connection object that | |||
| // will demux the streams and dispatch to one of the opened channels or | |||
| // handle on channel 0 (the connection channel). | |||
| func (c *Connection) reader(r io.Reader) { | |||
| buf := bufio.NewReader(r) | |||
| frames := &reader{buf} | |||
| conn, haveDeadliner := r.(readDeadliner) | |||
| for { | |||
| frame, err := frames.ReadFrame() | |||
| if err != nil { | |||
| c.shutdown(&Error{Code: FrameError, Reason: err.Error()}) | |||
| return | |||
| } | |||
| c.demux(frame) | |||
| if haveDeadliner { | |||
| c.deadlines <- conn | |||
| } | |||
| } | |||
| } | |||
| // Ensures that at least one frame is being sent at the tuned interval with a | |||
| // jitter tolerance of 1s | |||
| func (c *Connection) heartbeater(interval time.Duration, done chan *Error) { | |||
| const maxServerHeartbeatsInFlight = 3 | |||
| var sendTicks <-chan time.Time | |||
| if interval > 0 { | |||
| ticker := time.NewTicker(interval) | |||
| defer ticker.Stop() | |||
| sendTicks = ticker.C | |||
| } | |||
| lastSent := time.Now() | |||
| for { | |||
| select { | |||
| case at, stillSending := <-c.sends: | |||
| // When actively sending, depend on sent frames to reset server timer | |||
| if stillSending { | |||
| lastSent = at | |||
| } else { | |||
| return | |||
| } | |||
| case at := <-sendTicks: | |||
| // When idle, fill the space with a heartbeat frame | |||
| if at.Sub(lastSent) > interval-time.Second { | |||
| if err := c.send(&heartbeatFrame{}); err != nil { | |||
| // send heartbeats even after close/closeOk so we | |||
| // tick until the connection starts erroring | |||
| return | |||
| } | |||
| } | |||
| case conn := <-c.deadlines: | |||
| // When reading, reset our side of the deadline, if we've negotiated one with | |||
| // a deadline that covers at least 2 server heartbeats | |||
| if interval > 0 { | |||
| conn.SetReadDeadline(time.Now().Add(maxServerHeartbeatsInFlight * interval)) | |||
| } | |||
| case <-done: | |||
| return | |||
| } | |||
| } | |||
| } | |||
| // Convenience method to inspect the Connection.Properties["capabilities"] | |||
| // Table for server identified capabilities like "basic.ack" or | |||
| // "confirm.select". | |||
| func (c *Connection) isCapable(featureName string) bool { | |||
| capabilities, _ := c.Properties["capabilities"].(Table) | |||
| hasFeature, _ := capabilities[featureName].(bool) | |||
| return hasFeature | |||
| } | |||
| // allocateChannel records but does not open a new channel with a unique id. | |||
| // This method is the initial part of the channel lifecycle and paired with | |||
| // releaseChannel | |||
| func (c *Connection) allocateChannel() (*Channel, error) { | |||
| c.m.Lock() | |||
| defer c.m.Unlock() | |||
| if c.IsClosed() { | |||
| return nil, ErrClosed | |||
| } | |||
| id, ok := c.allocator.next() | |||
| if !ok { | |||
| return nil, ErrChannelMax | |||
| } | |||
| ch := newChannel(c, uint16(id)) | |||
| c.channels[uint16(id)] = ch | |||
| return ch, nil | |||
| } | |||
| // releaseChannel removes a channel from the registry as the final part of the | |||
| // channel lifecycle | |||
| func (c *Connection) releaseChannel(id uint16) { | |||
| c.m.Lock() | |||
| defer c.m.Unlock() | |||
| delete(c.channels, id) | |||
| c.allocator.release(int(id)) | |||
| } | |||
| // openChannel allocates and opens a channel, must be paired with closeChannel | |||
| func (c *Connection) openChannel() (*Channel, error) { | |||
| ch, err := c.allocateChannel() | |||
| if err != nil { | |||
| return nil, err | |||
| } | |||
| if err := ch.open(); err != nil { | |||
| c.releaseChannel(ch.id) | |||
| return nil, err | |||
| } | |||
| return ch, nil | |||
| } | |||
| // closeChannel releases and initiates a shutdown of the channel. All channel | |||
| // closures should be initiated here for proper channel lifecycle management on | |||
| // this connection. | |||
| func (c *Connection) closeChannel(ch *Channel, e *Error) { | |||
| ch.shutdown(e) | |||
| c.releaseChannel(ch.id) | |||
| } | |||
| /* | |||
| Channel opens a unique, concurrent server channel to process the bulk of AMQP | |||
| messages. Any error from methods on this receiver will render the receiver | |||
| invalid and a new Channel should be opened. | |||
| */ | |||
| func (c *Connection) Channel() (*Channel, error) { | |||
| return c.openChannel() | |||
| } | |||
| func (c *Connection) call(req message, res ...message) error { | |||
| // Special case for when the protocol header frame is sent insted of a | |||
| // request method | |||
| if req != nil { | |||
| if err := c.send(&methodFrame{ChannelId: 0, Method: req}); err != nil { | |||
| return err | |||
| } | |||
| } | |||
| select { | |||
| case err, ok := <-c.errors: | |||
| if !ok { | |||
| return ErrClosed | |||
| } | |||
| return err | |||
| case msg := <-c.rpc: | |||
| // Try to match one of the result types | |||
| for _, try := range res { | |||
| if reflect.TypeOf(msg) == reflect.TypeOf(try) { | |||
| // *res = *msg | |||
| vres := reflect.ValueOf(try).Elem() | |||
| vmsg := reflect.ValueOf(msg).Elem() | |||
| vres.Set(vmsg) | |||
| return nil | |||
| } | |||
| } | |||
| return ErrCommandInvalid | |||
| } | |||
| // unreachable | |||
| } | |||
| // Connection = open-Connection *use-Connection close-Connection | |||
| // open-Connection = C:protocol-header | |||
| // S:START C:START-OK | |||
| // *challenge | |||
| // S:TUNE C:TUNE-OK | |||
| // C:OPEN S:OPEN-OK | |||
| // challenge = S:SECURE C:SECURE-OK | |||
| // use-Connection = *channel | |||
| // close-Connection = C:CLOSE S:CLOSE-OK | |||
| // / S:CLOSE C:CLOSE-OK | |||
| func (c *Connection) open(config Config) error { | |||
| if err := c.send(&protocolHeader{}); err != nil { | |||
| return err | |||
| } | |||
| return c.openStart(config) | |||
| } | |||
| func (c *Connection) openStart(config Config) error { | |||
| start := &connectionStart{} | |||
| if err := c.call(nil, start); err != nil { | |||
| return err | |||
| } | |||
| c.Major = int(start.VersionMajor) | |||
| c.Minor = int(start.VersionMinor) | |||
| c.Properties = Table(start.ServerProperties) | |||
| c.Locales = strings.Split(start.Locales, " ") | |||
| // eventually support challenge/response here by also responding to | |||
| // connectionSecure. | |||
| auth, ok := pickSASLMechanism(config.SASL, strings.Split(start.Mechanisms, " ")) | |||
| if !ok { | |||
| return ErrSASL | |||
| } | |||
| // Save this mechanism off as the one we chose | |||
| c.Config.SASL = []Authentication{auth} | |||
| // Set the connection locale to client locale | |||
| c.Config.Locale = config.Locale | |||
| return c.openTune(config, auth) | |||
| } | |||
| func (c *Connection) openTune(config Config, auth Authentication) error { | |||
| if len(config.Properties) == 0 { | |||
| config.Properties = Table{ | |||
| "product": defaultProduct, | |||
| "version": defaultVersion, | |||
| } | |||
| } | |||
| config.Properties["capabilities"] = Table{ | |||
| "connection.blocked": true, | |||
| "consumer_cancel_notify": true, | |||
| } | |||
| ok := &connectionStartOk{ | |||
| ClientProperties: config.Properties, | |||
| Mechanism: auth.Mechanism(), | |||
| Response: auth.Response(), | |||
| Locale: config.Locale, | |||
| } | |||
| tune := &connectionTune{} | |||
| if err := c.call(ok, tune); err != nil { | |||
| // per spec, a connection can only be closed when it has been opened | |||
| // so at this point, we know it's an auth error, but the socket | |||
| // was closed instead. Return a meaningful error. | |||
| return ErrCredentials | |||
| } | |||
| // When the server and client both use default 0, then the max channel is | |||
| // only limited by uint16. | |||
| c.Config.ChannelMax = pick(config.ChannelMax, int(tune.ChannelMax)) | |||
| if c.Config.ChannelMax == 0 { | |||
| c.Config.ChannelMax = defaultChannelMax | |||
| } | |||
| c.Config.ChannelMax = min(c.Config.ChannelMax, maxChannelMax) | |||
| // Frame size includes headers and end byte (len(payload)+8), even if | |||
| // this is less than FrameMinSize, use what the server sends because the | |||
| // alternative is to stop the handshake here. | |||
| c.Config.FrameSize = pick(config.FrameSize, int(tune.FrameMax)) | |||
| // Save this off for resetDeadline() | |||
| c.Config.Heartbeat = time.Second * time.Duration(pick( | |||
| int(config.Heartbeat/time.Second), | |||
| int(tune.Heartbeat))) | |||
| // "The client should start sending heartbeats after receiving a | |||
| // Connection.Tune method" | |||
| go c.heartbeater(c.Config.Heartbeat, c.NotifyClose(make(chan *Error, 1))) | |||
| if err := c.send(&methodFrame{ | |||
| ChannelId: 0, | |||
| Method: &connectionTuneOk{ | |||
| ChannelMax: uint16(c.Config.ChannelMax), | |||
| FrameMax: uint32(c.Config.FrameSize), | |||
| Heartbeat: uint16(c.Config.Heartbeat / time.Second), | |||
| }, | |||
| }); err != nil { | |||
| return err | |||
| } | |||
| return c.openVhost(config) | |||
| } | |||
| func (c *Connection) openVhost(config Config) error { | |||
| req := &connectionOpen{VirtualHost: config.Vhost} | |||
| res := &connectionOpenOk{} | |||
| if err := c.call(req, res); err != nil { | |||
| // Cannot be closed yet, but we know it's a vhost problem | |||
| return ErrVhost | |||
| } | |||
| c.Config.Vhost = config.Vhost | |||
| return c.openComplete() | |||
| } | |||
| // openComplete performs any final Connection initialization dependent on the | |||
| // connection handshake and clears any state needed for TLS and AMQP handshaking. | |||
| func (c *Connection) openComplete() error { | |||
| // We clear the deadlines and let the heartbeater reset the read deadline if requested. | |||
| // RabbitMQ uses TCP flow control at this point for pushback so Writes can | |||
| // intentionally block. | |||
| if deadliner, ok := c.conn.(interface { | |||
| SetDeadline(time.Time) error | |||
| }); ok { | |||
| _ = deadliner.SetDeadline(time.Time{}) | |||
| } | |||
| c.allocator = newAllocator(1, c.Config.ChannelMax) | |||
| return nil | |||
| } | |||
| func max(a, b int) int { | |||
| if a > b { | |||
| return a | |||
| } | |||
| return b | |||
| } | |||
| func min(a, b int) int { | |||
| if a < b { | |||
| return a | |||
| } | |||
| return b | |||
| } | |||
| func pick(client, server int) int { | |||
| if client == 0 || server == 0 { | |||
| return max(client, server) | |||
| } | |||
| return min(client, server) | |||
| } | |||
| @ -0,0 +1,142 @@ | |||
| // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. | |||
| // Use of this source code is governed by a BSD-style | |||
| // license that can be found in the LICENSE file. | |||
| // Source code and contact info at http://github.com/streadway/amqp | |||
| package amqp | |||
| import ( | |||
| "os" | |||
| "strconv" | |||
| "sync" | |||
| "sync/atomic" | |||
| ) | |||
| var consumerSeq uint64 | |||
| const consumerTagLengthMax = 0xFF // see writeShortstr | |||
| func uniqueConsumerTag() string { | |||
| return commandNameBasedUniqueConsumerTag(os.Args[0]) | |||
| } | |||
| func commandNameBasedUniqueConsumerTag(commandName string) string { | |||
| tagPrefix := "ctag-" | |||
| tagInfix := commandName | |||
| tagSuffix := "-" + strconv.FormatUint(atomic.AddUint64(&consumerSeq, 1), 10) | |||
| if len(tagPrefix)+len(tagInfix)+len(tagSuffix) > consumerTagLengthMax { | |||
| tagInfix = "streadway/amqp" | |||
| } | |||
| return tagPrefix + tagInfix + tagSuffix | |||
| } | |||
| type consumerBuffers map[string]chan *Delivery | |||
| // Concurrent type that manages the consumerTag -> | |||
| // ingress consumerBuffer mapping | |||
| type consumers struct { | |||
| sync.WaitGroup // one for buffer | |||
| closed chan struct{} // signal buffer | |||
| sync.Mutex // protects below | |||
| chans consumerBuffers | |||
| } | |||
| func makeConsumers() *consumers { | |||
| return &consumers{ | |||
| closed: make(chan struct{}), | |||
| chans: make(consumerBuffers), | |||
| } | |||
| } | |||
| func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) { | |||
| defer close(out) | |||
| defer subs.Done() | |||
| var inflight = in | |||
| var queue []*Delivery | |||
| for delivery := range in { | |||
| queue = append(queue, delivery) | |||
| for len(queue) > 0 { | |||
| select { | |||
| case <-subs.closed: | |||
| // closed before drained, drop in-flight | |||
| return | |||
| case delivery, consuming := <-inflight: | |||
| if consuming { | |||
| queue = append(queue, delivery) | |||
| } else { | |||
| inflight = nil | |||
| } | |||
| case out <- *queue[0]: | |||
| queue = queue[1:] | |||
| } | |||
| } | |||
| } | |||
| } | |||
| // On key conflict, close the previous channel. | |||
| func (subs *consumers) add(tag string, consumer chan Delivery) { | |||
| subs.Lock() | |||
| defer subs.Unlock() | |||
| if prev, found := subs.chans[tag]; found { | |||
| close(prev) | |||
| } | |||
| in := make(chan *Delivery) | |||
| subs.chans[tag] = in | |||
| subs.Add(1) | |||
| go subs.buffer(in, consumer) | |||
| } | |||
| func (subs *consumers) cancel(tag string) (found bool) { | |||
| subs.Lock() | |||
| defer subs.Unlock() | |||
| ch, found := subs.chans[tag] | |||
| if found { | |||
| delete(subs.chans, tag) | |||
| close(ch) | |||
| } | |||
| return found | |||
| } | |||
| func (subs *consumers) close() { | |||
| subs.Lock() | |||
| defer subs.Unlock() | |||
| close(subs.closed) | |||
| for tag, ch := range subs.chans { | |||
| delete(subs.chans, tag) | |||
| close(ch) | |||
| } | |||
| subs.Wait() | |||
| } | |||
| // Sends a delivery to a the consumer identified by `tag`. | |||
| // If unbuffered channels are used for Consume this method | |||
| // could block all deliveries until the consumer | |||
| // receives on the other end of the channel. | |||
| func (subs *consumers) send(tag string, msg *Delivery) bool { | |||
| subs.Lock() | |||
| defer subs.Unlock() | |||
| buffer, found := subs.chans[tag] | |||
| if found { | |||
| buffer <- msg | |||
| } | |||
| return found | |||
| } | |||
| @ -0,0 +1,173 @@ | |||
| // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. | |||
| // Use of this source code is governed by a BSD-style | |||
| // license that can be found in the LICENSE file. | |||
| // Source code and contact info at http://github.com/streadway/amqp | |||
| package amqp | |||
| import ( | |||
| "errors" | |||
| "time" | |||
| ) | |||
| var errDeliveryNotInitialized = errors.New("delivery not initialized") | |||
| // Acknowledger notifies the server of successful or failed consumption of | |||
| // delivieries via identifier found in the Delivery.DeliveryTag field. | |||
| // | |||
| // Applications can provide mock implementations in tests of Delivery handlers. | |||
| type Acknowledger interface { | |||
| Ack(tag uint64, multiple bool) error | |||
| Nack(tag uint64, multiple bool, requeue bool) error | |||
| Reject(tag uint64, requeue bool) error | |||
| } | |||
| // Delivery captures the fields for a previously delivered message resident in | |||
| // a queue to be delivered by the server to a consumer from Channel.Consume or | |||
| // Channel.Get. | |||
| type Delivery struct { | |||
| Acknowledger Acknowledger // the channel from which this delivery arrived | |||
| Headers Table // Application or header exchange table | |||
| // Properties | |||
| ContentType string // MIME content type | |||
| ContentEncoding string // MIME content encoding | |||
| DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) | |||
| Priority uint8 // queue implementation use - 0 to 9 | |||
| CorrelationId string // application use - correlation identifier | |||
| ReplyTo string // application use - address to reply to (ex: RPC) | |||
| Expiration string // implementation use - message expiration spec | |||
| MessageId string // application use - message identifier | |||
| Timestamp time.Time // application use - message timestamp | |||
| Type string // application use - message type name | |||
| UserId string // application use - creating user - should be authenticated user | |||
| AppId string // application use - creating application id | |||
| // Valid only with Channel.Consume | |||
| ConsumerTag string | |||
| // Valid only with Channel.Get | |||
| MessageCount uint32 | |||
| DeliveryTag uint64 | |||
| Redelivered bool | |||
| Exchange string // basic.publish exchange | |||
| RoutingKey string // basic.publish routing key | |||
| Body []byte | |||
| } | |||
| func newDelivery(channel *Channel, msg messageWithContent) *Delivery { | |||
| props, body := msg.getContent() | |||
| delivery := Delivery{ | |||
| Acknowledger: channel, | |||
| Headers: props.Headers, | |||
| ContentType: props.ContentType, | |||
| ContentEncoding: props.ContentEncoding, | |||
| DeliveryMode: props.DeliveryMode, | |||
| Priority: props.Priority, | |||
| CorrelationId: props.CorrelationId, | |||
| ReplyTo: props.ReplyTo, | |||
| Expiration: props.Expiration, | |||
| MessageId: props.MessageId, | |||
| Timestamp: props.Timestamp, | |||
| Type: props.Type, | |||
| UserId: props.UserId, | |||
| AppId: props.AppId, | |||
| Body: body, | |||
| } | |||
| // Properties for the delivery types | |||
| switch m := msg.(type) { | |||
| case *basicDeliver: | |||
| delivery.ConsumerTag = m.ConsumerTag | |||
| delivery.DeliveryTag = m.DeliveryTag | |||
| delivery.Redelivered = m.Redelivered | |||
| delivery.Exchange = m.Exchange | |||
| delivery.RoutingKey = m.RoutingKey | |||
| case *basicGetOk: | |||
| delivery.MessageCount = m.MessageCount | |||
| delivery.DeliveryTag = m.DeliveryTag | |||
| delivery.Redelivered = m.Redelivered | |||
| delivery.Exchange = m.Exchange | |||
| delivery.RoutingKey = m.RoutingKey | |||
| } | |||
| return &delivery | |||
| } | |||
| /* | |||
| Ack delegates an acknowledgement through the Acknowledger interface that the | |||
| client or server has finished work on a delivery. | |||
| All deliveries in AMQP must be acknowledged. If you called Channel.Consume | |||
| with autoAck true then the server will be automatically ack each message and | |||
| this method should not be called. Otherwise, you must call Delivery.Ack after | |||
| you have successfully processed this delivery. | |||
| When multiple is true, this delivery and all prior unacknowledged deliveries | |||
| on the same channel will be acknowledged. This is useful for batch processing | |||
| of deliveries. | |||
| An error will indicate that the acknowledge could not be delivered to the | |||
| channel it was sent from. | |||
| Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every | |||
| delivery that is not automatically acknowledged. | |||
| */ | |||
| func (d Delivery) Ack(multiple bool) error { | |||
| if d.Acknowledger == nil { | |||
| return errDeliveryNotInitialized | |||
| } | |||
| return d.Acknowledger.Ack(d.DeliveryTag, multiple) | |||
| } | |||
| /* | |||
| Reject delegates a negatively acknowledgement through the Acknowledger interface. | |||
| When requeue is true, queue this message to be delivered to a consumer on a | |||
| different channel. When requeue is false or the server is unable to queue this | |||
| message, it will be dropped. | |||
| If you are batch processing deliveries, and your server supports it, prefer | |||
| Delivery.Nack. | |||
| Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every | |||
| delivery that is not automatically acknowledged. | |||
| */ | |||
| func (d Delivery) Reject(requeue bool) error { | |||
| if d.Acknowledger == nil { | |||
| return errDeliveryNotInitialized | |||
| } | |||
| return d.Acknowledger.Reject(d.DeliveryTag, requeue) | |||
| } | |||
| /* | |||
| Nack negatively acknowledge the delivery of message(s) identified by the | |||
| delivery tag from either the client or server. | |||
| When multiple is true, nack messages up to and including delivered messages up | |||
| until the delivery tag delivered on the same channel. | |||
| When requeue is true, request the server to deliver this message to a different | |||
| consumer. If it is not possible or requeue is false, the message will be | |||
| dropped or delivered to a server configured dead-letter queue. | |||
| This method must not be used to select or requeue messages the client wishes | |||
| not to handle, rather it is to inform the server that the client is incapable | |||
| of handling this message at this time. | |||
| Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every | |||
| delivery that is not automatically acknowledged. | |||
| */ | |||
| func (d Delivery) Nack(multiple, requeue bool) error { | |||
| if d.Acknowledger == nil { | |||
| return errDeliveryNotInitialized | |||
| } | |||
| return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue) | |||
| } | |||
| @ -0,0 +1,108 @@ | |||
| // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. | |||
| // Use of this source code is governed by a BSD-style | |||
| // license that can be found in the LICENSE file. | |||
| // Source code and contact info at http://github.com/streadway/amqp | |||
| /* | |||
| Package amqp is an AMQP 0.9.1 client with RabbitMQ extensions | |||
| Understand the AMQP 0.9.1 messaging model by reviewing these links first. Much | |||
| of the terminology in this library directly relates to AMQP concepts. | |||
| Resources | |||
| http://www.rabbitmq.com/tutorials/amqp-concepts.html | |||
| http://www.rabbitmq.com/getstarted.html | |||
| http://www.rabbitmq.com/amqp-0-9-1-reference.html | |||
| Design | |||
| Most other broker clients publish to queues, but in AMQP, clients publish | |||
| Exchanges instead. AMQP is programmable, meaning that both the producers and | |||
| consumers agree on the configuration of the broker, instead of requiring an | |||
| operator or system configuration that declares the logical topology in the | |||
| broker. The routing between producers and consumer queues is via Bindings. | |||
| These bindings form the logical topology of the broker. | |||
| In this library, a message sent from publisher is called a "Publishing" and a | |||
| message received to a consumer is called a "Delivery". The fields of | |||
| Publishings and Deliveries are close but not exact mappings to the underlying | |||
| wire format to maintain stronger types. Many other libraries will combine | |||
| message properties with message headers. In this library, the message well | |||
| known properties are strongly typed fields on the Publishings and Deliveries, | |||
| whereas the user defined headers are in the Headers field. | |||
| The method naming closely matches the protocol's method name with positional | |||
| parameters mapping to named protocol message fields. The motivation here is to | |||
| present a comprehensive view over all possible interactions with the server. | |||
| Generally, methods that map to protocol methods of the "basic" class will be | |||
| elided in this interface, and "select" methods of various channel mode selectors | |||
| will be elided for example Channel.Confirm and Channel.Tx. | |||
| The library is intentionally designed to be synchronous, where responses for | |||
| each protocol message are required to be received in an RPC manner. Some | |||
| methods have a noWait parameter like Channel.QueueDeclare, and some methods are | |||
| asynchronous like Channel.Publish. The error values should still be checked for | |||
| these methods as they will indicate IO failures like when the underlying | |||
| connection closes. | |||
| Asynchronous Events | |||
| Clients of this library may be interested in receiving some of the protocol | |||
| messages other than Deliveries like basic.ack methods while a channel is in | |||
| confirm mode. | |||
| The Notify* methods with Connection and Channel receivers model the pattern of | |||
| asynchronous events like closes due to exceptions, or messages that are sent out | |||
| of band from an RPC call like basic.ack or basic.flow. | |||
| Any asynchronous events, including Deliveries and Publishings must always have | |||
| a receiver until the corresponding chans are closed. Without asynchronous | |||
| receivers, the sychronous methods will block. | |||
| Use Case | |||
| It's important as a client to an AMQP topology to ensure the state of the | |||
| broker matches your expectations. For both publish and consume use cases, | |||
| make sure you declare the queues, exchanges and bindings you expect to exist | |||
| prior to calling Channel.Publish or Channel.Consume. | |||
| // Connections start with amqp.Dial() typically from a command line argument | |||
| // or environment variable. | |||
| connection, err := amqp.Dial(os.Getenv("AMQP_URL")) | |||
| // To cleanly shutdown by flushing kernel buffers, make sure to close and | |||
| // wait for the response. | |||
| defer connection.Close() | |||
| // Most operations happen on a channel. If any error is returned on a | |||
| // channel, the channel will no longer be valid, throw it away and try with | |||
| // a different channel. If you use many channels, it's useful for the | |||
| // server to | |||
| channel, err := connection.Channel() | |||
| // Declare your topology here, if it doesn't exist, it will be created, if | |||
| // it existed already and is not what you expect, then that's considered an | |||
| // error. | |||
| // Use your connection on this topology with either Publish or Consume, or | |||
| // inspect your queues with QueueInspect. It's unwise to mix Publish and | |||
| // Consume to let TCP do its job well. | |||
| SSL/TLS - Secure connections | |||
| When Dial encounters an amqps:// scheme, it will use the zero value of a | |||
| tls.Config. This will only perform server certificate and host verification. | |||
| Use DialTLS when you wish to provide a client certificate (recommended), | |||
| include a private certificate authority's certificate in the cert chain for | |||
| server validity, or run insecure by not verifying the server certificate dial | |||
| your own connection. DialTLS will use the provided tls.Config when it | |||
| encounters an amqps:// scheme and will dial a plain connection when it | |||
| encounters an amqp:// scheme. | |||
| SSL/TLS in RabbitMQ is documented here: http://www.rabbitmq.com/ssl.html | |||
| */ | |||
| package amqp | |||
| @ -0,0 +1,17 @@ | |||
| // +build gofuzz | |||
| package amqp | |||
| import "bytes" | |||
| func Fuzz(data []byte) int { | |||
| r := reader{bytes.NewReader(data)} | |||
| frame, err := r.ReadFrame() | |||
| if err != nil { | |||
| if frame != nil { | |||
| panic("frame is not nil") | |||
| } | |||
| return 0 | |||
| } | |||
| return 1 | |||
| } | |||
| @ -0,0 +1,2 @@ | |||
| #!/bin/sh | |||
| go run spec/gen.go < spec/amqp0-9-1.stripped.extended.xml | gofmt > spec091.go | |||
| @ -0,0 +1,3 @@ | |||
| module github.com/streadway/amqp | |||
| go 1.10 | |||
| @ -0,0 +1,67 @@ | |||
| #!/bin/sh | |||
| LATEST_STABLE_SUPPORTED_GO_VERSION="1.11" | |||
| main() { | |||
| if local_go_version_is_latest_stable | |||
| then | |||
| run_gofmt | |||
| run_golint | |||
| run_govet | |||
| fi | |||
| run_unit_tests | |||
| } | |||
| local_go_version_is_latest_stable() { | |||
| go version | grep -q $LATEST_STABLE_SUPPORTED_GO_VERSION | |||
| } | |||
| log_error() { | |||
| echo "$*" 1>&2 | |||
| } | |||
| run_gofmt() { | |||
| GOFMT_FILES=$(gofmt -l .) | |||
| if [ -n "$GOFMT_FILES" ] | |||
| then | |||
| log_error "gofmt failed for the following files: | |||
| $GOFMT_FILES | |||
| please run 'gofmt -w .' on your changes before committing." | |||
| exit 1 | |||
| fi | |||
| } | |||
| run_golint() { | |||
| GOLINT_ERRORS=$(golint ./... | grep -v "Id should be") | |||
| if [ -n "$GOLINT_ERRORS" ] | |||
| then | |||
| log_error "golint failed for the following reasons: | |||
| $GOLINT_ERRORS | |||
| please run 'golint ./...' on your changes before committing." | |||
| exit 1 | |||
| fi | |||
| } | |||
| run_govet() { | |||
| GOVET_ERRORS=$(go tool vet ./*.go 2>&1) | |||
| if [ -n "$GOVET_ERRORS" ] | |||
| then | |||
| log_error "go vet failed for the following reasons: | |||
| $GOVET_ERRORS | |||
| please run 'go tool vet ./*.go' on your changes before committing." | |||
| exit 1 | |||
| fi | |||
| } | |||
| run_unit_tests() { | |||
| if [ -z "$NOTEST" ] | |||
| then | |||
| log_error 'Running short tests...' | |||
| env AMQP_URL= go test -short | |||
| fi | |||
| } | |||
| main | |||
| @ -0,0 +1,456 @@ | |||
| // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. | |||
| // Use of this source code is governed by a BSD-style | |||
| // license that can be found in the LICENSE file. | |||
| // Source code and contact info at http://github.com/streadway/amqp | |||
| package amqp | |||
| import ( | |||
| "bytes" | |||
| "encoding/binary" | |||
| "errors" | |||
| "io" | |||
| "time" | |||
| ) | |||
| /* | |||
| Reads a frame from an input stream and returns an interface that can be cast into | |||
| one of the following: | |||
| methodFrame | |||
| PropertiesFrame | |||
| bodyFrame | |||
| heartbeatFrame | |||
| 2.3.5 frame Details | |||
| All frames consist of a header (7 octets), a payload of arbitrary size, and a | |||
| 'frame-end' octet that detects malformed frames: | |||
| 0 1 3 7 size+7 size+8 | |||
| +------+---------+-------------+ +------------+ +-----------+ | |||
| | type | channel | size | | payload | | frame-end | | |||
| +------+---------+-------------+ +------------+ +-----------+ | |||
| octet short long size octets octet | |||
| To read a frame, we: | |||
| 1. Read the header and check the frame type and channel. | |||
| 2. Depending on the frame type, we read the payload and process it. | |||
| 3. Read the frame end octet. | |||
| In realistic implementations where performance is a concern, we would use | |||
| “read-ahead buffering” or | |||
| “gathering reads” to avoid doing three separate system calls to read a frame. | |||
| */ | |||
| func (r *reader) ReadFrame() (frame frame, err error) { | |||
| var scratch [7]byte | |||
| if _, err = io.ReadFull(r.r, scratch[:7]); err != nil { | |||
| return | |||
| } | |||
| typ := uint8(scratch[0]) | |||
| channel := binary.BigEndian.Uint16(scratch[1:3]) | |||
| size := binary.BigEndian.Uint32(scratch[3:7]) | |||
| switch typ { | |||
| case frameMethod: | |||
| if frame, err = r.parseMethodFrame(channel, size); err != nil { | |||
| return | |||
| } | |||
| case frameHeader: | |||
| if frame, err = r.parseHeaderFrame(channel, size); err != nil { | |||
| return | |||
| } | |||
| case frameBody: | |||
| if frame, err = r.parseBodyFrame(channel, size); err != nil { | |||
| return nil, err | |||
| } | |||
| case frameHeartbeat: | |||
| if frame, err = r.parseHeartbeatFrame(channel, size); err != nil { | |||
| return | |||
| } | |||
| default: | |||
| return nil, ErrFrame | |||
| } | |||
| if _, err = io.ReadFull(r.r, scratch[:1]); err != nil { | |||
| return nil, err | |||
| } | |||
| if scratch[0] != frameEnd { | |||
| return nil, ErrFrame | |||
| } | |||
| return | |||
| } | |||
| func readShortstr(r io.Reader) (v string, err error) { | |||
| var length uint8 | |||
| if err = binary.Read(r, binary.BigEndian, &length); err != nil { | |||
| return | |||
| } | |||
| bytes := make([]byte, length) | |||
| if _, err = io.ReadFull(r, bytes); err != nil { | |||
| return | |||
| } | |||
| return string(bytes), nil | |||
| } | |||
| func readLongstr(r io.Reader) (v string, err error) { | |||
| var length uint32 | |||
| if err = binary.Read(r, binary.BigEndian, &length); err != nil { | |||
| return | |||
| } | |||
| // slices can't be longer than max int32 value | |||
| if length > (^uint32(0) >> 1) { | |||
| return | |||
| } | |||
| bytes := make([]byte, length) | |||
| if _, err = io.ReadFull(r, bytes); err != nil { | |||
| return | |||
| } | |||
| return string(bytes), nil | |||
| } | |||
| func readDecimal(r io.Reader) (v Decimal, err error) { | |||
| if err = binary.Read(r, binary.BigEndian, &v.Scale); err != nil { | |||
| return | |||
| } | |||
| if err = binary.Read(r, binary.BigEndian, &v.Value); err != nil { | |||
| return | |||
| } | |||
| return | |||
| } | |||
| func readFloat32(r io.Reader) (v float32, err error) { | |||
| if err = binary.Read(r, binary.BigEndian, &v); err != nil { | |||
| return | |||
| } | |||
| return | |||
| } | |||
| func readFloat64(r io.Reader) (v float64, err error) { | |||
| if err = binary.Read(r, binary.BigEndian, &v); err != nil { | |||
| return | |||
| } | |||
| return | |||
| } | |||
| func readTimestamp(r io.Reader) (v time.Time, err error) { | |||
| var sec int64 | |||
| if err = binary.Read(r, binary.BigEndian, &sec); err != nil { | |||
| return | |||
| } | |||
| return time.Unix(sec, 0), nil | |||
| } | |||
| /* | |||
| 'A': []interface{} | |||
| 'D': Decimal | |||
| 'F': Table | |||
| 'I': int32 | |||
| 'S': string | |||
| 'T': time.Time | |||
| 'V': nil | |||
| 'b': byte | |||
| 'd': float64 | |||
| 'f': float32 | |||
| 'l': int64 | |||
| 's': int16 | |||
| 't': bool | |||
| 'x': []byte | |||
| */ | |||
| func readField(r io.Reader) (v interface{}, err error) { | |||
| var typ byte | |||
| if err = binary.Read(r, binary.BigEndian, &typ); err != nil { | |||
| return | |||
| } | |||
| switch typ { | |||
| case 't': | |||
| var value uint8 | |||
| if err = binary.Read(r, binary.BigEndian, &value); err != nil { | |||
| return | |||
| } | |||
| return (value != 0), nil | |||
| case 'b': | |||
| var value [1]byte | |||
| if _, err = io.ReadFull(r, value[0:1]); err != nil { | |||
| return | |||
| } | |||
| return value[0], nil | |||
| case 's': | |||
| var value int16 | |||
| if err = binary.Read(r, binary.BigEndian, &value); err != nil { | |||
| return | |||
| } | |||
| return value, nil | |||
| case 'I': | |||
| var value int32 | |||
| if err = binary.Read(r, binary.BigEndian, &value); err != nil { | |||
| return | |||
| } | |||
| return value, nil | |||
| case 'l': | |||
| var value int64 | |||
| if err = binary.Read(r, binary.BigEndian, &value); err != nil { | |||
| return | |||
| } | |||
| return value, nil | |||
| case 'f': | |||
| var value float32 | |||
| if err = binary.Read(r, binary.BigEndian, &value); err != nil { | |||
| return | |||
| } | |||
| return value, nil | |||
| case 'd': | |||
| var value float64 | |||
| if err = binary.Read(r, binary.BigEndian, &value); err != nil { | |||
| return | |||
| } | |||
| return value, nil | |||
| case 'D': | |||
| return readDecimal(r) | |||
| case 'S': | |||
| return readLongstr(r) | |||
| case 'A': | |||
| return readArray(r) | |||
| case 'T': | |||
| return readTimestamp(r) | |||
| case 'F': | |||
| return readTable(r) | |||
| case 'x': | |||
| var len int32 | |||
| if err = binary.Read(r, binary.BigEndian, &len); err != nil { | |||
| return nil, err | |||
| } | |||
| value := make([]byte, len) | |||
| if _, err = io.ReadFull(r, value); err != nil { | |||
| return nil, err | |||
| } | |||
| return value, err | |||
| case 'V': | |||
| return nil, nil | |||
| } | |||
| return nil, ErrSyntax | |||
| } | |||
| /* | |||
| Field tables are long strings that contain packed name-value pairs. The | |||
| name-value pairs are encoded as short string defining the name, and octet | |||
| defining the values type and then the value itself. The valid field types for | |||
| tables are an extension of the native integer, bit, string, and timestamp | |||
| types, and are shown in the grammar. Multi-octet integer fields are always | |||
| held in network byte order. | |||
| */ | |||
| func readTable(r io.Reader) (table Table, err error) { | |||
| var nested bytes.Buffer | |||
| var str string | |||
| if str, err = readLongstr(r); err != nil { | |||
| return | |||
| } | |||
| nested.Write([]byte(str)) | |||
| table = make(Table) | |||
| for nested.Len() > 0 { | |||
| var key string | |||
| var value interface{} | |||
| if key, err = readShortstr(&nested); err != nil { | |||
| return | |||
| } | |||
| if value, err = readField(&nested); err != nil { | |||
| return | |||
| } | |||
| table[key] = value | |||
| } | |||
| return | |||
| } | |||
| func readArray(r io.Reader) ([]interface{}, error) { | |||
| var ( | |||
| size uint32 | |||
| err error | |||
| ) | |||
| if err = binary.Read(r, binary.BigEndian, &size); err != nil { | |||
| return nil, err | |||
| } | |||
| var ( | |||
| lim = &io.LimitedReader{R: r, N: int64(size)} | |||
| arr = []interface{}{} | |||
| field interface{} | |||
| ) | |||
| for { | |||
| if field, err = readField(lim); err != nil { | |||
| if err == io.EOF { | |||
| break | |||
| } | |||
| return nil, err | |||
| } | |||
| arr = append(arr, field) | |||
| } | |||
| return arr, nil | |||
| } | |||
| // Checks if this bit mask matches the flags bitset | |||
| func hasProperty(mask uint16, prop int) bool { | |||
| return int(mask)&prop > 0 | |||
| } | |||
| func (r *reader) parseHeaderFrame(channel uint16, size uint32) (frame frame, err error) { | |||
| hf := &headerFrame{ | |||
| ChannelId: channel, | |||
| } | |||
| if err = binary.Read(r.r, binary.BigEndian, &hf.ClassId); err != nil { | |||
| return | |||
| } | |||
| if err = binary.Read(r.r, binary.BigEndian, &hf.weight); err != nil { | |||
| return | |||
| } | |||
| if err = binary.Read(r.r, binary.BigEndian, &hf.Size); err != nil { | |||
| return | |||
| } | |||
| var flags uint16 | |||
| if err = binary.Read(r.r, binary.BigEndian, &flags); err != nil { | |||
| return | |||
| } | |||
| if hasProperty(flags, flagContentType) { | |||
| if hf.Properties.ContentType, err = readShortstr(r.r); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(flags, flagContentEncoding) { | |||
| if hf.Properties.ContentEncoding, err = readShortstr(r.r); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(flags, flagHeaders) { | |||
| if hf.Properties.Headers, err = readTable(r.r); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(flags, flagDeliveryMode) { | |||
| if err = binary.Read(r.r, binary.BigEndian, &hf.Properties.DeliveryMode); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(flags, flagPriority) { | |||
| if err = binary.Read(r.r, binary.BigEndian, &hf.Properties.Priority); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(flags, flagCorrelationId) { | |||
| if hf.Properties.CorrelationId, err = readShortstr(r.r); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(flags, flagReplyTo) { | |||
| if hf.Properties.ReplyTo, err = readShortstr(r.r); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(flags, flagExpiration) { | |||
| if hf.Properties.Expiration, err = readShortstr(r.r); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(flags, flagMessageId) { | |||
| if hf.Properties.MessageId, err = readShortstr(r.r); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(flags, flagTimestamp) { | |||
| if hf.Properties.Timestamp, err = readTimestamp(r.r); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(flags, flagType) { | |||
| if hf.Properties.Type, err = readShortstr(r.r); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(flags, flagUserId) { | |||
| if hf.Properties.UserId, err = readShortstr(r.r); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(flags, flagAppId) { | |||
| if hf.Properties.AppId, err = readShortstr(r.r); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(flags, flagReserved1) { | |||
| if hf.Properties.reserved1, err = readShortstr(r.r); err != nil { | |||
| return | |||
| } | |||
| } | |||
| return hf, nil | |||
| } | |||
| func (r *reader) parseBodyFrame(channel uint16, size uint32) (frame frame, err error) { | |||
| bf := &bodyFrame{ | |||
| ChannelId: channel, | |||
| Body: make([]byte, size), | |||
| } | |||
| if _, err = io.ReadFull(r.r, bf.Body); err != nil { | |||
| return nil, err | |||
| } | |||
| return bf, nil | |||
| } | |||
| var errHeartbeatPayload = errors.New("Heartbeats should not have a payload") | |||
| func (r *reader) parseHeartbeatFrame(channel uint16, size uint32) (frame frame, err error) { | |||
| hf := &heartbeatFrame{ | |||
| ChannelId: channel, | |||
| } | |||
| if size > 0 { | |||
| return nil, errHeartbeatPayload | |||
| } | |||
| return hf, nil | |||
| } | |||
| @ -0,0 +1,64 @@ | |||
| // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. | |||
| // Use of this source code is governed by a BSD-style | |||
| // license that can be found in the LICENSE file. | |||
| // Source code and contact info at http://github.com/streadway/amqp | |||
| package amqp | |||
| import ( | |||
| "time" | |||
| ) | |||
| // Return captures a flattened struct of fields returned by the server when a | |||
| // Publishing is unable to be delivered either due to the `mandatory` flag set | |||
| // and no route found, or `immediate` flag set and no free consumer. | |||
| type Return struct { | |||
| ReplyCode uint16 // reason | |||
| ReplyText string // description | |||
| Exchange string // basic.publish exchange | |||
| RoutingKey string // basic.publish routing key | |||
| // Properties | |||
| ContentType string // MIME content type | |||
| ContentEncoding string // MIME content encoding | |||
| Headers Table // Application or header exchange table | |||
| DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) | |||
| Priority uint8 // queue implementation use - 0 to 9 | |||
| CorrelationId string // application use - correlation identifier | |||
| ReplyTo string // application use - address to to reply to (ex: RPC) | |||
| Expiration string // implementation use - message expiration spec | |||
| MessageId string // application use - message identifier | |||
| Timestamp time.Time // application use - message timestamp | |||
| Type string // application use - message type name | |||
| UserId string // application use - creating user id | |||
| AppId string // application use - creating application | |||
| Body []byte | |||
| } | |||
| func newReturn(msg basicReturn) *Return { | |||
| props, body := msg.getContent() | |||
| return &Return{ | |||
| ReplyCode: msg.ReplyCode, | |||
| ReplyText: msg.ReplyText, | |||
| Exchange: msg.Exchange, | |||
| RoutingKey: msg.RoutingKey, | |||
| Headers: props.Headers, | |||
| ContentType: props.ContentType, | |||
| ContentEncoding: props.ContentEncoding, | |||
| DeliveryMode: props.DeliveryMode, | |||
| Priority: props.Priority, | |||
| CorrelationId: props.CorrelationId, | |||
| ReplyTo: props.ReplyTo, | |||
| Expiration: props.Expiration, | |||
| MessageId: props.MessageId, | |||
| Timestamp: props.Timestamp, | |||
| Type: props.Type, | |||
| UserId: props.UserId, | |||
| AppId: props.AppId, | |||
| Body: body, | |||
| } | |||
| } | |||
| @ -0,0 +1,428 @@ | |||
| // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. | |||
| // Use of this source code is governed by a BSD-style | |||
| // license that can be found in the LICENSE file. | |||
| // Source code and contact info at http://github.com/streadway/amqp | |||
| package amqp | |||
| import ( | |||
| "fmt" | |||
| "io" | |||
| "time" | |||
| ) | |||
| // Constants for standard AMQP 0-9-1 exchange types. | |||
| const ( | |||
| ExchangeDirect = "direct" | |||
| ExchangeFanout = "fanout" | |||
| ExchangeTopic = "topic" | |||
| ExchangeHeaders = "headers" | |||
| ) | |||
| var ( | |||
| // ErrClosed is returned when the channel or connection is not open | |||
| ErrClosed = &Error{Code: ChannelError, Reason: "channel/connection is not open"} | |||
| // ErrChannelMax is returned when Connection.Channel has been called enough | |||
| // times that all channel IDs have been exhausted in the client or the | |||
| // server. | |||
| ErrChannelMax = &Error{Code: ChannelError, Reason: "channel id space exhausted"} | |||
| // ErrSASL is returned from Dial when the authentication mechanism could not | |||
| // be negoated. | |||
| ErrSASL = &Error{Code: AccessRefused, Reason: "SASL could not negotiate a shared mechanism"} | |||
| // ErrCredentials is returned when the authenticated client is not authorized | |||
| // to any vhost. | |||
| ErrCredentials = &Error{Code: AccessRefused, Reason: "username or password not allowed"} | |||
| // ErrVhost is returned when the authenticated user is not permitted to | |||
| // access the requested Vhost. | |||
| ErrVhost = &Error{Code: AccessRefused, Reason: "no access to this vhost"} | |||
| // ErrSyntax is hard protocol error, indicating an unsupported protocol, | |||
| // implementation or encoding. | |||
| ErrSyntax = &Error{Code: SyntaxError, Reason: "invalid field or value inside of a frame"} | |||
| // ErrFrame is returned when the protocol frame cannot be read from the | |||
| // server, indicating an unsupported protocol or unsupported frame type. | |||
| ErrFrame = &Error{Code: FrameError, Reason: "frame could not be parsed"} | |||
| // ErrCommandInvalid is returned when the server sends an unexpected response | |||
| // to this requested message type. This indicates a bug in this client. | |||
| ErrCommandInvalid = &Error{Code: CommandInvalid, Reason: "unexpected command received"} | |||
| // ErrUnexpectedFrame is returned when something other than a method or | |||
| // heartbeat frame is delivered to the Connection, indicating a bug in the | |||
| // client. | |||
| ErrUnexpectedFrame = &Error{Code: UnexpectedFrame, Reason: "unexpected frame received"} | |||
| // ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP. | |||
| ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"} | |||
| ) | |||
| // Error captures the code and reason a channel or connection has been closed | |||
| // by the server. | |||
| type Error struct { | |||
| Code int // constant code from the specification | |||
| Reason string // description of the error | |||
| Server bool // true when initiated from the server, false when from this library | |||
| Recover bool // true when this error can be recovered by retrying later or with different parameters | |||
| } | |||
| func newError(code uint16, text string) *Error { | |||
| return &Error{ | |||
| Code: int(code), | |||
| Reason: text, | |||
| Recover: isSoftExceptionCode(int(code)), | |||
| Server: true, | |||
| } | |||
| } | |||
| func (e Error) Error() string { | |||
| return fmt.Sprintf("Exception (%d) Reason: %q", e.Code, e.Reason) | |||
| } | |||
| // Used by header frames to capture routing and header information | |||
| type properties struct { | |||
| ContentType string // MIME content type | |||
| ContentEncoding string // MIME content encoding | |||
| Headers Table // Application or header exchange table | |||
| DeliveryMode uint8 // queue implementation use - Transient (1) or Persistent (2) | |||
| Priority uint8 // queue implementation use - 0 to 9 | |||
| CorrelationId string // application use - correlation identifier | |||
| ReplyTo string // application use - address to to reply to (ex: RPC) | |||
| Expiration string // implementation use - message expiration spec | |||
| MessageId string // application use - message identifier | |||
| Timestamp time.Time // application use - message timestamp | |||
| Type string // application use - message type name | |||
| UserId string // application use - creating user id | |||
| AppId string // application use - creating application | |||
| reserved1 string // was cluster-id - process for buffer consumption | |||
| } | |||
| // DeliveryMode. Transient means higher throughput but messages will not be | |||
| // restored on broker restart. The delivery mode of publishings is unrelated | |||
| // to the durability of the queues they reside on. Transient messages will | |||
| // not be restored to durable queues, persistent messages will be restored to | |||
| // durable queues and lost on non-durable queues during server restart. | |||
| // | |||
| // This remains typed as uint8 to match Publishing.DeliveryMode. Other | |||
| // delivery modes specific to custom queue implementations are not enumerated | |||
| // here. | |||
| const ( | |||
| Transient uint8 = 1 | |||
| Persistent uint8 = 2 | |||
| ) | |||
| // The property flags are an array of bits that indicate the presence or | |||
| // absence of each property value in sequence. The bits are ordered from most | |||
| // high to low - bit 15 indicates the first property. | |||
| const ( | |||
| flagContentType = 0x8000 | |||
| flagContentEncoding = 0x4000 | |||
| flagHeaders = 0x2000 | |||
| flagDeliveryMode = 0x1000 | |||
| flagPriority = 0x0800 | |||
| flagCorrelationId = 0x0400 | |||
| flagReplyTo = 0x0200 | |||
| flagExpiration = 0x0100 | |||
| flagMessageId = 0x0080 | |||
| flagTimestamp = 0x0040 | |||
| flagType = 0x0020 | |||
| flagUserId = 0x0010 | |||
| flagAppId = 0x0008 | |||
| flagReserved1 = 0x0004 | |||
| ) | |||
| // Queue captures the current server state of the queue on the server returned | |||
| // from Channel.QueueDeclare or Channel.QueueInspect. | |||
| type Queue struct { | |||
| Name string // server confirmed or generated name | |||
| Messages int // count of messages not awaiting acknowledgment | |||
| Consumers int // number of consumers receiving deliveries | |||
| } | |||
| // Publishing captures the client message sent to the server. The fields | |||
| // outside of the Headers table included in this struct mirror the underlying | |||
| // fields in the content frame. They use native types for convenience and | |||
| // efficiency. | |||
| type Publishing struct { | |||
| // Application or exchange specific fields, | |||
| // the headers exchange will inspect this field. | |||
| Headers Table | |||
| // Properties | |||
| ContentType string // MIME content type | |||
| ContentEncoding string // MIME content encoding | |||
| DeliveryMode uint8 // Transient (0 or 1) or Persistent (2) | |||
| Priority uint8 // 0 to 9 | |||
| CorrelationId string // correlation identifier | |||
| ReplyTo string // address to to reply to (ex: RPC) | |||
| Expiration string // message expiration spec | |||
| MessageId string // message identifier | |||
| Timestamp time.Time // message timestamp | |||
| Type string // message type name | |||
| UserId string // creating user id - ex: "guest" | |||
| AppId string // creating application id | |||
| // The application specific payload of the message | |||
| Body []byte | |||
| } | |||
| // Blocking notifies the server's TCP flow control of the Connection. When a | |||
| // server hits a memory or disk alarm it will block all connections until the | |||
| // resources are reclaimed. Use NotifyBlock on the Connection to receive these | |||
| // events. | |||
| type Blocking struct { | |||
| Active bool // TCP pushback active/inactive on server | |||
| Reason string // Server reason for activation | |||
| } | |||
| // Confirmation notifies the acknowledgment or negative acknowledgement of a | |||
| // publishing identified by its delivery tag. Use NotifyPublish on the Channel | |||
| // to consume these events. | |||
| type Confirmation struct { | |||
| DeliveryTag uint64 // A 1 based counter of publishings from when the channel was put in Confirm mode | |||
| Ack bool // True when the server successfully received the publishing | |||
| } | |||
| // Decimal matches the AMQP decimal type. Scale is the number of decimal | |||
| // digits Scale == 2, Value == 12345, Decimal == 123.45 | |||
| type Decimal struct { | |||
| Scale uint8 | |||
| Value int32 | |||
| } | |||
| // Table stores user supplied fields of the following types: | |||
| // | |||
| // bool | |||
| // byte | |||
| // float32 | |||
| // float64 | |||
| // int | |||
| // int16 | |||
| // int32 | |||
| // int64 | |||
| // nil | |||
| // string | |||
| // time.Time | |||
| // amqp.Decimal | |||
| // amqp.Table | |||
| // []byte | |||
| // []interface{} - containing above types | |||
| // | |||
| // Functions taking a table will immediately fail when the table contains a | |||
| // value of an unsupported type. | |||
| // | |||
| // The caller must be specific in which precision of integer it wishes to | |||
| // encode. | |||
| // | |||
| // Use a type assertion when reading values from a table for type conversion. | |||
| // | |||
| // RabbitMQ expects int32 for integer values. | |||
| // | |||
| type Table map[string]interface{} | |||
| func validateField(f interface{}) error { | |||
| switch fv := f.(type) { | |||
| case nil, bool, byte, int, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time: | |||
| return nil | |||
| case []interface{}: | |||
| for _, v := range fv { | |||
| if err := validateField(v); err != nil { | |||
| return fmt.Errorf("in array %s", err) | |||
| } | |||
| } | |||
| return nil | |||
| case Table: | |||
| for k, v := range fv { | |||
| if err := validateField(v); err != nil { | |||
| return fmt.Errorf("table field %q %s", k, err) | |||
| } | |||
| } | |||
| return nil | |||
| } | |||
| return fmt.Errorf("value %T not supported", f) | |||
| } | |||
| // Validate returns and error if any Go types in the table are incompatible with AMQP types. | |||
| func (t Table) Validate() error { | |||
| return validateField(t) | |||
| } | |||
| // Heap interface for maintaining delivery tags | |||
| type tagSet []uint64 | |||
| func (set tagSet) Len() int { return len(set) } | |||
| func (set tagSet) Less(i, j int) bool { return (set)[i] < (set)[j] } | |||
| func (set tagSet) Swap(i, j int) { (set)[i], (set)[j] = (set)[j], (set)[i] } | |||
| func (set *tagSet) Push(tag interface{}) { *set = append(*set, tag.(uint64)) } | |||
| func (set *tagSet) Pop() interface{} { | |||
| val := (*set)[len(*set)-1] | |||
| *set = (*set)[:len(*set)-1] | |||
| return val | |||
| } | |||
| type message interface { | |||
| id() (uint16, uint16) | |||
| wait() bool | |||
| read(io.Reader) error | |||
| write(io.Writer) error | |||
| } | |||
| type messageWithContent interface { | |||
| message | |||
| getContent() (properties, []byte) | |||
| setContent(properties, []byte) | |||
| } | |||
| /* | |||
| The base interface implemented as: | |||
| 2.3.5 frame Details | |||
| All frames consist of a header (7 octets), a payload of arbitrary size, and a 'frame-end' octet that detects | |||
| malformed frames: | |||
| 0 1 3 7 size+7 size+8 | |||
| +------+---------+-------------+ +------------+ +-----------+ | |||
| | type | channel | size | | payload | | frame-end | | |||
| +------+---------+-------------+ +------------+ +-----------+ | |||
| octet short long size octets octet | |||
| To read a frame, we: | |||
| 1. Read the header and check the frame type and channel. | |||
| 2. Depending on the frame type, we read the payload and process it. | |||
| 3. Read the frame end octet. | |||
| In realistic implementations where performance is a concern, we would use | |||
| “read-ahead buffering” or “gathering reads” to avoid doing three separate | |||
| system calls to read a frame. | |||
| */ | |||
| type frame interface { | |||
| write(io.Writer) error | |||
| channel() uint16 | |||
| } | |||
| type reader struct { | |||
| r io.Reader | |||
| } | |||
| type writer struct { | |||
| w io.Writer | |||
| } | |||
| // Implements the frame interface for Connection RPC | |||
| type protocolHeader struct{} | |||
| func (protocolHeader) write(w io.Writer) error { | |||
| _, err := w.Write([]byte{'A', 'M', 'Q', 'P', 0, 0, 9, 1}) | |||
| return err | |||
| } | |||
| func (protocolHeader) channel() uint16 { | |||
| panic("only valid as initial handshake") | |||
| } | |||
| /* | |||
| Method frames carry the high-level protocol commands (which we call "methods"). | |||
| One method frame carries one command. The method frame payload has this format: | |||
| 0 2 4 | |||
| +----------+-----------+-------------- - - | |||
| | class-id | method-id | arguments... | |||
| +----------+-----------+-------------- - - | |||
| short short ... | |||
| To process a method frame, we: | |||
| 1. Read the method frame payload. | |||
| 2. Unpack it into a structure. A given method always has the same structure, | |||
| so we can unpack the method rapidly. 3. Check that the method is allowed in | |||
| the current context. | |||
| 4. Check that the method arguments are valid. | |||
| 5. Execute the method. | |||
| Method frame bodies are constructed as a list of AMQP data fields (bits, | |||
| integers, strings and string tables). The marshalling code is trivially | |||
| generated directly from the protocol specifications, and can be very rapid. | |||
| */ | |||
| type methodFrame struct { | |||
| ChannelId uint16 | |||
| ClassId uint16 | |||
| MethodId uint16 | |||
| Method message | |||
| } | |||
| func (f *methodFrame) channel() uint16 { return f.ChannelId } | |||
| /* | |||
| Heartbeating is a technique designed to undo one of TCP/IP's features, namely | |||
| its ability to recover from a broken physical connection by closing only after | |||
| a quite long time-out. In some scenarios we need to know very rapidly if a | |||
| peer is disconnected or not responding for other reasons (e.g. it is looping). | |||
| Since heartbeating can be done at a low level, we implement this as a special | |||
| type of frame that peers exchange at the transport level, rather than as a | |||
| class method. | |||
| */ | |||
| type heartbeatFrame struct { | |||
| ChannelId uint16 | |||
| } | |||
| func (f *heartbeatFrame) channel() uint16 { return f.ChannelId } | |||
| /* | |||
| Certain methods (such as Basic.Publish, Basic.Deliver, etc.) are formally | |||
| defined as carrying content. When a peer sends such a method frame, it always | |||
| follows it with a content header and zero or more content body frames. | |||
| A content header frame has this format: | |||
| 0 2 4 12 14 | |||
| +----------+--------+-----------+----------------+------------- - - | |||
| | class-id | weight | body size | property flags | property list... | |||
| +----------+--------+-----------+----------------+------------- - - | |||
| short short long long short remainder... | |||
| We place content body in distinct frames (rather than including it in the | |||
| method) so that AMQP may support "zero copy" techniques in which content is | |||
| never marshalled or encoded. We place the content properties in their own | |||
| frame so that recipients can selectively discard contents they do not want to | |||
| process | |||
| */ | |||
| type headerFrame struct { | |||
| ChannelId uint16 | |||
| ClassId uint16 | |||
| weight uint16 | |||
| Size uint64 | |||
| Properties properties | |||
| } | |||
| func (f *headerFrame) channel() uint16 { return f.ChannelId } | |||
| /* | |||
| Content is the application data we carry from client-to-client via the AMQP | |||
| server. Content is, roughly speaking, a set of properties plus a binary data | |||
| part. The set of allowed properties are defined by the Basic class, and these | |||
| form the "content header frame". The data can be any size, and MAY be broken | |||
| into several (or many) chunks, each forming a "content body frame". | |||
| Looking at the frames for a specific channel, as they pass on the wire, we | |||
| might see something like this: | |||
| [method] | |||
| [method] [header] [body] [body] | |||
| [method] | |||
| ... | |||
| */ | |||
| type bodyFrame struct { | |||
| ChannelId uint16 | |||
| Body []byte | |||
| } | |||
| func (f *bodyFrame) channel() uint16 { return f.ChannelId } | |||
| @ -0,0 +1,176 @@ | |||
| // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. | |||
| // Use of this source code is governed by a BSD-style | |||
| // license that can be found in the LICENSE file. | |||
| // Source code and contact info at http://github.com/streadway/amqp | |||
| package amqp | |||
| import ( | |||
| "errors" | |||
| "net" | |||
| "net/url" | |||
| "strconv" | |||
| "strings" | |||
| ) | |||
| var errURIScheme = errors.New("AMQP scheme must be either 'amqp://' or 'amqps://'") | |||
| var errURIWhitespace = errors.New("URI must not contain whitespace") | |||
| var schemePorts = map[string]int{ | |||
| "amqp": 5672, | |||
| "amqps": 5671, | |||
| } | |||
| var defaultURI = URI{ | |||
| Scheme: "amqp", | |||
| Host: "localhost", | |||
| Port: 5672, | |||
| Username: "guest", | |||
| Password: "guest", | |||
| Vhost: "/", | |||
| } | |||
| // URI represents a parsed AMQP URI string. | |||
| type URI struct { | |||
| Scheme string | |||
| Host string | |||
| Port int | |||
| Username string | |||
| Password string | |||
| Vhost string | |||
| } | |||
| // ParseURI attempts to parse the given AMQP URI according to the spec. | |||
| // See http://www.rabbitmq.com/uri-spec.html. | |||
| // | |||
| // Default values for the fields are: | |||
| // | |||
| // Scheme: amqp | |||
| // Host: localhost | |||
| // Port: 5672 | |||
| // Username: guest | |||
| // Password: guest | |||
| // Vhost: / | |||
| // | |||
| func ParseURI(uri string) (URI, error) { | |||
| builder := defaultURI | |||
| if strings.Contains(uri, " ") == true { | |||
| return builder, errURIWhitespace | |||
| } | |||
| u, err := url.Parse(uri) | |||
| if err != nil { | |||
| return builder, err | |||
| } | |||
| defaultPort, okScheme := schemePorts[u.Scheme] | |||
| if okScheme { | |||
| builder.Scheme = u.Scheme | |||
| } else { | |||
| return builder, errURIScheme | |||
| } | |||
| host := u.Hostname() | |||
| port := u.Port() | |||
| if host != "" { | |||
| builder.Host = host | |||
| } | |||
| if port != "" { | |||
| port32, err := strconv.ParseInt(port, 10, 32) | |||
| if err != nil { | |||
| return builder, err | |||
| } | |||
| builder.Port = int(port32) | |||
| } else { | |||
| builder.Port = defaultPort | |||
| } | |||
| if u.User != nil { | |||
| builder.Username = u.User.Username() | |||
| if password, ok := u.User.Password(); ok { | |||
| builder.Password = password | |||
| } | |||
| } | |||
| if u.Path != "" { | |||
| if strings.HasPrefix(u.Path, "/") { | |||
| if u.Host == "" && strings.HasPrefix(u.Path, "///") { | |||
| // net/url doesn't handle local context authorities and leaves that up | |||
| // to the scheme handler. In our case, we translate amqp:/// into the | |||
| // default host and whatever the vhost should be | |||
| if len(u.Path) > 3 { | |||
| builder.Vhost = u.Path[3:] | |||
| } | |||
| } else if len(u.Path) > 1 { | |||
| builder.Vhost = u.Path[1:] | |||
| } | |||
| } else { | |||
| builder.Vhost = u.Path | |||
| } | |||
| } | |||
| return builder, nil | |||
| } | |||
| // PlainAuth returns a PlainAuth structure based on the parsed URI's | |||
| // Username and Password fields. | |||
| func (uri URI) PlainAuth() *PlainAuth { | |||
| return &PlainAuth{ | |||
| Username: uri.Username, | |||
| Password: uri.Password, | |||
| } | |||
| } | |||
| // AMQPlainAuth returns a PlainAuth structure based on the parsed URI's | |||
| // Username and Password fields. | |||
| func (uri URI) AMQPlainAuth() *AMQPlainAuth { | |||
| return &AMQPlainAuth{ | |||
| Username: uri.Username, | |||
| Password: uri.Password, | |||
| } | |||
| } | |||
| func (uri URI) String() string { | |||
| authority, err := url.Parse("") | |||
| if err != nil { | |||
| return err.Error() | |||
| } | |||
| authority.Scheme = uri.Scheme | |||
| if uri.Username != defaultURI.Username || uri.Password != defaultURI.Password { | |||
| authority.User = url.User(uri.Username) | |||
| if uri.Password != defaultURI.Password { | |||
| authority.User = url.UserPassword(uri.Username, uri.Password) | |||
| } | |||
| } | |||
| authority.Host = net.JoinHostPort(uri.Host, strconv.Itoa(uri.Port)) | |||
| if defaultPort, found := schemePorts[uri.Scheme]; !found || defaultPort != uri.Port { | |||
| authority.Host = net.JoinHostPort(uri.Host, strconv.Itoa(uri.Port)) | |||
| } else { | |||
| // JoinHostPort() automatically add brackets to the host if it's | |||
| // an IPv6 address. | |||
| // | |||
| // If not port is specified, JoinHostPort() return an IP address in the | |||
| // form of "[::1]:", so we use TrimSuffix() to remove the extra ":". | |||
| authority.Host = strings.TrimSuffix(net.JoinHostPort(uri.Host, ""), ":") | |||
| } | |||
| if uri.Vhost != defaultURI.Vhost { | |||
| // Make sure net/url does not double escape, e.g. | |||
| // "%2F" does not become "%252F". | |||
| authority.Path = uri.Vhost | |||
| authority.RawPath = url.QueryEscape(uri.Vhost) | |||
| } else { | |||
| authority.Path = "/" | |||
| } | |||
| return authority.String() | |||
| } | |||
| @ -0,0 +1,416 @@ | |||
| // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. | |||
| // Use of this source code is governed by a BSD-style | |||
| // license that can be found in the LICENSE file. | |||
| // Source code and contact info at http://github.com/streadway/amqp | |||
| package amqp | |||
| import ( | |||
| "bufio" | |||
| "bytes" | |||
| "encoding/binary" | |||
| "errors" | |||
| "io" | |||
| "math" | |||
| "time" | |||
| ) | |||
| func (w *writer) WriteFrame(frame frame) (err error) { | |||
| if err = frame.write(w.w); err != nil { | |||
| return | |||
| } | |||
| if buf, ok := w.w.(*bufio.Writer); ok { | |||
| err = buf.Flush() | |||
| } | |||
| return | |||
| } | |||
| func (f *methodFrame) write(w io.Writer) (err error) { | |||
| var payload bytes.Buffer | |||
| if f.Method == nil { | |||
| return errors.New("malformed frame: missing method") | |||
| } | |||
| class, method := f.Method.id() | |||
| if err = binary.Write(&payload, binary.BigEndian, class); err != nil { | |||
| return | |||
| } | |||
| if err = binary.Write(&payload, binary.BigEndian, method); err != nil { | |||
| return | |||
| } | |||
| if err = f.Method.write(&payload); err != nil { | |||
| return | |||
| } | |||
| return writeFrame(w, frameMethod, f.ChannelId, payload.Bytes()) | |||
| } | |||
| // Heartbeat | |||
| // | |||
| // Payload is empty | |||
| func (f *heartbeatFrame) write(w io.Writer) (err error) { | |||
| return writeFrame(w, frameHeartbeat, f.ChannelId, []byte{}) | |||
| } | |||
| // CONTENT HEADER | |||
| // 0 2 4 12 14 | |||
| // +----------+--------+-----------+----------------+------------- - - | |||
| // | class-id | weight | body size | property flags | property list... | |||
| // +----------+--------+-----------+----------------+------------- - - | |||
| // short short long long short remainder... | |||
| // | |||
| func (f *headerFrame) write(w io.Writer) (err error) { | |||
| var payload bytes.Buffer | |||
| var zeroTime time.Time | |||
| if err = binary.Write(&payload, binary.BigEndian, f.ClassId); err != nil { | |||
| return | |||
| } | |||
| if err = binary.Write(&payload, binary.BigEndian, f.weight); err != nil { | |||
| return | |||
| } | |||
| if err = binary.Write(&payload, binary.BigEndian, f.Size); err != nil { | |||
| return | |||
| } | |||
| // First pass will build the mask to be serialized, second pass will serialize | |||
| // each of the fields that appear in the mask. | |||
| var mask uint16 | |||
| if len(f.Properties.ContentType) > 0 { | |||
| mask = mask | flagContentType | |||
| } | |||
| if len(f.Properties.ContentEncoding) > 0 { | |||
| mask = mask | flagContentEncoding | |||
| } | |||
| if f.Properties.Headers != nil && len(f.Properties.Headers) > 0 { | |||
| mask = mask | flagHeaders | |||
| } | |||
| if f.Properties.DeliveryMode > 0 { | |||
| mask = mask | flagDeliveryMode | |||
| } | |||
| if f.Properties.Priority > 0 { | |||
| mask = mask | flagPriority | |||
| } | |||
| if len(f.Properties.CorrelationId) > 0 { | |||
| mask = mask | flagCorrelationId | |||
| } | |||
| if len(f.Properties.ReplyTo) > 0 { | |||
| mask = mask | flagReplyTo | |||
| } | |||
| if len(f.Properties.Expiration) > 0 { | |||
| mask = mask | flagExpiration | |||
| } | |||
| if len(f.Properties.MessageId) > 0 { | |||
| mask = mask | flagMessageId | |||
| } | |||
| if f.Properties.Timestamp != zeroTime { | |||
| mask = mask | flagTimestamp | |||
| } | |||
| if len(f.Properties.Type) > 0 { | |||
| mask = mask | flagType | |||
| } | |||
| if len(f.Properties.UserId) > 0 { | |||
| mask = mask | flagUserId | |||
| } | |||
| if len(f.Properties.AppId) > 0 { | |||
| mask = mask | flagAppId | |||
| } | |||
| if err = binary.Write(&payload, binary.BigEndian, mask); err != nil { | |||
| return | |||
| } | |||
| if hasProperty(mask, flagContentType) { | |||
| if err = writeShortstr(&payload, f.Properties.ContentType); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(mask, flagContentEncoding) { | |||
| if err = writeShortstr(&payload, f.Properties.ContentEncoding); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(mask, flagHeaders) { | |||
| if err = writeTable(&payload, f.Properties.Headers); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(mask, flagDeliveryMode) { | |||
| if err = binary.Write(&payload, binary.BigEndian, f.Properties.DeliveryMode); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(mask, flagPriority) { | |||
| if err = binary.Write(&payload, binary.BigEndian, f.Properties.Priority); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(mask, flagCorrelationId) { | |||
| if err = writeShortstr(&payload, f.Properties.CorrelationId); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(mask, flagReplyTo) { | |||
| if err = writeShortstr(&payload, f.Properties.ReplyTo); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(mask, flagExpiration) { | |||
| if err = writeShortstr(&payload, f.Properties.Expiration); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(mask, flagMessageId) { | |||
| if err = writeShortstr(&payload, f.Properties.MessageId); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(mask, flagTimestamp) { | |||
| if err = binary.Write(&payload, binary.BigEndian, uint64(f.Properties.Timestamp.Unix())); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(mask, flagType) { | |||
| if err = writeShortstr(&payload, f.Properties.Type); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(mask, flagUserId) { | |||
| if err = writeShortstr(&payload, f.Properties.UserId); err != nil { | |||
| return | |||
| } | |||
| } | |||
| if hasProperty(mask, flagAppId) { | |||
| if err = writeShortstr(&payload, f.Properties.AppId); err != nil { | |||
| return | |||
| } | |||
| } | |||
| return writeFrame(w, frameHeader, f.ChannelId, payload.Bytes()) | |||
| } | |||
| // Body | |||
| // | |||
| // Payload is one byterange from the full body who's size is declared in the | |||
| // Header frame | |||
| func (f *bodyFrame) write(w io.Writer) (err error) { | |||
| return writeFrame(w, frameBody, f.ChannelId, f.Body) | |||
| } | |||
| func writeFrame(w io.Writer, typ uint8, channel uint16, payload []byte) (err error) { | |||
| end := []byte{frameEnd} | |||
| size := uint(len(payload)) | |||
| _, err = w.Write([]byte{ | |||
| byte(typ), | |||
| byte((channel & 0xff00) >> 8), | |||
| byte((channel & 0x00ff) >> 0), | |||
| byte((size & 0xff000000) >> 24), | |||
| byte((size & 0x00ff0000) >> 16), | |||
| byte((size & 0x0000ff00) >> 8), | |||
| byte((size & 0x000000ff) >> 0), | |||
| }) | |||
| if err != nil { | |||
| return | |||
| } | |||
| if _, err = w.Write(payload); err != nil { | |||
| return | |||
| } | |||
| if _, err = w.Write(end); err != nil { | |||
| return | |||
| } | |||
| return | |||
| } | |||
| func writeShortstr(w io.Writer, s string) (err error) { | |||
| b := []byte(s) | |||
| var length = uint8(len(b)) | |||
| if err = binary.Write(w, binary.BigEndian, length); err != nil { | |||
| return | |||
| } | |||
| if _, err = w.Write(b[:length]); err != nil { | |||
| return | |||
| } | |||
| return | |||
| } | |||
| func writeLongstr(w io.Writer, s string) (err error) { | |||
| b := []byte(s) | |||
| var length = uint32(len(b)) | |||
| if err = binary.Write(w, binary.BigEndian, length); err != nil { | |||
| return | |||
| } | |||
| if _, err = w.Write(b[:length]); err != nil { | |||
| return | |||
| } | |||
| return | |||
| } | |||
| /* | |||
| 'A': []interface{} | |||
| 'D': Decimal | |||
| 'F': Table | |||
| 'I': int32 | |||
| 'S': string | |||
| 'T': time.Time | |||
| 'V': nil | |||
| 'b': byte | |||
| 'd': float64 | |||
| 'f': float32 | |||
| 'l': int64 | |||
| 's': int16 | |||
| 't': bool | |||
| 'x': []byte | |||
| */ | |||
| func writeField(w io.Writer, value interface{}) (err error) { | |||
| var buf [9]byte | |||
| var enc []byte | |||
| switch v := value.(type) { | |||
| case bool: | |||
| buf[0] = 't' | |||
| if v { | |||
| buf[1] = byte(1) | |||
| } else { | |||
| buf[1] = byte(0) | |||
| } | |||
| enc = buf[:2] | |||
| case byte: | |||
| buf[0] = 'b' | |||
| buf[1] = byte(v) | |||
| enc = buf[:2] | |||
| case int16: | |||
| buf[0] = 's' | |||
| binary.BigEndian.PutUint16(buf[1:3], uint16(v)) | |||
| enc = buf[:3] | |||
| case int: | |||
| buf[0] = 'I' | |||
| binary.BigEndian.PutUint32(buf[1:5], uint32(v)) | |||
| enc = buf[:5] | |||
| case int32: | |||
| buf[0] = 'I' | |||
| binary.BigEndian.PutUint32(buf[1:5], uint32(v)) | |||
| enc = buf[:5] | |||
| case int64: | |||
| buf[0] = 'l' | |||
| binary.BigEndian.PutUint64(buf[1:9], uint64(v)) | |||
| enc = buf[:9] | |||
| case float32: | |||
| buf[0] = 'f' | |||
| binary.BigEndian.PutUint32(buf[1:5], math.Float32bits(v)) | |||
| enc = buf[:5] | |||
| case float64: | |||
| buf[0] = 'd' | |||
| binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(v)) | |||
| enc = buf[:9] | |||
| case Decimal: | |||
| buf[0] = 'D' | |||
| buf[1] = byte(v.Scale) | |||
| binary.BigEndian.PutUint32(buf[2:6], uint32(v.Value)) | |||
| enc = buf[:6] | |||
| case string: | |||
| buf[0] = 'S' | |||
| binary.BigEndian.PutUint32(buf[1:5], uint32(len(v))) | |||
| enc = append(buf[:5], []byte(v)...) | |||
| case []interface{}: // field-array | |||
| buf[0] = 'A' | |||
| sec := new(bytes.Buffer) | |||
| for _, val := range v { | |||
| if err = writeField(sec, val); err != nil { | |||
| return | |||
| } | |||
| } | |||
| binary.BigEndian.PutUint32(buf[1:5], uint32(sec.Len())) | |||
| if _, err = w.Write(buf[:5]); err != nil { | |||
| return | |||
| } | |||
| if _, err = w.Write(sec.Bytes()); err != nil { | |||
| return | |||
| } | |||
| return | |||
| case time.Time: | |||
| buf[0] = 'T' | |||
| binary.BigEndian.PutUint64(buf[1:9], uint64(v.Unix())) | |||
| enc = buf[:9] | |||
| case Table: | |||
| if _, err = w.Write([]byte{'F'}); err != nil { | |||
| return | |||
| } | |||
| return writeTable(w, v) | |||
| case []byte: | |||
| buf[0] = 'x' | |||
| binary.BigEndian.PutUint32(buf[1:5], uint32(len(v))) | |||
| if _, err = w.Write(buf[0:5]); err != nil { | |||
| return | |||
| } | |||
| if _, err = w.Write(v); err != nil { | |||
| return | |||
| } | |||
| return | |||
| case nil: | |||
| buf[0] = 'V' | |||
| enc = buf[:1] | |||
| default: | |||
| return ErrFieldType | |||
| } | |||
| _, err = w.Write(enc) | |||
| return | |||
| } | |||
| func writeTable(w io.Writer, table Table) (err error) { | |||
| var buf bytes.Buffer | |||
| for key, val := range table { | |||
| if err = writeShortstr(&buf, key); err != nil { | |||
| return | |||
| } | |||
| if err = writeField(&buf, val); err != nil { | |||
| return | |||
| } | |||
| } | |||
| return writeLongstr(w, string(buf.Bytes())) | |||
| } | |||
| @ -0,0 +1,3 @@ | |||
| # github.com/streadway/amqp v1.0.0 | |||
| ## explicit | |||
| github.com/streadway/amqp | |||