From e85746aa140c859a25a3a5e70b8ea5be6743363a Mon Sep 17 00:00:00 2001 From: Aaqa Ishtyaq Date: Fri, 2 Dec 2022 13:15:48 +0530 Subject: [PATCH] feat: Add PublishWithContext, deprecate Publish - 'Publish' has been deprecared by rabbitmq/ampq091-go. - 'PublishWithContext' has been introduced to pass context while publishing for cancellation. - Update examples to use PublishWithContext. Signed-off-by: Aaqa Ishtyaq --- examples/logger/main.go | 4 +++- examples/multipublisher/main.go | 7 +++++-- examples/publisher/main.go | 4 +++- internal/channelmanager/safe_wraps.go | 26 +++++++++++++++++++++++++- publish.go | 22 +++++++++++++++++++--- 5 files changed, 55 insertions(+), 8 deletions(-) diff --git a/examples/logger/main.go b/examples/logger/main.go index 8461ddb..8620e4c 100644 --- a/examples/logger/main.go +++ b/examples/logger/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "log" rabbitmq "github.com/wagslane/go-rabbitmq" @@ -48,7 +49,8 @@ func main() { if err != nil { log.Fatal(err) } - err = publisher.Publish( + err = publisher.PublishWithContext( + context.Background(), []byte("hello, world"), []string{"my_routing_key"}, rabbitmq.WithPublishOptionsContentType("application/json"), diff --git a/examples/multipublisher/main.go b/examples/multipublisher/main.go index 3b9de0d..5121a3f 100644 --- a/examples/multipublisher/main.go +++ b/examples/multipublisher/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "log" "os" @@ -78,7 +79,8 @@ func main() { for { select { case <-ticker.C: - err = publisher.Publish( + err = publisher.PublishWithContext( + context.Background(), []byte("hello, world"), []string{"my_routing_key"}, rabbitmq.WithPublishOptionsContentType("application/json"), @@ -89,7 +91,8 @@ func main() { if err != nil { log.Println(err) } - err = publisher2.Publish( + err = publisher2.PublishWithContext( + context.Background(), []byte("hello, world 2"), []string{"my_routing_key_2"}, rabbitmq.WithPublishOptionsContentType("application/json"), diff --git a/examples/publisher/main.go b/examples/publisher/main.go index 988d781..d07cc27 100644 --- a/examples/publisher/main.go +++ b/examples/publisher/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "log" "os" @@ -59,7 +60,8 @@ func main() { for { select { case <-ticker.C: - err = publisher.Publish( + err = publisher.PublishWithContext( + context.Background(), []byte("hello, world"), []string{"my_routing_key"}, rabbitmq.WithPublishOptionsContentType("application/json"), diff --git a/internal/channelmanager/safe_wraps.go b/internal/channelmanager/safe_wraps.go index 67c2101..ccca4b1 100644 --- a/internal/channelmanager/safe_wraps.go +++ b/internal/channelmanager/safe_wraps.go @@ -1,6 +1,8 @@ package channelmanager import ( + "context" + amqp "github.com/rabbitmq/amqp091-go" ) @@ -133,7 +135,11 @@ func (chanManager *ChannelManager) QosSafe( ) } -// PublishSafe safely wraps the (*amqp.Channel).Publish method +/* +PublishSafe safely wraps the (*amqp.Channel).Publish method. + +Deprecated: Use PublishWithContextSafe instead. +*/ func (chanManager *ChannelManager) PublishSafe( exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing, ) error { @@ -149,6 +155,24 @@ func (chanManager *ChannelManager) PublishSafe( ) } + +// PublishWithContextSafe safely wraps the (*amqp.Channel).PublishWithContext method. +func (chanManager *ChannelManager) PublishWithContextSafe( + ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.PublishWithContext( + ctx, + exchange, + key, + mandatory, + immediate, + msg, + ) +} + // NotifyReturnSafe safely wraps the (*amqp.Channel).NotifyReturn method func (chanManager *ChannelManager) NotifyReturnSafe( c chan amqp.Return, diff --git a/publish.go b/publish.go index b361f84..c88ac1a 100644 --- a/publish.go +++ b/publish.go @@ -1,6 +1,7 @@ package rabbitmq import ( + "context" "errors" "fmt" "sync" @@ -129,11 +130,25 @@ func (publisher *Publisher) handleRestarts() { } } -// Publish publishes the provided data to the given routing keys over the connection +/* +Publish publishes the provided data to the given routing keys over the connection. + +Deprecated: Use PublishWithContext instead. +*/ func (publisher *Publisher) Publish( data []byte, routingKeys []string, optionFuncs ...func(*PublishOptions), +) error { + return publisher.PublishWithContext(context.Background(), data, routingKeys, optionFuncs...) +} + +// PublishWithContext publishes the provided data to the given routing keys over the connection. +func (publisher *Publisher) PublishWithContext( + ctx context.Context, + data []byte, + routingKeys []string, + optionFuncs ...func(*PublishOptions), ) error { publisher.disablePublishDueToFlowMux.RLock() defer publisher.disablePublishDueToFlowMux.RUnlock() @@ -156,7 +171,7 @@ func (publisher *Publisher) Publish( } for _, routingKey := range routingKeys { - var message = amqp.Publishing{} + message := amqp.Publishing{} message.ContentType = options.ContentType message.DeliveryMode = options.DeliveryMode message.Body = data @@ -173,7 +188,8 @@ func (publisher *Publisher) Publish( message.AppId = options.AppID // Actual publish. - err := publisher.chanManager.PublishSafe( + err := publisher.chanManager.PublishWithContextSafe( + ctx, options.Exchange, routingKey, options.Mandatory,