Merge pull request #2105 from yyforyongyu/fix-batch-mem-leak

rpcclient: make sure batch requests are GCed
This commit is contained in:
Olaoluwa Osuntokun 2024-01-22 16:01:08 -08:00 committed by GitHub
commit 62e6af035e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 53 additions and 21 deletions

View file

@ -270,14 +270,21 @@ func (c *Client) removeRequest(id uint64) *jsonRequest {
c.requestLock.Lock()
defer c.requestLock.Unlock()
element := c.requestMap[id]
if element != nil {
delete(c.requestMap, id)
request := c.requestList.Remove(element).(*jsonRequest)
return request
element, ok := c.requestMap[id]
if !ok {
return nil
}
return nil
delete(c.requestMap, id)
var request *jsonRequest
if c.batch {
request = c.batchList.Remove(element).(*jsonRequest)
} else {
request = c.requestList.Remove(element).(*jsonRequest)
}
return request
}
// removeAllRequests removes all the jsonRequests which contain the response
@ -1733,28 +1740,38 @@ func (c *Client) Send() error {
return nil
}
// clear batchlist in case of an error
defer func() {
c.batchList = list.New()
}()
result, err := c.sendAsync().Receive()
batchResp, err := c.sendAsync().Receive()
if err != nil {
// Clear batchlist in case of an error.
//
// TODO(yy): need to double check to make sure there's no
// concurrent access to this batch list, otherwise we may miss
// some batched requests.
c.batchList = list.New()
return err
}
for iter := c.batchList.Front(); iter != nil; iter = iter.Next() {
var requestError error
request := iter.Value.(*jsonRequest)
individualResult := result[request.id]
fullResult, err := json.Marshal(individualResult.Result)
// Iterate each response and send it to the corresponding request.
for id, resp := range batchResp {
// Perform a GC on batchList and requestMap before moving
// forward.
request := c.removeRequest(id)
// If there's an error, we log it and continue to the next
// request.
fullResult, err := json.Marshal(resp.Result)
if err != nil {
return err
log.Errorf("Unable to marshal result: %v for req=%v",
err, request.id)
continue
}
if individualResult.Error != nil {
requestError = individualResult.Error
// If there's a response error, we send it back the request.
var requestError error
if resp.Error != nil {
requestError = resp.Error
}
result := Response{
@ -1763,5 +1780,6 @@ func (c *Client) Send() error {
}
request.responseChan <- &result
}
return nil
}

View file

@ -45,6 +45,20 @@ type MsgBlock struct {
Transactions []*MsgTx
}
// Copy creates a deep copy of MsgBlock.
func (msg *MsgBlock) Copy() *MsgBlock {
block := &MsgBlock{
Header: msg.Header,
Transactions: make([]*MsgTx, len(msg.Transactions)),
}
for i, tx := range msg.Transactions {
block.Transactions[i] = tx.Copy()
}
return block
}
// AddTransaction adds a transaction to the message.
func (msg *MsgBlock) AddTransaction(tx *MsgTx) error {
msg.Transactions = append(msg.Transactions, tx)