diff --git a/publish.go b/publish.go index dc2c77b..1d71fa8 100644 --- a/publish.go +++ b/publish.go @@ -40,7 +40,10 @@ type PublishOptions struct { ContentType string // Transient or Persistent DeliveryMode uint8 - Headers Table + // Expiration time in ms that a message will expire from a queue. + // See https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers + Expiration string + Headers Table } // WithPublishOptionsExchange returns a function that sets the exchange to publish to @@ -77,6 +80,14 @@ func WithPublishOptionsPersistentDelivery(options *PublishOptions) { options.DeliveryMode = Persistent } +// WithPublishOptionsExpiration returns a function that sets the expiry/TTL of a message. As per RabbitMq spec, it must be a +// string value in milliseconds. +func WithPublishOptionsExpiration(expiration string) func(options *PublishOptions) { + return func(options *PublishOptions) { + options.Expiration = expiration + } +} + // WithPublishOptionsHeaders returns a function that sets message header values, i.e. "msg-id" func WithPublishOptionsHeaders(headers Table) func(*PublishOptions) { return func(options *PublishOptions) { @@ -184,17 +195,21 @@ func (publisher *Publisher) Publish( } for _, routingKey := range routingKeys { + var message = amqp.Publishing{} + message.ContentType = options.ContentType + message.DeliveryMode = options.DeliveryMode + message.Body = data + message.Headers = tableToAMQPTable(options.Headers) + message.Expiration = options.Expiration + + // Actual publish. err := publisher.chManager.channel.Publish( options.Exchange, routingKey, options.Mandatory, options.Immediate, - amqp.Publishing{ - Headers: tableToAMQPTable(options.Headers), - ContentType: options.ContentType, - Body: data, - DeliveryMode: options.DeliveryMode, - }) + message, + ) if err != nil { return err }