Browse Source

Merge pull request #105 from vishal-android-freak/main

update: Add manual action to consumer handler & expose function for Publishing with deferred confirmation
pull/110/head
Lane Wagner 3 years ago
committed by GitHub
parent
commit
a946ce1f0d
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 82 additions and 0 deletions
  1. +2
    -0
      consume.go
  2. +16
    -0
      internal/channelmanager/safe_wraps.go
  3. +64
    -0
      publish.go

+ 2
- 0
consume.go View File

@ -22,6 +22,8 @@ const (
NackDiscard NackDiscard
// NackRequeue deliver this message to a different consumer. // NackRequeue deliver this message to a different consumer.
NackRequeue NackRequeue
// Message acknowledgement is left to the user using the msg.Ack() method
Manual
) )
// Consumer allows you to create and connect to queues for data consumption. // Consumer allows you to create and connect to queues for data consumption.


+ 16
- 0
internal/channelmanager/safe_wraps.go View File

@ -171,6 +171,22 @@ func (chanManager *ChannelManager) PublishWithContextSafe(
) )
} }
func (chanManager *ChannelManager) PublishWithDeferredConfirmWithContextSafe(
ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing,
) (*amqp.DeferredConfirmation, error) {
chanManager.channelMux.RLock()
defer chanManager.channelMux.RUnlock()
return chanManager.channel.PublishWithDeferredConfirmWithContext(
ctx,
exchange,
key,
mandatory,
immediate,
msg,
)
}
// NotifyReturnSafe safely wraps the (*amqp.Channel).NotifyReturn method // NotifyReturnSafe safely wraps the (*amqp.Channel).NotifyReturn method
func (chanManager *ChannelManager) NotifyReturnSafe( func (chanManager *ChannelManager) NotifyReturnSafe(
c chan amqp.Return, c chan amqp.Return,


+ 64
- 0
publish.go View File

@ -60,6 +60,8 @@ type Publisher struct {
options PublisherOptions options PublisherOptions
} }
type PublisherConfirmation []*amqp.DeferredConfirmation
// NewPublisher returns a new publisher with an open channel to the cluster. // NewPublisher returns a new publisher with an open channel to the cluster.
// If you plan to enforce mandatory or immediate publishing, those failures will be reported // If you plan to enforce mandatory or immediate publishing, those failures will be reported
// on the channel of Returns that you should setup a listener on. // on the channel of Returns that you should setup a listener on.
@ -201,6 +203,68 @@ func (publisher *Publisher) PublishWithContext(
return nil return nil
} }
func (publisher *Publisher) PublishWithDeferredConfirmWithContext(
ctx context.Context,
data []byte,
routingKeys []string,
optionFuncs ...func(*PublishOptions),
) (PublisherConfirmation, error) {
publisher.disablePublishDueToFlowMux.RLock()
defer publisher.disablePublishDueToFlowMux.RUnlock()
if publisher.disablePublishDueToFlow {
return nil, fmt.Errorf("publishing blocked due to high flow on the server")
}
publisher.disablePublishDueToBlockedMux.RLock()
defer publisher.disablePublishDueToBlockedMux.RUnlock()
if publisher.disablePublishDueToBlocked {
return nil, fmt.Errorf("publishing blocked due to TCP block on the server")
}
options := &PublishOptions{}
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
if options.DeliveryMode == 0 {
options.DeliveryMode = Transient
}
var deferredConfirmations []*amqp.DeferredConfirmation
for _, routingKey := range routingKeys {
message := amqp.Publishing{}
message.ContentType = options.ContentType
message.DeliveryMode = options.DeliveryMode
message.Body = data
message.Headers = tableToAMQPTable(options.Headers)
message.Expiration = options.Expiration
message.ContentEncoding = options.ContentEncoding
message.Priority = options.Priority
message.CorrelationId = options.CorrelationID
message.ReplyTo = options.ReplyTo
message.MessageId = options.MessageID
message.Timestamp = options.Timestamp
message.Type = options.Type
message.UserId = options.UserID
message.AppId = options.AppID
// Actual publish.
conf, err := publisher.chanManager.PublishWithDeferredConfirmWithContextSafe(
ctx,
options.Exchange,
routingKey,
options.Mandatory,
options.Immediate,
message,
)
if err != nil {
return nil, err
}
deferredConfirmations = append(deferredConfirmations, conf)
}
return deferredConfirmations, nil
}
// Close closes the publisher and releases resources // Close closes the publisher and releases resources
// The publisher should be discarded as it's not safe for re-use // The publisher should be discarded as it's not safe for re-use
// Only call Close() once // Only call Close() once


Loading…
Cancel
Save