peer: re-use buf from bufpool when decoding messages

In this commit, which builds on top of the prior commit, rather than
using the returned buffer outside of the closure (which means it'll be
copied), we instead use it within the `Submit` closure instead. With the
recent changes to the `brontide` package, we won't allocate any new
buffer when we decrypt, as a result, the `rawMsg` byte slice actually
just slices into the passed `buf` slice (obtained from the pool)`.

With this change, we ensure that the buffer pool can release the slice
back to the pool and eliminate any extra allocations along the way.
This commit is contained in:
Olaoluwa Osuntokun 2021-08-11 18:04:00 -07:00
parent 8c6dbc9ffa
commit 3e1558b616
No known key found for this signature in database
GPG key ID: 3BBD59E99B280306

View file

@ -951,7 +951,10 @@ func (p *Brontide) readNextMessage() (lnwire.Message, error) {
// reading incrementally from the stream as the Lightning wire protocol
// is message oriented and allows nodes to pad on additional data to
// the message stream.
var rawMsg []byte
var (
nextMsg lnwire.Message
msgLen uint64
)
err = p.cfg.ReadPool.Submit(func(buf *buffer.Read) error {
// Before reading the body of the message, set the read timeout
// accordingly to ensure we don't block other readers using the
@ -964,18 +967,29 @@ func (p *Brontide) readNextMessage() (lnwire.Message, error) {
return readErr
}
rawMsg, readErr = noiseConn.ReadNextBody(buf[:pktLen])
return readErr
})
atomic.AddUint64(&p.bytesReceived, uint64(len(rawMsg)))
if err != nil {
return nil, err
}
// The ReadNextBody method will actually end up re-using the
// buffer, so within this closure, we can continue to use
// rawMsg as it's just a slice into the buf from the buffer
// pool.
rawMsg, readErr := noiseConn.ReadNextBody(buf[:pktLen])
if readErr != nil {
return readErr
}
msgLen = uint64(len(rawMsg))
// Next, create a new io.Reader implementation from the raw message,
// and use this to decode the message directly from.
msgReader := bytes.NewReader(rawMsg)
nextMsg, err := lnwire.ReadMessage(msgReader, 0)
// Next, create a new io.Reader implementation from the raw
// message, and use this to decode the message directly from.
msgReader := bytes.NewReader(rawMsg)
nextMsg, err = lnwire.ReadMessage(msgReader, 0)
if err != nil {
return err
}
// At this point, rawMsg and buf will be returned back to the
// buffer pool for re-use.
return nil
})
atomic.AddUint64(&p.bytesReceived, msgLen)
if err != nil {
return nil, err
}