Browse Source

feat: Add PublishWithContext, deprecate Publish

- 'Publish' has been deprecared by rabbitmq/ampq091-go.
 - 'PublishWithContext' has been introduced to pass context
    while publishing for cancellation.
 - Update examples to use PublishWithContext.

Signed-off-by: Aaqa Ishtyaq <aaqa@hackerrank.com>
pull/93/head
Aaqa Ishtyaq 3 years ago
committed by Aaqa Ishtyaq
parent
commit
e85746aa14
5 changed files with 55 additions and 8 deletions
  1. +3
    -1
      examples/logger/main.go
  2. +5
    -2
      examples/multipublisher/main.go
  3. +3
    -1
      examples/publisher/main.go
  4. +25
    -1
      internal/channelmanager/safe_wraps.go
  5. +19
    -3
      publish.go

+ 3
- 1
examples/logger/main.go View File

@ -1,6 +1,7 @@
package main
import (
"context"
"log"
rabbitmq "github.com/wagslane/go-rabbitmq"
@ -48,7 +49,8 @@ func main() {
if err != nil {
log.Fatal(err)
}
err = publisher.Publish(
err = publisher.PublishWithContext(
context.Background(),
[]byte("hello, world"),
[]string{"my_routing_key"},
rabbitmq.WithPublishOptionsContentType("application/json"),


+ 5
- 2
examples/multipublisher/main.go View File

@ -1,6 +1,7 @@
package main
import (
"context"
"fmt"
"log"
"os"
@ -78,7 +79,8 @@ func main() {
for {
select {
case <-ticker.C:
err = publisher.Publish(
err = publisher.PublishWithContext(
context.Background(),
[]byte("hello, world"),
[]string{"my_routing_key"},
rabbitmq.WithPublishOptionsContentType("application/json"),
@ -89,7 +91,8 @@ func main() {
if err != nil {
log.Println(err)
}
err = publisher2.Publish(
err = publisher2.PublishWithContext(
context.Background(),
[]byte("hello, world 2"),
[]string{"my_routing_key_2"},
rabbitmq.WithPublishOptionsContentType("application/json"),


+ 3
- 1
examples/publisher/main.go View File

@ -1,6 +1,7 @@
package main
import (
"context"
"fmt"
"log"
"os"
@ -59,7 +60,8 @@ func main() {
for {
select {
case <-ticker.C:
err = publisher.Publish(
err = publisher.PublishWithContext(
context.Background(),
[]byte("hello, world"),
[]string{"my_routing_key"},
rabbitmq.WithPublishOptionsContentType("application/json"),


+ 25
- 1
internal/channelmanager/safe_wraps.go View File

@ -1,6 +1,8 @@
package channelmanager
import (
"context"
amqp "github.com/rabbitmq/amqp091-go"
)
@ -133,7 +135,11 @@ func (chanManager *ChannelManager) QosSafe(
)
}
// PublishSafe safely wraps the (*amqp.Channel).Publish method
/*
PublishSafe safely wraps the (*amqp.Channel).Publish method.
Deprecated: Use PublishWithContextSafe instead.
*/
func (chanManager *ChannelManager) PublishSafe(
exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing,
) error {
@ -149,6 +155,24 @@ func (chanManager *ChannelManager) PublishSafe(
)
}
// PublishWithContextSafe safely wraps the (*amqp.Channel).PublishWithContext method.
func (chanManager *ChannelManager) PublishWithContextSafe(
ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing,
) error {
chanManager.channelMux.RLock()
defer chanManager.channelMux.RUnlock()
return chanManager.channel.PublishWithContext(
ctx,
exchange,
key,
mandatory,
immediate,
msg,
)
}
// NotifyReturnSafe safely wraps the (*amqp.Channel).NotifyReturn method
func (chanManager *ChannelManager) NotifyReturnSafe(
c chan amqp.Return,


+ 19
- 3
publish.go View File

@ -1,6 +1,7 @@
package rabbitmq
import (
"context"
"errors"
"fmt"
"sync"
@ -129,11 +130,25 @@ func (publisher *Publisher) handleRestarts() {
}
}
// Publish publishes the provided data to the given routing keys over the connection
/*
Publish publishes the provided data to the given routing keys over the connection.
Deprecated: Use PublishWithContext instead.
*/
func (publisher *Publisher) Publish(
data []byte,
routingKeys []string,
optionFuncs ...func(*PublishOptions),
) error {
return publisher.PublishWithContext(context.Background(), data, routingKeys, optionFuncs...)
}
// PublishWithContext publishes the provided data to the given routing keys over the connection.
func (publisher *Publisher) PublishWithContext(
ctx context.Context,
data []byte,
routingKeys []string,
optionFuncs ...func(*PublishOptions),
) error {
publisher.disablePublishDueToFlowMux.RLock()
defer publisher.disablePublishDueToFlowMux.RUnlock()
@ -156,7 +171,7 @@ func (publisher *Publisher) Publish(
}
for _, routingKey := range routingKeys {
var message = amqp.Publishing{}
message := amqp.Publishing{}
message.ContentType = options.ContentType
message.DeliveryMode = options.DeliveryMode
message.Body = data
@ -173,7 +188,8 @@ func (publisher *Publisher) Publish(
message.AppId = options.AppID
// Actual publish.
err := publisher.chanManager.PublishSafe(
err := publisher.chanManager.PublishWithContextSafe(
ctx,
options.Exchange,
routingKey,
options.Mandatory,


Loading…
Cancel
Save