Browse Source

Merge branch 'wagslane:main' into connectionConfig

pull/20/head
Brian 5 years ago
parent
commit
e76ec2a67e
2 changed files with 207 additions and 207 deletions
  1. +1
    -1
      channel.go
  2. +206
    -206
      consume_options.go

+ 1
- 1
channel.go View File

@ -60,7 +60,7 @@ func (chManager *channelManager) startNotifyCancelOrClosed() {
select {
case err := <-notifyCloseChan:
// If the connection close is triggered by the Server, a reconnection takes place
if err.Server {
if err != nil && err.Server {
chManager.logger.Printf("attempting to reconnect to amqp server after close")
chManager.reconnectWithBackoff()
chManager.logger.Printf("successfully reconnected to amqp server after close")


+ 206
- 206
consume_options.go View File

@ -1,206 +1,206 @@
package rabbitmq
// getDefaultConsumeOptions descibes the options that will be used when a value isn't provided
func getDefaultConsumeOptions() ConsumeOptions {
return ConsumeOptions{
QueueDurable: false,
QueueAutoDelete: false,
QueueExclusive: false,
QueueNoWait: false,
QueueArgs: nil,
BindingExchange: nil,
BindingNoWait: false,
BindingArgs: nil,
Concurrency: 1,
QOSPrefetch: 0,
QOSGlobal: false,
ConsumerName: "",
ConsumerAutoAck: false,
ConsumerExclusive: false,
ConsumerNoWait: false,
ConsumerNoLocal: false,
ConsumerArgs: nil,
}
}
// ConsumeOptions are used to describe how a new consumer will be created.
type ConsumeOptions struct {
QueueDurable bool
QueueAutoDelete bool
QueueExclusive bool
QueueNoWait bool
QueueArgs Table
BindingExchange *BindingExchangeOptions
BindingNoWait bool
BindingArgs Table
Concurrency int
QOSPrefetch int
QOSGlobal bool
ConsumerName string
ConsumerAutoAck bool
ConsumerExclusive bool
ConsumerNoWait bool
ConsumerNoLocal bool
ConsumerArgs Table
}
// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values.
func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExchangeOptions {
if options.BindingExchange == nil {
options.BindingExchange = &BindingExchangeOptions{
Name: "",
Kind: "direct",
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
ExchangeArgs: nil,
}
}
return options.BindingExchange
}
// BindingExchangeOptions are used when binding to an exchange.
// it will verify the exchange is created before binding to it.
type BindingExchangeOptions struct {
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
ExchangeArgs Table
}
// WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't
// be destroyed when the server restarts. It must only be bound to durable exchanges
func WithConsumeOptionsQueueDurable(options *ConsumeOptions) {
options.QueueDurable = true
}
// WithConsumeOptionsQueueAutoDelete sets the queue to auto delete, which means it will
// be deleted when there are no more conusmers on it
func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions) {
options.QueueAutoDelete = true
}
// WithConsumeOptionsQueueExclusive sets the queue to exclusive, which means
// it's are only accessible by the connection that declares it and
// will be deleted when the connection closes. Channels on other connections
// will receive an error when attempting to declare, bind, consume, purge or
// delete a queue with the same name.
func WithConsumeOptionsQueueExclusive(options *ConsumeOptions) {
options.QueueExclusive = true
}
// WithConsumeOptionsQueueNoWait sets the queue to nowait, which means
// the queue will assume to be declared on the server. A
// channel exception will arrive if the conditions are met for existing queues
// or attempting to modify an existing queue from a different connection.
func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) {
options.QueueNoWait = true
}
// WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes
// in the cluster will have the messages distributed amongst them for higher reliability
func WithConsumeOptionsQuorum(options *ConsumeOptions) {
if options.QueueArgs == nil {
options.QueueArgs = Table{}
}
options.QueueArgs["x-queue-type"] = "quorum"
}
// WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to
func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Name = name
}
}
// WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type
func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Kind = kind
}
}
// WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag
func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Durable = true
}
// WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag
func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).AutoDelete = true
}
// WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag
func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Internal = true
}
// WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag
func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).NoWait = true
}
// WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange
func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).ExchangeArgs = args
}
}
// WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound
// the channel will not be closed with an error.
func WithConsumeOptionsBindingNoWait(options *ConsumeOptions) {
options.BindingNoWait = true
}
// WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that
// many goroutines will be spawned to run the provided handler on messages
func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.Concurrency = concurrency
}
}
// WithConsumeOptionsQOSPrefetch returns a function that sets the prefetch count, which means that
// many messages will be fetched from the server in advance to help with throughput.
// This doesn't affect the handler, messages are still processed one at a time.
func WithConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.QOSPrefetch = prefetchCount
}
}
// WithConsumeOptionsQOSGlobal sets the qos on the channel to global, which means
// these QOS settings apply to ALL existing and future
// consumers on all channels on the same connection
func WithConsumeOptionsQOSGlobal(options *ConsumeOptions) {
options.QOSGlobal = true
}
// WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer
// if unset a random name will be given
func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.ConsumerName = consumerName
}
}
// WithConsumeOptionsConsumerExclusive sets the consumer to exclusive, which means
// the server will ensure that this is the sole consumer
// from this queue. When exclusive is false, the server will fairly distribute
// deliveries across multiple consumers.
func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) {
options.ConsumerExclusive = true
}
// WithConsumeOptionsConsumerNoWait sets the consumer to nowait, which means
// it does not wait for the server to confirm the request and
// immediately begin deliveries. If it is not possible to consume, a channel
// exception will be raised and the channel will be closed.
func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions) {
options.ConsumerNoWait = true
}
package rabbitmq
// getDefaultConsumeOptions descibes the options that will be used when a value isn't provided
func getDefaultConsumeOptions() ConsumeOptions {
return ConsumeOptions{
QueueDurable: false,
QueueAutoDelete: false,
QueueExclusive: false,
QueueNoWait: false,
QueueArgs: nil,
BindingExchange: nil,
BindingNoWait: false,
BindingArgs: nil,
Concurrency: 1,
QOSPrefetch: 0,
QOSGlobal: false,
ConsumerName: "",
ConsumerAutoAck: false,
ConsumerExclusive: false,
ConsumerNoWait: false,
ConsumerNoLocal: false,
ConsumerArgs: nil,
}
}
// ConsumeOptions are used to describe how a new consumer will be created.
type ConsumeOptions struct {
QueueDurable bool
QueueAutoDelete bool
QueueExclusive bool
QueueNoWait bool
QueueArgs Table
BindingExchange *BindingExchangeOptions
BindingNoWait bool
BindingArgs Table
Concurrency int
QOSPrefetch int
QOSGlobal bool
ConsumerName string
ConsumerAutoAck bool
ConsumerExclusive bool
ConsumerNoWait bool
ConsumerNoLocal bool
ConsumerArgs Table
}
// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values.
func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExchangeOptions {
if options.BindingExchange == nil {
options.BindingExchange = &BindingExchangeOptions{
Name: "",
Kind: "direct",
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
ExchangeArgs: nil,
}
}
return options.BindingExchange
}
// BindingExchangeOptions are used when binding to an exchange.
// it will verify the exchange is created before binding to it.
type BindingExchangeOptions struct {
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
ExchangeArgs Table
}
// WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't
// be destroyed when the server restarts. It must only be bound to durable exchanges
func WithConsumeOptionsQueueDurable(options *ConsumeOptions) {
options.QueueDurable = true
}
// WithConsumeOptionsQueueAutoDelete sets the queue to auto delete, which means it will
// be deleted when there are no more conusmers on it
func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions) {
options.QueueAutoDelete = true
}
// WithConsumeOptionsQueueExclusive sets the queue to exclusive, which means
// it's are only accessible by the connection that declares it and
// will be deleted when the connection closes. Channels on other connections
// will receive an error when attempting to declare, bind, consume, purge or
// delete a queue with the same name.
func WithConsumeOptionsQueueExclusive(options *ConsumeOptions) {
options.QueueExclusive = true
}
// WithConsumeOptionsQueueNoWait sets the queue to nowait, which means
// the queue will assume to be declared on the server. A
// channel exception will arrive if the conditions are met for existing queues
// or attempting to modify an existing queue from a different connection.
func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) {
options.QueueNoWait = true
}
// WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes
// in the cluster will have the messages distributed amongst them for higher reliability
func WithConsumeOptionsQuorum(options *ConsumeOptions) {
if options.QueueArgs == nil {
options.QueueArgs = Table{}
}
options.QueueArgs["x-queue-type"] = "quorum"
}
// WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to
func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Name = name
}
}
// WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type
func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Kind = kind
}
}
// WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag
func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Durable = true
}
// WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag
func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).AutoDelete = true
}
// WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag
func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Internal = true
}
// WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag
func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).NoWait = true
}
// WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange
func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).ExchangeArgs = args
}
}
// WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound
// the channel will not be closed with an error.
func WithConsumeOptionsBindingNoWait(options *ConsumeOptions) {
options.BindingNoWait = true
}
// WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that
// many goroutines will be spawned to run the provided handler on messages
func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.Concurrency = concurrency
}
}
// WithConsumeOptionsQOSPrefetch returns a function that sets the prefetch count, which means that
// many messages will be fetched from the server in advance to help with throughput.
// This doesn't affect the handler, messages are still processed one at a time.
func WithConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.QOSPrefetch = prefetchCount
}
}
// WithConsumeOptionsQOSGlobal sets the qos on the channel to global, which means
// these QOS settings apply to ALL existing and future
// consumers on all channels on the same connection
func WithConsumeOptionsQOSGlobal(options *ConsumeOptions) {
options.QOSGlobal = true
}
// WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer
// if unset a random name will be given
func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.ConsumerName = consumerName
}
}
// WithConsumeOptionsConsumerExclusive sets the consumer to exclusive, which means
// the server will ensure that this is the sole consumer
// from this queue. When exclusive is false, the server will fairly distribute
// deliveries across multiple consumers.
func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) {
options.ConsumerExclusive = true
}
// WithConsumeOptionsConsumerNoWait sets the consumer to nowait, which means
// it does not wait for the server to confirm the request and
// immediately begin deliveries. If it is not possible to consume, a channel
// exception will be raised and the channel will be closed.
func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions) {
options.ConsumerNoWait = true
}

Loading…
Cancel
Save