|
|
@ -1,22 +1,32 @@ |
|
|
|
|
|
// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved.
|
|
|
|
|
|
// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd.
|
|
|
|
|
|
// Use of this source code is governed by a BSD-style
|
|
|
|
|
|
// license that can be found in the LICENSE file.
|
|
|
|
|
|
|
|
|
package amqp091 |
|
|
package amqp091 |
|
|
|
|
|
|
|
|
import "sync" |
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
|
"sync" |
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
// confirms resequences and notifies one or multiple publisher confirmation listeners
|
|
|
// confirms resequences and notifies one or multiple publisher confirmation listeners
|
|
|
type confirms struct { |
|
|
type confirms struct { |
|
|
m sync.Mutex |
|
|
|
|
|
listeners []chan Confirmation |
|
|
|
|
|
sequencer map[uint64]Confirmation |
|
|
|
|
|
published uint64 |
|
|
|
|
|
expecting uint64 |
|
|
|
|
|
|
|
|
m sync.Mutex |
|
|
|
|
|
listeners []chan Confirmation |
|
|
|
|
|
sequencer map[uint64]Confirmation |
|
|
|
|
|
deferredConfirmations *deferredConfirmations |
|
|
|
|
|
published uint64 |
|
|
|
|
|
publishedMut sync.Mutex |
|
|
|
|
|
expecting uint64 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// newConfirms allocates a confirms
|
|
|
// newConfirms allocates a confirms
|
|
|
func newConfirms() *confirms { |
|
|
func newConfirms() *confirms { |
|
|
return &confirms{ |
|
|
return &confirms{ |
|
|
sequencer: map[uint64]Confirmation{}, |
|
|
|
|
|
published: 0, |
|
|
|
|
|
expecting: 1, |
|
|
|
|
|
|
|
|
sequencer: map[uint64]Confirmation{}, |
|
|
|
|
|
deferredConfirmations: newDeferredConfirmations(), |
|
|
|
|
|
published: 0, |
|
|
|
|
|
expecting: 1, |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@ -28,12 +38,12 @@ func (c *confirms) Listen(l chan Confirmation) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Publish increments the publishing counter
|
|
|
// Publish increments the publishing counter
|
|
|
func (c *confirms) Publish() uint64 { |
|
|
|
|
|
c.m.Lock() |
|
|
|
|
|
defer c.m.Unlock() |
|
|
|
|
|
|
|
|
func (c *confirms) Publish() *DeferredConfirmation { |
|
|
|
|
|
c.publishedMut.Lock() |
|
|
|
|
|
defer c.publishedMut.Unlock() |
|
|
|
|
|
|
|
|
c.published++ |
|
|
c.published++ |
|
|
return c.published |
|
|
|
|
|
|
|
|
return c.deferredConfirmations.Add(c.published) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// confirm confirms one publishing, increments the expecting delivery tag, and
|
|
|
// confirm confirms one publishing, increments the expecting delivery tag, and
|
|
|
@ -48,6 +58,9 @@ func (c *confirms) confirm(confirmation Confirmation) { |
|
|
|
|
|
|
|
|
// resequence confirms any out of order delivered confirmations
|
|
|
// resequence confirms any out of order delivered confirmations
|
|
|
func (c *confirms) resequence() { |
|
|
func (c *confirms) resequence() { |
|
|
|
|
|
c.publishedMut.Lock() |
|
|
|
|
|
defer c.publishedMut.Unlock() |
|
|
|
|
|
|
|
|
for c.expecting <= c.published { |
|
|
for c.expecting <= c.published { |
|
|
sequenced, found := c.sequencer[c.expecting] |
|
|
sequenced, found := c.sequencer[c.expecting] |
|
|
if !found { |
|
|
if !found { |
|
|
@ -62,6 +75,8 @@ func (c *confirms) One(confirmed Confirmation) { |
|
|
c.m.Lock() |
|
|
c.m.Lock() |
|
|
defer c.m.Unlock() |
|
|
defer c.m.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
c.deferredConfirmations.Confirm(confirmed) |
|
|
|
|
|
|
|
|
if c.expecting == confirmed.DeliveryTag { |
|
|
if c.expecting == confirmed.DeliveryTag { |
|
|
c.confirm(confirmed) |
|
|
c.confirm(confirmed) |
|
|
} else { |
|
|
} else { |
|
|
@ -75,6 +90,8 @@ func (c *confirms) Multiple(confirmed Confirmation) { |
|
|
c.m.Lock() |
|
|
c.m.Lock() |
|
|
defer c.m.Unlock() |
|
|
defer c.m.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
c.deferredConfirmations.ConfirmMultiple(confirmed) |
|
|
|
|
|
|
|
|
for c.expecting <= confirmed.DeliveryTag { |
|
|
for c.expecting <= confirmed.DeliveryTag { |
|
|
c.confirm(Confirmation{c.expecting, confirmed.Ack}) |
|
|
c.confirm(Confirmation{c.expecting, confirmed.Ack}) |
|
|
} |
|
|
} |
|
|
@ -92,3 +109,56 @@ func (c *confirms) Close() error { |
|
|
c.listeners = nil |
|
|
c.listeners = nil |
|
|
return nil |
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
type deferredConfirmations struct { |
|
|
|
|
|
m sync.Mutex |
|
|
|
|
|
confirmations map[uint64]*DeferredConfirmation |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func newDeferredConfirmations() *deferredConfirmations { |
|
|
|
|
|
return &deferredConfirmations{ |
|
|
|
|
|
confirmations: map[uint64]*DeferredConfirmation{}, |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (d *deferredConfirmations) Add(tag uint64) *DeferredConfirmation { |
|
|
|
|
|
d.m.Lock() |
|
|
|
|
|
defer d.m.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
dc := &DeferredConfirmation{DeliveryTag: tag} |
|
|
|
|
|
dc.wg.Add(1) |
|
|
|
|
|
d.confirmations[tag] = dc |
|
|
|
|
|
return dc |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (d *deferredConfirmations) Confirm(confirmation Confirmation) { |
|
|
|
|
|
d.m.Lock() |
|
|
|
|
|
defer d.m.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
dc, found := d.confirmations[confirmation.DeliveryTag] |
|
|
|
|
|
if !found { |
|
|
|
|
|
// we should never receive a confirmation for a tag that hasn't been published, but a test causes this to happen
|
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
|
|
|
dc.confirmation = confirmation |
|
|
|
|
|
dc.wg.Done() |
|
|
|
|
|
delete(d.confirmations, confirmation.DeliveryTag) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (d *deferredConfirmations) ConfirmMultiple(confirmation Confirmation) { |
|
|
|
|
|
d.m.Lock() |
|
|
|
|
|
defer d.m.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
for k, v := range d.confirmations { |
|
|
|
|
|
if k <= confirmation.DeliveryTag { |
|
|
|
|
|
v.confirmation = Confirmation{DeliveryTag: k, Ack: confirmation.Ack} |
|
|
|
|
|
v.wg.Done() |
|
|
|
|
|
delete(d.confirmations, k) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (d *DeferredConfirmation) Wait() bool { |
|
|
|
|
|
d.wg.Wait() |
|
|
|
|
|
return d.confirmation.Ack |
|
|
|
|
|
} |