From 22a1e16e0e8896176155b5e5743e186faf3e68a0 Mon Sep 17 00:00:00 2001 From: AJ McKee Date: Tue, 4 May 2021 17:23:46 +0200 Subject: [PATCH 1/3] Adding support for publishing messages with an expiration. - Added WithPublishOptionsExpiration which permits message expiry when the TTL has been reached - Introduced new message var to permit the confidiontal injection of amp.Publishing options in the future. --- publish.go | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/publish.go b/publish.go index 1733b6c..e7b5518 100644 --- a/publish.go +++ b/publish.go @@ -40,6 +40,8 @@ type PublishOptions struct { ContentType string // Transient or Persistent DeliveryMode uint8 + // Expiration time in ms a message will expire from a queue. + Expiration string } // WithPublishOptionsExchange returns a function that sets the exchange to publish to @@ -76,6 +78,14 @@ func WithPublishOptionsPersistentDelivery(options *PublishOptions) { options.DeliveryMode = Persistent } +// WithPublishOptionsExpiration returns a function that sets the expiry/TTL of a message. As per RabbitMq spec, it must be a +// string value in milliseconds. +func WithPublishOptionsExpiration (expiration string) func(options *PublishOptions) { + return func(options *PublishOptions) { + options.Expiration = expiration + } +} + // Publisher allows you to publish messages safely across an open connection type Publisher struct { chManager *channelManager @@ -176,16 +186,22 @@ func (publisher *Publisher) Publish( } for _, routingKey := range routingKeys { + var message = amqp.Publishing{} + message.ContentType = options.ContentType + message.DeliveryMode = options.DeliveryMode + message.Body = data + // If we have a TTL use it. + if options.Expiration != "" { + message.Expiration = options.Expiration + } + // Actual publish. err := publisher.chManager.channel.Publish( options.Exchange, routingKey, options.Mandatory, options.Immediate, - amqp.Publishing{ - ContentType: options.ContentType, - Body: data, - DeliveryMode: options.DeliveryMode, - }) + message, + ) if err != nil { return err } From ebe1fc2391af8085e36970707a1bd8ff1badb68b Mon Sep 17 00:00:00 2001 From: ajmckee Date: Fri, 7 May 2021 00:00:21 +0200 Subject: [PATCH 2/3] =?UTF-8?q?Update=20PR=20to=20remove=20redundant=20if?= =?UTF-8?q?=E2=80=99s=20and=20comments.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- publish.go | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/publish.go b/publish.go index e321e5e..65f15b1 100644 --- a/publish.go +++ b/publish.go @@ -40,9 +40,10 @@ type PublishOptions struct { ContentType string // Transient or Persistent DeliveryMode uint8 - // Expiration time in ms a message will expire from a queue. - Expiration string - Headers Table + // Expiration time in ms that a message will expire from a queue. + // See https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers + Expiration string + Headers Table } // WithPublishOptionsExchange returns a function that sets the exchange to publish to @@ -197,18 +198,9 @@ func (publisher *Publisher) Publish( var message = amqp.Publishing{} message.ContentType = options.ContentType message.DeliveryMode = options.DeliveryMode - // Message Body message.Body = data - - // If no header options, don't add. - if len(options.Headers) > 0 { - message.Headers = tableToAMQPTable(options.Headers) - } - - // If we have a TTL use it. - if options.Expiration != "" { - message.Expiration = options.Expiration - } + message.Headers = tableToAMQPTable(options.Headers) + message.Expiration = options.Expiration // Actual publish. err := publisher.chManager.channel.Publish( From ba5e59c8028d338d8ba9980800c40cbc2a18b14f Mon Sep 17 00:00:00 2001 From: ajmckee Date: Tue, 11 May 2021 07:56:09 +0200 Subject: [PATCH 3/3] Correct go fmt issue --- publish.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/publish.go b/publish.go index 65f15b1..1d71fa8 100644 --- a/publish.go +++ b/publish.go @@ -43,7 +43,7 @@ type PublishOptions struct { // Expiration time in ms that a message will expire from a queue. // See https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers Expiration string - Headers Table + Headers Table } // WithPublishOptionsExchange returns a function that sets the exchange to publish to @@ -82,7 +82,7 @@ func WithPublishOptionsPersistentDelivery(options *PublishOptions) { // WithPublishOptionsExpiration returns a function that sets the expiry/TTL of a message. As per RabbitMq spec, it must be a // string value in milliseconds. -func WithPublishOptionsExpiration (expiration string) func(options *PublishOptions) { +func WithPublishOptionsExpiration(expiration string) func(options *PublishOptions) { return func(options *PublishOptions) { options.Expiration = expiration } @@ -209,7 +209,7 @@ func (publisher *Publisher) Publish( options.Mandatory, options.Immediate, message, - ) + ) if err != nil { return err }