From 5d9514fbe4e440b0cd3e6b46273cafceeb9f9af3 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 15 Feb 2019 19:27:16 -0800 Subject: [PATCH] buffer+pool: add buffer.Read and pool.ReadBuffer --- buffer/read.go | 19 ++++++++++++++++++ pool/read_buffer.go | 48 ++++++++++++++++++++++++++++++++++++++++++++ pool/recycle_test.go | 24 ++++++++++++++++++++++ 3 files changed, 91 insertions(+) create mode 100644 buffer/read.go create mode 100644 pool/read_buffer.go diff --git a/buffer/read.go b/buffer/read.go new file mode 100644 index 000000000..57050d58f --- /dev/null +++ b/buffer/read.go @@ -0,0 +1,19 @@ +package buffer + +import ( + "github.com/lightningnetwork/lnd/lnwire" +) + +// ReadSize represents the size of the maximum message that can be read off the +// wire by brontide. The buffer is used to hold the ciphertext while the +// brontide state machine decrypts the message. +const ReadSize = lnwire.MaxMessagePayload + 16 + +// Read is a static byte array sized to the maximum-allowed Lightning message +// size, plus 16 bytes for the MAC. +type Read [ReadSize]byte + +// Recycle zeroes the Read, making it fresh for another use. +func (b *Read) Recycle() { + RecycleSlice(b[:]) +} diff --git a/pool/read_buffer.go b/pool/read_buffer.go new file mode 100644 index 000000000..58f6014ed --- /dev/null +++ b/pool/read_buffer.go @@ -0,0 +1,48 @@ +package pool + +import ( + "time" + + "github.com/lightningnetwork/lnd/buffer" +) + +const ( + // DefaultReadBufferGCInterval is the default interval that a Read will + // perform a sweep to see which expired buffer.Reads can be released to + // the runtime. + DefaultReadBufferGCInterval = 15 * time.Second + + // DefaultReadBufferExpiryInterval is the default, minimum interval that + // must elapse before a Read will release a buffer.Read. The maximum + // time before the buffer can be released is equal to the expiry + // interval plus the gc interval. + DefaultReadBufferExpiryInterval = 30 * time.Second +) + +// ReadBuffer is a pool of buffer.Read items, that dynamically allocates and +// reclaims buffers in response to load. +type ReadBuffer struct { + pool *Recycle +} + +// NewReadBuffer returns a freshly instantiated ReadBuffer, using the given +// gcInterval and expieryInterval. +func NewReadBuffer(gcInterval, expiryInterval time.Duration) *ReadBuffer { + return &ReadBuffer{ + pool: NewRecycle( + func() interface{} { return new(buffer.Read) }, + 100, gcInterval, expiryInterval, + ), + } +} + +// Take returns a fresh buffer.Read to the caller. +func (p *ReadBuffer) Take() *buffer.Read { + return p.pool.Take().(*buffer.Read) +} + +// Return returns the buffer.Read to the pool, so that it can be cycled or +// released. +func (p *ReadBuffer) Return(buf *buffer.Read) { + p.pool.Return(buf) +} diff --git a/pool/recycle_test.go b/pool/recycle_test.go index 58be090c7..2750da004 100644 --- a/pool/recycle_test.go +++ b/pool/recycle_test.go @@ -30,6 +30,10 @@ func TestRecyclers(t *testing.T) { "write_buffer", func() interface{} { return new(buffer.Write) }, }, + { + "read_buffer", + func() interface{} { return new(buffer.Read) }, + }, } for _, test := range tests { @@ -71,6 +75,14 @@ func TestConcreteRecyclePoolTests(t *testing.T) { ) }, }, + { + name: "read buffer pool", + newPool: func() interface{} { + return pool.NewReadBuffer( + gcInterval, expiryInterval, + ) + }, + }, } for _, test := range tests { @@ -124,6 +136,9 @@ func takeGeneric(t *testing.T, p interface{}) pool.Recycler { case *pool.WriteBuffer: return pp.Take() + case *pool.ReadBuffer: + return pp.Take() + default: t.Fatalf("unknown pool type: %T", p) } @@ -138,6 +153,9 @@ func returnGeneric(t *testing.T, p, item interface{}) { case *pool.WriteBuffer: pp.Return(item.(*buffer.Write)) + case *pool.ReadBuffer: + pp.Return(item.(*buffer.Read)) + default: t.Fatalf("unknown pool type: %T", p) } @@ -153,6 +171,9 @@ func dirtyGeneric(t *testing.T, i interface{}) { case *buffer.Write: dirtySlice(item[:]) + case *buffer.Read: + dirtySlice(item[:]) + default: t.Fatalf("unknown item type: %T", i) } @@ -177,6 +198,9 @@ func isCleanGeneric(t *testing.T, i interface{}) { case *buffer.Write: isCleanSlice(t, item[:]) + case *buffer.Read: + isCleanSlice(t, item[:]) + default: t.Fatalf("unknown item type: %T", i) }