contractcourt: add onchain interception

This commit is contained in:
Joost Jager 2022-02-04 14:32:15 +01:00
parent c392e003aa
commit 721fb4ee88
No known key found for this signature in database
GPG Key ID: A61B9D4C393C59C7
11 changed files with 264 additions and 16 deletions

View File

@ -14,6 +14,7 @@ import (
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/labels" "github.com/lightningnetwork/lnd/labels"
@ -75,7 +76,9 @@ type WitnessSubscription struct {
type WitnessBeacon interface { type WitnessBeacon interface {
// SubscribeUpdates returns a channel that will be sent upon *each* time // SubscribeUpdates returns a channel that will be sent upon *each* time
// a new preimage is discovered. // a new preimage is discovered.
SubscribeUpdates() *WitnessSubscription SubscribeUpdates(chanID lnwire.ShortChannelID, htlc *channeldb.HTLC,
payload *hop.Payload,
nextHopOnionBlob []byte) (*WitnessSubscription, error)
// LookupPreImage attempts to lookup a preimage in the global cache. // LookupPreImage attempts to lookup a preimage in the global cache.
// True is returned for the second argument if the preimage is found. // True is returned for the second argument if the preimage is found.

View File

@ -13,6 +13,7 @@ import (
"github.com/lightningnetwork/lnd/invoices" "github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
) )
// htlcIncomingContestResolver is a ContractResolver that's able to resolve an // htlcIncomingContestResolver is a ContractResolver that's able to resolve an
@ -70,7 +71,7 @@ func (h *htlcIncomingContestResolver) Resolve() (ContractResolver, error) {
// First try to parse the payload. If that fails, we can stop resolution // First try to parse the payload. If that fails, we can stop resolution
// now. // now.
payload, err := h.decodePayload() payload, nextHopOnionBlob, err := h.decodePayload()
if err != nil { if err != nil {
log.Debugf("ChannelArbitrator(%v): cannot decode payload of "+ log.Debugf("ChannelArbitrator(%v): cannot decode payload of "+
"htlc %v", h.ChanPoint, h.HtlcPoint()) "htlc %v", h.ChanPoint, h.HtlcPoint())
@ -152,7 +153,7 @@ func (h *htlcIncomingContestResolver) Resolve() (ContractResolver, error) {
// Update htlcResolution with the matching preimage. // Update htlcResolution with the matching preimage.
h.htlcResolution.Preimage = preimage h.htlcResolution.Preimage = preimage
log.Infof("%T(%v): extracted preimage=%v from beacon!", h, log.Infof("%T(%v): applied preimage=%v", h,
h.htlcResolution.ClaimOutpoint, preimage) h.htlcResolution.ClaimOutpoint, preimage)
// If this is our commitment transaction, then we'll need to // If this is our commitment transaction, then we'll need to
@ -277,7 +278,13 @@ func (h *htlcIncomingContestResolver) Resolve() (ContractResolver, error) {
// NOTE: This is done BEFORE opportunistically querying the db, // NOTE: This is done BEFORE opportunistically querying the db,
// to ensure the preimage can't be delivered between querying // to ensure the preimage can't be delivered between querying
// and registering for the preimage subscription. // and registering for the preimage subscription.
preimageSubscription := h.PreimageDB.SubscribeUpdates() preimageSubscription, err := h.PreimageDB.SubscribeUpdates(
h.htlcSuccessResolver.ShortChanID, &h.htlc,
payload, nextHopOnionBlob,
)
if err != nil {
return nil, err
}
defer preimageSubscription.CancelSubscription() defer preimageSubscription.CancelSubscription()
// With the epochs and preimage subscriptions initialized, we'll // With the epochs and preimage subscriptions initialized, we'll
@ -440,16 +447,31 @@ func (h *htlcIncomingContestResolver) SupplementState(_ *channeldb.OpenChannel)
} }
// decodePayload (re)decodes the hop payload of a received htlc. // decodePayload (re)decodes the hop payload of a received htlc.
func (h *htlcIncomingContestResolver) decodePayload() (*hop.Payload, error) { func (h *htlcIncomingContestResolver) decodePayload() (*hop.Payload,
[]byte, error) {
onionReader := bytes.NewReader(h.htlc.OnionBlob) onionReader := bytes.NewReader(h.htlc.OnionBlob)
iterator, err := h.OnionProcessor.ReconstructHopIterator( iterator, err := h.OnionProcessor.ReconstructHopIterator(
onionReader, h.htlc.RHash[:], onionReader, h.htlc.RHash[:],
) )
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
return iterator.HopPayload() payload, err := iterator.HopPayload()
if err != nil {
return nil, nil, err
}
// Transform onion blob for the next hop.
var onionBlob [lnwire.OnionPacketSize]byte
buf := bytes.NewBuffer(onionBlob[0:0])
err = iterator.EncodeNextHop(buf)
if err != nil {
return nil, nil, err
}
return payload, onionBlob[:], nil
} }
// A compile time assertion to ensure htlcIncomingContestResolver meets the // A compile time assertion to ensure htlcIncomingContestResolver meets the

View File

@ -276,6 +276,10 @@ func (h *mockHopIterator) HopPayload() (*hop.Payload, error) {
}), nil }), nil
} }
func (h *mockHopIterator) EncodeNextHop(w io.Writer) error {
return nil
}
type mockOnionProcessor struct { type mockOnionProcessor struct {
isExit bool isExit bool
offeredOnionBlob []byte offeredOnionBlob []byte

View File

@ -14,11 +14,13 @@ import (
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lntest/mock" "github.com/lightningnetwork/lnd/lntest/mock"
"github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -36,11 +38,15 @@ func newMockWitnessBeacon() *mockWitnessBeacon {
} }
} }
func (m *mockWitnessBeacon) SubscribeUpdates() *WitnessSubscription { func (m *mockWitnessBeacon) SubscribeUpdates(
chanID lnwire.ShortChannelID, htlc *channeldb.HTLC,
payload *hop.Payload,
nextHopOnionBlob []byte) (*WitnessSubscription, error) {
return &WitnessSubscription{ return &WitnessSubscription{
WitnessUpdates: m.preImageUpdates, WitnessUpdates: m.preImageUpdates,
CancelSubscription: func() {}, CancelSubscription: func() {},
} }, nil
} }
func (m *mockWitnessBeacon) LookupPreimage(payhash lntypes.Hash) (lntypes.Preimage, bool) { func (m *mockWitnessBeacon) LookupPreimage(payhash lntypes.Hash) (lntypes.Preimage, bool) {

View File

@ -178,6 +178,9 @@ then watch it on chain. Taproot script spends are also supported through the
* [Support for making routes with the legacy onion payload format via `SendToRoute` has been removed.](https://github.com/lightningnetwork/lnd/pull/6385) * [Support for making routes with the legacy onion payload format via `SendToRoute` has been removed.](https://github.com/lightningnetwork/lnd/pull/6385)
* Close a gap in the HTLC interceptor API by [intercepting htlcs in the on-chain
resolution flow](https://github.com/lightningnetwork/lnd/pull/6219) too.
## Database ## Database
* [Add ForAll implementation for etcd to speed up * [Add ForAll implementation for etcd to speed up

View File

@ -41,6 +41,8 @@ type InterceptableSwitch struct {
// interceptor client. // interceptor client.
resolutionChan chan *fwdResolution resolutionChan chan *fwdResolution
onchainIntercepted chan InterceptedForward
// interceptorRegistration is a channel that we use to synchronize // interceptorRegistration is a channel that we use to synchronize
// client connect and disconnect. // client connect and disconnect.
interceptorRegistration chan ForwardInterceptor interceptorRegistration chan ForwardInterceptor
@ -116,6 +118,7 @@ func NewInterceptableSwitch(s *Switch, cltvRejectDelta uint32,
return &InterceptableSwitch{ return &InterceptableSwitch{
htlcSwitch: s, htlcSwitch: s,
intercepted: make(chan *interceptedPackets), intercepted: make(chan *interceptedPackets),
onchainIntercepted: make(chan InterceptedForward),
interceptorRegistration: make(chan ForwardInterceptor), interceptorRegistration: make(chan ForwardInterceptor),
holdForwards: make(map[channeldb.CircuitKey]InterceptedForward), holdForwards: make(map[channeldb.CircuitKey]InterceptedForward),
resolutionChan: make(chan *fwdResolution), resolutionChan: make(chan *fwdResolution),
@ -181,6 +184,16 @@ func (s *InterceptableSwitch) run() {
log.Errorf("Cannot forward packets: %v", err) log.Errorf("Cannot forward packets: %v", err)
} }
case fwd := <-s.onchainIntercepted:
// For on-chain interceptions, we don't know if it has
// already been offered before. This information is in
// the forwarding package which isn't easily accessible
// from contractcourt. It is likely though that it was
// already intercepted in the off-chain flow. And even
// if not, it is safe to signal replay so that we won't
// unexpectedly skip over this htlc.
s.forward(fwd, true)
case res := <-s.resolutionChan: case res := <-s.resolutionChan:
res.errChan <- s.resolve(res.resolution) res.errChan <- s.resolve(res.resolution)
@ -308,6 +321,20 @@ func (s *InterceptableSwitch) ForwardPackets(linkQuit chan struct{}, isReplay bo
return nil return nil
} }
// ForwardPacket forwards a single htlc to the external interceptor.
func (s *InterceptableSwitch) ForwardPacket(
fwd InterceptedForward) error {
select {
case s.onchainIntercepted <- fwd:
case <-s.quit:
return errors.New("interceptable switch quit")
}
return nil
}
// interceptForward forwards the packet to the external interceptor after // interceptForward forwards the packet to the external interceptor after
// checking the interception criteria. // checking the interception criteria.
func (s *InterceptableSwitch) interceptForward(packet *htlcPacket, func (s *InterceptableSwitch) interceptForward(packet *htlcPacket,

View File

@ -65,8 +65,12 @@ func (m *mockPreimageCache) AddPreimages(preimages ...lntypes.Preimage) error {
return nil return nil
} }
func (m *mockPreimageCache) SubscribeUpdates() *contractcourt.WitnessSubscription { func (m *mockPreimageCache) SubscribeUpdates(
return nil chanID lnwire.ShortChannelID, htlc *channeldb.HTLC,
payload *hop.Payload,
nextHopOnionBlob []byte) (*contractcourt.WitnessSubscription, error) {
return nil, nil
} }
type mockFeeEstimator struct { type mockFeeEstimator struct {

80
intercepted_forward.go Normal file
View File

@ -0,0 +1,80 @@
package lnd
import (
"errors"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire"
)
var (
// ErrCannotResume is returned when an intercepted forward cannot be
// resumed. This is the case in the on-chain resolution flow.
ErrCannotResume = errors.New("cannot resume in the on-chain flow")
// ErrCannotFail is returned when an intercepted forward cannot be failed.
// This is the case in the on-chain resolution flow.
ErrCannotFail = errors.New("cannot fail in the on-chain flow")
// ErrPreimageMismatch is returned when the preimage that is specified to
// settle an htlc doesn't match the htlc hash.
ErrPreimageMismatch = errors.New("preimage does not match hash")
)
// interceptedForward implements the on-chain behavior for the resolution of
// a forwarded htlc.
type interceptedForward struct {
packet *htlcswitch.InterceptedPacket
beacon *preimageBeacon
}
func newInterceptedForward(
packet *htlcswitch.InterceptedPacket,
beacon *preimageBeacon) *interceptedForward {
return &interceptedForward{
beacon: beacon,
packet: packet,
}
}
// Packet returns the intercepted htlc packet.
func (f *interceptedForward) Packet() htlcswitch.InterceptedPacket {
return *f.packet
}
// Resume notifies the intention to resume an existing hold forward. This
// basically means the caller wants to resume with the default behavior for this
// htlc which usually means forward it.
func (f *interceptedForward) Resume() error {
return ErrCannotResume
}
// Fail notifies the intention to fail an existing hold forward with an
// encrypted failure reason.
func (f *interceptedForward) Fail(_ []byte) error {
// We can't actively fail an htlc. The best we could do is abandon the
// resolver, but this wouldn't be a safe operation. There may be a race
// with the preimage beacon supplying a preimage. Therefore we don't
// attempt to fail and just return an error here.
return ErrCannotFail
}
// FailWithCode notifies the intention to fail an existing hold forward with the
// specified failure code.
func (f *interceptedForward) FailWithCode(_ lnwire.FailCode) error {
return ErrCannotFail
}
// Settle notifies the intention to settle an existing hold forward with a given
// preimage.
func (f *interceptedForward) Settle(preimage lntypes.Preimage) error {
if !preimage.Matches(f.packet.Hash) {
return ErrPreimageMismatch
}
// Add preimage to the preimage beacon. The onchain resolver will pick
// up the preimage from the beacon.
return f.beacon.AddPreimages(preimage)
}

View File

@ -595,8 +595,6 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
quit: make(chan struct{}), quit: make(chan struct{}),
} }
s.witnessBeacon = newPreimageBeacon(dbs.ChanStateDB.NewWitnessCache())
currentHash, currentHeight, err := s.cc.ChainIO.GetBestBlock() currentHash, currentHeight, err := s.cc.ChainIO.GetBestBlock()
if err != nil { if err != nil {
return nil, err return nil, err
@ -656,6 +654,11 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
s.cfg.RequireInterceptor, s.cfg.RequireInterceptor,
) )
s.witnessBeacon = newPreimageBeacon(
dbs.ChanStateDB.NewWitnessCache(),
s.interceptableSwitch.ForwardPacket,
)
chanStatusMgrCfg := &netann.ChanStatusConfig{ chanStatusMgrCfg := &netann.ChanStatusConfig{
ChanStatusSampleInterval: cfg.ChanStatusSampleInterval, ChanStatusSampleInterval: cfg.ChanStatusSampleInterval,
ChanEnableTimeout: cfg.ChanEnableTimeout, ChanEnableTimeout: cfg.ChanEnableTimeout,

View File

@ -3,8 +3,12 @@ package lnd
import ( import (
"sync" "sync"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire"
) )
// preimageSubscriber reprints an active subscription to be notified once the // preimageSubscriber reprints an active subscription to be notified once the
@ -36,18 +40,27 @@ type preimageBeacon struct {
clientCounter uint64 clientCounter uint64
subscribers map[uint64]*preimageSubscriber subscribers map[uint64]*preimageSubscriber
interceptor func(htlcswitch.InterceptedForward) error
} }
func newPreimageBeacon(wCache witnessCache) *preimageBeacon { func newPreimageBeacon(wCache witnessCache,
interceptor func(htlcswitch.InterceptedForward) error) *preimageBeacon {
return &preimageBeacon{ return &preimageBeacon{
wCache: wCache, wCache: wCache,
interceptor: interceptor,
subscribers: make(map[uint64]*preimageSubscriber), subscribers: make(map[uint64]*preimageSubscriber),
} }
} }
// SubscribeUpdates returns a channel that will be sent upon *each* time a new // SubscribeUpdates returns a channel that will be sent upon *each* time a new
// preimage is discovered. // preimage is discovered.
func (p *preimageBeacon) SubscribeUpdates() *contractcourt.WitnessSubscription { func (p *preimageBeacon) SubscribeUpdates(
chanID lnwire.ShortChannelID, htlc *channeldb.HTLC,
payload *hop.Payload,
nextHopOnionBlob []byte) (*contractcourt.WitnessSubscription, error) {
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
@ -64,7 +77,7 @@ func (p *preimageBeacon) SubscribeUpdates() *contractcourt.WitnessSubscription {
srvrLog.Debugf("Creating new witness beacon subscriber, id=%v", srvrLog.Debugf("Creating new witness beacon subscriber, id=%v",
p.clientCounter) p.clientCounter)
return &contractcourt.WitnessSubscription{ sub := &contractcourt.WitnessSubscription{
WitnessUpdates: client.updateChan, WitnessUpdates: client.updateChan,
CancelSubscription: func() { CancelSubscription: func() {
p.Lock() p.Lock()
@ -75,6 +88,32 @@ func (p *preimageBeacon) SubscribeUpdates() *contractcourt.WitnessSubscription {
close(client.quit) close(client.quit)
}, },
} }
// Notify the htlc interceptor. There may be a client connected
// and willing to supply a preimage.
packet := &htlcswitch.InterceptedPacket{
Hash: htlc.RHash,
IncomingExpiry: htlc.RefundTimeout,
IncomingAmount: htlc.Amt,
IncomingCircuit: channeldb.CircuitKey{
ChanID: chanID,
HtlcID: htlc.HtlcIndex,
},
OutgoingChanID: payload.FwdInfo.NextHop,
OutgoingExpiry: payload.FwdInfo.OutgoingCTLV,
OutgoingAmount: payload.FwdInfo.AmountToForward,
CustomRecords: payload.CustomRecords(),
}
copy(packet.OnionBlob[:], nextHopOnionBlob)
fwd := newInterceptedForward(packet, p)
err := p.interceptor(fwd)
if err != nil {
return nil, err
}
return sub, nil
} }
// LookupPreImage attempts to lookup a preimage in the global cache. True is // LookupPreImage attempts to lookup a preimage in the global cache. True is

57
witness_beacon_test.go Normal file
View File

@ -0,0 +1,57 @@
package lnd
import (
"testing"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/stretchr/testify/require"
)
// TestWitnessBeaconIntercept tests that the beacon passes on subscriptions to
// the interceptor correctly.
func TestWitnessBeaconIntercept(t *testing.T) {
var interceptedFwd htlcswitch.InterceptedForward
interceptor := func(fwd htlcswitch.InterceptedForward) error {
interceptedFwd = fwd
return nil
}
p := newPreimageBeacon(
&mockWitnessCache{}, interceptor,
)
preimage := lntypes.Preimage{1, 2, 3}
hash := preimage.Hash()
subscription, err := p.SubscribeUpdates(
lnwire.NewShortChanIDFromInt(1),
&channeldb.HTLC{
RHash: hash,
},
&hop.Payload{},
[]byte{2},
)
require.NoError(t, err)
defer subscription.CancelSubscription()
require.NoError(t, interceptedFwd.Settle(preimage))
update := <-subscription.WitnessUpdates
require.Equal(t, preimage, update)
}
type mockWitnessCache struct {
witnessCache
}
func (w *mockWitnessCache) AddSha256Witnesses(
preimages ...lntypes.Preimage) error {
return nil
}