Browse Source

Resolving conflict between my original PR and one that adds header support

pull/7/head
ajmckee 5 years ago
parent
commit
e4d48dbb63
1 changed files with 16 additions and 0 deletions
  1. +16
    -0
      publish.go

+ 16
- 0
publish.go View File

@ -42,6 +42,7 @@ type PublishOptions struct {
DeliveryMode uint8
// Expiration time in ms a message will expire from a queue.
Expiration string
Headers Table
}
// WithPublishOptionsExchange returns a function that sets the exchange to publish to
@ -86,6 +87,13 @@ func WithPublishOptionsExpiration (expiration string) func(options *PublishOptio
}
}
// WithPublishOptionsHeaders returns a function that sets message header values, i.e. "msg-id"
func WithPublishOptionsHeaders(headers Table) func(*PublishOptions) {
return func(options *PublishOptions) {
options.Headers = headers
}
}
// Publisher allows you to publish messages safely across an open connection
type Publisher struct {
chManager *channelManager
@ -189,11 +197,19 @@ func (publisher *Publisher) Publish(
var message = amqp.Publishing{}
message.ContentType = options.ContentType
message.DeliveryMode = options.DeliveryMode
// Message Body
message.Body = data
// If no header options, don't add.
if len(options.Headers) > 0 {
message.Headers = tableToAMQPTable(options.Headers)
}
// If we have a TTL use it.
if options.Expiration != "" {
message.Expiration = options.Expiration
}
// Actual publish.
err := publisher.chManager.channel.Publish(
options.Exchange,


Loading…
Cancel
Save