diff --git a/consumer_options.go b/consumer_options.go index 42d4b96..904a561 100644 --- a/consumer_options.go +++ b/consumer_options.go @@ -282,3 +282,14 @@ func WithConsumerOptionsQOSPrefetch(prefetchCount int) func(*ConsumerOptions) { func WithConsumerOptionsQOSGlobal(options *ConsumerOptions) { options.QOSGlobal = true } + +// WithConsumerOptionsQueueQuorum sets the queue a quorum type, which means +// multiple nodes in the cluster will have the messages distributed amongst them +// for higher reliability +func WithConsumerOptionsQueueQuorum(options *ConsumerOptions) { + if options.QueueOptions.Args == nil { + options.QueueOptions.Args = Table{} + } + + options.QueueOptions.Args["x-queue-type"] = "quorum" +}