Browse Source

Merge pull request #7 from ajmckee/main

Adding support for publishing messages with an expiration.
pull/18/head
Lane Wagner 5 years ago
committed by GitHub
parent
commit
7e07dbfd2a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 22 additions and 7 deletions
  1. +22
    -7
      publish.go

+ 22
- 7
publish.go View File

@ -40,7 +40,10 @@ type PublishOptions struct {
ContentType string
// Transient or Persistent
DeliveryMode uint8
Headers Table
// Expiration time in ms that a message will expire from a queue.
// See https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers
Expiration string
Headers Table
}
// WithPublishOptionsExchange returns a function that sets the exchange to publish to
@ -77,6 +80,14 @@ func WithPublishOptionsPersistentDelivery(options *PublishOptions) {
options.DeliveryMode = Persistent
}
// WithPublishOptionsExpiration returns a function that sets the expiry/TTL of a message. As per RabbitMq spec, it must be a
// string value in milliseconds.
func WithPublishOptionsExpiration(expiration string) func(options *PublishOptions) {
return func(options *PublishOptions) {
options.Expiration = expiration
}
}
// WithPublishOptionsHeaders returns a function that sets message header values, i.e. "msg-id"
func WithPublishOptionsHeaders(headers Table) func(*PublishOptions) {
return func(options *PublishOptions) {
@ -184,17 +195,21 @@ func (publisher *Publisher) Publish(
}
for _, routingKey := range routingKeys {
var message = amqp.Publishing{}
message.ContentType = options.ContentType
message.DeliveryMode = options.DeliveryMode
message.Body = data
message.Headers = tableToAMQPTable(options.Headers)
message.Expiration = options.Expiration
// Actual publish.
err := publisher.chManager.channel.Publish(
options.Exchange,
routingKey,
options.Mandatory,
options.Immediate,
amqp.Publishing{
Headers: tableToAMQPTable(options.Headers),
ContentType: options.ContentType,
Body: data,
DeliveryMode: options.DeliveryMode,
})
message,
)
if err != nil {
return err
}


Loading…
Cancel
Save