Merge pull request #1905 from halseth/fundinglocked-htlc-mismatch

Defer channel restoration until after channel active check.
This commit is contained in:
Olaoluwa Osuntokun 2018-09-28 16:34:18 -07:00 committed by GitHub
commit c508365bcc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 74 additions and 55 deletions

View file

@ -26,7 +26,6 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing"
) )
@ -2315,7 +2314,7 @@ func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error {
return nil return nil
} }
func (p *mockPeer) AddNewChannel(_ *lnwallet.LightningChannel, _ <-chan struct{}) error { func (p *mockPeer) AddNewChannel(_ *channeldb.OpenChannel, _ <-chan struct{}) error {
return nil return nil
} }
func (p *mockPeer) WipeChannel(_ *wire.OutPoint) error { return nil } func (p *mockPeer) WipeChannel(_ *wire.OutPoint) error { return nil }

View file

@ -267,7 +267,7 @@ type fundingConfig struct {
// FindChannel queries the database for the channel with the given // FindChannel queries the database for the channel with the given
// channel ID. // channel ID.
FindChannel func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) FindChannel func(chanID lnwire.ChannelID) (*channeldb.OpenChannel, error)
// TempChanIDSeed is a cryptographically random string of bytes that's // TempChanIDSeed is a cryptographically random string of bytes that's
// used as a seed to generate pending channel ID's. // used as a seed to generate pending channel ID's.
@ -2323,10 +2323,9 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
// If the RemoteNextRevocation is non-nil, it means that we have // If the RemoteNextRevocation is non-nil, it means that we have
// already processed fundingLocked for this channel, so ignore. // already processed fundingLocked for this channel, so ignore.
if channel.RemoteNextRevocation() != nil { if channel.RemoteNextRevocation != nil {
fndgLog.Infof("Received duplicate fundingLocked for "+ fndgLog.Infof("Received duplicate fundingLocked for "+
"ChannelID(%v), ignoring.", chanID) "ChannelID(%v), ignoring.", chanID)
channel.Stop()
return return
} }
@ -2334,10 +2333,9 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
// need to create the next commitment state for the remote party. So // need to create the next commitment state for the remote party. So
// we'll insert that into the channel now before passing it along to // we'll insert that into the channel now before passing it along to
// other sub-systems. // other sub-systems.
err = channel.InitNextRevocation(fmsg.msg.NextPerCommitmentPoint) err = channel.InsertNextRevocation(fmsg.msg.NextPerCommitmentPoint)
if err != nil { if err != nil {
fndgLog.Errorf("unable to insert next commitment point: %v", err) fndgLog.Errorf("unable to insert next commitment point: %v", err)
channel.Stop()
return return
} }
@ -2361,8 +2359,7 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
if err := fmsg.peer.AddNewChannel(channel, f.quit); err != nil { if err := fmsg.peer.AddNewChannel(channel, f.quit); err != nil {
fndgLog.Errorf("Unable to add new channel %v with peer %x: %v", fndgLog.Errorf("Unable to add new channel %v with peer %x: %v",
fmsg.peer.IdentityKey().SerializeCompressed(), fmsg.peer.IdentityKey().SerializeCompressed(),
*channel.ChanPoint, err) channel.FundingOutpoint, err)
channel.Stop()
} }
} }

View file

