diff --git a/server.go b/server.go index cbc13bc6..e41f27e5 100644 --- a/server.go +++ b/server.go @@ -762,8 +762,21 @@ func (sp *serverPeer) OnGetCFilters(_ *peer.Peer, msg *wire.MsgGetCFilters) { return } - hashes, err := sp.server.chain.HeightToHashRange(int32(msg.StartHeight), - &msg.StopHash, wire.MaxGetCFiltersReqRange) + // We'll also ensure that the remote party is requesting a set of + // filters that we actually currently maintain. + switch msg.FilterType { + case wire.GCSFilterRegular: + break + + default: + peerLog.Debug("Filter request for unknown filter: %v", + msg.FilterType) + return + } + + hashes, err := sp.server.chain.HeightToHashRange( + int32(msg.StartHeight), &msg.StopHash, wire.MaxGetCFiltersReqRange, + ) if err != nil { peerLog.Debugf("Invalid getcfilters request: %v", err) return @@ -776,8 +789,9 @@ func (sp *serverPeer) OnGetCFilters(_ *peer.Peer, msg *wire.MsgGetCFilters) { hashPtrs[i] = &hashes[i] } - filters, err := sp.server.cfIndex.FiltersByBlockHashes(hashPtrs, - msg.FilterType) + filters, err := sp.server.cfIndex.FiltersByBlockHashes( + hashPtrs, msg.FilterType, + ) if err != nil { peerLog.Errorf("Error retrieving cfilters: %v", err) return @@ -785,10 +799,14 @@ func (sp *serverPeer) OnGetCFilters(_ *peer.Peer, msg *wire.MsgGetCFilters) { for i, filterBytes := range filters { if len(filterBytes) == 0 { - peerLog.Warnf("Could not obtain cfilter for %v", hashes[i]) + peerLog.Warnf("Could not obtain cfilter for %v", + hashes[i]) return } - filterMsg := wire.NewMsgCFilter(msg.FilterType, &hashes[i], filterBytes) + + filterMsg := wire.NewMsgCFilter( + msg.FilterType, &hashes[i], filterBytes, + ) sp.QueueMessage(filterMsg, nil) } } @@ -800,19 +818,32 @@ func (sp *serverPeer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) { return } + // We'll also ensure that the remote party is requesting a set of + // headers for filters that we actually currently maintain. + switch msg.FilterType { + case wire.GCSFilterRegular: + break + + default: + peerLog.Debug("Filter request for unknown headers for "+ + "filter: %v", msg.FilterType) + return + } + startHeight := int32(msg.StartHeight) maxResults := wire.MaxCFHeadersPerMsg - // If StartHeight is positive, fetch the predecessor block hash so we can - // populate the PrevFilterHeader field. + // If StartHeight is positive, fetch the predecessor block hash so we + // can populate the PrevFilterHeader field. if msg.StartHeight > 0 { startHeight-- maxResults++ } // Fetch the hashes from the block index. - hashList, err := sp.server.chain.HeightToHashRange(startHeight, - &msg.StopHash, maxResults) + hashList, err := sp.server.chain.HeightToHashRange( + startHeight, &msg.StopHash, maxResults, + ) if err != nil { peerLog.Debugf("Invalid getcfheaders request: %v", err) } @@ -833,8 +864,9 @@ func (sp *serverPeer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) { } // Fetch the raw filter hash bytes from the database for all blocks. - filterHashes, err := sp.server.cfIndex.FilterHashesByBlockHashes(hashPtrs, - msg.FilterType) + filterHashes, err := sp.server.cfIndex.FilterHashesByBlockHashes( + hashPtrs, msg.FilterType, + ) if err != nil { peerLog.Errorf("Error retrieving cfilter hashes: %v", err) return @@ -892,6 +924,7 @@ func (sp *serverPeer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) { headersMsg.FilterType = msg.FilterType headersMsg.StopHash = msg.StopHash + sp.QueueMessage(headersMsg, nil) } @@ -902,42 +935,78 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) { return } - blockHashes, err := sp.server.chain.IntervalBlockHashes(&msg.StopHash, - wire.CFCheckptInterval) + // We'll also ensure that the remote party is requesting a set of + // checkpoints for filters that we actually currently maintain. + switch msg.FilterType { + case wire.GCSFilterRegular: + break + + default: + peerLog.Debug("Filter request for unknown checkpoints for "+ + "filter: %v", msg.FilterType) + return + } + + // Now that we know the client is fetching a filter that we know of, + // we'll fetch the block hashes et each check point interval so we can + // compare against our cache, and create new check points if necessary. + blockHashes, err := sp.server.chain.IntervalBlockHashes( + &msg.StopHash, wire.CFCheckptInterval, + ) if err != nil { peerLog.Debugf("Invalid getcfilters request: %v", err) return } + checkptMsg := wire.NewMsgCFCheckpt( + msg.FilterType, &msg.StopHash, len(blockHashes), + ) + + // Fetch the current existing cache so we can decide if we need to + // extend it or if its adequate as is. + sp.server.cfCheckptCachesMtx.Lock() + checkptCache := sp.server.cfCheckptCaches[msg.FilterType] + + // If the set of block hashes is beyond the current size of the cache, + // then we'll expand the size of the cache and also retain the write + // lock. var updateCache bool - var checkptCache []cfHeaderKV - if len(blockHashes) > len(checkptCache) { - // Update the cache if the checkpoint chain is longer than the cached - // one. This ensures that the cache is relatively stable and mostly - // overlaps with the best chain, since it follows the longest chain - // heuristic. - updateCache = true - - // Take write lock because we are going to update cache. - sp.server.cfCheckptCachesMtx.Lock() + // Now that we know we'll need to modify the size of the cache, + // we'll defer the release of the write lock so we don't + // forget. defer sp.server.cfCheckptCachesMtx.Unlock() - // Grow the checkptCache to be the length of blockHashes. + // We'll mark that we need to update the cache for below and + // also expand the size of the cache in place. + updateCache = true + additionalLength := len(blockHashes) - len(checkptCache) - checkptCache = append(sp.server.cfCheckptCaches[msg.FilterType], - make([]cfHeaderKV, additionalLength)...) + newEntries := make([]cfHeaderKV, additionalLength) + + peerLog.Infof("Growing size of checkpoint cache from %v to %v "+ + "block hashes", len(checkptCache), len(blockHashes)) + + checkptCache = append( + sp.server.cfCheckptCaches[msg.FilterType], + newEntries..., + ) } else { - updateCache = false - - // Take reader lock because we are not going to update cache. + // Otherwise, we'll release the write lock, then grab the read + // lock, as the cache is already properly sized. + sp.server.cfCheckptCachesMtx.Unlock() sp.server.cfCheckptCachesMtx.RLock() - defer sp.server.cfCheckptCachesMtx.RUnlock() - checkptCache = sp.server.cfCheckptCaches[msg.FilterType] + peerLog.Tracef("Serving stale cache of size %v", + len(checkptCache)) + + defer sp.server.cfCheckptCachesMtx.RUnlock() } - // Iterate backwards until the block hash is found in the cache. + // Now that we know the cache is of an appropriate size, we'll iterate + // backwards until the find the block hash. We do this as it's possible + // a re-org has occurred so items in the db are now in the main china + // while the cache has been partially invalidated. var forkIdx int for forkIdx = len(checkptCache); forkIdx > 0; forkIdx-- { if checkptCache[forkIdx-1].blockHash == blockHashes[forkIdx-1] { @@ -945,29 +1014,33 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) { } } - // Populate results with cached checkpoints. - checkptMsg := wire.NewMsgCFCheckpt(msg.FilterType, &msg.StopHash, - len(blockHashes)) + // Now that we know the how much of the cache is relevant for this + // query, we'll populate our check point message with the cache as is. + // Shortly below, we'll populate the new elements of the cache. for i := 0; i < forkIdx; i++ { checkptMsg.AddCFHeader(&checkptCache[i].filterHeader) } - // Look up any filter headers that aren't cached. + // We'll now collect the set of hashes that are beyond our cache so we + // can look up the filter headers to populate the final cache. blockHashPtrs := make([]*chainhash.Hash, 0, len(blockHashes)-forkIdx) for i := forkIdx; i < len(blockHashes); i++ { blockHashPtrs = append(blockHashPtrs, &blockHashes[i]) } - - filterHeaders, err := sp.server.cfIndex.FilterHeadersByBlockHashes(blockHashPtrs, - msg.FilterType) + filterHeaders, err := sp.server.cfIndex.FilterHeadersByBlockHashes( + blockHashPtrs, msg.FilterType, + ) if err != nil { peerLog.Errorf("Error retrieving cfilter headers: %v", err) return } + // Now that we have the full set of filter headers, we'll add them to + // the checkpoint message, and also update our cache in line. for i, filterHeaderBytes := range filterHeaders { if len(filterHeaderBytes) == 0 { - peerLog.Warnf("Could not obtain CF header for %v", blockHashPtrs[i]) + peerLog.Warnf("Could not obtain CF header for %v", + blockHashPtrs[i]) return } @@ -979,6 +1052,9 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) { } checkptMsg.AddCFHeader(filterHeader) + + // If the new main chain is longer than what's in the cache, + // then we'll override it beyond the fork point. if updateCache { checkptCache[forkIdx+i] = cfHeaderKV{ blockHash: blockHashes[forkIdx+i], @@ -987,6 +1063,8 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) { } } + // Finally, we'll update the cache if we need to, and send the final + // message back to the requesting peer. if updateCache { sp.server.cfCheckptCaches[msg.FilterType] = checkptCache }