diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index 7e62e5057..0d7ca2b4f 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -121,3 +121,15 @@ type Peer interface { // properly handle. Disconnect(reason error) } + +// ForwardingLog is an interface that represents a time series database which +// keep track of all successfully completed payment circuits. Every few +// seconds, the switch will collate and flush out all the successful payment +// circuits during the last interval. +type ForwardingLog interface { + // AddForwardingEvents is a method that should write out the set of + // forwarding events in a batch to persistent storage. Outside + // sub-systems can then query the contents of the log for analysis, + // visualizations, etc. + AddForwardingEvents([]channeldb.ForwardingEvent) error +} diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 55df1e495..b748c668e 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -80,6 +80,22 @@ func (m *mockFeeEstimator) Stop() error { var _ lnwallet.FeeEstimator = (*mockFeeEstimator)(nil) +type mockForwardingLog struct { + sync.Mutex + events map[time.Time]channeldb.ForwardingEvent +} + +func (m *mockForwardingLog) AddForwardingEvents(events []channeldb.ForwardingEvent) error { + m.Lock() + defer m.Unlock() + + for _, event := range events { + m.events[event.Timestamp] = event + } + + return nil +} + type mockServer struct { started int32 shutdown int32 diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index f1656f12e..029c1515e 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -13,6 +13,7 @@ import ( "github.com/roasbeef/btcd/btcec" "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" @@ -99,6 +100,12 @@ type Config struct { // properly route around link./vertex failures. SelfKey *btcec.PublicKey + // FwdingLog is an interface that will be used by the switch to log + // forwarding events. A forwarding event happens each time a payment + // circuit is successfully completed. So when we forward an HTLC, and a + // settle is eventually received. + FwdingLog ForwardingLog + // LocalChannelClose kicks-off the workflow to execute a cooperative or // forced unilateral closure of the channel initiated by a local // subsystem. @@ -169,6 +176,12 @@ type Switch struct { // linkControl is a channel used to propagate add/remove/get htlc // switch handler commands. linkControl chan interface{} + + // pendingFwdingEvents is the set of forwarding events which have been + // collected during the current interval, but hasn't yet been written + // to the forwarding log. + fwdEventMtx sync.Mutex + pendingFwdingEvents []channeldb.ForwardingEvent } // New creates the new instance of htlc switch. @@ -553,8 +566,8 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { // the ultimate settle message back latter. case *lnwire.UpdateAddHTLC: if packet.incomingChanID == (lnwire.ShortChannelID{}) { - // A blank incomingChanID indicates that this is a pending - // user-initiated payment. + // A blank incomingChanID indicates that this is a + // pending user-initiated payment. return s.handleLocalDispatch(packet) } @@ -606,8 +619,8 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { } if link.Bandwidth() >= htlc.Amount { - destination = link + break } } @@ -665,9 +678,12 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { return err } - // Remove circuit since we are about to complete the HTLC. - err := s.circuits.Remove(packet.outgoingChanID, - packet.outgoingHTLCID) + // Remove the circuit since we are about to complete + // the HTLC. + err := s.circuits.Remove( + packet.outgoingChanID, + packet.outgoingHTLCID, + ) if err != nil { log.Warnf("Failed to close completed onion circuit for %x: "+ "(%s, %d) <-> (%s, %d)", circuit.PaymentHash, @@ -683,6 +699,29 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { packet.incomingChanID = circuit.IncomingChanID packet.incomingHTLCID = circuit.IncomingHTLCID + // If this is an HTLC settle, and it wasn't from a + // locally initiated HTLC, then we'll log a forwarding + // event so we can flush it to disk later. + // + // TODO(roasbeef): only do this once link actually + // fully settles? + _, isSettle := packet.htlc.(*lnwire.UpdateFulfillHTLC) + localHTLC := packet.incomingChanID == (lnwire.ShortChannelID{}) + if isSettle && !localHTLC { + s.fwdEventMtx.Lock() + s.pendingFwdingEvents = append( + s.pendingFwdingEvents, + channeldb.ForwardingEvent{ + Timestamp: time.Now(), + IncomingChanID: circuit.IncomingChanID, + OutgoingChanID: circuit.OutgoingChanID, + AmtIn: circuit.IncomingAmt, + AmtOut: circuit.OutgoingAmt, + }, + ) + s.fwdEventMtx.Unlock() + } + // Obfuscate the error message for fail updates before // sending back through the circuit unless the payment // was generated locally. @@ -715,9 +754,11 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { } } - // A blank IncomingChanID in a circuit indicates that it is a - // pending user-initiated payment. - if packet.incomingChanID == (lnwire.ShortChannelID{}) { + // For local HTLC's we'll dispatch the settle event back to the + // caller, rather than to the peer that sent us the HTLC + // originally. + localHTLC := packet.incomingChanID == (lnwire.ShortChannelID{}) + if localHTLC { return s.handleLocalDispatch(packet) } @@ -790,6 +831,13 @@ func (s *Switch) htlcForwarder() { "channel link on stop: %v", err) } } + + // Before we exit fully, we'll attempt to flush out any + // forwarding events that may still be lingering since the last + // batch flush. + if err := s.FlushForwardingEvents(); err != nil { + log.Errorf("unable to flush forwarding events: %v", err) + } }() // TODO(roasbeef): cleared vs settled distinction @@ -801,6 +849,11 @@ func (s *Switch) htlcForwarder() { logTicker := time.NewTicker(10 * time.Second) defer logTicker.Stop() + // Every 15 seconds, we'll flush out the forwarding events that + // occurred during that period. + fwdEventTicker := time.NewTicker(15 * time.Second) + defer fwdEventTicker.Stop() + for { select { // A local close request has arrived, we'll forward this to the @@ -861,6 +914,17 @@ func (s *Switch) htlcForwarder() { case cmd := <-s.htlcPlex: cmd.err <- s.handlePacketForward(cmd.pkt) + // When this time ticks, then it indicates that we should + // collect all the forwarding events since the last internal, + // and write them out to our log. + case <-fwdEventTicker.C: + go func() { + if err := s.FlushForwardingEvents(); err != nil { + log.Errorf("unable to flush "+ + "forwarding events: %v", err) + } + }() + // The log ticker has fired, so we'll calculate some forwarding // stats for the last 10 seconds to display within the logs to // users. @@ -1307,3 +1371,32 @@ func (s *Switch) numPendingPayments() int { func (s *Switch) addCircuit(circuit *PaymentCircuit) { s.circuits.Add(circuit) } + +// FlushForwardingEvents flushes out the set of pending forwarding events to +// the persistent log. This will be used by the switch to periodically flush +// out the set of forwarding events to disk. External callers can also use this +// method to ensure all data is flushed to dis before querying the log. +func (s *Switch) FlushForwardingEvents() error { + // First, we'll obtain a copy of the current set of pending forwarding + // events. + s.fwdEventMtx.Lock() + + // If we won't have any forwarding events, then we can exit early. + if len(s.pendingFwdingEvents) == 0 { + s.fwdEventMtx.Unlock() + return nil + } + + events := make([]channeldb.ForwardingEvent, len(s.pendingFwdingEvents)) + copy(events[:], s.pendingFwdingEvents[:]) + + // With the copy obtained, we can now clear out the header pointer of + // the current slice. This way, we can re-use the underlying storage + // allocated for the slice. + s.pendingFwdingEvents = s.pendingFwdingEvents[:0] + s.fwdEventMtx.Unlock() + + // Finally, we'll write out the copied events to the persistent + // forwarding log. + return s.cfg.FwdingLog.AddForwardingEvents(events) +}