multi: reliable hand-off from htlcswitch to contractcourt

This is achieved by changing the 1-way handoff to a 2-way handoff
with a done channel.
This commit is contained in:
eugene 2022-01-04 16:21:36 -05:00
parent e7505c3e6b
commit 8607e8c28a
No known key found for this signature in database
GPG Key ID: 118759E83439A9B1
7 changed files with 193 additions and 75 deletions

View File

@ -964,16 +964,9 @@ type ContractUpdate struct {
Htlcs []channeldb.HTLC
}
// ContractSignals wraps the two signals that affect the state of a channel
// being watched by an arbitrator. The two signals we care about are: the
// channel has a new set of HTLC's, and the remote party has just broadcast
// their version of the commitment transaction.
// ContractSignals is used by outside subsystems to notify a channel arbitrator
// of its ShortChannelID.
type ContractSignals struct {
// HtlcUpdates is a channel that the link will use to update the
// designated channel arbitrator when the set of HTLCs on any valid
// commitment changes.
HtlcUpdates chan *ContractUpdate
// ShortChanID is the up to date short channel ID for a contract. This
// can change either if when the contract was added it didn't yet have
// a stable identifier, or in the case of a reorg.
@ -1001,6 +994,25 @@ func (c *ChainArbitrator) UpdateContractSignals(chanPoint wire.OutPoint,
return nil
}
// NotifyContractUpdate lets a channel arbitrator know that a new
// ContractUpdate is available. This calls the ChannelArbitrator's internal
// method NotifyContractUpdate which waits for a response on a done chan before
// returning. This method will return an error if the ChannelArbitrator is not
// in the activeChannels map. However, this only happens if the arbitrator is
// resolved and the related link would already be shut down.
func (c *ChainArbitrator) NotifyContractUpdate(chanPoint wire.OutPoint,
update *ContractUpdate) error {
c.Lock()
arbitrator, ok := c.activeChannels[chanPoint]
c.Unlock()
if !ok {
return fmt.Errorf("can't find arbitrator for %v", chanPoint)
}
return arbitrator.notifyContractUpdate(update)
}
// GetChannelArbitrator safely returns the channel arbitrator for a given
// channel outpoint.
func (c *ChainArbitrator) GetChannelArbitrator(chanPoint wire.OutPoint) (

View File

@ -28,6 +28,12 @@ var (
// close a channel that's already in the process of doing so.
errAlreadyForceClosed = errors.New("channel is already in the " +
"process of being force closed")
// errChanArbShuttingDown is an error returned when the channel arb is
// shutting down during the hand-off in notifyContractUpdate. This is
// mainly used to be able to notify the original caller (the link) that
// an error occurred.
errChanArbShuttingDown = errors.New("channel arb shutting down")
)
const (
@ -342,7 +348,7 @@ type ChannelArbitrator struct {
// htlcUpdates is a channel that is sent upon with new updates from the
// active channel. Each time a new commitment state is accepted, the
// set of HTLC's on the new state should be sent across this channel.
htlcUpdates <-chan *ContractUpdate
htlcUpdates chan *contractUpdateSignal
// activeResolvers is a slice of any active resolvers. This is used to
// be able to signal them for shutdown in the case that we shutdown.
@ -378,7 +384,7 @@ func NewChannelArbitrator(cfg ChannelArbitratorConfig,
log: log,
blocks: make(chan int32, arbitratorBlockBufferSize),
signalUpdates: make(chan *signalUpdateMsg),
htlcUpdates: make(<-chan *ContractUpdate),
htlcUpdates: make(chan *contractUpdateSignal),
resolutionSignal: make(chan struct{}),
forceCloseReqs: make(chan *forceCloseReq),
activeHTLCs: htlcSets,
@ -2387,6 +2393,41 @@ func (c *ChannelArbitrator) UpdateContractSignals(newSignals *ContractSignals) {
}
}
// contractUpdateSignal is a struct that carries the latest set of
// ContractUpdate for a particular key. It also carries a done chan that should
// be closed by the recipient.
type contractUpdateSignal struct {
// newUpdate contains the latest ContractUpdate for a key.
newUpdate *ContractUpdate
// doneChan is an acknowledgement channel.
doneChan chan struct{}
}
// notifyContractUpdate notifies the ChannelArbitrator that a new
// ContractUpdate is available from the link. The link will be paused until
// this function returns.
func (c *ChannelArbitrator) notifyContractUpdate(upd *ContractUpdate) error {
done := make(chan struct{})
select {
case c.htlcUpdates <- &contractUpdateSignal{
newUpdate: upd,
doneChan: done,
}:
case <-c.quit:
return errChanArbShuttingDown
}
select {
case <-done:
case <-c.quit:
return errChanArbShuttingDown
}
return nil
}
// channelAttendant is the primary goroutine that acts at the judicial
// arbitrator between our channel state, the remote channel peer, and the
// blockchain (Our judge). This goroutine will ensure that we faithfully execute
@ -2448,13 +2489,12 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
log.Tracef("ChannelArbitrator(%v) got new signal "+
"update!", c.cfg.ChanPoint)
// First, we'll update our set of signals.
c.htlcUpdates = signalUpdate.newSignals.HtlcUpdates
// We'll update the ShortChannelID.
c.cfg.ShortChanID = signalUpdate.newSignals.ShortChanID
// Now that the signals have been updated, we'll now
// Now that the signal has been updated, we'll now
// close the done channel to signal to the caller we've
// registered the new contracts.
// registered the new ShortChannelID.
close(signalUpdate.doneChan)
// A new set of HTLC's has been added or removed from the
@ -2465,14 +2505,19 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
// htlcSetKey type included in this update in order to
// only monitor the HTLCs that are still active on this
// target commitment.
c.activeHTLCs[htlcUpdate.HtlcKey] = newHtlcSet(
htlcUpdate.Htlcs,
htlcKey := htlcUpdate.newUpdate.HtlcKey
c.activeHTLCs[htlcKey] = newHtlcSet(
htlcUpdate.newUpdate.Htlcs,
)
// Now that the activeHTLCs have been updated, we'll
// close the done channel.
close(htlcUpdate.doneChan)
log.Tracef("ChannelArbitrator(%v): fresh set of htlcs=%v",
c.cfg.ChanPoint,
newLogClosure(func() string {
return spew.Sdump(htlcUpdate)
return spew.Sdump(htlcUpdate.newUpdate)
}),
)

View File

@ -829,11 +829,7 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) {
}
defer chanArb.Stop()
// Create htlcUpdates channel.
htlcUpdates := make(chan *ContractUpdate)
signals := &ContractSignals{
HtlcUpdates: htlcUpdates,
ShortChanID: lnwire.ShortChannelID{},
}
chanArb.UpdateContractSignals(signals)
@ -864,10 +860,12 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) {
htlc, outgoingDustHtlc, incomingDustHtlc,
}
htlcUpdates <- &ContractUpdate{
newUpdate := &ContractUpdate{
HtlcKey: LocalHtlcSet,
Htlcs: htlcSet,
}
err = chanArb.notifyContractUpdate(newUpdate)
require.NoError(t, err)
errChan := make(chan error, 1)
respChan := make(chan *wire.MsgTx, 1)
@ -1849,9 +1847,7 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) {
// Now that our channel arb has started, we'll set up
// its contract signals channel so we can send it
// various HTLC updates for this test.
htlcUpdates := make(chan *ContractUpdate)
signals := &ContractSignals{
HtlcUpdates: htlcUpdates,
ShortChanID: lnwire.ShortChannelID{},
}
chanArb.UpdateContractSignals(signals)
@ -1872,10 +1868,12 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) {
HtlcIndex: htlcIndex,
RefundTimeout: htlcExpiry,
}
htlcUpdates <- &ContractUpdate{
newUpdate := &ContractUpdate{
HtlcKey: htlcKey,
Htlcs: []channeldb.HTLC{danglingHTLC},
}
err = chanArb.notifyContractUpdate(newUpdate)
require.NoError(t, err)
// At this point, we now have a split commitment state
// from the PoV of the channel arb. There's now an HTLC
@ -2044,9 +2042,7 @@ func TestChannelArbitratorPendingExpiredHTLC(t *testing.T) {
// Now that our channel arb has started, we'll set up
// its contract signals channel so we can send it
// various HTLC updates for this test.
htlcUpdates := make(chan *ContractUpdate)
signals := &ContractSignals{
HtlcUpdates: htlcUpdates,
ShortChanID: lnwire.ShortChannelID{},
}
chanArb.UpdateContractSignals(signals)
@ -2061,10 +2057,12 @@ func TestChannelArbitratorPendingExpiredHTLC(t *testing.T) {
HtlcIndex: htlcIndex,
RefundTimeout: htlcExpiry,
}
htlcUpdates <- &ContractUpdate{
newUpdate := &ContractUpdate{
HtlcKey: RemoteHtlcSet,
Htlcs: []channeldb.HTLC{pendingHTLC},
}
err = chanArb.notifyContractUpdate(newUpdate)
require.NoError(t, err)
// We will advance the uptime to 10 seconds which should be still within
// the grace period and should not trigger going to chain.
@ -2531,11 +2529,7 @@ func TestChannelArbitratorAnchors(t *testing.T) {
}
}()
// Create htlcUpdates channel.
htlcUpdates := make(chan *ContractUpdate)
signals := &ContractSignals{
HtlcUpdates: htlcUpdates,
ShortChanID: lnwire.ShortChannelID{},
}
chanArb.UpdateContractSignals(signals)
@ -2559,7 +2553,7 @@ func TestChannelArbitratorAnchors(t *testing.T) {
// We now send two HTLC updates, one for local HTLC set and the other
// for remote HTLC set.
htlcUpdates <- &ContractUpdate{
newUpdate := &ContractUpdate{
HtlcKey: LocalHtlcSet,
// This will make the deadline of the local anchor resolution
// to be htlcWithPreimage's CLTV minus heightHint since the
@ -2567,13 +2561,18 @@ func TestChannelArbitratorAnchors(t *testing.T) {
// preimage available.
Htlcs: []channeldb.HTLC{htlc, htlcWithPreimage},
}
htlcUpdates <- &ContractUpdate{
err = chanArb.notifyContractUpdate(newUpdate)
require.NoError(t, err)
newUpdate = &ContractUpdate{
HtlcKey: RemoteHtlcSet,
// This will make the deadline of the remote anchor resolution
// to be htlcWithPreimage's CLTV minus heightHint because the
// incoming HTLC (toRemoteHTLCs) has a lower CLTV.
Htlcs: []channeldb.HTLC{htlc, htlcWithPreimage},
}
err = chanArb.notifyContractUpdate(newUpdate)
require.NoError(t, err)
errChan := make(chan error, 1)
respChan := make(chan *wire.MsgTx, 1)

View File

@ -187,11 +187,14 @@ type ChannelLinkConfig struct {
LinkFailureError)
// UpdateContractSignals is a function closure that we'll use to update
// outside sub-systems with the latest signals for our inner Lightning
// channel. These signals will notify the caller when the channel has
// been closed, or when the set of active HTLC's is updated.
// outside sub-systems with this channel's latest ShortChannelID.
UpdateContractSignals func(*contractcourt.ContractSignals) error
// NotifyContractUpdate is a function closure that we'll use to update
// the contractcourt and more specifically the ChannelArbitrator of the
// latest channel state.
NotifyContractUpdate func(*contractcourt.ContractUpdate) error
// ChainEvents is an active subscription to the chain watcher for this
// channel to be notified of any on-chain activity related to this
// channel.
@ -372,10 +375,6 @@ type channelLink struct {
// sent across.
localUpdateAdd chan *localUpdateAddMsg
// htlcUpdates is a channel that we'll use to update outside
// sub-systems with the latest set of active HTLC's on our channel.
htlcUpdates chan *contractcourt.ContractUpdate
// shutdownRequest is a channel that the channelLink will listen on to
// service shutdown requests from ShutdownIfChannelClean calls.
shutdownRequest chan *shutdownReq
@ -421,11 +420,9 @@ func NewChannelLink(cfg ChannelLinkConfig,
logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint())
return &channelLink{
cfg: cfg,
channel: channel,
shortChanID: channel.ShortChanID(),
// TODO(roasbeef): just do reserve here?
htlcUpdates: make(chan *contractcourt.ContractUpdate),
cfg: cfg,
channel: channel,
shortChanID: channel.ShortChanID(),
shutdownRequest: make(chan *shutdownReq),
hodlMap: make(map[channeldb.CircuitKey]hodlHtlc),
hodlQueue: queue.NewConcurrentQueue(10),
@ -496,7 +493,6 @@ func (l *channelLink) Start() error {
// TODO(roasbeef): split goroutines within channel arb to avoid
go func() {
signals := &contractcourt.ContractSignals{
HtlcUpdates: l.htlcUpdates,
ShortChanID: l.channel.ShortChanID(),
}
@ -1837,15 +1833,23 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
l.cfg.Peer.SendMessage(false, nextRevocation)
// Since we just revoked our commitment, we may have a new set
// of HTLC's on our commitment, so we'll send them over our
// HTLC update channel so any callers can be notified.
select {
case l.htlcUpdates <- &contractcourt.ContractUpdate{
// of HTLC's on our commitment, so we'll send them using our
// function closure NotifyContractUpdate.
newUpdate := &contractcourt.ContractUpdate{
HtlcKey: contractcourt.LocalHtlcSet,
Htlcs: currentHtlcs,
}:
}
err = l.cfg.NotifyContractUpdate(newUpdate)
if err != nil {
l.log.Errorf("unable to notify contract update: %v",
err)
return
}
select {
case <-l.quit:
return
default:
}
// If both commitment chains are fully synced from our PoV,
@ -1879,13 +1883,21 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// The remote party now has a new primary commitment, so we'll
// update the contract court to be aware of this new set (the
// prior old remote pending).
select {
case l.htlcUpdates <- &contractcourt.ContractUpdate{
newUpdate := &contractcourt.ContractUpdate{
HtlcKey: contractcourt.RemoteHtlcSet,
Htlcs: remoteHTLCs,
}:
}
err = l.cfg.NotifyContractUpdate(newUpdate)
if err != nil {
l.log.Errorf("unable to notify contract update: %v",
err)
return
}
select {
case <-l.quit:
return
default:
}
// If we have a tower client for this channel type, we'll
@ -2093,13 +2105,20 @@ func (l *channelLink) updateCommitTx() error {
// The remote party now has a new pending commitment, so we'll update
// the contract court to be aware of this new set (the prior old remote
// pending).
select {
case l.htlcUpdates <- &contractcourt.ContractUpdate{
newUpdate := &contractcourt.ContractUpdate{
HtlcKey: contractcourt.RemotePendingHtlcSet,
Htlcs: pendingHTLCs,
}:
}
err = l.cfg.NotifyContractUpdate(newUpdate)
if err != nil {
l.log.Errorf("unable to notify contract update: %v", err)
return err
}
select {
case <-l.quit:
return ErrLinkShuttingDown
default:
}
commitSig := &lnwire.CommitSig{
@ -2167,7 +2186,6 @@ func (l *channelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) {
go func() {
err := l.cfg.UpdateContractSignals(&contractcourt.ContractSignals{
HtlcUpdates: l.htlcUpdates,
ShortChanID: sid,
})
if err != nil {

View File

@ -1944,6 +1944,17 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
return nil, nil, nil, nil, nil, nil, err
}
notifyUpdateChan := make(chan *contractcourt.ContractUpdate)
doneChan := make(chan struct{})
notifyContractUpdate := func(u *contractcourt.ContractUpdate) error {
select {
case notifyUpdateChan <- u:
case <-doneChan:
}
return nil
}
// Instantiate with a long interval, so that we can precisely control
// the firing via force feeding.
bticker := ticker.NewForce(time.Hour)
@ -1967,12 +1978,13 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil
},
Registry: invoiceRegistry,
FeeEstimator: newMockFeeEstimator(),
ChainEvents: &contractcourt.ChainEventSubscription{},
BatchTicker: bticker,
FwdPkgGCTicker: ticker.NewForce(15 * time.Second),
PendingCommitTicker: ticker.New(time.Minute),
NotifyContractUpdate: notifyContractUpdate,
Registry: invoiceRegistry,
FeeEstimator: newMockFeeEstimator(),
ChainEvents: &contractcourt.ChainEventSubscription{},
BatchTicker: bticker,
FwdPkgGCTicker: ticker.NewForce(15 * time.Second),
PendingCommitTicker: ticker.New(time.Minute),
// Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
// to not trigger commit updates automatically during tests.
BatchSize: 10000,
@ -1993,8 +2005,9 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
go func() {
for {
select {
case <-aliceLink.(*channelLink).htlcUpdates:
case <-notifyUpdateChan:
case <-aliceLink.(*channelLink).quit:
close(doneChan)
return
}
}
@ -4482,6 +4495,17 @@ func (h *persistentLinkHarness) restartLink(
}
}
notifyUpdateChan := make(chan *contractcourt.ContractUpdate)
doneChan := make(chan struct{})
notifyContractUpdate := func(u *contractcourt.ContractUpdate) error {
select {
case notifyUpdateChan <- u:
case <-doneChan:
}
return nil
}
// Instantiate with a long interval, so that we can precisely control
// the firing via force feeding.
bticker := ticker.NewForce(time.Hour)
@ -4505,12 +4529,13 @@ func (h *persistentLinkHarness) restartLink(
UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil
},
Registry: h.coreLink.cfg.Registry,
FeeEstimator: newMockFeeEstimator(),
ChainEvents: &contractcourt.ChainEventSubscription{},
BatchTicker: bticker,
FwdPkgGCTicker: ticker.New(5 * time.Second),
PendingCommitTicker: ticker.New(time.Minute),
NotifyContractUpdate: notifyContractUpdate,
Registry: h.coreLink.cfg.Registry,
FeeEstimator: newMockFeeEstimator(),
ChainEvents: &contractcourt.ChainEventSubscription{},
BatchTicker: bticker,
FwdPkgGCTicker: ticker.New(5 * time.Second),
PendingCommitTicker: ticker.New(time.Minute),
// Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
// to not trigger commit updates automatically during tests.
BatchSize: 10000,
@ -4534,8 +4559,9 @@ func (h *persistentLinkHarness) restartLink(
go func() {
for {
select {
case <-aliceLink.(*channelLink).htlcUpdates:
case <-notifyUpdateChan:
case <-aliceLink.(*channelLink).quit:
close(doneChan)
return
}
}

View File

@ -1122,6 +1122,17 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer,
maxFeeUpdateTimeout = 40 * time.Minute
)
notifyUpdateChan := make(chan *contractcourt.ContractUpdate)
doneChan := make(chan struct{})
notifyContractUpdate := func(u *contractcourt.ContractUpdate) error {
select {
case notifyUpdateChan <- u:
case <-doneChan:
}
return nil
}
link := NewChannelLink(
ChannelLinkConfig{
Switch: server.htlcSwitch,
@ -1142,6 +1153,7 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer,
UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil
},
NotifyContractUpdate: notifyContractUpdate,
ChainEvents: &contractcourt.ChainEventSubscription{},
SyncStates: true,
BatchSize: 10,
@ -1169,8 +1181,9 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer,
go func() {
for {
select {
case <-link.(*channelLink).htlcUpdates:
case <-notifyUpdateChan:
case <-link.(*channelLink).quit:
close(doneChan)
return
}
}

View File

@ -815,6 +815,10 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint,
return p.cfg.ChainArb.UpdateContractSignals(*chanPoint, signals)
}
notifyContractUpdate := func(update *contractcourt.ContractUpdate) error {
return p.cfg.ChainArb.NotifyContractUpdate(*chanPoint, update)
}
chanType := lnChan.State().ChanType
// Select the appropriate tower client based on the channel type. It's
@ -842,6 +846,7 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint,
PreimageCache: p.cfg.WitnessBeacon,
ChainEvents: chainEvents,
UpdateContractSignals: updateContractSignals,
NotifyContractUpdate: notifyContractUpdate,
OnChannelFailure: onChannelFailure,
SyncStates: syncStates,
BatchTicker: ticker.New(p.cfg.ChannelCommitInterval),