Browse Source

Adding support for publishing messages with an expiration.

- Added WithPublishOptionsExpiration which permits message expiry when the TTL has been reached
- Introduced new message var to permit the confidiontal injection of amp.Publishing options in the future.
pull/7/head
AJ McKee 5 years ago
parent
commit
22a1e16e0e
1 changed files with 21 additions and 5 deletions
  1. +21
    -5
      publish.go

+ 21
- 5
publish.go View File

@ -40,6 +40,8 @@ type PublishOptions struct {
ContentType string
// Transient or Persistent
DeliveryMode uint8
// Expiration time in ms a message will expire from a queue.
Expiration string
}
// WithPublishOptionsExchange returns a function that sets the exchange to publish to
@ -76,6 +78,14 @@ func WithPublishOptionsPersistentDelivery(options *PublishOptions) {
options.DeliveryMode = Persistent
}
// WithPublishOptionsExpiration returns a function that sets the expiry/TTL of a message. As per RabbitMq spec, it must be a
// string value in milliseconds.
func WithPublishOptionsExpiration (expiration string) func(options *PublishOptions) {
return func(options *PublishOptions) {
options.Expiration = expiration
}
}
// Publisher allows you to publish messages safely across an open connection
type Publisher struct {
chManager *channelManager
@ -176,16 +186,22 @@ func (publisher *Publisher) Publish(
}
for _, routingKey := range routingKeys {
var message = amqp.Publishing{}
message.ContentType = options.ContentType
message.DeliveryMode = options.DeliveryMode
message.Body = data
// If we have a TTL use it.
if options.Expiration != "" {
message.Expiration = options.Expiration
}
// Actual publish.
err := publisher.chManager.channel.Publish(
options.Exchange,
routingKey,
options.Mandatory,
options.Immediate,
amqp.Publishing{
ContentType: options.ContentType,
Body: data,
DeliveryMode: options.DeliveryMode,
})
message,
)
if err != nil {
return err
}


Loading…
Cancel
Save