@ -178,13 +178,13 @@ func (n *testNode) QuitSignal() <-chan struct{} {
return n.shutdownChannel return n.shutdownChannel
} }
func (n *testNode) AddNewChannel(channel *lnwallet.LightningChannel, func (n *testNode) AddNewChannel(channel *channeldb.OpenChannel,
quit <-chan struct{}) error { quit <-chan struct{}) error {
done := make(chan struct{}) errChan := make(chan error)
msg := &newChannelMsg{ msg := &newChannelMsg{
channel: channel, channel: channel,
done: done, err: errChan,
} }
select { select {
@ -194,12 +194,11 @@ func (n *testNode) AddNewChannel(channel *lnwallet.LightningChannel,
} }
select { select {
case <-done: case err := <-errChan:
return err
case <-quit: case <-quit:
return ErrFundingManagerShuttingDown return ErrFundingManagerShuttingDown
} }
return nil
} }
func init() { func init() {
@ -304,7 +303,8 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
return lnwire.NodeAnnouncement{}, nil return lnwire.NodeAnnouncement{}, nil
}, },
TempChanIDSeed: chanIDSeed, TempChanIDSeed: chanIDSeed,
FindChannel: func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) { FindChannel: func(chanID lnwire.ChannelID) (
*channeldb.OpenChannel, error) {
dbChannels, err := cdb.FetchAllChannels() dbChannels, err := cdb.FetchAllChannels()
if err != nil { if err != nil {
return nil, err return nil, err
@ -312,10 +312,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
for _, channel := range dbChannels { for _, channel := range dbChannels {
if chanID.IsChanPoint(&channel.FundingOutpoint) { if chanID.IsChanPoint(&channel.FundingOutpoint) {
return lnwallet.NewLightningChannel( return channel, nil
signer,
nil,
channel)
} }
} }
@ -992,14 +989,14 @@ func assertHandleFundingLocked(t *testing.T, alice, bob *testNode) {
// They should both send the new channel state to their peer. // They should both send the new channel state to their peer.
select { select {
case c := <-alice.newChannels: case c := <-alice.newChannels:
close(c.done) close(c.err)
case <-time.After(time.Second * 15): case <-time.After(time.Second * 15):
t.Fatalf("alice did not send new channel to peer") t.Fatalf("alice did not send new channel to peer")
} }
select { select {
case c := <-bob.newChannels: case c := <-bob.newChannels:
close(c.done) close(c.err)
case <-time.After(time.Second * 15): case <-time.After(time.Second * 15):
t.Fatalf("bob did not send new channel to peer") t.Fatalf("bob did not send new channel to peer")
} }

View file

@ -1477,7 +1477,8 @@ func (m *mockPeer) SendMessage(sync bool, msgs ...lnwire.Message) error {
} }
return nil return nil
} }
func (m *mockPeer) AddNewChannel(_ *lnwallet.LightningChannel, _ <-chan struct{}) error { func (m *mockPeer) AddNewChannel(_ *channeldb.OpenChannel,
_ <-chan struct{}) error {
return nil return nil
} }
func (m *mockPeer) WipeChannel(*wire.OutPoint) error { func (m *mockPeer) WipeChannel(*wire.OutPoint) error {

View file

@ -543,7 +543,7 @@ func (s *mockServer) Address() net.Addr {
return nil return nil
} }
func (s *mockServer) AddNewChannel(channel *lnwallet.LightningChannel, func (s *mockServer) AddNewChannel(channel *channeldb.OpenChannel,
cancel <-chan struct{}) error { cancel <-chan struct{}) error {
return nil return nil

View file

@ -5,7 +5,7 @@ import (
"github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
) )
@ -19,7 +19,7 @@ type Peer interface {
// AddNewChannel adds a new channel to the peer. The channel should fail // AddNewChannel adds a new channel to the peer. The channel should fail
// to be added if the cancel channel is closed. // to be added if the cancel channel is closed.
AddNewChannel(channel *lnwallet.LightningChannel, cancel <-chan struct{}) error AddNewChannel(channel *channeldb.OpenChannel, cancel <-chan struct{}) error
// WipeChannel removes the channel uniquely identified by its channel // WipeChannel removes the channel uniquely identified by its channel
// point from all indexes associated with the peer. // point from all indexes associated with the peer.

74
peer.go
View file

@ -59,12 +59,12 @@ type outgoingMsg struct {
errChan chan error // MUST be buffered. errChan chan error // MUST be buffered.
} }
// newChannelMsg packages an lnwallet.LightningChannel with a channel that // newChannelMsg packages a channeldb.OpenChannel with a channel that allows
// allows the receiver of the request to report when the funding transaction // the receiver of the request to report when the funding transaction has been
// has been confirmed and the channel creation process completed. // confirmed and the channel creation process completed.
type newChannelMsg struct { type newChannelMsg struct {
channel *lnwallet.LightningChannel channel *channeldb.OpenChannel
done chan struct{} err chan error
} }
// closeMsgs is a wrapper struct around any wire messages that deal with the // closeMsgs is a wrapper struct around any wire messages that deal with the
@ -1522,9 +1522,9 @@ out:
// funding workflow. We'll initialize the necessary local // funding workflow. We'll initialize the necessary local
// state, and notify the htlc switch of a new link. // state, and notify the htlc switch of a new link.
case newChanReq := <-p.newChannels: case newChanReq := <-p.newChannels:
chanPoint := newChanReq.channel.ChannelPoint()
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
newChan := newChanReq.channel newChan := newChanReq.channel
chanPoint := &newChan.FundingOutpoint
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
// Make sure this channel is not already active. // Make sure this channel is not already active.
p.activeChanMtx.Lock() p.activeChanMtx.Lock()
@ -1533,8 +1533,7 @@ out:
"ignoring.", chanPoint) "ignoring.", chanPoint)
p.activeChanMtx.Unlock() p.activeChanMtx.Unlock()
close(newChanReq.done) close(newChanReq.err)
newChanReq.channel.Stop()
// If we're being sent a new channel, and our // If we're being sent a new channel, and our
// existing channel doesn't have the next // existing channel doesn't have the next
@ -1548,7 +1547,7 @@ out:
"FundingLocked for ChannelPoint(%v)", "FundingLocked for ChannelPoint(%v)",
chanPoint) chanPoint)
nextRevoke := newChan.RemoteNextRevocation() nextRevoke := newChan.RemoteNextRevocation
err := currentChan.InitNextRevocation(nextRevoke) err := currentChan.InitNextRevocation(nextRevoke)
if err != nil { if err != nil {
peerLog.Errorf("unable to init chan "+ peerLog.Errorf("unable to init chan "+
@ -1562,7 +1561,21 @@ out:
// If not already active, we'll add this channel to the // If not already active, we'll add this channel to the
// set of active channels, so we can look it up later // set of active channels, so we can look it up later
// easily according to its channel ID. // easily according to its channel ID.
p.activeChannels[chanID] = newChan lnChan, err := lnwallet.NewLightningChannel(
p.server.cc.signer, p.server.witnessBeacon,
newChan,
)
if err != nil {
p.activeChanMtx.Unlock()
err := fmt.Errorf("unable to create "+
"LightningChannel: %v", err)
peerLog.Errorf(err.Error())
newChanReq.err <- err
continue
}
p.activeChannels[chanID] = lnChan
p.activeChanMtx.Unlock() p.activeChanMtx.Unlock()
peerLog.Infof("New channel active ChannelPoint(%v) "+ peerLog.Infof("New channel active ChannelPoint(%v) "+
@ -1574,15 +1587,24 @@ out:
// TODO(roasbeef): panic on below? // TODO(roasbeef): panic on below?
_, currentHeight, err := p.server.cc.chainIO.GetBestBlock() _, currentHeight, err := p.server.cc.chainIO.GetBestBlock()
if err != nil { if err != nil {
peerLog.Errorf("unable to get best block: %v", err) err := fmt.Errorf("unable to get best "+
"block: %v", err)
peerLog.Errorf(err.Error())
lnChan.Stop()
newChanReq.err <- err
continue continue
} }
chainEvents, err := p.server.chainArb.SubscribeChannelEvents( chainEvents, err := p.server.chainArb.SubscribeChannelEvents(
*chanPoint, *chanPoint,
) )
if err != nil { if err != nil {
peerLog.Errorf("unable to subscribe to chain "+ err := fmt.Errorf("unable to subscribe to "+
"events: %v", err) "chain events: %v", err)
peerLog.Errorf(err.Error())
lnChan.Stop()
newChanReq.err <- err
continue continue
} }
@ -1591,7 +1613,7 @@ out:
// forwarded. For fees we'll use the default values, as // forwarded. For fees we'll use the default values, as
// they currently are always set to the default values // they currently are always set to the default values
// at initial channel creation. // at initial channel creation.
fwdMinHtlc := newChan.FwdMinHtlc() fwdMinHtlc := lnChan.FwdMinHtlc()
defaultPolicy := p.server.cc.routingPolicy defaultPolicy := p.server.cc.routingPolicy
forwardingPolicy := &htlcswitch.ForwardingPolicy{ forwardingPolicy := &htlcswitch.ForwardingPolicy{
MinHTLC: fwdMinHtlc, MinHTLC: fwdMinHtlc,
@ -1602,16 +1624,21 @@ out:
// Create the link and add it to the switch. // Create the link and add it to the switch.
err = p.addLink( err = p.addLink(
chanPoint, newChan, forwardingPolicy, chanPoint, lnChan, forwardingPolicy,
chainEvents, currentHeight, false, chainEvents, currentHeight, false,
) )
if err != nil { if err != nil {
peerLog.Errorf("can't register new channel "+ err := fmt.Errorf("can't register new channel "+
"link(%v) with NodeKey(%x)", chanPoint, "link(%v) with NodeKey(%x)", chanPoint,
p.PubKey()) p.PubKey())
peerLog.Errorf(err.Error())
lnChan.Stop()
newChanReq.err <- err
continue
} }
close(newChanReq.done) close(newChanReq.err)
// We've just received a local request to close an active // We've just received a local request to close an active
// channel. If will either kick of a cooperative channel // channel. If will either kick of a cooperative channel
@ -2171,13 +2198,13 @@ func (p *peer) Address() net.Addr {
// added if the cancel channel is closed. // added if the cancel channel is closed.
// //
// NOTE: Part of the lnpeer.Peer interface. // NOTE: Part of the lnpeer.Peer interface.
func (p *peer) AddNewChannel(channel *lnwallet.LightningChannel, func (p *peer) AddNewChannel(channel *channeldb.OpenChannel,
cancel <-chan struct{}) error { cancel <-chan struct{}) error {
newChanDone := make(chan struct{}) errChan := make(chan error, 1)
newChanMsg := &newChannelMsg{ newChanMsg := &newChannelMsg{
channel: channel, channel: channel,
done: newChanDone, err: errChan,
} }
select { select {
@ -2191,12 +2218,11 @@ func (p *peer) AddNewChannel(channel *lnwallet.LightningChannel,
// We pause here to wait for the peer to recognize the new channel // We pause here to wait for the peer to recognize the new channel
// before we close the channel barrier corresponding to the channel. // before we close the channel barrier corresponding to the channel.
select { select {
case <-newChanDone: case err := <-errChan:
return err
case <-p.quit: case <-p.quit:
return ErrPeerExiting return ErrPeerExiting
} }
return nil
} }
// StartTime returns the time at which the connection was established if the // StartTime returns the time at which the connection was established if the

View file

@ -743,7 +743,9 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
}, },
NotifyWhenOnline: s.NotifyWhenOnline, NotifyWhenOnline: s.NotifyWhenOnline,
TempChanIDSeed: chanIDSeed, TempChanIDSeed: chanIDSeed,
FindChannel: func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) { FindChannel: func(chanID lnwire.ChannelID) (
*channeldb.OpenChannel, error) {
dbChannels, err := chanDB.FetchAllChannels() dbChannels, err := chanDB.FetchAllChannels()
if err != nil { if err != nil {
return nil, err return nil, err
@ -751,10 +753,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
for _, channel := range dbChannels { for _, channel := range dbChannels {
if chanID.IsChanPoint(&channel.FundingOutpoint) { if chanID.IsChanPoint(&channel.FundingOutpoint) {
return lnwallet.NewLightningChannel( return channel, nil
cc.signer, s.witnessBeacon,
channel,
)
} }
} }