rpcclient: Add retry with backoffs to HTTP POST requests

Adds behavior similar to the retries of persistent RPC connections
to HTTP request.

* Initial backoff: 500ms
* Linear increase
* Max retries: 10

Room for future improvement:
* Make configurable
* Add jitter
* Tests for retry behavior
This commit is contained in:
3nprob 2021-08-21 22:04:31 +09:00 committed by John C. Vernaleo
parent 65e986844e
commit cc7327c194

View File

@ -86,15 +86,11 @@ const (
// connectionRetryInterval is the amount of time to wait in between // connectionRetryInterval is the amount of time to wait in between
// retries when automatically reconnecting to an RPC server. // retries when automatically reconnecting to an RPC server.
connectionRetryInterval = time.Second * 5 connectionRetryInterval = time.Second * 5
)
// sendPostDetails houses an HTTP POST request to send to an RPC server as well // requestRetryInterval is the initial amount of time to wait in between
// as the original JSON-RPC command and a channel to reply on when the server // retries when sending HTTP POST requests.
// responds with the result. requestRetryInterval = time.Millisecond * 500
type sendPostDetails struct { )
httpRequest *http.Request
jsonRequest *jsonRequest
}
// jsonRequest holds information about a json request that is used to properly // jsonRequest holds information about a json request that is used to properly
// detect, interpret, and deliver a reply to it. // detect, interpret, and deliver a reply to it.
@ -183,7 +179,7 @@ type Client struct {
// Networking infrastructure. // Networking infrastructure.
sendChan chan []byte sendChan chan []byte
sendPostChan chan *sendPostDetails sendPostChan chan *jsonRequest
connEstablished chan struct{} connEstablished chan struct{}
disconnect chan struct{} disconnect chan struct{}
shutdown chan struct{} shutdown chan struct{}
@ -765,10 +761,50 @@ out:
// handleSendPostMessage handles performing the passed HTTP request, reading the // handleSendPostMessage handles performing the passed HTTP request, reading the
// result, unmarshalling it, and delivering the unmarshalled result to the // result, unmarshalling it, and delivering the unmarshalled result to the
// provided response channel. // provided response channel.
func (c *Client) handleSendPostMessage(details *sendPostDetails) { func (c *Client) handleSendPostMessage(jReq *jsonRequest) {
jReq := details.jsonRequest protocol := "http"
log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id) if !c.config.DisableTLS {
httpResponse, err := c.httpClient.Do(details.httpRequest) protocol = "https"
}
url := protocol + "://" + c.config.Host
var err error
var backoff time.Duration
var httpResponse *http.Response
tries := 10
for i := 0; tries == 0 || i < tries; i++ {
bodyReader := bytes.NewReader(jReq.marshalledJSON)
httpReq, err := http.NewRequest("POST", url, bodyReader)
if err != nil {
jReq.responseChan <- &Response{result: nil, err: err}
return
}
httpReq.Close = true
httpReq.Header.Set("Content-Type", "application/json")
for key, value := range c.config.ExtraHeaders {
httpReq.Header.Set(key, value)
}
// Configure basic access authorization.
user, pass, err := c.config.getAuth()
if err != nil {
jReq.responseChan <- &Response{result: nil, err: err}
return
}
httpReq.SetBasicAuth(user, pass)
httpResponse, err = c.httpClient.Do(httpReq)
if err != nil {
backoff = requestRetryInterval * time.Duration(i+1)
if backoff > time.Minute {
backoff = time.Minute
}
log.Debugf("Failed command [%s] with id %d attempt %d. Retrying in %v... \n", jReq.method, jReq.id, i, backoff)
time.Sleep(backoff)
continue
}
break
}
if err != nil { if err != nil {
jReq.responseChan <- &Response{err: err} jReq.responseChan <- &Response{err: err}
return return
@ -821,8 +857,8 @@ out:
// Send any messages ready for send until the shutdown channel // Send any messages ready for send until the shutdown channel
// is closed. // is closed.
select { select {
case details := <-c.sendPostChan: case jReq := <-c.sendPostChan:
c.handleSendPostMessage(details) c.handleSendPostMessage(jReq)
case <-c.shutdown: case <-c.shutdown:
break out break out
@ -834,8 +870,8 @@ out:
cleanup: cleanup:
for { for {
select { select {
case details := <-c.sendPostChan: case jReq := <-c.sendPostChan:
details.jsonRequest.responseChan <- &Response{ jReq.responseChan <- &Response{
result: nil, result: nil,
err: ErrClientShutdown, err: ErrClientShutdown,
} }
@ -852,7 +888,7 @@ cleanup:
// sendPostRequest sends the passed HTTP request to the RPC server using the // sendPostRequest sends the passed HTTP request to the RPC server using the
// HTTP client associated with the client. It is backed by a buffered channel, // HTTP client associated with the client. It is backed by a buffered channel,
// so it will not block until the send channel is full. // so it will not block until the send channel is full.
func (c *Client) sendPostRequest(httpReq *http.Request, jReq *jsonRequest) { func (c *Client) sendPostRequest(jReq *jsonRequest) {
// Don't send the message if shutting down. // Don't send the message if shutting down.
select { select {
case <-c.shutdown: case <-c.shutdown:
@ -860,10 +896,9 @@ func (c *Client) sendPostRequest(httpReq *http.Request, jReq *jsonRequest) {
default: default:
} }
c.sendPostChan <- &sendPostDetails{ log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id)
jsonRequest: jReq,
httpRequest: httpReq, c.sendPostChan <- jReq
}
} }
// newFutureError returns a new future result channel that already has the // newFutureError returns a new future result channel that already has the
@ -885,42 +920,6 @@ func ReceiveFuture(f chan *Response) ([]byte, error) {
return r.result, r.err return r.result, r.err
} }
// sendPost sends the passed request to the server by issuing an HTTP POST
// request using the provided response channel for the reply. Typically a new
// connection is opened and closed for each command when using this method,
// however, the underlying HTTP client might coalesce multiple commands
// depending on several factors including the remote server configuration.
func (c *Client) sendPost(jReq *jsonRequest) {
// Generate a request to the configured RPC server.
protocol := "http"
if !c.config.DisableTLS {
protocol = "https"
}
url := protocol + "://" + c.config.Host
bodyReader := bytes.NewReader(jReq.marshalledJSON)
httpReq, err := http.NewRequest("POST", url, bodyReader)
if err != nil {
jReq.responseChan <- &Response{result: nil, err: err}
return
}
httpReq.Close = true
httpReq.Header.Set("Content-Type", "application/json")
for key, value := range c.config.ExtraHeaders {
httpReq.Header.Set(key, value)
}
// Configure basic access authorization.
user, pass, err := c.config.getAuth()
if err != nil {
jReq.responseChan <- &Response{result: nil, err: err}
return
}
httpReq.SetBasicAuth(user, pass)
log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id)
c.sendPostRequest(httpReq, jReq)
}
// sendRequest sends the passed json request to the associated server using the // sendRequest sends the passed json request to the associated server using the
// provided response channel for the reply. It handles both websocket and HTTP // provided response channel for the reply. It handles both websocket and HTTP
// POST mode depending on the configuration of the client. // POST mode depending on the configuration of the client.
@ -935,7 +934,7 @@ func (c *Client) sendRequest(jReq *jsonRequest) {
log.Warn(err) log.Warn(err)
} }
} else { } else {
c.sendPost(jReq) c.sendPostRequest(jReq)
} }
return return
} }
@ -1428,7 +1427,7 @@ func New(config *ConnConfig, ntfnHandlers *NotificationHandlers) (*Client, error
ntfnHandlers: ntfnHandlers, ntfnHandlers: ntfnHandlers,
ntfnState: newNotificationState(), ntfnState: newNotificationState(),
sendChan: make(chan []byte, sendBufferSize), sendChan: make(chan []byte, sendBufferSize),
sendPostChan: make(chan *sendPostDetails, sendPostBufferSize), sendPostChan: make(chan *jsonRequest, sendPostBufferSize),
connEstablished: connEstablished, connEstablished: connEstablished,
disconnect: make(chan struct{}), disconnect: make(chan struct{}),
shutdown: make(chan struct{}), shutdown: make(chan struct{}),
@ -1642,7 +1641,7 @@ func (c *Client) sendAsync() FutureGetBulkResult {
marshalledJSON: marshalledRequest, marshalledJSON: marshalledRequest,
responseChan: responseChan, responseChan: responseChan,
} }
c.sendPost(&request) c.sendPostRequest(&request)
return responseChan return responseChan
} }