mirror of
https://github.com/btcsuite/btcd.git
synced 2025-03-12 19:02:12 +01:00
Merge pull request #2273 from ynewmann/fix/batch-data-race
rpcclient: safe read and write to batch
This commit is contained in:
commit
b1b1e9551b
1 changed files with 30 additions and 10 deletions
|
@ -74,6 +74,9 @@ var (
|
|||
// client having already connected to the RPC server.
|
||||
ErrClientAlreadyConnected = errors.New("websocket client has already " +
|
||||
"connected")
|
||||
|
||||
// ErrEmptyBatch is an error to describe that there is nothing to send.
|
||||
ErrEmptyBatch = errors.New("batch is empty")
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -151,6 +154,7 @@ type Client struct {
|
|||
|
||||
// whether or not to batch requests, false unless changed by Batch()
|
||||
batch bool
|
||||
batchLock sync.Mutex
|
||||
batchList *list.List
|
||||
|
||||
// retryCount holds the number of times the client has tried to
|
||||
|
@ -214,7 +218,10 @@ func (c *Client) addRequest(jReq *jsonRequest) error {
|
|||
element := c.requestList.PushBack(jReq)
|
||||
c.requestMap[jReq.id] = element
|
||||
} else {
|
||||
c.batchLock.Lock()
|
||||
element := c.batchList.PushBack(jReq)
|
||||
c.batchLock.Unlock()
|
||||
|
||||
c.requestMap[jReq.id] = element
|
||||
}
|
||||
return nil
|
||||
|
@ -238,7 +245,9 @@ func (c *Client) removeRequest(id uint64) *jsonRequest {
|
|||
|
||||
var request *jsonRequest
|
||||
if c.batch {
|
||||
c.batchLock.Lock()
|
||||
request = c.batchList.Remove(element).(*jsonRequest)
|
||||
c.batchLock.Unlock()
|
||||
} else {
|
||||
request = c.requestList.Remove(element).(*jsonRequest)
|
||||
}
|
||||
|
@ -1672,7 +1681,15 @@ func (c *Client) BackendVersion() (BackendVersion, error) {
|
|||
return c.backendVersion, nil
|
||||
}
|
||||
|
||||
func (c *Client) sendAsync() FutureGetBulkResult {
|
||||
func (c *Client) sendAsync() (FutureGetBulkResult, error) {
|
||||
c.batchLock.Lock()
|
||||
defer c.batchLock.Unlock()
|
||||
|
||||
// If batchList is empty, there's nothing to send.
|
||||
if c.batchList.Len() == 0 {
|
||||
return nil, ErrEmptyBatch
|
||||
}
|
||||
|
||||
// convert the array of marshalled json requests to a single request we can send
|
||||
responseChan := make(chan *Response, 1)
|
||||
marshalledRequest := []byte("[")
|
||||
|
@ -1694,25 +1711,24 @@ func (c *Client) sendAsync() FutureGetBulkResult {
|
|||
responseChan: responseChan,
|
||||
}
|
||||
c.sendPostRequest(&request)
|
||||
return responseChan
|
||||
return responseChan, nil
|
||||
}
|
||||
|
||||
// Marshall's bulk requests and sends to the server
|
||||
// creates a response channel to receive the response
|
||||
func (c *Client) Send() error {
|
||||
// if batchlist is empty, there's nothing to send
|
||||
if c.batchList.Len() == 0 {
|
||||
return nil
|
||||
future, err := c.sendAsync()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
batchResp, err := c.sendAsync().Receive()
|
||||
batchResp, err := future.Receive()
|
||||
if err != nil {
|
||||
// Clear batchlist in case of an error.
|
||||
//
|
||||
// TODO(yy): need to double check to make sure there's no
|
||||
// concurrent access to this batch list, otherwise we may miss
|
||||
// some batched requests.
|
||||
|
||||
c.batchLock.Lock()
|
||||
c.batchList = list.New()
|
||||
c.batchLock.Unlock()
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -1722,6 +1738,10 @@ func (c *Client) Send() error {
|
|||
// Perform a GC on batchList and requestMap before moving
|
||||
// forward.
|
||||
request := c.removeRequest(id)
|
||||
if request == nil {
|
||||
// Perhaps another goroutine has already processed this request.
|
||||
continue
|
||||
}
|
||||
|
||||
// If there's an error, we log it and continue to the next
|
||||
// request.
|
||||
|
|
Loading…
Add table
Reference in a new issue