Browse Source

Option to create exchange in publisher

pull/91/head
NicklasWallgren 4 years ago
parent
commit
5ed5062443
8 changed files with 161 additions and 83 deletions
  1. +1
    -0
      .gitignore
  2. +9
    -18
      consume.go
  3. +10
    -40
      consume_options.go
  4. +1
    -1
      examples/logger/main.go
  5. +1
    -1
      examples/publisher/main.go
  6. +71
    -0
      exchange.go
  7. +68
    -15
      publish.go
  8. +0
    -8
      publish_options.go

+ 1
- 0
.gitignore View File

@ -1 +1,2 @@
.idea/
TODO.md

+ 9
- 18
consume.go View File

@ -160,30 +160,21 @@ func (consumer Consumer) startGoroutines(
}
}
if consumeOptions.BindingExchange != nil {
exchange := consumeOptions.BindingExchange
if exchange.Name == "" {
return fmt.Errorf("binding to exchange but name not specified")
if consumeOptions.ExchangeOptions != nil {
exchangeOptions := consumeOptions.ExchangeOptions
if exchangeOptions.Name == "" {
return fmt.Errorf("binding to exchangeOptions but name not specified")
}
if exchange.Declare {
err := consumer.chManager.channel.ExchangeDeclare(
exchange.Name,
exchange.Kind,
exchange.Durable,
exchange.AutoDelete,
exchange.Internal,
exchange.NoWait,
tableToAMQPTable(exchange.ExchangeArgs),
)
if err != nil {
return err
}
if err := declareOrVerifyExchange(consumeOptions.ExchangeOptions, consumer.chManager.channel); err != nil {
return err
}
for _, routingKey := range routingKeys {
err := consumer.chManager.channel.QueueBind(
queue,
routingKey,
exchange.Name,
exchangeOptions.Name,
consumeOptions.BindingNoWait,
tableToAMQPTable(consumeOptions.BindingArgs),
)


+ 10
- 40
consume_options.go View File

@ -9,9 +9,9 @@ func getDefaultConsumeOptions() ConsumeOptions {
QueueNoWait: false,
QueueDeclare: true,
QueueArgs: nil,
BindingExchange: nil,
BindingNoWait: false,
BindingArgs: nil,
ExchangeOptions: nil,
Concurrency: 1,
QOSPrefetch: 0,
QOSGlobal: false,
@ -32,9 +32,9 @@ type ConsumeOptions struct {
QueueNoWait bool
QueueDeclare bool
QueueArgs Table
BindingExchange *BindingExchangeOptions
BindingNoWait bool
BindingArgs Table
ExchangeOptions *ExchangeOptions
Concurrency int
QOSPrefetch int
QOSGlobal bool
@ -46,36 +46,6 @@ type ConsumeOptions struct {
ConsumerArgs Table
}
// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values.
func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExchangeOptions {
if options.BindingExchange == nil {
options.BindingExchange = &BindingExchangeOptions{
Name: "",
Kind: "direct",
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
ExchangeArgs: nil,
Declare: true,
}
}
return options.BindingExchange
}
// BindingExchangeOptions are used when binding to an exchange.
// it will verify the exchange is created before binding to it.
type BindingExchangeOptions struct {
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
ExchangeArgs Table
Declare bool
}
// WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't
// be destroyed when the server restarts. It must only be bound to durable exchanges
func WithConsumeOptionsQueueDurable(options *ConsumeOptions) {
@ -124,41 +94,41 @@ func WithConsumeOptionsQuorum(options *ConsumeOptions) {
// WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to
func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Name = name
getConsumerExchangeOptionsOrSetDefault(options).Name = name
}
}
// WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type
func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Kind = kind
getConsumerExchangeOptionsOrSetDefault(options).Kind = kind
}
}
// WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag
func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Durable = true
getConsumerExchangeOptionsOrSetDefault(options).Durable = true
}
// WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag
func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).AutoDelete = true
getConsumerExchangeOptionsOrSetDefault(options).AutoDelete = true
}
// WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag
func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Internal = true
getConsumerExchangeOptionsOrSetDefault(options).Internal = true
}
// WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag
func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).NoWait = true
getConsumerExchangeOptionsOrSetDefault(options).NoWait = true
}
// WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange
func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).ExchangeArgs = args
getConsumerExchangeOptionsOrSetDefault(options).ExchangeArgs = args
}
}
@ -166,7 +136,7 @@ func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) {
// binding exchange. Use this setting if the exchange already exists and you don't need to declare
// it on consumer start.
func WithConsumeOptionsBindingExchangeSkipDeclare(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Declare = false
getConsumerExchangeOptionsOrSetDefault(options).Declare = false
}
// WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound


+ 1
- 1
examples/logger/main.go View File

@ -35,6 +35,7 @@ func main() {
publisher, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost", rabbitmq.Config{},
rabbitmq.WithPublisherOptionsLogger(mylogger),
rabbitmq.WithPublisherOptionsExchangeName("events"),
)
if err != nil {
log.Fatal(err)
@ -45,7 +46,6 @@ func main() {
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
rabbitmq.WithPublishOptionsExchange("events"),
)
if err != nil {
log.Fatal(err)


+ 1
- 1
examples/publisher/main.go View File

@ -15,6 +15,7 @@ func main() {
publisher, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost", rabbitmq.Config{},
rabbitmq.WithPublisherOptionsLogging,
rabbitmq.WithPublisherOptionsExchangeName("events"),
)
if err != nil {
log.Fatal(err)
@ -65,7 +66,6 @@ func main() {
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
rabbitmq.WithPublishOptionsExchange("events"),
)
if err != nil {
log.Println(err)


+ 71
- 0
exchange.go View File

@ -0,0 +1,71 @@
package rabbitmq
import amqp "github.com/rabbitmq/amqp091-go"
// ExchangeOptions are used when configuring or binding to an exchange.
// it will verify the exchange is created before binding to it.
type ExchangeOptions struct {
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
ExchangeArgs Table
Declare bool
}
// getConsumerExchangeOptionsOrSetDefault returns pointer to current Exchange options. if no Exchange options are set yet, it will set it with default values.
func getConsumerExchangeOptionsOrSetDefault(options *ConsumeOptions) *ExchangeOptions {
if options.ExchangeOptions == nil {
options.ExchangeOptions = getDefaultExchangeOptions()
}
return options.ExchangeOptions
}
// getPublisherExchangeOptionsOrSetDefault returns pointer to current Exchange options. if no Exchange options are set yet, it will set it with default values.
func getPublisherExchangeOptionsOrSetDefault(options *PublisherOptions) *ExchangeOptions {
if options.ExchangeOptions == nil {
options.ExchangeOptions = getDefaultExchangeOptions()
}
return options.ExchangeOptions
}
// getDefaultExchangeOptions returns pointer to the default Exchange options.
func getDefaultExchangeOptions() *ExchangeOptions {
return &ExchangeOptions{
Name: "",
Kind: "direct",
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
ExchangeArgs: nil,
Declare: true,
}
}
// getDefaultExchangeOptions declares or verifies the existence of an exchange.
func declareOrVerifyExchange(exchangeOptions *ExchangeOptions, channel *amqp.Channel) error {
if exchangeOptions.Declare {
return channel.ExchangeDeclare(
exchangeOptions.Name,
exchangeOptions.Kind,
exchangeOptions.Durable,
exchangeOptions.AutoDelete,
exchangeOptions.Internal,
exchangeOptions.NoWait,
tableToAMQPTable(exchangeOptions.ExchangeArgs),
)
}
return channel.ExchangeDeclarePassive(
exchangeOptions.Name,
exchangeOptions.Kind,
exchangeOptions.Durable,
exchangeOptions.AutoDelete,
exchangeOptions.Internal,
exchangeOptions.NoWait,
tableToAMQPTable(exchangeOptions.ExchangeArgs),
)
}

+ 68
- 15
publish.go View File

@ -58,6 +58,54 @@ type Publisher struct {
type PublisherOptions struct {
Logger Logger
ReconnectInterval time.Duration
ExchangeOptions *ExchangeOptions
}
// WithPublisherOptionsExchangeName returns a function that sets the exchange to publish to
func WithPublisherOptionsExchangeName(name string) func(*PublisherOptions) {
return func(options *PublisherOptions) {
getPublisherExchangeOptionsOrSetDefault(options).Name = name
}
}
// WithPublisherOptionsExchangeKind returns a function that sets the binding exchange kind/type
func WithPublisherOptionsExchangeKind(kind string) func(*PublisherOptions) {
return func(options *PublisherOptions) {
getPublisherExchangeOptionsOrSetDefault(options).Kind = kind
}
}
// WithPublisherOptionsExchangeDurable returns a function that sets the binding exchange durable flag
func WithPublisherOptionsExchangeDurable(options *PublisherOptions) {
getPublisherExchangeOptionsOrSetDefault(options).Durable = true
}
// WithPublisherOptionsExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag
func WithPublisherOptionsExchangeAutoDelete(options *PublisherOptions) {
getPublisherExchangeOptionsOrSetDefault(options).AutoDelete = true
}
// WithPublisherOptionsExchangeInternal returns a function that sets the binding exchange internal flag
func WithPublisherOptionsExchangeInternal(options *PublisherOptions) {
getPublisherExchangeOptionsOrSetDefault(options).Internal = true
}
// WithPublisherOptionsExchangeNoWait returns a function that sets the binding exchange noWait flag
func WithPublisherOptionsExchangeNoWait(options *PublisherOptions) {
getPublisherExchangeOptionsOrSetDefault(options).NoWait = true
}
// WithPublisherOptionsExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange
func WithPublisherOptionsExchangeArgs(args Table) func(*PublisherOptions) {
return func(options *PublisherOptions) {
getPublisherExchangeOptionsOrSetDefault(options).ExchangeArgs = args
}
}
// WithPublisherOptionsExchangeDeclare returns a function that declares the binding exchange.
// Use this setting if you want the consumer to create the exchange on start.
func WithPublisherOptionsExchangeDeclare(options *PublisherOptions) {
getPublisherExchangeOptionsOrSetDefault(options).Declare = true
}
// WithPublisherOptionsReconnectInterval sets the interval at which the publisher will
@ -91,6 +139,7 @@ func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptio
options := &PublisherOptions{
Logger: &stdDebugLogger{},
ReconnectInterval: time.Second * 5,
ExchangeOptions: getDefaultExchangeOptions(),
}
for _, optionFunc := range optionFuncs {
optionFunc(options)
@ -112,6 +161,10 @@ func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptio
notifyPublishChan: nil,
}
if err = declareOrVerifyExchange(publisher.options.ExchangeOptions, chManager.channel); err != nil {
return nil, err
}
go publisher.startNotifyFlowHandler()
go publisher.startNotifyBlockedHandler()
@ -120,20 +173,6 @@ func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptio
return publisher, nil
}
func (publisher *Publisher) handleRestarts() {
for err := range publisher.chManager.notifyCancelOrClose {
publisher.options.Logger.Infof("successful publisher recovery from: %v", err)
go publisher.startNotifyFlowHandler()
go publisher.startNotifyBlockedHandler()
if publisher.notifyReturnChan != nil {
go publisher.startNotifyReturnHandler()
}
if publisher.notifyPublishChan != nil {
publisher.startNotifyPublishHandler()
}
}
}
// NotifyReturn registers a listener for basic.return methods.
// These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags.
func (publisher *Publisher) NotifyReturn() <-chan Return {
@ -194,7 +233,7 @@ func (publisher *Publisher) Publish(
// Actual publish.
err := publisher.chManager.channel.Publish(
options.Exchange,
publisher.options.ExchangeOptions.Name,
routingKey,
options.Mandatory,
options.Immediate,
@ -214,6 +253,20 @@ func (publisher Publisher) Close() error {
return publisher.chManager.close()
}
func (publisher *Publisher) handleRestarts() {
for err := range publisher.chManager.notifyCancelOrClose {
publisher.options.Logger.Infof("successful publisher recovery from: %v", err)
go publisher.startNotifyFlowHandler()
go publisher.startNotifyBlockedHandler()
if publisher.notifyReturnChan != nil {
go publisher.startNotifyReturnHandler()
}
if publisher.notifyPublishChan != nil {
publisher.startNotifyPublishHandler()
}
}
}
func (publisher *Publisher) startNotifyReturnHandler() {
returnAMQPCh := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1))
for ret := range returnAMQPCh {


+ 0
- 8
publish_options.go View File

@ -6,7 +6,6 @@ import (
// PublishOptions are used to control how data is published
type PublishOptions struct {
Exchange string
// Mandatory fails to publish if there are no queues
// bound to the routing key
Mandatory bool
@ -43,13 +42,6 @@ type PublishOptions struct {
Headers Table
}
// WithPublishOptionsExchange returns a function that sets the exchange to publish to
func WithPublishOptionsExchange(exchange string) func(*PublishOptions) {
return func(options *PublishOptions) {
options.Exchange = exchange
}
}
// WithPublishOptionsMandatory makes the publishing mandatory, which means when a queue is not
// bound to the routing key a message will be sent back on the returns channel for you to handle
func WithPublishOptionsMandatory(options *PublishOptions) {


Loading…
Cancel
Save