From 3e1558b61686ab97729c5449dc6c33c5e7a6ef96 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 11 Aug 2021 18:04:00 -0700 Subject: [PATCH] peer: re-use buf from bufpool when decoding messages In this commit, which builds on top of the prior commit, rather than using the returned buffer outside of the closure (which means it'll be copied), we instead use it within the `Submit` closure instead. With the recent changes to the `brontide` package, we won't allocate any new buffer when we decrypt, as a result, the `rawMsg` byte slice actually just slices into the passed `buf` slice (obtained from the pool)`. With this change, we ensure that the buffer pool can release the slice back to the pool and eliminate any extra allocations along the way. --- peer/brontide.go | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index 8d71fffe6..695a09a87 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -951,7 +951,10 @@ func (p *Brontide) readNextMessage() (lnwire.Message, error) { // reading incrementally from the stream as the Lightning wire protocol // is message oriented and allows nodes to pad on additional data to // the message stream. - var rawMsg []byte + var ( + nextMsg lnwire.Message + msgLen uint64 + ) err = p.cfg.ReadPool.Submit(func(buf *buffer.Read) error { // Before reading the body of the message, set the read timeout // accordingly to ensure we don't block other readers using the @@ -964,18 +967,29 @@ func (p *Brontide) readNextMessage() (lnwire.Message, error) { return readErr } - rawMsg, readErr = noiseConn.ReadNextBody(buf[:pktLen]) - return readErr - }) - atomic.AddUint64(&p.bytesReceived, uint64(len(rawMsg))) - if err != nil { - return nil, err - } + // The ReadNextBody method will actually end up re-using the + // buffer, so within this closure, we can continue to use + // rawMsg as it's just a slice into the buf from the buffer + // pool. + rawMsg, readErr := noiseConn.ReadNextBody(buf[:pktLen]) + if readErr != nil { + return readErr + } + msgLen = uint64(len(rawMsg)) - // Next, create a new io.Reader implementation from the raw message, - // and use this to decode the message directly from. - msgReader := bytes.NewReader(rawMsg) - nextMsg, err := lnwire.ReadMessage(msgReader, 0) + // Next, create a new io.Reader implementation from the raw + // message, and use this to decode the message directly from. + msgReader := bytes.NewReader(rawMsg) + nextMsg, err = lnwire.ReadMessage(msgReader, 0) + if err != nil { + return err + } + + // At this point, rawMsg and buf will be returned back to the + // buffer pool for re-use. + return nil + }) + atomic.AddUint64(&p.bytesReceived, msgLen) if err != nil { return nil, err }