diff --git a/publish.go b/publish.go index e7b5518..e321e5e 100644 --- a/publish.go +++ b/publish.go @@ -42,6 +42,7 @@ type PublishOptions struct { DeliveryMode uint8 // Expiration time in ms a message will expire from a queue. Expiration string + Headers Table } // WithPublishOptionsExchange returns a function that sets the exchange to publish to @@ -86,6 +87,13 @@ func WithPublishOptionsExpiration (expiration string) func(options *PublishOptio } } +// WithPublishOptionsHeaders returns a function that sets message header values, i.e. "msg-id" +func WithPublishOptionsHeaders(headers Table) func(*PublishOptions) { + return func(options *PublishOptions) { + options.Headers = headers + } +} + // Publisher allows you to publish messages safely across an open connection type Publisher struct { chManager *channelManager @@ -189,11 +197,19 @@ func (publisher *Publisher) Publish( var message = amqp.Publishing{} message.ContentType = options.ContentType message.DeliveryMode = options.DeliveryMode + // Message Body message.Body = data + + // If no header options, don't add. + if len(options.Headers) > 0 { + message.Headers = tableToAMQPTable(options.Headers) + } + // If we have a TTL use it. if options.Expiration != "" { message.Expiration = options.Expiration } + // Actual publish. err := publisher.chManager.channel.Publish( options.Exchange,