Browse Source

feat: expose function for publishing with deferred confirmation

pull/105/head
Vishal Dubey 3 years ago
parent
commit
3deb2b46c2
2 changed files with 80 additions and 0 deletions
  1. +16
    -0
      internal/channelmanager/safe_wraps.go
  2. +64
    -0
      publish.go

+ 16
- 0
internal/channelmanager/safe_wraps.go View File

@ -171,6 +171,22 @@ func (chanManager *ChannelManager) PublishWithContextSafe(
)
}
func (chanManager *ChannelManager) PublishWithDeferredConfirmWithContextSafe(
ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing,
) (*amqp.DeferredConfirmation, error) {
chanManager.channelMux.RLock()
defer chanManager.channelMux.RUnlock()
return chanManager.channel.PublishWithDeferredConfirmWithContext(
ctx,
exchange,
key,
mandatory,
immediate,
msg,
)
}
// NotifyReturnSafe safely wraps the (*amqp.Channel).NotifyReturn method
func (chanManager *ChannelManager) NotifyReturnSafe(
c chan amqp.Return,


+ 64
- 0
publish.go View File

@ -60,6 +60,8 @@ type Publisher struct {
options PublisherOptions
}
type PublisherConfirmation []*amqp.DeferredConfirmation
// NewPublisher returns a new publisher with an open channel to the cluster.
// If you plan to enforce mandatory or immediate publishing, those failures will be reported
// on the channel of Returns that you should setup a listener on.
@ -201,6 +203,68 @@ func (publisher *Publisher) PublishWithContext(
return nil
}
func (publisher *Publisher) PublishWithDeferredConfirmWithContext(
ctx context.Context,
data []byte,
routingKeys []string,
optionFuncs ...func(*PublishOptions),
) (PublisherConfirmation, error) {
publisher.disablePublishDueToFlowMux.RLock()
defer publisher.disablePublishDueToFlowMux.RUnlock()
if publisher.disablePublishDueToFlow {
return nil, fmt.Errorf("publishing blocked due to high flow on the server")
}
publisher.disablePublishDueToBlockedMux.RLock()
defer publisher.disablePublishDueToBlockedMux.RUnlock()
if publisher.disablePublishDueToBlocked {
return nil, fmt.Errorf("publishing blocked due to TCP block on the server")
}
options := &PublishOptions{}
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
if options.DeliveryMode == 0 {
options.DeliveryMode = Transient
}
var deferredConfirmations []*amqp.DeferredConfirmation
for _, routingKey := range routingKeys {
message := amqp.Publishing{}
message.ContentType = options.ContentType
message.DeliveryMode = options.DeliveryMode
message.Body = data
message.Headers = tableToAMQPTable(options.Headers)
message.Expiration = options.Expiration
message.ContentEncoding = options.ContentEncoding
message.Priority = options.Priority
message.CorrelationId = options.CorrelationID
message.ReplyTo = options.ReplyTo
message.MessageId = options.MessageID
message.Timestamp = options.Timestamp
message.Type = options.Type
message.UserId = options.UserID
message.AppId = options.AppID
// Actual publish.
conf, err := publisher.chanManager.PublishWithDeferredConfirmWithContextSafe(
ctx,
options.Exchange,
routingKey,
options.Mandatory,
options.Immediate,
message,
)
if err != nil {
return nil, err
}
deferredConfirmations = append(deferredConfirmations, conf)
}
return deferredConfirmations, nil
}
// Close closes the publisher and releases resources
// The publisher should be discarded as it's not safe for re-use
// Only call Close() once


Loading…
Cancel
Save