From 3deb2b46c2aecd263fc7e701808a83f093946ee6 Mon Sep 17 00:00:00 2001 From: Vishal Dubey Date: Mon, 20 Feb 2023 14:28:39 +0530 Subject: [PATCH] feat: expose function for publishing with deferred confirmation --- internal/channelmanager/safe_wraps.go | 16 +++++++ publish.go | 64 +++++++++++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/internal/channelmanager/safe_wraps.go b/internal/channelmanager/safe_wraps.go index 5182dfe..0e96b8d 100644 --- a/internal/channelmanager/safe_wraps.go +++ b/internal/channelmanager/safe_wraps.go @@ -171,6 +171,22 @@ func (chanManager *ChannelManager) PublishWithContextSafe( ) } +func (chanManager *ChannelManager) PublishWithDeferredConfirmWithContextSafe( + ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing, +) (*amqp.DeferredConfirmation, error) { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.PublishWithDeferredConfirmWithContext( + ctx, + exchange, + key, + mandatory, + immediate, + msg, + ) +} + // NotifyReturnSafe safely wraps the (*amqp.Channel).NotifyReturn method func (chanManager *ChannelManager) NotifyReturnSafe( c chan amqp.Return, diff --git a/publish.go b/publish.go index ed0905a..697a0d2 100644 --- a/publish.go +++ b/publish.go @@ -60,6 +60,8 @@ type Publisher struct { options PublisherOptions } +type PublisherConfirmation []*amqp.DeferredConfirmation + // NewPublisher returns a new publisher with an open channel to the cluster. // If you plan to enforce mandatory or immediate publishing, those failures will be reported // on the channel of Returns that you should setup a listener on. @@ -201,6 +203,68 @@ func (publisher *Publisher) PublishWithContext( return nil } +func (publisher *Publisher) PublishWithDeferredConfirmWithContext( + ctx context.Context, + data []byte, + routingKeys []string, + optionFuncs ...func(*PublishOptions), +) (PublisherConfirmation, error) { + publisher.disablePublishDueToFlowMux.RLock() + defer publisher.disablePublishDueToFlowMux.RUnlock() + if publisher.disablePublishDueToFlow { + return nil, fmt.Errorf("publishing blocked due to high flow on the server") + } + + publisher.disablePublishDueToBlockedMux.RLock() + defer publisher.disablePublishDueToBlockedMux.RUnlock() + if publisher.disablePublishDueToBlocked { + return nil, fmt.Errorf("publishing blocked due to TCP block on the server") + } + + options := &PublishOptions{} + for _, optionFunc := range optionFuncs { + optionFunc(options) + } + if options.DeliveryMode == 0 { + options.DeliveryMode = Transient + } + + var deferredConfirmations []*amqp.DeferredConfirmation + + for _, routingKey := range routingKeys { + message := amqp.Publishing{} + message.ContentType = options.ContentType + message.DeliveryMode = options.DeliveryMode + message.Body = data + message.Headers = tableToAMQPTable(options.Headers) + message.Expiration = options.Expiration + message.ContentEncoding = options.ContentEncoding + message.Priority = options.Priority + message.CorrelationId = options.CorrelationID + message.ReplyTo = options.ReplyTo + message.MessageId = options.MessageID + message.Timestamp = options.Timestamp + message.Type = options.Type + message.UserId = options.UserID + message.AppId = options.AppID + + // Actual publish. + conf, err := publisher.chanManager.PublishWithDeferredConfirmWithContextSafe( + ctx, + options.Exchange, + routingKey, + options.Mandatory, + options.Immediate, + message, + ) + if err != nil { + return nil, err + } + deferredConfirmations = append(deferredConfirmations, conf) + } + return deferredConfirmations, nil +} + // Close closes the publisher and releases resources // The publisher should be discarded as it's not safe for re-use // Only call Close() once