| @ -1,206 +1,206 @@ | |||||
| package rabbitmq | |||||
| // getDefaultConsumeOptions descibes the options that will be used when a value isn't provided | |||||
| func getDefaultConsumeOptions() ConsumeOptions { | |||||
| return ConsumeOptions{ | |||||
| QueueDurable: false, | |||||
| QueueAutoDelete: false, | |||||
| QueueExclusive: false, | |||||
| QueueNoWait: false, | |||||
| QueueArgs: nil, | |||||
| BindingExchange: nil, | |||||
| BindingNoWait: false, | |||||
| BindingArgs: nil, | |||||
| Concurrency: 1, | |||||
| QOSPrefetch: 0, | |||||
| QOSGlobal: false, | |||||
| ConsumerName: "", | |||||
| ConsumerAutoAck: false, | |||||
| ConsumerExclusive: false, | |||||
| ConsumerNoWait: false, | |||||
| ConsumerNoLocal: false, | |||||
| ConsumerArgs: nil, | |||||
| } | |||||
| } | |||||
| // ConsumeOptions are used to describe how a new consumer will be created. | |||||
| type ConsumeOptions struct { | |||||
| QueueDurable bool | |||||
| QueueAutoDelete bool | |||||
| QueueExclusive bool | |||||
| QueueNoWait bool | |||||
| QueueArgs Table | |||||
| BindingExchange *BindingExchangeOptions | |||||
| BindingNoWait bool | |||||
| BindingArgs Table | |||||
| Concurrency int | |||||
| QOSPrefetch int | |||||
| QOSGlobal bool | |||||
| ConsumerName string | |||||
| ConsumerAutoAck bool | |||||
| ConsumerExclusive bool | |||||
| ConsumerNoWait bool | |||||
| ConsumerNoLocal bool | |||||
| ConsumerArgs Table | |||||
| } | |||||
| // getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values. | |||||
| func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExchangeOptions { | |||||
| if options.BindingExchange == nil { | |||||
| options.BindingExchange = &BindingExchangeOptions{ | |||||
| Name: "", | |||||
| Kind: "direct", | |||||
| Durable: false, | |||||
| AutoDelete: false, | |||||
| Internal: false, | |||||
| NoWait: false, | |||||
| ExchangeArgs: nil, | |||||
| } | |||||
| } | |||||
| return options.BindingExchange | |||||
| } | |||||
| // BindingExchangeOptions are used when binding to an exchange. | |||||
| // it will verify the exchange is created before binding to it. | |||||
| type BindingExchangeOptions struct { | |||||
| Name string | |||||
| Kind string | |||||
| Durable bool | |||||
| AutoDelete bool | |||||
| Internal bool | |||||
| NoWait bool | |||||
| ExchangeArgs Table | |||||
| } | |||||
| // WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't | |||||
| // be destroyed when the server restarts. It must only be bound to durable exchanges | |||||
| func WithConsumeOptionsQueueDurable(options *ConsumeOptions) { | |||||
| options.QueueDurable = true | |||||
| } | |||||
| // WithConsumeOptionsQueueAutoDelete sets the queue to auto delete, which means it will | |||||
| // be deleted when there are no more conusmers on it | |||||
| func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions) { | |||||
| options.QueueAutoDelete = true | |||||
| } | |||||
| // WithConsumeOptionsQueueExclusive sets the queue to exclusive, which means | |||||
| // it's are only accessible by the connection that declares it and | |||||
| // will be deleted when the connection closes. Channels on other connections | |||||
| // will receive an error when attempting to declare, bind, consume, purge or | |||||
| // delete a queue with the same name. | |||||
| func WithConsumeOptionsQueueExclusive(options *ConsumeOptions) { | |||||
| options.QueueExclusive = true | |||||
| } | |||||
| // WithConsumeOptionsQueueNoWait sets the queue to nowait, which means | |||||
| // the queue will assume to be declared on the server. A | |||||
| // channel exception will arrive if the conditions are met for existing queues | |||||
| // or attempting to modify an existing queue from a different connection. | |||||
| func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) { | |||||
| options.QueueNoWait = true | |||||
| } | |||||
| // WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes | |||||
| // in the cluster will have the messages distributed amongst them for higher reliability | |||||
| func WithConsumeOptionsQuorum(options *ConsumeOptions) { | |||||
| if options.QueueArgs == nil { | |||||
| options.QueueArgs = Table{} | |||||
| } | |||||
| options.QueueArgs["x-queue-type"] = "quorum" | |||||
| } | |||||
| // WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to | |||||
| func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions) { | |||||
| return func(options *ConsumeOptions) { | |||||
| getBindingExchangeOptionsOrSetDefault(options).Name = name | |||||
| } | |||||
| } | |||||
| // WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type | |||||
| func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) { | |||||
| return func(options *ConsumeOptions) { | |||||
| getBindingExchangeOptionsOrSetDefault(options).Kind = kind | |||||
| } | |||||
| } | |||||
| // WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag | |||||
| func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions) { | |||||
| getBindingExchangeOptionsOrSetDefault(options).Durable = true | |||||
| } | |||||
| // WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag | |||||
| func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions) { | |||||
| getBindingExchangeOptionsOrSetDefault(options).AutoDelete = true | |||||
| } | |||||
| // WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag | |||||
| func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions) { | |||||
| getBindingExchangeOptionsOrSetDefault(options).Internal = true | |||||
| } | |||||
| // WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag | |||||
| func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions) { | |||||
| getBindingExchangeOptionsOrSetDefault(options).NoWait = true | |||||
| } | |||||
| // WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange | |||||
| func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) { | |||||
| return func(options *ConsumeOptions) { | |||||
| getBindingExchangeOptionsOrSetDefault(options).ExchangeArgs = args | |||||
| } | |||||
| } | |||||
| // WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound | |||||
| // the channel will not be closed with an error. | |||||
| func WithConsumeOptionsBindingNoWait(options *ConsumeOptions) { | |||||
| options.BindingNoWait = true | |||||
| } | |||||
| // WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that | |||||
| // many goroutines will be spawned to run the provided handler on messages | |||||
| func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) { | |||||
| return func(options *ConsumeOptions) { | |||||
| options.Concurrency = concurrency | |||||
| } | |||||
| } | |||||
| // WithConsumeOptionsQOSPrefetch returns a function that sets the prefetch count, which means that | |||||
| // many messages will be fetched from the server in advance to help with throughput. | |||||
| // This doesn't affect the handler, messages are still processed one at a time. | |||||
| func WithConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions) { | |||||
| return func(options *ConsumeOptions) { | |||||
| options.QOSPrefetch = prefetchCount | |||||
| } | |||||
| } | |||||
| // WithConsumeOptionsQOSGlobal sets the qos on the channel to global, which means | |||||
| // these QOS settings apply to ALL existing and future | |||||
| // consumers on all channels on the same connection | |||||
| func WithConsumeOptionsQOSGlobal(options *ConsumeOptions) { | |||||
| options.QOSGlobal = true | |||||
| } | |||||
| // WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer | |||||
| // if unset a random name will be given | |||||
| func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) { | |||||
| return func(options *ConsumeOptions) { | |||||
| options.ConsumerName = consumerName | |||||
| } | |||||
| } | |||||
| // WithConsumeOptionsConsumerExclusive sets the consumer to exclusive, which means | |||||
| // the server will ensure that this is the sole consumer | |||||
| // from this queue. When exclusive is false, the server will fairly distribute | |||||
| // deliveries across multiple consumers. | |||||
| func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) { | |||||
| options.ConsumerExclusive = true | |||||
| } | |||||
| // WithConsumeOptionsConsumerNoWait sets the consumer to nowait, which means | |||||
| // it does not wait for the server to confirm the request and | |||||
| // immediately begin deliveries. If it is not possible to consume, a channel | |||||
| // exception will be raised and the channel will be closed. | |||||
| func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions) { | |||||
| options.ConsumerNoWait = true | |||||
| } | |||||
| package rabbitmq | |||||
| // getDefaultConsumeOptions descibes the options that will be used when a value isn't provided | |||||
| func getDefaultConsumeOptions() ConsumeOptions { | |||||
| return ConsumeOptions{ | |||||
| QueueDurable: false, | |||||
| QueueAutoDelete: false, | |||||
| QueueExclusive: false, | |||||
| QueueNoWait: false, | |||||
| QueueArgs: nil, | |||||
| BindingExchange: nil, | |||||
| BindingNoWait: false, | |||||
| BindingArgs: nil, | |||||
| Concurrency: 1, | |||||
| QOSPrefetch: 0, | |||||
| QOSGlobal: false, | |||||
| ConsumerName: "", | |||||
| ConsumerAutoAck: false, | |||||
| ConsumerExclusive: false, | |||||
| ConsumerNoWait: false, | |||||
| ConsumerNoLocal: false, | |||||
| ConsumerArgs: nil, | |||||
| } | |||||
| } | |||||
| // ConsumeOptions are used to describe how a new consumer will be created. | |||||
| type ConsumeOptions struct { | |||||
| QueueDurable bool | |||||
| QueueAutoDelete bool | |||||
| QueueExclusive bool | |||||
| QueueNoWait bool | |||||
| QueueArgs Table | |||||
| BindingExchange *BindingExchangeOptions | |||||
| BindingNoWait bool | |||||
| BindingArgs Table | |||||
| Concurrency int | |||||
| QOSPrefetch int | |||||
| QOSGlobal bool | |||||
| ConsumerName string | |||||
| ConsumerAutoAck bool | |||||
| ConsumerExclusive bool | |||||
| ConsumerNoWait bool | |||||
| ConsumerNoLocal bool | |||||
| ConsumerArgs Table | |||||
| } | |||||
| // getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values. | |||||
| func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExchangeOptions { | |||||
| if options.BindingExchange == nil { | |||||
| options.BindingExchange = &BindingExchangeOptions{ | |||||
| Name: "", | |||||
| Kind: "direct", | |||||
| Durable: false, | |||||
| AutoDelete: false, | |||||
| Internal: false, | |||||
| NoWait: false, | |||||
| ExchangeArgs: nil, | |||||
| } | |||||
| } | |||||
| return options.BindingExchange | |||||
| } | |||||
| // BindingExchangeOptions are used when binding to an exchange. | |||||
| // it will verify the exchange is created before binding to it. | |||||
| type BindingExchangeOptions struct { | |||||
| Name string | |||||
| Kind string | |||||
| Durable bool | |||||
| AutoDelete bool | |||||
| Internal bool | |||||
| NoWait bool | |||||
| ExchangeArgs Table | |||||
| } | |||||
| // WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't | |||||
| // be destroyed when the server restarts. It must only be bound to durable exchanges | |||||
| func WithConsumeOptionsQueueDurable(options *ConsumeOptions) { | |||||
| options.QueueDurable = true | |||||
| } | |||||
| // WithConsumeOptionsQueueAutoDelete sets the queue to auto delete, which means it will | |||||
| // be deleted when there are no more conusmers on it | |||||
| func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions) { | |||||
| options.QueueAutoDelete = true | |||||
| } | |||||
| // WithConsumeOptionsQueueExclusive sets the queue to exclusive, which means | |||||
| // it's are only accessible by the connection that declares it and | |||||
| // will be deleted when the connection closes. Channels on other connections | |||||
| // will receive an error when attempting to declare, bind, consume, purge or | |||||
| // delete a queue with the same name. | |||||
| func WithConsumeOptionsQueueExclusive(options *ConsumeOptions) { | |||||
| options.QueueExclusive = true | |||||
| } | |||||
| // WithConsumeOptionsQueueNoWait sets the queue to nowait, which means | |||||
| // the queue will assume to be declared on the server. A | |||||
| // channel exception will arrive if the conditions are met for existing queues | |||||
| // or attempting to modify an existing queue from a different connection. | |||||
| func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) { | |||||
| options.QueueNoWait = true | |||||
| } | |||||
| // WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes | |||||
| // in the cluster will have the messages distributed amongst them for higher reliability | |||||
| func WithConsumeOptionsQuorum(options *ConsumeOptions) { | |||||
| if options.QueueArgs == nil { | |||||
| options.QueueArgs = Table{} | |||||
| } | |||||
| options.QueueArgs["x-queue-type"] = "quorum" | |||||
| } | |||||
| // WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to | |||||
| func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions) { | |||||
| return func(options *ConsumeOptions) { | |||||
| getBindingExchangeOptionsOrSetDefault(options).Name = name | |||||
| } | |||||
| } | |||||
| // WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type | |||||
| func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) { | |||||
| return func(options *ConsumeOptions) { | |||||
| getBindingExchangeOptionsOrSetDefault(options).Kind = kind | |||||
| } | |||||
| } | |||||
| // WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag | |||||
| func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions) { | |||||
| getBindingExchangeOptionsOrSetDefault(options).Durable = true | |||||
| } | |||||
| // WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag | |||||
| func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions) { | |||||
| getBindingExchangeOptionsOrSetDefault(options).AutoDelete = true | |||||
| } | |||||
| // WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag | |||||
| func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions) { | |||||
| getBindingExchangeOptionsOrSetDefault(options).Internal = true | |||||
| } | |||||
| // WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag | |||||
| func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions) { | |||||
| getBindingExchangeOptionsOrSetDefault(options).NoWait = true | |||||
| } | |||||
| // WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange | |||||
| func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) { | |||||
| return func(options *ConsumeOptions) { | |||||
| getBindingExchangeOptionsOrSetDefault(options).ExchangeArgs = args | |||||
| } | |||||
| } | |||||
| // WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound | |||||
| // the channel will not be closed with an error. | |||||
| func WithConsumeOptionsBindingNoWait(options *ConsumeOptions) { | |||||
| options.BindingNoWait = true | |||||
| } | |||||
| // WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that | |||||
| // many goroutines will be spawned to run the provided handler on messages | |||||
| func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) { | |||||
| return func(options *ConsumeOptions) { | |||||
| options.Concurrency = concurrency | |||||
| } | |||||
| } | |||||
| // WithConsumeOptionsQOSPrefetch returns a function that sets the prefetch count, which means that | |||||
| // many messages will be fetched from the server in advance to help with throughput. | |||||
| // This doesn't affect the handler, messages are still processed one at a time. | |||||
| func WithConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions) { | |||||
| return func(options *ConsumeOptions) { | |||||
| options.QOSPrefetch = prefetchCount | |||||
| } | |||||
| } | |||||
| // WithConsumeOptionsQOSGlobal sets the qos on the channel to global, which means | |||||
| // these QOS settings apply to ALL existing and future | |||||
| // consumers on all channels on the same connection | |||||
| func WithConsumeOptionsQOSGlobal(options *ConsumeOptions) { | |||||
| options.QOSGlobal = true | |||||
| } | |||||
| // WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer | |||||
| // if unset a random name will be given | |||||
| func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) { | |||||
| return func(options *ConsumeOptions) { | |||||
| options.ConsumerName = consumerName | |||||
| } | |||||
| } | |||||
| // WithConsumeOptionsConsumerExclusive sets the consumer to exclusive, which means | |||||
| // the server will ensure that this is the sole consumer | |||||
| // from this queue. When exclusive is false, the server will fairly distribute | |||||
| // deliveries across multiple consumers. | |||||
| func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) { | |||||
| options.ConsumerExclusive = true | |||||
| } | |||||
| // WithConsumeOptionsConsumerNoWait sets the consumer to nowait, which means | |||||
| // it does not wait for the server to confirm the request and | |||||
| // immediately begin deliveries. If it is not possible to consume, a channel | |||||
| // exception will be raised and the channel will be closed. | |||||
| func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions) { | |||||
| options.ConsumerNoWait = true | |||||
| } | |||||