diff --git a/publish.go b/publish.go index 1733b6c..e7b5518 100644 --- a/publish.go +++ b/publish.go @@ -40,6 +40,8 @@ type PublishOptions struct { ContentType string // Transient or Persistent DeliveryMode uint8 + // Expiration time in ms a message will expire from a queue. + Expiration string } // WithPublishOptionsExchange returns a function that sets the exchange to publish to @@ -76,6 +78,14 @@ func WithPublishOptionsPersistentDelivery(options *PublishOptions) { options.DeliveryMode = Persistent } +// WithPublishOptionsExpiration returns a function that sets the expiry/TTL of a message. As per RabbitMq spec, it must be a +// string value in milliseconds. +func WithPublishOptionsExpiration (expiration string) func(options *PublishOptions) { + return func(options *PublishOptions) { + options.Expiration = expiration + } +} + // Publisher allows you to publish messages safely across an open connection type Publisher struct { chManager *channelManager @@ -176,16 +186,22 @@ func (publisher *Publisher) Publish( } for _, routingKey := range routingKeys { + var message = amqp.Publishing{} + message.ContentType = options.ContentType + message.DeliveryMode = options.DeliveryMode + message.Body = data + // If we have a TTL use it. + if options.Expiration != "" { + message.Expiration = options.Expiration + } + // Actual publish. err := publisher.chManager.channel.Publish( options.Exchange, routingKey, options.Mandatory, options.Immediate, - amqp.Publishing{ - ContentType: options.ContentType, - Body: data, - DeliveryMode: options.DeliveryMode, - }) + message, + ) if err != nil { return err }