package htlcswitch import ( "bytes" "errors" "fmt" "math/rand" "sync" "sync/atomic" "time" "github.com/btcsuite/btcd/btcec/v2/ecdsa" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/wire" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch/hop" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/chainfee" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/ticker" ) const ( // DefaultFwdEventInterval is the duration between attempts to flush // pending forwarding events to disk. DefaultFwdEventInterval = 15 * time.Second // DefaultLogInterval is the duration between attempts to log statistics // about forwarding events. DefaultLogInterval = 10 * time.Second // DefaultAckInterval is the duration between attempts to ack any settle // fails in a forwarding package. DefaultAckInterval = 15 * time.Second // DefaultHTLCExpiry is the duration after which Adds will be cancelled // if they could not get added to an outgoing commitment. DefaultHTLCExpiry = time.Minute ) var ( // ErrChannelLinkNotFound is used when channel link hasn't been found. ErrChannelLinkNotFound = errors.New("channel link not found") // ErrDuplicateAdd signals that the ADD htlc was already forwarded // through the switch and is locked into another commitment txn. ErrDuplicateAdd = errors.New("duplicate add HTLC detected") // ErrUnknownErrorDecryptor signals that we were unable to locate the // error decryptor for this payment. This is likely due to restarting // the daemon. ErrUnknownErrorDecryptor = errors.New("unknown error decryptor") // ErrSwitchExiting signaled when the switch has received a shutdown // request. ErrSwitchExiting = errors.New("htlcswitch shutting down") // ErrNoLinksFound is an error returned when we attempt to retrieve the // active links in the switch for a specific destination. ErrNoLinksFound = errors.New("no channel links found") // ErrUnreadableFailureMessage is returned when the failure message // cannot be decrypted. ErrUnreadableFailureMessage = errors.New("unreadable failure message") // ErrLocalAddFailed signals that the ADD htlc for a local payment // failed to be processed. ErrLocalAddFailed = errors.New("local add HTLC failed") // errDustThresholdExceeded is only surfaced to callers of SendHTLC and // signals that sending the HTLC would exceed the outgoing link's dust // threshold. errDustThresholdExceeded = errors.New("dust threshold exceeded") // DefaultDustThreshold is the default threshold after which we'll fail // payments if they are dust. This is currently set to 500m msats. DefaultDustThreshold = lnwire.MilliSatoshi(500_000_000) ) // plexPacket encapsulates switch packet and adds error channel to receive // error from request handler. type plexPacket struct { pkt *htlcPacket err chan error } // ChanClose represents a request which close a particular channel specified by // its id. type ChanClose struct { // CloseType is a variable which signals the type of channel closure the // peer should execute. CloseType contractcourt.ChannelCloseType // ChanPoint represent the id of the channel which should be closed. ChanPoint *wire.OutPoint // TargetFeePerKw is the ideal fee that was specified by the caller. // This value is only utilized if the closure type is CloseRegular. // This will be the starting offered fee when the fee negotiation // process for the cooperative closure transaction kicks off. TargetFeePerKw chainfee.SatPerKWeight // DeliveryScript is an optional delivery script to pay funds out to. DeliveryScript lnwire.DeliveryAddress // Updates is used by request creator to receive the notifications about // execution of the close channel request. Updates chan interface{} // Err is used by request creator to receive request execution error. Err chan error } // Config defines the configuration for the service. ALL elements within the // configuration MUST be non-nil for the service to carry out its duties. type Config struct { // FwdingLog is an interface that will be used by the switch to log // forwarding events. A forwarding event happens each time a payment // circuit is successfully completed. So when we forward an HTLC, and a // settle is eventually received. FwdingLog ForwardingLog // LocalChannelClose kicks-off the workflow to execute a cooperative or // forced unilateral closure of the channel initiated by a local // subsystem. LocalChannelClose func(pubKey []byte, request *ChanClose) // DB is the database backend that will be used to back the switch's // persistent circuit map. DB kvdb.Backend // FetchAllOpenChannels is a function that fetches all currently open // channels from the channel database. FetchAllOpenChannels func() ([]*channeldb.OpenChannel, error) // FetchClosedChannels is a function that fetches all closed channels // from the channel database. FetchClosedChannels func( pendingOnly bool) ([]*channeldb.ChannelCloseSummary, error) // SwitchPackager provides access to the forwarding packages of all // active channels. This gives the switch the ability to read arbitrary // forwarding packages, and ack settles and fails contained within them. SwitchPackager channeldb.FwdOperator // ExtractErrorEncrypter is an interface allowing switch to reextract // error encrypters stored in the circuit map on restarts, since they // are not stored directly within the database. ExtractErrorEncrypter hop.ErrorEncrypterExtracter // FetchLastChannelUpdate retrieves the latest routing policy for a // target channel. This channel will typically be the outgoing channel // specified when we receive an incoming HTLC. This will be used to // provide payment senders our latest policy when sending encrypted // error messages. FetchLastChannelUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) // Notifier is an instance of a chain notifier that we'll use to signal // the switch when a new block has arrived. Notifier chainntnfs.ChainNotifier // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc // events through. HtlcNotifier htlcNotifier // FwdEventTicker is a signal that instructs the htlcswitch to flush any // pending forwarding events. FwdEventTicker ticker.Ticker // LogEventTicker is a signal instructing the htlcswitch to log // aggregate stats about it's forwarding during the last interval. LogEventTicker ticker.Ticker // AckEventTicker is a signal instructing the htlcswitch to ack any settle // fails in forwarding packages. AckEventTicker ticker.Ticker // AllowCircularRoute is true if the user has configured their node to // allow forwards that arrive and depart our node over the same channel. AllowCircularRoute bool // RejectHTLC is a flag that instructs the htlcswitch to reject any // HTLCs that are not from the source hop. RejectHTLC bool // Clock is a time source for the switch. Clock clock.Clock // HTLCExpiry is the interval after which Adds will be cancelled if they // have not been yet been delivered to a link. The computed deadline // will expiry this long after the Adds are added to a mailbox via // AddPacket. HTLCExpiry time.Duration // DustThreshold is the threshold in milli-satoshis after which we'll // fail incoming or outgoing dust payments for a particular channel. DustThreshold lnwire.MilliSatoshi // SignAliasUpdate is used when sending FailureMessages backwards for // option_scid_alias channels. This avoids a potential privacy leak by // replacing the public, confirmed SCID with the alias in the // ChannelUpdate. SignAliasUpdate func(u *lnwire.ChannelUpdate) (*ecdsa.Signature, error) // IsAlias returns whether or not a given SCID is an alias. IsAlias func(scid lnwire.ShortChannelID) bool } // Switch is the central messaging bus for all incoming/outgoing HTLCs. // Connected peers with active channels are treated as named interfaces which // refer to active channels as links. A link is the switch's message // communication point with the goroutine that manages an active channel. New // links are registered each time a channel is created, and unregistered once // the channel is closed. The switch manages the hand-off process for multi-hop // HTLCs, forwarding HTLCs initiated from within the daemon, and finally // notifies users local-systems concerning their outstanding payment requests. type Switch struct { started int32 // To be used atomically. shutdown int32 // To be used atomically. // bestHeight is the best known height of the main chain. The links will // be used this information to govern decisions based on HTLC timeouts. // This will be retrieved by the registered links atomically. bestHeight uint32 wg sync.WaitGroup quit chan struct{} // cfg is a copy of the configuration struct that the htlc switch // service was initialized with. cfg *Config // networkResults stores the results of payments initiated by the user. // The store is used to later look up the payments and notify the // user of the result when they are complete. Each payment attempt // should be given a unique integer ID when it is created, otherwise // results might be overwritten. networkResults *networkResultStore // circuits is storage for payment circuits which are used to // forward the settle/fail htlc updates back to the add htlc initiator. circuits CircuitMap // mailOrchestrator manages the lifecycle of mailboxes used throughout // the switch, and facilitates delayed delivery of packets to links that // later come online. mailOrchestrator *mailOrchestrator // indexMtx is a read/write mutex that protects the set of indexes // below. indexMtx sync.RWMutex // pendingLinkIndex holds links that have not had their final, live // short_chan_id assigned. pendingLinkIndex map[lnwire.ChannelID]ChannelLink // links is a map of channel id and channel link which manages // this channel. linkIndex map[lnwire.ChannelID]ChannelLink // forwardingIndex is an index which is consulted by the switch when it // needs to locate the next hop to forward an incoming/outgoing HTLC // update to/from. // // TODO(roasbeef): eventually add a NetworkHop mapping before the // ChannelLink forwardingIndex map[lnwire.ShortChannelID]ChannelLink // interfaceIndex maps the compressed public key of a peer to all the // channels that the switch maintains with that peer. interfaceIndex map[[33]byte]map[lnwire.ChannelID]ChannelLink // linkStopIndex stores the currently stopping ChannelLinks, // represented by their ChannelID. The key is the link's ChannelID and // the value is a chan that is closed when the link has fully stopped. // This map is only added to if RemoveLink is called and is not added // to when the Switch is shutting down and calls Stop() on each link. // // MUST be used with the indexMtx. linkStopIndex map[lnwire.ChannelID]chan struct{} // htlcPlex is the channel which all connected links use to coordinate // the setup/teardown of Sphinx (onion routing) payment circuits. // Active links forward any add/settle messages over this channel each // state transition, sending new adds/settles which are fully locked // in. htlcPlex chan *plexPacket // chanCloseRequests is used to transfer the channel close request to // the channel close handler. chanCloseRequests chan *ChanClose // resolutionMsgs is the channel that all external contract resolution // messages will be sent over. resolutionMsgs chan *resolutionMsg // pendingFwdingEvents is the set of forwarding events which have been // collected during the current interval, but hasn't yet been written // to the forwarding log. fwdEventMtx sync.Mutex pendingFwdingEvents []channeldb.ForwardingEvent // blockEpochStream is an active block epoch event stream backed by an // active ChainNotifier instance. This will be used to retrieve the // latest height of the chain. blockEpochStream *chainntnfs.BlockEpochEvent // pendingSettleFails is the set of settle/fail entries that we need to // ack in the forwarding package of the outgoing link. This was added to // make pipelining settles more efficient. pendingSettleFails []channeldb.SettleFailRef // resMsgStore is used to store the set of ResolutionMsg that come from // contractcourt. This is used so the Switch can properly forward them, // even on restarts. resMsgStore *resolutionStore // aliasToReal is a map used for option-scid-alias feature-bit links. // The alias SCID is the key and the real, confirmed SCID is the value. // If the channel is unconfirmed, there will not be a mapping for it. // Since channels can have multiple aliases, this map is essentially a // N->1 mapping for a channel. This MUST be accessed with the indexMtx. aliasToReal map[lnwire.ShortChannelID]lnwire.ShortChannelID // baseIndex is a map used for option-scid-alias feature-bit links. // The value is the SCID of the link's ShortChannelID. This value may // be an alias for zero-conf channels or a confirmed SCID for // non-zero-conf channels with the option-scid-alias feature-bit. The // key includes the value itself and also any other aliases. This MUST // be accessed with the indexMtx. baseIndex map[lnwire.ShortChannelID]lnwire.ShortChannelID } // New creates the new instance of htlc switch. func New(cfg Config, currentHeight uint32) (*Switch, error) { resStore := newResolutionStore(cfg.DB) circuitMap, err := NewCircuitMap(&CircuitMapConfig{ DB: cfg.DB, FetchAllOpenChannels: cfg.FetchAllOpenChannels, FetchClosedChannels: cfg.FetchClosedChannels, ExtractErrorEncrypter: cfg.ExtractErrorEncrypter, CheckResolutionMsg: resStore.checkResolutionMsg, }) if err != nil { return nil, err } s := &Switch{ bestHeight: currentHeight, cfg: &cfg, circuits: circuitMap, linkIndex: make(map[lnwire.ChannelID]ChannelLink), forwardingIndex: make(map[lnwire.ShortChannelID]ChannelLink), interfaceIndex: make(map[[33]byte]map[lnwire.ChannelID]ChannelLink), pendingLinkIndex: make(map[lnwire.ChannelID]ChannelLink), linkStopIndex: make(map[lnwire.ChannelID]chan struct{}), networkResults: newNetworkResultStore(cfg.DB), htlcPlex: make(chan *plexPacket), chanCloseRequests: make(chan *ChanClose), resolutionMsgs: make(chan *resolutionMsg), resMsgStore: resStore, quit: make(chan struct{}), } s.aliasToReal = make(map[lnwire.ShortChannelID]lnwire.ShortChannelID) s.baseIndex = make(map[lnwire.ShortChannelID]lnwire.ShortChannelID) s.mailOrchestrator = newMailOrchestrator(&mailOrchConfig{ forwardPackets: s.ForwardPackets, clock: s.cfg.Clock, expiry: s.cfg.HTLCExpiry, failMailboxUpdate: s.failMailboxUpdate, }) return s, nil } // resolutionMsg is a struct that wraps an existing ResolutionMsg with a done // channel. We'll use this channel to synchronize delivery of the message with // the caller. type resolutionMsg struct { contractcourt.ResolutionMsg errChan chan error } // ProcessContractResolution is called by active contract resolvers once a // contract they are watching over has been fully resolved. The message carries // an external signal that *would* have been sent if the outgoing channel // didn't need to go to the chain in order to fulfill a contract. We'll process // this message just as if it came from an active outgoing channel. func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) error { errChan := make(chan error, 1) select { case s.resolutionMsgs <- &resolutionMsg{ ResolutionMsg: msg, errChan: errChan, }: case <-s.quit: return ErrSwitchExiting } select { case err := <-errChan: return err case <-s.quit: return ErrSwitchExiting } } // GetPaymentResult returns the the result of the payment attempt with the // given attemptID. The paymentHash should be set to the payment's overall // hash, or in case of AMP payments the payment's unique identifier. // // The method returns a channel where the payment result will be sent when // available, or an error is encountered during forwarding. When a result is // received on the channel, the HTLC is guaranteed to no longer be in flight. // The switch shutting down is signaled by closing the channel. If the // attemptID is unknown, ErrPaymentIDNotFound will be returned. func (s *Switch) GetPaymentResult(attemptID uint64, paymentHash lntypes.Hash, deobfuscator ErrorDecrypter) (<-chan *PaymentResult, error) { var ( nChan <-chan *networkResult err error inKey = CircuitKey{ ChanID: hop.Source, HtlcID: attemptID, } ) // If the payment is not found in the circuit map, check whether a // result is already available. // Assumption: no one will add this payment ID other than the caller. if s.circuits.LookupCircuit(inKey) == nil { res, err := s.networkResults.getResult(attemptID) if err != nil { return nil, err } c := make(chan *networkResult, 1) c <- res nChan = c } else { // The payment was committed to the circuits, subscribe for a // result. nChan, err = s.networkResults.subscribeResult(attemptID) if err != nil { return nil, err } } resultChan := make(chan *PaymentResult, 1) // Since the payment was known, we can start a goroutine that can // extract the result when it is available, and pass it on to the // caller. s.wg.Add(1) go func() { defer s.wg.Done() var n *networkResult select { case n = <-nChan: case <-s.quit: // We close the result channel to signal a shutdown. We // don't send any result in this case since the HTLC is // still in flight. close(resultChan) return } log.Debugf("Received network result %T for attemptID=%v", n.msg, attemptID) // Extract the result and pass it to the result channel. result, err := s.extractResult( deobfuscator, n, attemptID, paymentHash, ) if err != nil { e := fmt.Errorf("unable to extract result: %v", err) log.Error(e) resultChan <- &PaymentResult{ Error: e, } return } resultChan <- result }() return resultChan, nil } // CleanStore calls the underlying result store, telling it is safe to delete // all entries except the ones in the keepPids map. This should be called // preiodically to let the switch clean up payment results that we have // handled. func (s *Switch) CleanStore(keepPids map[uint64]struct{}) error { return s.networkResults.cleanStore(keepPids) } // SendHTLC is used by other subsystems which aren't belong to htlc switch // package in order to send the htlc update. The attemptID used MUST be unique // for this HTLC, and MUST be used only once, otherwise the switch might reject // it. func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, attemptID uint64, htlc *lnwire.UpdateAddHTLC) error { // Generate and send new update packet, if error will be received on // this stage it means that packet haven't left boundaries of our // system and something wrong happened. packet := &htlcPacket{ incomingChanID: hop.Source, incomingHTLCID: attemptID, outgoingChanID: firstHop, htlc: htlc, amount: htlc.Amount, } // Attempt to fetch the target link before creating a circuit so that // we don't leave dangling circuits. The getLocalLink method does not // require the circuit variable to be set on the *htlcPacket. link, linkErr := s.getLocalLink(packet, htlc) if linkErr != nil { // Notify the htlc notifier of a link failure on our outgoing // link. Incoming timelock/amount values are not set because // they are not present for local sends. s.cfg.HtlcNotifier.NotifyLinkFailEvent( newHtlcKey(packet), HtlcInfo{ OutgoingTimeLock: htlc.Expiry, OutgoingAmt: htlc.Amount, }, HtlcEventTypeSend, linkErr, false, ) return linkErr } // Evaluate whether this HTLC would increase our exposure to dust. If // it does, don't send it out and instead return an error. if s.evaluateDustThreshold(link, htlc.Amount, false) { // Notify the htlc notifier of a link failure on our outgoing // link. We use the FailTemporaryChannelFailure in place of a // more descriptive error message. linkErr := NewLinkError( &lnwire.FailTemporaryChannelFailure{}, ) s.cfg.HtlcNotifier.NotifyLinkFailEvent( newHtlcKey(packet), HtlcInfo{ OutgoingTimeLock: htlc.Expiry, OutgoingAmt: htlc.Amount, }, HtlcEventTypeSend, linkErr, false, ) return errDustThresholdExceeded } circuit := newPaymentCircuit(&htlc.PaymentHash, packet) actions, err := s.circuits.CommitCircuits(circuit) if err != nil { log.Errorf("unable to commit circuit in switch: %v", err) return err } // Drop duplicate packet if it has already been seen. switch { case len(actions.Drops) == 1: return ErrDuplicateAdd case len(actions.Fails) == 1: return ErrLocalAddFailed } // Give the packet to the link's mailbox so that HTLC's are properly // canceled back if the mailbox timeout elapses. packet.circuit = circuit return link.handleSwitchPacket(packet) } // UpdateForwardingPolicies sends a message to the switch to update the // forwarding policies for the set of target channels, keyed in chanPolicies. // // NOTE: This function is synchronous and will block until either the // forwarding policies for all links have been updated, or the switch shuts // down. func (s *Switch) UpdateForwardingPolicies( chanPolicies map[wire.OutPoint]ForwardingPolicy) { log.Tracef("Updating link policies: %v", newLogClosure(func() string { return spew.Sdump(chanPolicies) })) s.indexMtx.RLock() // Update each link in chanPolicies. for targetLink, policy := range chanPolicies { cid := lnwire.NewChanIDFromOutPoint(&targetLink) link, ok := s.linkIndex[cid] if !ok { log.Debugf("Unable to find ChannelPoint(%v) to update "+ "link policy", targetLink) continue } link.UpdateForwardingPolicy(policy) } s.indexMtx.RUnlock() } // IsForwardedHTLC checks for a given channel and htlc index if it is related // to an opened circuit that represents a forwarded payment. func (s *Switch) IsForwardedHTLC(chanID lnwire.ShortChannelID, htlcIndex uint64) bool { circuit := s.circuits.LookupOpenCircuit(channeldb.CircuitKey{ ChanID: chanID, HtlcID: htlcIndex, }) return circuit != nil && circuit.Incoming.ChanID != hop.Source } // ForwardPackets adds a list of packets to the switch for processing. Fails // and settles are added on a first past, simultaneously constructing circuits // for any adds. After persisting the circuits, another pass of the adds is // given to forward them through the router. The sending link's quit channel is // used to prevent deadlocks when the switch stops a link in the midst of // forwarding. func (s *Switch) ForwardPackets(linkQuit chan struct{}, packets ...*htlcPacket) error { var ( // fwdChan is a buffered channel used to receive err msgs from // the htlcPlex when forwarding this batch. fwdChan = make(chan error, len(packets)) // numSent keeps a running count of how many packets are // forwarded to the switch, which determines how many responses // we will wait for on the fwdChan.. numSent int ) // No packets, nothing to do. if len(packets) == 0 { return nil } // Setup a barrier to prevent the background tasks from processing // responses until this function returns to the user. var wg sync.WaitGroup wg.Add(1) defer wg.Done() // Before spawning the following goroutine to proxy our error responses, // check to see if we have already been issued a shutdown request. If // so, we exit early to avoid incrementing the switch's waitgroup while // it is already in the process of shutting down. select { case <-linkQuit: return nil case <-s.quit: return nil default: // Spawn a goroutine to log the errors returned from failed packets. s.wg.Add(1) go s.logFwdErrs(&numSent, &wg, fwdChan) } // Make a first pass over the packets, forwarding any settles or fails. // As adds are found, we create a circuit and append it to our set of // circuits to be written to disk. var circuits []*PaymentCircuit var addBatch []*htlcPacket for _, packet := range packets { switch htlc := packet.htlc.(type) { case *lnwire.UpdateAddHTLC: circuit := newPaymentCircuit(&htlc.PaymentHash, packet) packet.circuit = circuit circuits = append(circuits, circuit) addBatch = append(addBatch, packet) default: err := s.routeAsync(packet, fwdChan, linkQuit) if err != nil { return fmt.Errorf("failed to forward packet %v", err) } numSent++ } } // If this batch did not contain any circuits to commit, we can return // early. if len(circuits) == 0 { return nil } // Write any circuits that we found to disk. actions, err := s.circuits.CommitCircuits(circuits...) if err != nil { log.Errorf("unable to commit circuits in switch: %v", err) } // Split the htlc packets by comparing an in-order seek to the head of // the added, dropped, or failed circuits. // // NOTE: This assumes each list is guaranteed to be a subsequence of the // circuits, and that the union of the sets results in the original set // of circuits. var addedPackets, failedPackets []*htlcPacket for _, packet := range addBatch { switch { case len(actions.Adds) > 0 && packet.circuit == actions.Adds[0]: addedPackets = append(addedPackets, packet) actions.Adds = actions.Adds[1:] case len(actions.Drops) > 0 && packet.circuit == actions.Drops[0]: actions.Drops = actions.Drops[1:] case len(actions.Fails) > 0 && packet.circuit == actions.Fails[0]: failedPackets = append(failedPackets, packet) actions.Fails = actions.Fails[1:] } } // Now, forward any packets for circuits that were successfully added to // the switch's circuit map. for _, packet := range addedPackets { err := s.routeAsync(packet, fwdChan, linkQuit) if err != nil { return fmt.Errorf("failed to forward packet %v", err) } numSent++ } // Lastly, for any packets that failed, this implies that they were // left in a half added state, which can happen when recovering from // failures. if len(failedPackets) > 0 { var failure lnwire.FailureMessage incomingID := failedPackets[0].incomingChanID // If the incoming channel is an option_scid_alias channel, // then we'll need to replace the SCID in the ChannelUpdate. update := s.failAliasUpdate(incomingID, true) if update == nil { // Fallback to the original non-option behavior. update, err := s.cfg.FetchLastChannelUpdate( incomingID, ) if err != nil { failure = &lnwire.FailTemporaryNodeFailure{} } else { failure = lnwire.NewTemporaryChannelFailure( update, ) } } else { // This is an option_scid_alias channel. failure = lnwire.NewTemporaryChannelFailure(update) } linkError := NewDetailedLinkError( failure, OutgoingFailureIncompleteForward, ) for _, packet := range failedPackets { // We don't handle the error here since this method // always returns an error. _ = s.failAddPacket(packet, linkError) } } return nil } // logFwdErrs logs any errors received on `fwdChan`. func (s *Switch) logFwdErrs(num *int, wg *sync.WaitGroup, fwdChan chan error) { defer s.wg.Done() // Wait here until the outer function has finished persisting // and routing the packets. This guarantees we don't read from num until // the value is accurate. wg.Wait() numSent := *num for i := 0; i < numSent; i++ { select { case err := <-fwdChan: if err != nil { log.Errorf("Unhandled error while reforwarding htlc "+ "settle/fail over htlcswitch: %v", err) } case <-s.quit: log.Errorf("unable to forward htlc packet " + "htlc switch was stopped") return } } } // routeAsync sends a packet through the htlc switch, using the provided err // chan to propagate errors back to the caller. The link's quit channel is // provided so that the send can be canceled if either the link or the switch // receive a shutdown requuest. This method does not wait for a response from // the htlcForwarder before returning. func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error, linkQuit chan struct{}) error { command := &plexPacket{ pkt: packet, err: errChan, } select { case s.htlcPlex <- command: return nil case <-linkQuit: return ErrLinkShuttingDown case <-s.quit: return errors.New("htlc switch was stopped") } } // getLocalLink handles the addition of a htlc for a send that originates from // our node. It returns the link that the htlc should be forwarded outwards on, // and a link error if the htlc cannot be forwarded. func (s *Switch) getLocalLink(pkt *htlcPacket, htlc *lnwire.UpdateAddHTLC) ( ChannelLink, *LinkError) { // Try to find links by node destination. s.indexMtx.RLock() link, err := s.getLinkByShortID(pkt.outgoingChanID) defer s.indexMtx.RUnlock() if err != nil { // If the link was not found for the outgoingChanID, an outside // subsystem may be using the confirmed SCID of a zero-conf // channel. In this case, we'll consult the Switch maps to see // if an alias exists and use the alias to lookup the link. // This extra step is a consequence of not updating the Switch // forwardingIndex when a zero-conf channel is confirmed. We // don't need to change the outgoingChanID since the link will // do that upon receiving the packet. baseScid, ok := s.baseIndex[pkt.outgoingChanID] if !ok { log.Errorf("Link %v not found", pkt.outgoingChanID) return nil, NewLinkError(&lnwire.FailUnknownNextPeer{}) } // The base SCID was found, so we'll use that to fetch the // link. link, err = s.getLinkByShortID(baseScid) if err != nil { log.Errorf("Link %v not found", baseScid) return nil, NewLinkError(&lnwire.FailUnknownNextPeer{}) } } if !link.EligibleToForward() { log.Errorf("Link %v is not available to forward", pkt.outgoingChanID) // The update does not need to be populated as the error // will be returned back to the router. return nil, NewDetailedLinkError( lnwire.NewTemporaryChannelFailure(nil), OutgoingFailureLinkNotEligible, ) } // Ensure that the htlc satisfies the outgoing channel policy. currentHeight := atomic.LoadUint32(&s.bestHeight) htlcErr := link.CheckHtlcTransit( htlc.PaymentHash, htlc.Amount, htlc.Expiry, currentHeight, ) if htlcErr != nil { log.Errorf("Link %v policy for local forward not "+ "satisfied", pkt.outgoingChanID) return nil, htlcErr } return link, nil } // handleLocalResponse processes a Settle or Fail responding to a // locally-initiated payment. This is handled asynchronously to avoid blocking // the main event loop within the switch, as these operations can require // multiple db transactions. The guarantees of the circuit map are stringent // enough such that we are able to tolerate reordering of these operations // without side effects. The primary operations handled are: // 1. Save the payment result to the pending payment store. // 2. Notify subscribers about the payment result. // 3. Ack settle/fail references, to avoid resending this response internally // 4. Teardown the closing circuit in the circuit map // // NOTE: This method MUST be spawned as a goroutine. func (s *Switch) handleLocalResponse(pkt *htlcPacket) { defer s.wg.Done() attemptID := pkt.incomingHTLCID // The error reason will be unencypted in case this a local // failure or a converted error. unencrypted := pkt.localFailure || pkt.convertedError n := &networkResult{ msg: pkt.htlc, unencrypted: unencrypted, isResolution: pkt.isResolution, } // Store the result to the db. This will also notify subscribers about // the result. if err := s.networkResults.storeResult(attemptID, n); err != nil { log.Errorf("Unable to complete payment for pid=%v: %v", attemptID, err) return } // First, we'll clean up any fwdpkg references, circuit entries, and // mark in our db that the payment for this payment hash has either // succeeded or failed. // // If this response is contained in a forwarding package, we'll start by // acking the settle/fail so that we don't continue to retransmit the // HTLC internally. if pkt.destRef != nil { if err := s.ackSettleFail(*pkt.destRef); err != nil { log.Warnf("Unable to ack settle/fail reference: %s: %v", *pkt.destRef, err) return } } // Next, we'll remove the circuit since we are about to complete an // fulfill/fail of this HTLC. Since we've already removed the // settle/fail fwdpkg reference, the response from the peer cannot be // replayed internally if this step fails. If this happens, this logic // will be executed when a provided resolution message comes through. // This can only happen if the circuit is still open, which is why this // ordering is chosen. if err := s.teardownCircuit(pkt); err != nil { log.Warnf("Unable to teardown circuit %s: %v", pkt.inKey(), err) return } // Finally, notify on the htlc failure or success that has been handled. key := newHtlcKey(pkt) eventType := getEventType(pkt) switch htlc := pkt.htlc.(type) { case *lnwire.UpdateFulfillHTLC: s.cfg.HtlcNotifier.NotifySettleEvent(key, htlc.PaymentPreimage, eventType) case *lnwire.UpdateFailHTLC: s.cfg.HtlcNotifier.NotifyForwardingFailEvent(key, eventType) } } // extractResult uses the given deobfuscator to extract the payment result from // the given network message. func (s *Switch) extractResult(deobfuscator ErrorDecrypter, n *networkResult, attemptID uint64, paymentHash lntypes.Hash) (*PaymentResult, error) { switch htlc := n.msg.(type) { // We've received a settle update which means we can finalize the user // payment and return successful response. case *lnwire.UpdateFulfillHTLC: return &PaymentResult{ Preimage: htlc.PaymentPreimage, }, nil // We've received a fail update which means we can finalize the // user payment and return fail response. case *lnwire.UpdateFailHTLC: paymentErr := s.parseFailedPayment( deobfuscator, attemptID, paymentHash, n.unencrypted, n.isResolution, htlc, ) return &PaymentResult{ Error: paymentErr, }, nil default: return nil, fmt.Errorf("received unknown response type: %T", htlc) } } // parseFailedPayment determines the appropriate failure message to return to // a user initiated payment. The three cases handled are: // 1) An unencrypted failure, which should already plaintext. // 2) A resolution from the chain arbitrator, which possibly has no failure // reason attached. // 3) A failure from the remote party, which will need to be decrypted using // the payment deobfuscator. func (s *Switch) parseFailedPayment(deobfuscator ErrorDecrypter, attemptID uint64, paymentHash lntypes.Hash, unencrypted, isResolution bool, htlc *lnwire.UpdateFailHTLC) error { switch { // The payment never cleared the link, so we don't need to // decrypt the error, simply decode it them report back to the // user. case unencrypted: r := bytes.NewReader(htlc.Reason) failureMsg, err := lnwire.DecodeFailure(r, 0) if err != nil { // If we could not decode the failure reason, return a link // error indicating that we failed to decode the onion. linkError := NewDetailedLinkError( // As this didn't even clear the link, we don't // need to apply an update here since it goes // directly to the router. lnwire.NewTemporaryChannelFailure(nil), OutgoingFailureDecodeError, ) log.Errorf("%v: (hash=%v, pid=%d): %v", linkError.FailureDetail.FailureString(), paymentHash, attemptID, err) return linkError } // If we successfully decoded the failure reason, return it. return NewLinkError(failureMsg) // A payment had to be timed out on chain before it got past // the first hop. In this case, we'll report a permanent // channel failure as this means us, or the remote party had to // go on chain. case isResolution && htlc.Reason == nil: linkError := NewDetailedLinkError( &lnwire.FailPermanentChannelFailure{}, OutgoingFailureOnChainTimeout, ) log.Infof("%v: hash=%v, pid=%d", linkError.FailureDetail.FailureString(), paymentHash, attemptID) return linkError // A regular multi-hop payment error that we'll need to // decrypt. default: // We'll attempt to fully decrypt the onion encrypted // error. If we're unable to then we'll bail early. failure, err := deobfuscator.DecryptError(htlc.Reason) if err != nil { log.Errorf("unable to de-obfuscate onion failure "+ "(hash=%v, pid=%d): %v", paymentHash, attemptID, err) return ErrUnreadableFailureMessage } return failure } } // handlePacketForward is used in cases when we need forward the htlc update // from one channel link to another and be able to propagate the settle/fail // updates back. This behaviour is achieved by creation of payment circuits. func (s *Switch) handlePacketForward(packet *htlcPacket) error { switch htlc := packet.htlc.(type) { // Channel link forwarded us a new htlc, therefore we initiate the // payment circuit within our internal state so we can properly forward // the ultimate settle message back latter. case *lnwire.UpdateAddHTLC: // Check if the node is set to reject all onward HTLCs and also make // sure that HTLC is not from the source node. if s.cfg.RejectHTLC { failure := NewDetailedLinkError( &lnwire.FailChannelDisabled{}, OutgoingFailureForwardsDisabled, ) return s.failAddPacket(packet, failure) } // Before we attempt to find a non-strict forwarding path for // this htlc, check whether the htlc is being routed over the // same incoming and outgoing channel. If our node does not // allow forwards of this nature, we fail the htlc early. This // check is in place to disallow inefficiently routed htlcs from // locking up our balance. With channels where the // option-scid-alias feature was negotiated, we also have to be // sure that the IDs aren't the same since one or both could be // an alias. linkErr := s.checkCircularForward( packet.incomingChanID, packet.outgoingChanID, s.cfg.AllowCircularRoute, htlc.PaymentHash, ) if linkErr != nil { return s.failAddPacket(packet, linkErr) } s.indexMtx.RLock() targetLink, err := s.getLinkByMapping(packet) if err != nil { s.indexMtx.RUnlock() log.Debugf("unable to find link with "+ "destination %v", packet.outgoingChanID) // If packet was forwarded from another channel link // than we should notify this link that some error // occurred. linkError := NewLinkError( &lnwire.FailUnknownNextPeer{}, ) return s.failAddPacket(packet, linkError) } targetPeerKey := targetLink.Peer().PubKey() interfaceLinks, _ := s.getLinks(targetPeerKey) s.indexMtx.RUnlock() // We'll keep track of any HTLC failures during the link // selection process. This way we can return the error for // precise link that the sender selected, while optimistically // trying all links to utilize our available bandwidth. linkErrs := make(map[lnwire.ShortChannelID]*LinkError) // Find all destination channel links with appropriate // bandwidth. var destinations []ChannelLink for _, link := range interfaceLinks { var failure *LinkError // We'll skip any links that aren't yet eligible for // forwarding. if !link.EligibleToForward() { failure = NewDetailedLinkError( &lnwire.FailUnknownNextPeer{}, OutgoingFailureLinkNotEligible, ) } else { // We'll ensure that the HTLC satisfies the // current forwarding conditions of this target // link. currentHeight := atomic.LoadUint32(&s.bestHeight) failure = link.CheckHtlcForward( htlc.PaymentHash, packet.incomingAmount, packet.amount, packet.incomingTimeout, packet.outgoingTimeout, currentHeight, packet.originalOutgoingChanID, ) } // If this link can forward the htlc, add it to the set // of destinations. if failure == nil { destinations = append(destinations, link) continue } linkErrs[link.ShortChanID()] = failure } // If we had a forwarding failure due to the HTLC not // satisfying the current policy, then we'll send back an // error, but ensure we send back the error sourced at the // *target* link. if len(destinations) == 0 { // At this point, some or all of the links rejected the // HTLC so we couldn't forward it. So we'll try to look // up the error that came from the source. linkErr, ok := linkErrs[packet.outgoingChanID] if !ok { // If we can't find the error of the source, // then we'll return an unknown next peer, // though this should never happen. linkErr = NewLinkError( &lnwire.FailUnknownNextPeer{}, ) log.Warnf("unable to find err source for "+ "outgoing_link=%v, errors=%v", packet.outgoingChanID, newLogClosure(func() string { return spew.Sdump(linkErrs) })) } log.Tracef("incoming HTLC(%x) violated "+ "target outgoing link (id=%v) policy: %v", htlc.PaymentHash[:], packet.outgoingChanID, linkErr) return s.failAddPacket(packet, linkErr) } // Choose a random link out of the set of links that can forward // this htlc. The reason for randomization is to evenly // distribute the htlc load without making assumptions about // what the best channel is. destination := destinations[rand.Intn(len(destinations))] // nolint:gosec // Retrieve the incoming link by its ShortChannelID. Note that // the incomingChanID is never set to hop.Source here. s.indexMtx.RLock() incomingLink, err := s.getLinkByShortID(packet.incomingChanID) s.indexMtx.RUnlock() if err != nil { // If we couldn't find the incoming link, we can't // evaluate the incoming's exposure to dust, so we just // fail the HTLC back. linkErr := NewLinkError( &lnwire.FailTemporaryChannelFailure{}, ) return s.failAddPacket(packet, linkErr) } // Evaluate whether this HTLC would increase our exposure to // dust on the incoming link. If it does, fail it backwards. if s.evaluateDustThreshold( incomingLink, packet.incomingAmount, true, ) { // The incoming dust exceeds the threshold, so we fail // the add back. linkErr := NewLinkError( &lnwire.FailTemporaryChannelFailure{}, ) return s.failAddPacket(packet, linkErr) } // Also evaluate whether this HTLC would increase our exposure // to dust on the destination link. If it does, fail it back. if s.evaluateDustThreshold( destination, packet.amount, false, ) { // The outgoing dust exceeds the threshold, so we fail // the add back. linkErr := NewLinkError( &lnwire.FailTemporaryChannelFailure{}, ) return s.failAddPacket(packet, linkErr) } // Send the packet to the destination channel link which // manages the channel. packet.outgoingChanID = destination.ShortChanID() return destination.handleSwitchPacket(packet) case *lnwire.UpdateFailHTLC, *lnwire.UpdateFulfillHTLC: // If the source of this packet has not been set, use the // circuit map to lookup the origin. circuit, err := s.closeCircuit(packet) if err != nil { return err } // closeCircuit returns a nil circuit when a settle packet returns an // ErrUnknownCircuit error upon the inner call to CloseCircuit. if circuit == nil { return nil } fail, isFail := htlc.(*lnwire.UpdateFailHTLC) if isFail && !packet.hasSource { switch { // No message to encrypt, locally sourced payment. case circuit.ErrorEncrypter == nil: // If this is a resolution message, then we'll need to // encrypt it as it's actually internally sourced. case packet.isResolution: var err error // TODO(roasbeef): don't need to pass actually? failure := &lnwire.FailPermanentChannelFailure{} fail.Reason, err = circuit.ErrorEncrypter.EncryptFirstHop( failure, ) if err != nil { err = fmt.Errorf("unable to obfuscate "+ "error: %v", err) log.Error(err) } // Alternatively, if the remote party send us an // UpdateFailMalformedHTLC, then we'll need to convert // this into a proper well formatted onion error as // there's no HMAC currently. case packet.convertedError: log.Infof("Converting malformed HTLC error "+ "for circuit for Circuit(%x: "+ "(%s, %d) <-> (%s, %d))", packet.circuit.PaymentHash, packet.incomingChanID, packet.incomingHTLCID, packet.outgoingChanID, packet.outgoingHTLCID) fail.Reason = circuit.ErrorEncrypter.EncryptMalformedError( fail.Reason, ) default: // Otherwise, it's a forwarded error, so we'll perform a // wrapper encryption as normal. fail.Reason = circuit.ErrorEncrypter.IntermediateEncrypt( fail.Reason, ) } } else if !isFail && circuit.Outgoing != nil { // If this is an HTLC settle, and it wasn't from a // locally initiated HTLC, then we'll log a forwarding // event so we can flush it to disk later. // // TODO(roasbeef): only do this once link actually // fully settles? localHTLC := packet.incomingChanID == hop.Source if !localHTLC { log.Infof("Forwarded HTLC(%x) of %v (fee: %v) "+ "from IncomingChanID(%v) to OutgoingChanID(%v)", circuit.PaymentHash[:], circuit.OutgoingAmount, circuit.IncomingAmount-circuit.OutgoingAmount, circuit.Incoming.ChanID, circuit.Outgoing.ChanID) s.fwdEventMtx.Lock() s.pendingFwdingEvents = append( s.pendingFwdingEvents, channeldb.ForwardingEvent{ Timestamp: time.Now(), IncomingChanID: circuit.Incoming.ChanID, OutgoingChanID: circuit.Outgoing.ChanID, AmtIn: circuit.IncomingAmount, AmtOut: circuit.OutgoingAmount, }, ) s.fwdEventMtx.Unlock() } } // A blank IncomingChanID in a circuit indicates that it is a pending // user-initiated payment. if packet.incomingChanID == hop.Source { s.wg.Add(1) go s.handleLocalResponse(packet) return nil } // Check to see that the source link is online before removing // the circuit. return s.mailOrchestrator.Deliver(packet.incomingChanID, packet) default: return errors.New("wrong update type") } } // checkCircularForward checks whether a forward is circular (arrives and // departs on the same link) and returns a link error if the switch is // configured to disallow this behaviour. func (s *Switch) checkCircularForward(incoming, outgoing lnwire.ShortChannelID, allowCircular bool, paymentHash lntypes.Hash) *LinkError { // If they are equal, we can skip the alias mapping checks. if incoming == outgoing { // The switch may be configured to allow circular routes, so // just log and return nil. if allowCircular { log.Debugf("allowing circular route over link: %v "+ "(payment hash: %x)", incoming, paymentHash) return nil } // Otherwise, we'll return a temporary channel failure. return NewDetailedLinkError( lnwire.NewTemporaryChannelFailure(nil), OutgoingFailureCircularRoute, ) } // We'll fetch the "base" SCID from the baseIndex for the incoming and // outgoing SCIDs. If either one does not have a base SCID, then the // two channels are not equal since one will be a channel that does not // need a mapping and SCID equality was checked above. If the "base" // SCIDs are equal, then this is a circular route. Otherwise, it isn't. s.indexMtx.RLock() incomingBaseScid, ok := s.baseIndex[incoming] if !ok { // This channel does not use baseIndex, bail out. s.indexMtx.RUnlock() return nil } outgoingBaseScid, ok := s.baseIndex[outgoing] if !ok { // This channel does not use baseIndex, bail out. s.indexMtx.RUnlock() return nil } s.indexMtx.RUnlock() // Check base SCID equality. if incomingBaseScid != outgoingBaseScid { // The base SCIDs are not equal so these are not the same // channel. return nil } // If the incoming and outgoing link are equal, the htlc is part of a // circular route which may be used to lock up our liquidity. If the // switch is configured to allow circular routes, log that we are // allowing the route then return nil. if allowCircular { log.Debugf("allowing circular route over link: %v "+ "(payment hash: %x)", incoming, paymentHash) return nil } // If our node disallows circular routes, return a temporary channel // failure. There is nothing wrong with the policy used by the remote // node, so we do not include a channel update. return NewDetailedLinkError( lnwire.NewTemporaryChannelFailure(nil), OutgoingFailureCircularRoute, ) } // failAddPacket encrypts a fail packet back to an add packet's source. // The ciphertext will be derived from the failure message proivded by context. // This method returns the failErr if all other steps complete successfully. func (s *Switch) failAddPacket(packet *htlcPacket, failure *LinkError) error { // Encrypt the failure so that the sender will be able to read the error // message. Since we failed this packet, we use EncryptFirstHop to // obfuscate the failure for their eyes only. reason, err := packet.obfuscator.EncryptFirstHop(failure.WireMessage()) if err != nil { err := fmt.Errorf("unable to obfuscate "+ "error: %v", err) log.Error(err) return err } log.Error(failure.Error()) // Create a failure packet for this htlc. The the full set of // information about the htlc failure is included so that they can // be included in link failure notifications. failPkt := &htlcPacket{ sourceRef: packet.sourceRef, incomingChanID: packet.incomingChanID, incomingHTLCID: packet.incomingHTLCID, outgoingChanID: packet.outgoingChanID, outgoingHTLCID: packet.outgoingHTLCID, incomingAmount: packet.incomingAmount, amount: packet.amount, incomingTimeout: packet.incomingTimeout, outgoingTimeout: packet.outgoingTimeout, circuit: packet.circuit, linkFailure: failure, htlc: &lnwire.UpdateFailHTLC{ Reason: reason, }, } // Route a fail packet back to the source link. err = s.mailOrchestrator.Deliver(failPkt.incomingChanID, failPkt) if err != nil { err = fmt.Errorf("source chanid=%v unable to "+ "handle switch packet: %v", packet.incomingChanID, err) log.Error(err) return err } return failure } // closeCircuit accepts a settle or fail htlc and the associated htlc packet and // attempts to determine the source that forwarded this htlc. This method will // set the incoming chan and htlc ID of the given packet if the source was // found, and will properly [re]encrypt any failure messages. func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) { // If the packet has its source, that means it was failed locally by // the outgoing link. We fail it here to make sure only one response // makes it through the switch. if pkt.hasSource { circuit, err := s.circuits.FailCircuit(pkt.inKey()) switch err { // Circuit successfully closed. case nil: return circuit, nil // Circuit was previously closed, but has not been deleted. // We'll just drop this response until the circuit has been // fully removed. case ErrCircuitClosing: return nil, err // Failed to close circuit because it does not exist. This is // likely because the circuit was already successfully closed. // Since this packet failed locally, there is no forwarding // package entry to acknowledge. case ErrUnknownCircuit: return nil, err // Unexpected error. default: return nil, err } } // Otherwise, this is packet was received from the remote party. Use // circuit map to find the incoming link to receive the settle/fail. circuit, err := s.circuits.CloseCircuit(pkt.outKey()) switch err { // Open circuit successfully closed. case nil: pkt.incomingChanID = circuit.Incoming.ChanID pkt.incomingHTLCID = circuit.Incoming.HtlcID pkt.circuit = circuit pkt.sourceRef = &circuit.AddRef pktType := "SETTLE" if _, ok := pkt.htlc.(*lnwire.UpdateFailHTLC); ok { pktType = "FAIL" } log.Debugf("Closed completed %s circuit for %x: "+ "(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash, pkt.incomingChanID, pkt.incomingHTLCID, pkt.outgoingChanID, pkt.outgoingHTLCID) return circuit, nil // Circuit was previously closed, but has not been deleted. We'll just // drop this response until the circuit has been removed. case ErrCircuitClosing: return nil, err // Failed to close circuit because it does not exist. This is likely // because the circuit was already successfully closed. case ErrUnknownCircuit: if pkt.destRef != nil { // Add this SettleFailRef to the set of pending settle/fail entries // awaiting acknowledgement. s.pendingSettleFails = append(s.pendingSettleFails, *pkt.destRef) } // If this is a settle, we will not log an error message as settles // are expected to hit the ErrUnknownCircuit case. The only way fails // can hit this case if the link restarts after having just sent a fail // to the switch. _, isSettle := pkt.htlc.(*lnwire.UpdateFulfillHTLC) if !isSettle { err := fmt.Errorf("unable to find target channel "+ "for HTLC fail: channel ID = %s, "+ "HTLC ID = %d", pkt.outgoingChanID, pkt.outgoingHTLCID) log.Error(err) return nil, err } return nil, nil // Unexpected error. default: return nil, err } } // ackSettleFail is used by the switch to ACK any settle/fail entries in the // forwarding package of the outgoing link for a payment circuit. We do this if // we're the originator of the payment, so the link stops attempting to // re-broadcast. func (s *Switch) ackSettleFail(settleFailRefs ...channeldb.SettleFailRef) error { return kvdb.Batch(s.cfg.DB, func(tx kvdb.RwTx) error { return s.cfg.SwitchPackager.AckSettleFails(tx, settleFailRefs...) }) } // teardownCircuit removes a pending or open circuit from the switch's circuit // map and prints useful logging statements regarding the outcome. func (s *Switch) teardownCircuit(pkt *htlcPacket) error { var pktType string switch htlc := pkt.htlc.(type) { case *lnwire.UpdateFulfillHTLC: pktType = "SETTLE" case *lnwire.UpdateFailHTLC: pktType = "FAIL" default: err := fmt.Errorf("cannot tear down packet of type: %T", htlc) log.Errorf(err.Error()) return err } switch { case pkt.circuit.HasKeystone(): log.Debugf("Tearing down open circuit with %s pkt, removing circuit=%v "+ "with keystone=%v", pktType, pkt.inKey(), pkt.outKey()) err := s.circuits.DeleteCircuits(pkt.inKey()) if err != nil { log.Warnf("Failed to tear down open circuit (%s, %d) <-> (%s, %d) "+ "with payment_hash-%v using %s pkt", pkt.incomingChanID, pkt.incomingHTLCID, pkt.outgoingChanID, pkt.outgoingHTLCID, pkt.circuit.PaymentHash, pktType) return err } log.Debugf("Closed completed %s circuit for %x: "+ "(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash, pkt.incomingChanID, pkt.incomingHTLCID, pkt.outgoingChanID, pkt.outgoingHTLCID) default: log.Debugf("Tearing down incomplete circuit with %s for inkey=%v", pktType, pkt.inKey()) err := s.circuits.DeleteCircuits(pkt.inKey()) if err != nil { log.Warnf("Failed to tear down pending %s circuit for %x: "+ "(%s, %d)", pktType, pkt.circuit.PaymentHash, pkt.incomingChanID, pkt.incomingHTLCID) return err } log.Debugf("Removed pending onion circuit for %x: "+ "(%s, %d)", pkt.circuit.PaymentHash, pkt.incomingChanID, pkt.incomingHTLCID) } return nil } // CloseLink creates and sends the close channel command to the target link // directing the specified closure type. If the closure type is CloseRegular, // targetFeePerKw parameter should be the ideal fee-per-kw that will be used as // a starting point for close negotiation. The deliveryScript parameter is an // optional parameter which sets a user specified script to close out to. func (s *Switch) CloseLink(chanPoint *wire.OutPoint, closeType contractcourt.ChannelCloseType, targetFeePerKw chainfee.SatPerKWeight, deliveryScript lnwire.DeliveryAddress) (chan interface{}, chan error) { // TODO(roasbeef) abstract out the close updates. updateChan := make(chan interface{}, 2) errChan := make(chan error, 1) command := &ChanClose{ CloseType: closeType, ChanPoint: chanPoint, Updates: updateChan, TargetFeePerKw: targetFeePerKw, DeliveryScript: deliveryScript, Err: errChan, } select { case s.chanCloseRequests <- command: return updateChan, errChan case <-s.quit: errChan <- ErrSwitchExiting close(updateChan) return updateChan, errChan } } // htlcForwarder is responsible for optimally forwarding (and possibly // fragmenting) incoming/outgoing HTLCs amongst all active interfaces and their // links. The duties of the forwarder are similar to that of a network switch, // in that it facilitates multi-hop payments by acting as a central messaging // bus. The switch communicates will active links to create, manage, and tear // down active onion routed payments. Each active channel is modeled as // networked device with metadata such as the available payment bandwidth, and // total link capacity. // // NOTE: This MUST be run as a goroutine. func (s *Switch) htlcForwarder() { defer s.wg.Done() defer func() { s.blockEpochStream.Cancel() // Remove all links once we've been signalled for shutdown. var linksToStop []ChannelLink s.indexMtx.Lock() for _, link := range s.linkIndex { activeLink := s.removeLink(link.ChanID()) if activeLink == nil { log.Errorf("unable to remove ChannelLink(%v) "+ "on stop", link.ChanID()) continue } linksToStop = append(linksToStop, activeLink) } for _, link := range s.pendingLinkIndex { pendingLink := s.removeLink(link.ChanID()) if pendingLink == nil { log.Errorf("unable to remove ChannelLink(%v) "+ "on stop", link.ChanID()) continue } linksToStop = append(linksToStop, pendingLink) } s.indexMtx.Unlock() // Now that all pending and live links have been removed from // the forwarding indexes, stop each one before shutting down. // We'll shut them down in parallel to make exiting as fast as // possible. var wg sync.WaitGroup for _, link := range linksToStop { wg.Add(1) go func(l ChannelLink) { defer wg.Done() l.Stop() }(link) } wg.Wait() // Before we exit fully, we'll attempt to flush out any // forwarding events that may still be lingering since the last // batch flush. if err := s.FlushForwardingEvents(); err != nil { log.Errorf("unable to flush forwarding events: %v", err) } }() // TODO(roasbeef): cleared vs settled distinction var ( totalNumUpdates uint64 totalSatSent btcutil.Amount totalSatRecv btcutil.Amount ) s.cfg.LogEventTicker.Resume() defer s.cfg.LogEventTicker.Stop() // Every 15 seconds, we'll flush out the forwarding events that // occurred during that period. s.cfg.FwdEventTicker.Resume() defer s.cfg.FwdEventTicker.Stop() defer s.cfg.AckEventTicker.Stop() out: for { // If the set of pending settle/fail entries is non-zero, // reinstate the ack ticker so we can batch ack them. if len(s.pendingSettleFails) > 0 { s.cfg.AckEventTicker.Resume() } select { case blockEpoch, ok := <-s.blockEpochStream.Epochs: if !ok { break out } atomic.StoreUint32(&s.bestHeight, uint32(blockEpoch.Height)) // A local close request has arrived, we'll forward this to the // relevant link (if it exists) so the channel can be // cooperatively closed (if possible). case req := <-s.chanCloseRequests: chanID := lnwire.NewChanIDFromOutPoint(req.ChanPoint) s.indexMtx.RLock() link, ok := s.linkIndex[chanID] if !ok { s.indexMtx.RUnlock() req.Err <- fmt.Errorf("no peer for channel with "+ "chan_id=%x", chanID[:]) continue } s.indexMtx.RUnlock() peerPub := link.Peer().PubKey() log.Debugf("Requesting local channel close: peer=%v, "+ "chan_id=%x", link.Peer(), chanID[:]) go s.cfg.LocalChannelClose(peerPub[:], req) case resolutionMsg := <-s.resolutionMsgs: // We'll persist the resolution message to the Switch's // resolution store. resMsg := resolutionMsg.ResolutionMsg err := s.resMsgStore.addResolutionMsg(&resMsg) if err != nil { // This will only fail if there is a database // error or a serialization error. Sending the // error prevents the contractcourt from being // in a state where it believes the send was // successful, when it wasn't. log.Errorf("unable to add resolution msg: %v", err) resolutionMsg.errChan <- err continue } // At this point, the resolution message has been // persisted. It is safe to signal success by sending // a nil error since the Switch will re-deliver the // resolution message on restart. resolutionMsg.errChan <- nil pkt := &htlcPacket{ outgoingChanID: resolutionMsg.SourceChan, outgoingHTLCID: resolutionMsg.HtlcIndex, isResolution: true, } // Resolution messages will either be cancelling // backwards an existing HTLC, or settling a previously // outgoing HTLC. Based on this, we'll map the message // to the proper htlcPacket. if resolutionMsg.Failure != nil { pkt.htlc = &lnwire.UpdateFailHTLC{} } else { pkt.htlc = &lnwire.UpdateFulfillHTLC{ PaymentPreimage: *resolutionMsg.PreImage, } } log.Infof("Received outside contract resolution, "+ "mapping to: %v", spew.Sdump(pkt)) // We don't check the error, as the only failure we can // encounter is due to the circuit already being // closed. This is fine, as processing this message is // meant to be idempotent. err = s.handlePacketForward(pkt) if err != nil { log.Errorf("Unable to forward resolution msg: %v", err) } // A new packet has arrived for forwarding, we'll interpret the // packet concretely, then either forward it along, or // interpret a return packet to a locally initialized one. case cmd := <-s.htlcPlex: cmd.err <- s.handlePacketForward(cmd.pkt) // When this time ticks, then it indicates that we should // collect all the forwarding events since the last internal, // and write them out to our log. case <-s.cfg.FwdEventTicker.Ticks(): s.wg.Add(1) go func() { defer s.wg.Done() if err := s.FlushForwardingEvents(); err != nil { log.Errorf("unable to flush "+ "forwarding events: %v", err) } }() // The log ticker has fired, so we'll calculate some forwarding // stats for the last 10 seconds to display within the logs to // users. case <-s.cfg.LogEventTicker.Ticks(): // First, we'll collate the current running tally of // our forwarding stats. prevSatSent := totalSatSent prevSatRecv := totalSatRecv prevNumUpdates := totalNumUpdates var ( newNumUpdates uint64 newSatSent btcutil.Amount newSatRecv btcutil.Amount ) // Next, we'll run through all the registered links and // compute their up-to-date forwarding stats. s.indexMtx.RLock() for _, link := range s.linkIndex { // TODO(roasbeef): when links first registered // stats printed. updates, sent, recv := link.Stats() newNumUpdates += updates newSatSent += sent.ToSatoshis() newSatRecv += recv.ToSatoshis() } s.indexMtx.RUnlock() var ( diffNumUpdates uint64 diffSatSent btcutil.Amount diffSatRecv btcutil.Amount ) // If this is the first time we're computing these // stats, then the diff is just the new value. We do // this in order to avoid integer underflow issues. if prevNumUpdates == 0 { diffNumUpdates = newNumUpdates diffSatSent = newSatSent diffSatRecv = newSatRecv } else { diffNumUpdates = newNumUpdates - prevNumUpdates diffSatSent = newSatSent - prevSatSent diffSatRecv = newSatRecv - prevSatRecv } // If the diff of num updates is zero, then we haven't // forwarded anything in the last 10 seconds, so we can // skip this update. if diffNumUpdates == 0 { continue } // If the diff of num updates is negative, then some // links may have been unregistered from the switch, so // we'll update our stats to only include our registered // links. if int64(diffNumUpdates) < 0 { totalNumUpdates = newNumUpdates totalSatSent = newSatSent totalSatRecv = newSatRecv continue } // Otherwise, we'll log this diff, then accumulate the // new stats into the running total. log.Debugf("Sent %d satoshis and received %d satoshis "+ "in the last 10 seconds (%f tx/sec)", diffSatSent, diffSatRecv, float64(diffNumUpdates)/10) totalNumUpdates += diffNumUpdates totalSatSent += diffSatSent totalSatRecv += diffSatRecv // The ack ticker has fired so if we have any settle/fail entries // for a forwarding package to ack, we will do so here in a batch // db call. case <-s.cfg.AckEventTicker.Ticks(): // If the current set is empty, pause the ticker. if len(s.pendingSettleFails) == 0 { s.cfg.AckEventTicker.Pause() continue } // Batch ack the settle/fail entries. if err := s.ackSettleFail(s.pendingSettleFails...); err != nil { log.Errorf("Unable to ack batch of settle/fails: %v", err) continue } log.Tracef("Acked %d settle fails: %v", len(s.pendingSettleFails), newLogClosure(func() string { return spew.Sdump(s.pendingSettleFails) })) // Reset the pendingSettleFails buffer while keeping acquired // memory. s.pendingSettleFails = s.pendingSettleFails[:0] case <-s.quit: return } } } // Start starts all helper goroutines required for the operation of the switch. func (s *Switch) Start() error { if !atomic.CompareAndSwapInt32(&s.started, 0, 1) { log.Warn("Htlc Switch already started") return errors.New("htlc switch already started") } log.Infof("HTLC Switch starting") blockEpochStream, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { return err } s.blockEpochStream = blockEpochStream s.wg.Add(1) go s.htlcForwarder() if err := s.reforwardResponses(); err != nil { s.Stop() log.Errorf("unable to reforward responses: %v", err) return err } if err := s.reforwardResolutions(); err != nil { // We are already stopping so we can ignore the error. _ = s.Stop() log.Errorf("unable to reforward resolutions: %v", err) return err } return nil } // reforwardResolutions fetches the set of resolution messages stored on-disk // and reforwards them if their circuits are still open. If the circuits have // been deleted, then we will delete the resolution message from the database. func (s *Switch) reforwardResolutions() error { // Fetch all stored resolution messages, deleting the ones that are // resolved. resMsgs, err := s.resMsgStore.fetchAllResolutionMsg() if err != nil { return err } switchPackets := make([]*htlcPacket, 0, len(resMsgs)) for _, resMsg := range resMsgs { // If the open circuit no longer exists, then we can remove the // message from the store. outKey := CircuitKey{ ChanID: resMsg.SourceChan, HtlcID: resMsg.HtlcIndex, } if s.circuits.LookupOpenCircuit(outKey) == nil { // The open circuit doesn't exist. err := s.resMsgStore.deleteResolutionMsg(&outKey) if err != nil { return err } continue } // The circuit is still open, so we can assume that the link or // switch (if we are the source) hasn't cleaned it up yet. resPkt := &htlcPacket{ outgoingChanID: resMsg.SourceChan, outgoingHTLCID: resMsg.HtlcIndex, isResolution: true, } if resMsg.Failure != nil { resPkt.htlc = &lnwire.UpdateFailHTLC{} } else { resPkt.htlc = &lnwire.UpdateFulfillHTLC{ PaymentPreimage: *resMsg.PreImage, } } switchPackets = append(switchPackets, resPkt) } // We'll now dispatch the set of resolution messages to the proper // destination. An error is only encountered here if the switch is // shutting down. if err := s.ForwardPackets(nil, switchPackets...); err != nil { return err } return nil } // reforwardResponses for every known, non-pending channel, loads all associated // forwarding packages and reforwards any Settle or Fail HTLCs found. This is // used to resurrect the switch's mailboxes after a restart. func (s *Switch) reforwardResponses() error { openChannels, err := s.cfg.FetchAllOpenChannels() if err != nil { return err } for _, openChannel := range openChannels { shortChanID := openChannel.ShortChanID() // Locally-initiated payments never need reforwarding. if shortChanID == hop.Source { continue } // If the channel is pending, it should have no forwarding // packages, and nothing to reforward. if openChannel.IsPending { continue } // Channels in open or waiting-close may still have responses in // their forwarding packages. We will continue to reattempt // forwarding on startup until the channel is fully-closed. // // Load this channel's forwarding packages, and deliver them to // the switch. fwdPkgs, err := s.loadChannelFwdPkgs(shortChanID) if err != nil { log.Errorf("unable to load forwarding "+ "packages for %v: %v", shortChanID, err) return err } s.reforwardSettleFails(fwdPkgs) } return nil } // loadChannelFwdPkgs loads all forwarding packages owned by the `source` short // channel identifier. func (s *Switch) loadChannelFwdPkgs(source lnwire.ShortChannelID) ([]*channeldb.FwdPkg, error) { var fwdPkgs []*channeldb.FwdPkg if err := kvdb.View(s.cfg.DB, func(tx kvdb.RTx) error { var err error fwdPkgs, err = s.cfg.SwitchPackager.LoadChannelFwdPkgs( tx, source, ) return err }, func() { fwdPkgs = nil }); err != nil { return nil, err } return fwdPkgs, nil } // reforwardSettleFails parses the Settle and Fail HTLCs from the list of // forwarding packages, and reforwards those that have not been acknowledged. // This is intended to occur on startup, in order to recover the switch's // mailboxes, and to ensure that responses can be propagated in case the // outgoing link never comes back online. // // NOTE: This should mimic the behavior processRemoteSettleFails. func (s *Switch) reforwardSettleFails(fwdPkgs []*channeldb.FwdPkg) { for _, fwdPkg := range fwdPkgs { settleFails, err := lnwallet.PayDescsFromRemoteLogUpdates( fwdPkg.Source, fwdPkg.Height, fwdPkg.SettleFails, ) if err != nil { log.Errorf("Unable to process remote log updates: %v", err) continue } switchPackets := make([]*htlcPacket, 0, len(settleFails)) for i, pd := range settleFails { // Skip any settles or fails that have already been // acknowledged by the incoming link that originated the // forwarded Add. if fwdPkg.SettleFailFilter.Contains(uint16(i)) { continue } switch pd.EntryType { // A settle for an HTLC we previously forwarded HTLC has // been received. So we'll forward the HTLC to the // switch which will handle propagating the settle to // the prior hop. case lnwallet.Settle: settlePacket := &htlcPacket{ outgoingChanID: fwdPkg.Source, outgoingHTLCID: pd.ParentIndex, destRef: pd.DestRef, htlc: &lnwire.UpdateFulfillHTLC{ PaymentPreimage: pd.RPreimage, }, } // Add the packet to the batch to be forwarded, and // notify the overflow queue that a spare spot has been // freed up within the commitment state. switchPackets = append(switchPackets, settlePacket) // A failureCode message for a previously forwarded HTLC has been // received. As a result a new slot will be freed up in our // commitment state, so we'll forward this to the switch so the // backwards undo can continue. case lnwallet.Fail: // Fetch the reason the HTLC was canceled so // we can continue to propagate it. This // failure originated from another node, so // the linkFailure field is not set on this // packet. failPacket := &htlcPacket{ outgoingChanID: fwdPkg.Source, outgoingHTLCID: pd.ParentIndex, destRef: pd.DestRef, htlc: &lnwire.UpdateFailHTLC{ Reason: lnwire.OpaqueReason(pd.FailReason), }, } // Add the packet to the batch to be forwarded, and // notify the overflow queue that a spare spot has been // freed up within the commitment state. switchPackets = append(switchPackets, failPacket) } } // Since this send isn't tied to a specific link, we pass a nil // link quit channel, meaning the send will fail only if the // switch receives a shutdown request. if err := s.ForwardPackets(nil, switchPackets...); err != nil { log.Errorf("Unhandled error while reforwarding packets "+ "settle/fail over htlcswitch: %v", err) } } } // Stop gracefully stops all active helper goroutines, then waits until they've // exited. func (s *Switch) Stop() error { if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) { log.Warn("Htlc Switch already stopped") return errors.New("htlc switch already shutdown") } log.Info("HTLC Switch shutting down") close(s.quit) s.wg.Wait() // Wait until all active goroutines have finished exiting before // stopping the mailboxes, otherwise the mailbox map could still be // accessed and modified. s.mailOrchestrator.Stop() return nil } // CreateAndAddLink will create a link and then add it to the internal maps // when given a ChannelLinkConfig and LightningChannel. func (s *Switch) CreateAndAddLink(linkCfg ChannelLinkConfig, lnChan *lnwallet.LightningChannel) error { link := NewChannelLink(linkCfg, lnChan) return s.AddLink(link) } // AddLink is used to initiate the handling of the add link command. The // request will be propagated and handled in the main goroutine. func (s *Switch) AddLink(link ChannelLink) error { s.indexMtx.Lock() defer s.indexMtx.Unlock() chanID := link.ChanID() // First, ensure that this link is not already active in the switch. _, err := s.getLink(chanID) if err == nil { return fmt.Errorf("unable to add ChannelLink(%v), already "+ "active", chanID) } // Get and attach the mailbox for this link, which buffers packets in // case there packets that we tried to deliver while this link was // offline. shortChanID := link.ShortChanID() mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID, shortChanID) link.AttachMailBox(mailbox) // Attach the Switch's failAliasUpdate function to the link. link.attachFailAliasUpdate(s.failAliasUpdate) if err := link.Start(); err != nil { s.removeLink(chanID) return err } if shortChanID == hop.Source { log.Infof("Adding pending link chan_id=%v, short_chan_id=%v", chanID, shortChanID) s.pendingLinkIndex[chanID] = link } else { log.Infof("Adding live link chan_id=%v, short_chan_id=%v", chanID, shortChanID) s.addLiveLink(link) s.mailOrchestrator.BindLiveShortChanID( mailbox, chanID, shortChanID, ) } return nil } // addLiveLink adds a link to all associated forwarding index, this makes it a // candidate for forwarding HTLCs. func (s *Switch) addLiveLink(link ChannelLink) { linkScid := link.ShortChanID() // We'll add the link to the linkIndex which lets us quickly // look up a channel when we need to close or register it, and // the forwarding index which'll be used when forwarding HTLC's // in the multi-hop setting. s.linkIndex[link.ChanID()] = link s.forwardingIndex[linkScid] = link // Next we'll add the link to the interface index so we can // quickly look up all the channels for a particular node. peerPub := link.Peer().PubKey() if _, ok := s.interfaceIndex[peerPub]; !ok { s.interfaceIndex[peerPub] = make(map[lnwire.ChannelID]ChannelLink) } s.interfaceIndex[peerPub][link.ChanID()] = link aliases := link.getAliases() if link.isZeroConf() { if link.zeroConfConfirmed() { // Since the zero-conf channel has confirmed, we can // populate the aliasToReal mapping. confirmedScid := link.confirmedScid() for _, alias := range aliases { s.aliasToReal[alias] = confirmedScid } // Add the confirmed SCID as a key in the baseIndex. s.baseIndex[confirmedScid] = linkScid } // Now we populate the baseIndex which will be used to fetch // the link given any of the channel's alias SCIDs or the real // SCID. The link's SCID is an alias, so we don't need to // special-case it like the option-scid-alias feature-bit case // further down. for _, alias := range aliases { s.baseIndex[alias] = linkScid } } else if link.negotiatedAliasFeature() { // The link's SCID is the confirmed SCID for non-zero-conf // option-scid-alias feature bit channels. for _, alias := range aliases { s.aliasToReal[alias] = linkScid s.baseIndex[alias] = linkScid } // Since the link's SCID is confirmed, it was not included in // the baseIndex above as a key. Add it now. s.baseIndex[linkScid] = linkScid } } // GetLink is used to initiate the handling of the get link command. The // request will be propagated/handled to/in the main goroutine. func (s *Switch) GetLink(chanID lnwire.ChannelID) (ChannelUpdateHandler, error) { s.indexMtx.RLock() defer s.indexMtx.RUnlock() return s.getLink(chanID) } // getLink returns the link stored in either the pending index or the live // lindex. func (s *Switch) getLink(chanID lnwire.ChannelID) (ChannelLink, error) { link, ok := s.linkIndex[chanID] if !ok { link, ok = s.pendingLinkIndex[chanID] if !ok { return nil, ErrChannelLinkNotFound } } return link, nil } // GetLinkByShortID attempts to return the link which possesses the target short // channel ID. func (s *Switch) GetLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink, error) { s.indexMtx.RLock() defer s.indexMtx.RUnlock() link, err := s.getLinkByShortID(chanID) if err != nil { // If we failed to find the link under the passed-in SCID, we // consult the Switch's baseIndex map to see if the confirmed // SCID was used for a zero-conf channel. aliasID, ok := s.baseIndex[chanID] if !ok { return nil, err } // An alias was found, use it to lookup if a link exists. return s.getLinkByShortID(aliasID) } return link, nil } // getLinkByShortID attempts to return the link which possesses the target // short channel ID. // // NOTE: This MUST be called with the indexMtx held. func (s *Switch) getLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink, error) { link, ok := s.forwardingIndex[chanID] if !ok { return nil, ErrChannelLinkNotFound } return link, nil } // getLinkByMapping attempts to fetch the link via the htlcPacket's // outgoingChanID, possibly using a mapping. If it finds the link via mapping, // the outgoingChanID will be changed so that an error can be properly // attributed when looping over linkErrs in handlePacketForward. // // * If the outgoingChanID is an alias, we'll fetch the link regardless if it's // public or not. // // * If the outgoingChanID is a confirmed SCID, we'll need to do more checks. // - If there is no entry found in baseIndex, fetch the link. This channel // did not have the option-scid-alias feature negotiated (which includes // zero-conf and option-scid-alias channel-types). // - If there is an entry found, fetch the link from forwardingIndex and // fail if this is a private link. // // NOTE: This MUST be called with the indexMtx read lock held. func (s *Switch) getLinkByMapping(pkt *htlcPacket) (ChannelLink, error) { // Determine if this ShortChannelID is an alias or a confirmed SCID. chanID := pkt.outgoingChanID aliasID := s.cfg.IsAlias(chanID) // Set the originalOutgoingChanID so the proper channel_update can be // sent back if the option-scid-alias feature bit was negotiated. pkt.originalOutgoingChanID = chanID if aliasID { // Since outgoingChanID is an alias, we'll fetch the link via // baseIndex. baseScid, ok := s.baseIndex[chanID] if !ok { // No mapping exists, bail. return nil, ErrChannelLinkNotFound } // A mapping exists, so use baseScid to find the link in the // forwardingIndex. link, ok := s.forwardingIndex[baseScid] if !ok { // Link not found, bail. return nil, ErrChannelLinkNotFound } // Change the packet's outgoingChanID field so that errors are // properly attributed. pkt.outgoingChanID = baseScid // Return the link without checking if it's private or not. return link, nil } // The outgoingChanID is a confirmed SCID. Attempt to fetch the base // SCID from baseIndex. baseScid, ok := s.baseIndex[chanID] if !ok { // outgoingChanID is not a key in base index meaning this // channel did not have the option-scid-alias feature bit // negotiated. We'll fetch the link and return it. link, ok := s.forwardingIndex[chanID] if !ok { // The link wasn't found, bail out. return nil, ErrChannelLinkNotFound } return link, nil } // Fetch the link whose internal SCID is baseScid. link, ok := s.forwardingIndex[baseScid] if !ok { // Link wasn't found, bail out. return nil, ErrChannelLinkNotFound } // If the link is unadvertised, we fail since the real SCID was used to // forward over it and this is a channel where the option-scid-alias // feature bit was negotiated. if link.IsUnadvertised() { return nil, ErrChannelLinkNotFound } // The link is public so the confirmed SCID can be used to forward over // it. We'll also replace pkt's outgoingChanID field so errors can // properly be attributed in the calling function. pkt.outgoingChanID = baseScid return link, nil } // HasActiveLink returns true if the given channel ID has a link in the link // index AND the link is eligible to forward. func (s *Switch) HasActiveLink(chanID lnwire.ChannelID) bool { s.indexMtx.RLock() defer s.indexMtx.RUnlock() if link, ok := s.linkIndex[chanID]; ok { return link.EligibleToForward() } return false } // RemoveLink purges the switch of any link associated with chanID. If a pending // or active link is not found, this method does nothing. Otherwise, the method // returns after the link has been completely shutdown. func (s *Switch) RemoveLink(chanID lnwire.ChannelID) { s.indexMtx.Lock() link, err := s.getLink(chanID) if err != nil { // If err is non-nil, this means that link is also nil. The // link variable cannot be nil without err being non-nil. s.indexMtx.Unlock() log.Tracef("Unable to remove link for ChannelID(%v): %v", chanID, err) return } // Check if the link is already stopping and grab the stop chan if it // is. stopChan, ok := s.linkStopIndex[chanID] if !ok { // If the link is non-nil, it is not currently stopping, so // we'll add a stop chan to the linkStopIndex. stopChan = make(chan struct{}) s.linkStopIndex[chanID] = stopChan } s.indexMtx.Unlock() if ok { // If the stop chan exists, we will wait for it to be closed. // Once it is closed, we will exit. select { case <-stopChan: return case <-s.quit: return } } // Stop the link before removing it from the maps. link.Stop() s.indexMtx.Lock() _ = s.removeLink(chanID) // Close stopChan and remove this link from the linkStopIndex. // Deleting from the index and removing from the link must be done // in the same block while the mutex is held. close(stopChan) delete(s.linkStopIndex, chanID) s.indexMtx.Unlock() } // removeLink is used to remove and stop the channel link. // // NOTE: This MUST be called with the indexMtx held. func (s *Switch) removeLink(chanID lnwire.ChannelID) ChannelLink { log.Infof("Removing channel link with ChannelID(%v)", chanID) link, err := s.getLink(chanID) if err != nil { return nil } // Remove the channel from live link indexes. delete(s.pendingLinkIndex, link.ChanID()) delete(s.linkIndex, link.ChanID()) delete(s.forwardingIndex, link.ShortChanID()) // If the link has been added to the peer index, then we'll move to // delete the entry within the index. peerPub := link.Peer().PubKey() if peerIndex, ok := s.interfaceIndex[peerPub]; ok { delete(peerIndex, link.ChanID()) // If after deletion, there are no longer any links, then we'll // remove the interface map all together. if len(peerIndex) == 0 { delete(s.interfaceIndex, peerPub) } } return link } // UpdateShortChanID locates the link with the passed-in chanID and updates the // underlying channel state. This is only used in zero-conf channels to allow // the confirmed SCID to be updated. func (s *Switch) UpdateShortChanID(chanID lnwire.ChannelID) error { s.indexMtx.Lock() defer s.indexMtx.Unlock() // Locate the target link in the link index. If no such link exists, // then we will ignore the request. link, ok := s.linkIndex[chanID] if !ok { return fmt.Errorf("link %v not found", chanID) } // Try to update the link's underlying channel state, returning early // if this update failed. _, err := link.UpdateShortChanID() if err != nil { return err } // Since the zero-conf channel is confirmed, we should populate the // aliasToReal map and update the baseIndex. aliases := link.getAliases() confirmedScid := link.confirmedScid() for _, alias := range aliases { s.aliasToReal[alias] = confirmedScid } s.baseIndex[confirmedScid] = link.ShortChanID() return nil } // GetLinksByInterface fetches all the links connected to a particular node // identified by the serialized compressed form of its public key. func (s *Switch) GetLinksByInterface(hop [33]byte) ([]ChannelUpdateHandler, error) { s.indexMtx.RLock() defer s.indexMtx.RUnlock() var handlers []ChannelUpdateHandler links, err := s.getLinks(hop) if err != nil { return nil, err } // Range over the returned []ChannelLink to convert them into // []ChannelUpdateHandler. for _, link := range links { handlers = append(handlers, link) } return handlers, nil } // getLinks is function which returns the channel links of the peer by hop // destination id. // // NOTE: This MUST be called with the indexMtx held. func (s *Switch) getLinks(destination [33]byte) ([]ChannelLink, error) { links, ok := s.interfaceIndex[destination] if !ok { return nil, ErrNoLinksFound } channelLinks := make([]ChannelLink, 0, len(links)) for _, link := range links { channelLinks = append(channelLinks, link) } return channelLinks, nil } // CircuitModifier returns a reference to subset of the interfaces provided by // the circuit map, to allow links to open and close circuits. func (s *Switch) CircuitModifier() CircuitModifier { return s.circuits } // CircuitLookup returns a reference to subset of the interfaces provided by the // circuit map, to allow looking up circuits. func (s *Switch) CircuitLookup() CircuitLookup { return s.circuits } // commitCircuits persistently adds a circuit to the switch's circuit map. func (s *Switch) commitCircuits(circuits ...*PaymentCircuit) ( *CircuitFwdActions, error) { return s.circuits.CommitCircuits(circuits...) } // FlushForwardingEvents flushes out the set of pending forwarding events to // the persistent log. This will be used by the switch to periodically flush // out the set of forwarding events to disk. External callers can also use this // method to ensure all data is flushed to dis before querying the log. func (s *Switch) FlushForwardingEvents() error { // First, we'll obtain a copy of the current set of pending forwarding // events. s.fwdEventMtx.Lock() // If we won't have any forwarding events, then we can exit early. if len(s.pendingFwdingEvents) == 0 { s.fwdEventMtx.Unlock() return nil } events := make([]channeldb.ForwardingEvent, len(s.pendingFwdingEvents)) copy(events[:], s.pendingFwdingEvents[:]) // With the copy obtained, we can now clear out the header pointer of // the current slice. This way, we can re-use the underlying storage // allocated for the slice. s.pendingFwdingEvents = s.pendingFwdingEvents[:0] s.fwdEventMtx.Unlock() // Finally, we'll write out the copied events to the persistent // forwarding log. return s.cfg.FwdingLog.AddForwardingEvents(events) } // BestHeight returns the best height known to the switch. func (s *Switch) BestHeight() uint32 { return atomic.LoadUint32(&s.bestHeight) } // evaluateDustThreshold takes in a ChannelLink, HTLC amount, and a boolean to // determine whether the default dust threshold has been exceeded. This // heuristic takes into account the trimmed-to-dust mechanism. The sum of the // commitment's dust with the mailbox's dust with the amount is checked against // the default threshold. If incoming is true, then the amount is not included // in the sum as it was already included in the commitment's dust. A boolean is // returned telling the caller whether the HTLC should be failed back. func (s *Switch) evaluateDustThreshold(link ChannelLink, amount lnwire.MilliSatoshi, incoming bool) bool { // Retrieve the link's current commitment feerate and dustClosure. feeRate := link.getFeeRate() isDust := link.getDustClosure() // Evaluate if the HTLC is dust on either sides' commitment. isLocalDust := isDust(feeRate, incoming, true, amount.ToSatoshis()) isRemoteDust := isDust(feeRate, incoming, false, amount.ToSatoshis()) if !(isLocalDust || isRemoteDust) { // If the HTLC is not dust on either commitment, it's fine to // forward. return false } // Fetch the dust sums currently in the mailbox for this link. cid := link.ChanID() sid := link.ShortChanID() mailbox := s.mailOrchestrator.GetOrCreateMailBox(cid, sid) localMailDust, remoteMailDust := mailbox.DustPackets() // If the htlc is dust on the local commitment, we'll obtain the dust // sum for it. if isLocalDust { localSum := link.getDustSum(false) localSum += localMailDust // Optionally include the HTLC amount only for outgoing // HTLCs. if !incoming { localSum += amount } // Finally check against the defined dust threshold. if localSum > s.cfg.DustThreshold { return true } } // Also check if the htlc is dust on the remote commitment, if we've // reached this point. if isRemoteDust { remoteSum := link.getDustSum(true) remoteSum += remoteMailDust // Optionally include the HTLC amount only for outgoing // HTLCs. if !incoming { remoteSum += amount } // Finally check against the defined dust threshold. if remoteSum > s.cfg.DustThreshold { return true } } // If we reached this point, this HTLC is fine to forward. return false } // failMailboxUpdate is passed to the mailbox orchestrator which in turn passes // it to individual mailboxes. It allows the mailboxes to construct a // FailureMessage when failing back HTLC's due to expiry and may include an // alias in the ShortChannelID field. The outgoingScid is the SCID originally // used in the onion. The mailboxScid is the SCID that the mailbox and link // use. The mailboxScid is only used in the non-alias case, so it is always // the confirmed SCID. func (s *Switch) failMailboxUpdate(outgoingScid, mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage { // Try to use the failAliasUpdate function in case this is a channel // that uses aliases. If it returns nil, we'll fallback to the original // pre-alias behavior. update := s.failAliasUpdate(outgoingScid, false) if update == nil { // Execute the fallback behavior. var err error update, err = s.cfg.FetchLastChannelUpdate(mailboxScid) if err != nil { return &lnwire.FailTemporaryNodeFailure{} } } return lnwire.NewTemporaryChannelFailure(update) } // failAliasUpdate prepares a ChannelUpdate for a failed incoming or outgoing // HTLC on a channel where the option-scid-alias feature bit was negotiated. If // the associated channel is not one of these, this function will return nil // and the caller is expected to handle this properly. In this case, a return // to the original non-alias behavior is expected. func (s *Switch) failAliasUpdate(scid lnwire.ShortChannelID, incoming bool) *lnwire.ChannelUpdate { // This function does not defer the unlocking because of the database // lookups for ChannelUpdate. s.indexMtx.RLock() if s.cfg.IsAlias(scid) { // The alias SCID was used. In the incoming case this means // the channel is zero-conf as the link sets the scid. In the // outgoing case, the sender set the scid to use and may be // either the alias or the confirmed one, if it exists. realScid, ok := s.aliasToReal[scid] if !ok { // The real, confirmed SCID does not exist yet. Find // the "base" SCID that the link uses via the // baseIndex. If we can't find it, return nil. This // means the channel is zero-conf. baseScid, ok := s.baseIndex[scid] s.indexMtx.RUnlock() if !ok { return nil } update, err := s.cfg.FetchLastChannelUpdate(baseScid) if err != nil { return nil } // Replace the baseScid with the passed-in alias. update.ShortChannelID = scid sig, err := s.cfg.SignAliasUpdate(update) if err != nil { return nil } update.Signature, err = lnwire.NewSigFromSignature(sig) if err != nil { return nil } return update } s.indexMtx.RUnlock() // Fetch the SCID via the confirmed SCID and replace it with // the alias. update, err := s.cfg.FetchLastChannelUpdate(realScid) if err != nil { return nil } // In the incoming case, we want to ensure that we don't leak // the UTXO in case the channel is private. In the outgoing // case, since the alias was used, we do the same thing. update.ShortChannelID = scid sig, err := s.cfg.SignAliasUpdate(update) if err != nil { return nil } update.Signature, err = lnwire.NewSigFromSignature(sig) if err != nil { return nil } return update } // If the confirmed SCID is not in baseIndex, this is not an // option-scid-alias or zero-conf channel. baseScid, ok := s.baseIndex[scid] if !ok { s.indexMtx.RUnlock() return nil } // Fetch the link so we can get an alias to use in the ShortChannelID // of the ChannelUpdate. link, ok := s.forwardingIndex[baseScid] s.indexMtx.RUnlock() if !ok { // This should never happen, but if it does for some reason, // fallback to the old behavior. return nil } aliases := link.getAliases() if len(aliases) == 0 { // This should never happen, but if it does, fallback. return nil } // Fetch the ChannelUpdate via the real, confirmed SCID. update, err := s.cfg.FetchLastChannelUpdate(scid) if err != nil { return nil } // The incoming case will replace the ShortChannelID in the retrieved // ChannelUpdate with the alias to ensure no privacy leak occurs. This // would happen if a private non-zero-conf option-scid-alias // feature-bit channel leaked its UTXO here rather than supplying an // alias. In the outgoing case, the confirmed SCID was actually used // for forwarding in the onion, so no replacement is necessary as the // sender knows the scid. if incoming { // We will replace and sign the update with the first alias. // Since this happens on the incoming side, it's not actually // possible to know what the sender used in the onion. update.ShortChannelID = aliases[0] sig, err := s.cfg.SignAliasUpdate(update) if err != nil { return nil } update.Signature, err = lnwire.NewSigFromSignature(sig) if err != nil { return nil } } return update } // AddAliasForLink instructs the Switch to update its in-memory maps to reflect // that a link has a new alias. func (s *Switch) AddAliasForLink(chanID lnwire.ChannelID, alias lnwire.ShortChannelID) error { // Fetch the link so that we can update the underlying channel's set of // aliases. s.indexMtx.RLock() link, err := s.getLink(chanID) s.indexMtx.RUnlock() if err != nil { return err } // If the link is a channel where the option-scid-alias feature bit was // not negotiated, we'll return an error. if !link.negotiatedAliasFeature() { return fmt.Errorf("attempted to update non-alias channel") } linkScid := link.ShortChanID() // We'll update the maps so the Switch includes this alias in its // forwarding decisions. if link.isZeroConf() { if link.zeroConfConfirmed() { // If the channel has confirmed on-chain, we'll // add this alias to the aliasToReal map. confirmedScid := link.confirmedScid() s.aliasToReal[alias] = confirmedScid } // Add this alias to the baseIndex mapping. s.baseIndex[alias] = linkScid } else if link.negotiatedAliasFeature() { // The channel is confirmed, so we'll populate the aliasToReal // and baseIndex maps. s.aliasToReal[alias] = linkScid s.baseIndex[alias] = linkScid } return nil }