Concurrently handle websocket client JSON-RPC requests.

This commit is contained in:
Josh Rickmar 2016-04-25 18:51:42 -04:00
parent da04285e0d
commit d0a9c03844
2 changed files with 285 additions and 372 deletions

View File

@ -40,6 +40,7 @@ const (
defaultBanThreshold = 100 defaultBanThreshold = 100
defaultMaxRPCClients = 10 defaultMaxRPCClients = 10
defaultMaxRPCWebsockets = 25 defaultMaxRPCWebsockets = 25
defaultMaxRPCConcurrentReqs = 20
defaultVerifyEnabled = false defaultVerifyEnabled = false
defaultDbType = "ffldb" defaultDbType = "ffldb"
defaultFreeTxRelayLimit = 15.0 defaultFreeTxRelayLimit = 15.0
@ -103,6 +104,7 @@ type config struct {
RPCKey string `long:"rpckey" description:"File containing the certificate key"` RPCKey string `long:"rpckey" description:"File containing the certificate key"`
RPCMaxClients int `long:"rpcmaxclients" description:"Max number of RPC clients for standard connections"` RPCMaxClients int `long:"rpcmaxclients" description:"Max number of RPC clients for standard connections"`
RPCMaxWebsockets int `long:"rpcmaxwebsockets" description:"Max number of RPC websocket connections"` RPCMaxWebsockets int `long:"rpcmaxwebsockets" description:"Max number of RPC websocket connections"`
RPCMaxConcurrentReqs int `long:"rpcmaxconcurrentreqs" description:"Max number of concurrent RPC requests that may be processed concurrently"`
DisableRPC bool `long:"norpc" description:"Disable built-in RPC server -- NOTE: The RPC server is disabled by default if no rpcuser/rpcpass or rpclimituser/rpclimitpass is specified"` DisableRPC bool `long:"norpc" description:"Disable built-in RPC server -- NOTE: The RPC server is disabled by default if no rpcuser/rpcpass or rpclimituser/rpclimitpass is specified"`
DisableTLS bool `long:"notls" description:"Disable TLS for the RPC server -- NOTE: This is only allowed if the RPC server is bound to localhost"` DisableTLS bool `long:"notls" description:"Disable TLS for the RPC server -- NOTE: This is only allowed if the RPC server is bound to localhost"`
DisableDNSSeed bool `long:"nodnsseed" description:"Disable DNS seeding for peers"` DisableDNSSeed bool `long:"nodnsseed" description:"Disable DNS seeding for peers"`
@ -341,6 +343,7 @@ func loadConfig() (*config, []string, error) {
BanThreshold: defaultBanThreshold, BanThreshold: defaultBanThreshold,
RPCMaxClients: defaultMaxRPCClients, RPCMaxClients: defaultMaxRPCClients,
RPCMaxWebsockets: defaultMaxRPCWebsockets, RPCMaxWebsockets: defaultMaxRPCWebsockets,
RPCMaxConcurrentReqs: defaultMaxRPCConcurrentReqs,
DataDir: defaultDataDir, DataDir: defaultDataDir,
LogDir: defaultLogDir, LogDir: defaultLogDir,
DbType: defaultDbType, DbType: defaultDbType,
@ -634,6 +637,15 @@ func loadConfig() (*config, []string, error) {
} }
} }
if cfg.RPCMaxConcurrentReqs < 0 {
str := "%s: The rpcmaxwebsocketconcurrentrequests option may " +
"not be less than 0 -- parsed [%d]"
err := fmt.Errorf(str, funcName, cfg.RPCMaxConcurrentReqs)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
}
// Validate the the minrelaytxfee. // Validate the the minrelaytxfee.
cfg.minRelayTxFee, err = btcutil.NewAmount(cfg.MinRelayTxFee) cfg.minRelayTxFee, err = btcutil.NewAmount(cfg.MinRelayTxFee)
if err != nil { if err != nil {

View File

@ -39,6 +39,15 @@ const (
websocketSendBufferSize = 50 websocketSendBufferSize = 50
) )
type semaphore chan struct{}
func makeSemaphore(n int) semaphore {
return make(chan struct{}, n)
}
func (s semaphore) acquire() { s <- struct{}{} }
func (s semaphore) release() { <-s }
// timeZeroVal is simply the zero value for a time.Time and is used to avoid // timeZeroVal is simply the zero value for a time.Time and is used to avoid
// creating multiple instances. // creating multiple instances.
var timeZeroVal time.Time var timeZeroVal time.Time
@ -65,14 +74,6 @@ var wsHandlersBeforeInit = map[string]wsCommandHandler{
"rescan": handleRescan, "rescan": handleRescan,
} }
// wsAsyncHandlers holds the websocket commands which should be run
// asynchronously to the main input handler goroutine. This allows long-running
// operations to run concurrently (and one at a time) while still responding
// to the majority of normal requests which can be answered quickly.
var wsAsyncHandlers = map[string]struct{}{
"rescan": {},
}
// WebsocketHandler handles a new websocket client by creating a new wsClient, // WebsocketHandler handles a new websocket client by creating a new wsClient,
// starting it, and blocking until the connection closes. Since it blocks, it // starting it, and blocking until the connection closes. Since it blocks, it
// must be run in a separate goroutine. It should be invoked from the websocket // must be run in a separate goroutine. It should be invoked from the websocket
@ -898,180 +899,13 @@ type wsClient struct {
spentRequests map[wire.OutPoint]struct{} spentRequests map[wire.OutPoint]struct{}
// Networking infrastructure. // Networking infrastructure.
asyncStarted bool serviceRequestSem semaphore
asyncChan chan *parsedRPCCmd
ntfnChan chan []byte ntfnChan chan []byte
sendChan chan wsResponse sendChan chan wsResponse
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
} }
// handleMessage is the main handler for incoming requests. It enforces
// authentication, parses the incoming json, looks up and executes handlers
// (including pass through for standard RPC commands), and sends the appropriate
// response. It also detects commands which are marked as long-running and
// sends them off to the asyncHander for processing.
func (c *wsClient) handleMessage(msg []byte) {
if !c.authenticated {
// Disconnect immediately if the provided command fails to
// parse when the client is not already authenticated.
var request btcjson.Request
if err := json.Unmarshal(msg, &request); err != nil {
c.Disconnect()
return
}
parsedCmd := parseCmd(&request)
if parsedCmd.err != nil {
c.Disconnect()
return
}
// Disconnect immediately if the first command is not
// authenticate when not already authenticated.
authCmd, ok := parsedCmd.cmd.(*btcjson.AuthenticateCmd)
if !ok {
rpcsLog.Warnf("Unauthenticated websocket message " +
"received")
c.Disconnect()
return
}
// Check credentials.
login := authCmd.Username + ":" + authCmd.Passphrase
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
authSha := fastsha256.Sum256([]byte(auth))
cmp := subtle.ConstantTimeCompare(authSha[:], c.server.authsha[:])
limitcmp := subtle.ConstantTimeCompare(authSha[:], c.server.limitauthsha[:])
if cmp != 1 && limitcmp != 1 {
rpcsLog.Warnf("Auth failure.")
c.Disconnect()
return
}
c.authenticated = true
c.isAdmin = cmp == 1
// Marshal and send response.
reply, err := createMarshalledReply(parsedCmd.id, nil, nil)
if err != nil {
rpcsLog.Errorf("Failed to marshal authenticate reply: "+
"%v", err.Error())
return
}
c.SendMessage(reply, nil)
return
}
// Attempt to parse the raw message into a JSON-RPC request.
var request btcjson.Request
if err := json.Unmarshal(msg, &request); err != nil {
jsonErr := &btcjson.RPCError{
Code: btcjson.ErrRPCParse.Code,
Message: "Failed to parse request: " + err.Error(),
}
// Marshal and send response.
reply, err := createMarshalledReply(nil, nil, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to marshal parse failure "+
"reply: %v", err)
return
}
c.SendMessage(reply, nil)
return
}
// Requests with no ID (notifications) must not have a response per the
// JSON-RPC spec.
if request.ID == nil {
return
}
// Check if the user is limited and disconnect client if unauthorized
if !c.isAdmin {
if _, ok := rpcLimited[request.Method]; !ok {
jsonErr := &btcjson.RPCError{
Code: btcjson.ErrRPCInvalidParams.Code,
Message: "limited user not authorized for this method",
}
// Marshal and send response.
reply, err := createMarshalledReply(request.ID, nil, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to marshal parse failure "+
"reply: %v", err)
return
}
c.SendMessage(reply, nil)
return
}
}
// Attempt to parse the JSON-RPC request into a known concrete command.
cmd := parseCmd(&request)
if cmd.err != nil {
// Marshal and send response.
reply, err := createMarshalledReply(cmd.id, nil, cmd.err)
if err != nil {
rpcsLog.Errorf("Failed to marshal parse failure "+
"reply: %v", err)
return
}
c.SendMessage(reply, nil)
return
}
rpcsLog.Debugf("Received command <%s> from %s", cmd.method, c.addr)
// Disconnect if already authenticated and another authenticate command
// is received.
if _, ok := cmd.cmd.(*btcjson.AuthenticateCmd); ok {
rpcsLog.Warnf("Websocket client %s is already authenticated",
c.addr)
c.Disconnect()
return
}
// When the command is marked as a long-running command, send it off
// to the asyncHander goroutine for processing.
if _, ok := wsAsyncHandlers[cmd.method]; ok {
// Start up the async goroutine for handling long-running
// requests asynchonrously if needed.
if !c.asyncStarted {
rpcsLog.Tracef("Starting async handler for %s", c.addr)
c.wg.Add(1)
go c.asyncHandler()
c.asyncStarted = true
}
c.asyncChan <- cmd
return
}
// Lookup the websocket extension for the command and if it doesn't
// exist fallback to handling the command as a standard command.
wsHandler, ok := wsHandlers[cmd.method]
if !ok {
// No websocket-specific handler so handle like a legacy
// RPC connection.
result, jsonErr := c.server.standardCmdResult(cmd, nil)
reply, err := createMarshalledReply(cmd.id, result, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to marshal reply for <%s> "+
"command: %v", cmd.method, err)
return
}
c.SendMessage(reply, nil)
return
}
// Invoke the handler and marshal and send response.
result, jsonErr := wsHandler(c, cmd.cmd)
reply, err := createMarshalledReply(cmd.id, result, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to marshal reply for <%s> command: %v",
cmd.method, err)
return
}
c.SendMessage(reply, nil)
}
// inHandler handles all incoming messages for the websocket connection. It // inHandler handles all incoming messages for the websocket connection. It
// must be run as a goroutine. // must be run as a goroutine.
func (c *wsClient) inHandler() { func (c *wsClient) inHandler() {
@ -1094,7 +928,138 @@ out:
} }
break out break out
} }
c.handleMessage(msg)
var request btcjson.Request
err = json.Unmarshal(msg, &request)
if err != nil {
if !c.authenticated {
break out
}
jsonErr := &btcjson.RPCError{
Code: btcjson.ErrRPCParse.Code,
Message: "Failed to parse request: " + err.Error(),
}
reply, err := createMarshalledReply(nil, nil, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to marshal parse failure "+
"reply: %v", err)
continue
}
c.SendMessage(reply, nil)
continue
}
// Requests with no ID (notifications) must not have a response per the
// JSON-RPC spec.
if request.ID == nil {
if !c.authenticated {
break out
}
continue
}
cmd := parseCmd(&request)
if cmd.err != nil {
if !c.authenticated {
break out
}
reply, err := createMarshalledReply(cmd.id, nil, cmd.err)
if err != nil {
rpcsLog.Errorf("Failed to marshal parse failure "+
"reply: %v", err)
continue
}
c.SendMessage(reply, nil)
continue
}
rpcsLog.Debugf("Received command <%s> from %s", cmd.method, c.addr)
// Check auth. The client is immediately disconnected if the
// first request of an unauthentiated websocket client is not
// the authenticate request, an authenticate request is received
// when the client is already authenticated, or incorrect
// authentication credentials are provided in the request.
switch authCmd, ok := cmd.cmd.(*btcjson.AuthenticateCmd); {
case c.authenticated && ok:
rpcsLog.Warnf("Websocket client %s is already authenticated",
c.addr)
break out
case !c.authenticated && !ok:
rpcsLog.Warnf("Unauthenticated websocket message " +
"received")
break out
case !c.authenticated:
// Check credentials.
login := authCmd.Username + ":" + authCmd.Passphrase
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
authSha := fastsha256.Sum256([]byte(auth))
cmp := subtle.ConstantTimeCompare(authSha[:], c.server.authsha[:])
limitcmp := subtle.ConstantTimeCompare(authSha[:], c.server.limitauthsha[:])
if cmp != 1 && limitcmp != 1 {
rpcsLog.Warnf("Auth failure.")
break out
}
c.authenticated = true
c.isAdmin = cmp == 1
// Marshal and send response.
reply, err := createMarshalledReply(cmd.id, nil, nil)
if err != nil {
rpcsLog.Errorf("Failed to marshal authenticate reply: "+
"%v", err.Error())
continue
}
c.SendMessage(reply, nil)
continue
}
// Check if the client is using limited RPC credentials and
// error when not authorized to call this RPC.
if !c.isAdmin {
if _, ok := rpcLimited[request.Method]; !ok {
jsonErr := &btcjson.RPCError{
Code: btcjson.ErrRPCInvalidParams.Code,
Message: "limited user not authorized for this method",
}
// Marshal and send response.
reply, err := createMarshalledReply(request.ID, nil, jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to marshal parse failure "+
"reply: %v", err)
continue
}
c.SendMessage(reply, nil)
continue
}
}
// Asynchronously handle the request. A semaphore is used to
// limit the number of concurrent requests currently being
// serviced. If the semaphore can not be acquired, simply wait
// until a request finished before reading the next RPC request
// from the websocket client.
//
// This could be a little fancier by timing out and erroring
// when it takes too long to service the request, but if that is
// done, the read of the next request should not be blocked by
// this semaphore, otherwise the next request will be read and
// will probably sit here for another few seconds before timing
// out as well. This will cause the total timeout duration for
// later requests to be much longer than the check here would
// imply.
//
// If a timeout is added, the semaphore acquiring should be
// moved inside of the new goroutine with a select statement
// that also reads a time.After channel. This will unblock the
// read of the next request from the websocket client and allow
// many requests to be waited on concurrently.
c.serviceRequestSem.acquire()
go func() {
c.serviceRequest(cmd)
c.serviceRequestSem.release()
}()
} }
// Ensure the connection is closed. // Ensure the connection is closed.
@ -1103,6 +1068,32 @@ out:
rpcsLog.Tracef("Websocket client input handler done for %s", c.addr) rpcsLog.Tracef("Websocket client input handler done for %s", c.addr)
} }
// serviceRequest services a parsed RPC request by looking up and executing the
// appropiate RPC handler. The response is marshalled and sent to the websocket
// client.
func (c *wsClient) serviceRequest(r *parsedRPCCmd) {
var (
result interface{}
err error
)
// Lookup the websocket extension for the command and if it doesn't
// exist fallback to handling the command as a standard command.
wsHandler, ok := wsHandlers[r.method]
if ok {
result, err = wsHandler(c, r.cmd)
} else {
result, err = c.server.standardCmdResult(r, nil)
}
reply, err := createMarshalledReply(r.id, result, err)
if err != nil {
rpcsLog.Errorf("Failed to marshal reply for <%s> "+
"command: %v", r.method, err)
return
}
c.SendMessage(reply, nil)
}
// notificationQueueHandler handles the queuing of outgoing notifications for // notificationQueueHandler handles the queuing of outgoing notifications for
// the websocket client. This runs as a muxer for various sources of input to // the websocket client. This runs as a muxer for various sources of input to
// ensure that queuing up notifications to be sent will not block. Otherwise, // ensure that queuing up notifications to be sent will not block. Otherwise,
@ -1218,96 +1209,6 @@ cleanup:
rpcsLog.Tracef("Websocket client output handler done for %s", c.addr) rpcsLog.Tracef("Websocket client output handler done for %s", c.addr)
} }
// asyncHandler handles all long-running requests such as rescans which are
// not run directly in the inHandler routine unlike most requests. This allows
// normal quick requests to continue to be processed and responded to even while
// lengthy operations are underway. Only one long-running operation is
// permitted at a time, so multiple long-running requests are queued and
// serialized. It must be run as a goroutine. Also, this goroutine is not
// started until/if the first long-running request is made.
func (c *wsClient) asyncHandler() {
asyncHandlerDoneChan := make(chan struct{}, 1) // nonblocking sync
pendingCmds := list.New()
waiting := false
// runHandler runs the handler for the passed command and sends the
// reply.
runHandler := func(parsedCmd *parsedRPCCmd) {
wsHandler, ok := wsHandlers[parsedCmd.method]
if !ok {
rpcsLog.Warnf("No handler for command <%s>",
parsedCmd.method)
return
}
// Invoke the handler and marshal and send response.
result, jsonErr := wsHandler(c, parsedCmd.cmd)
reply, err := createMarshalledReply(parsedCmd.id, result,
jsonErr)
if err != nil {
rpcsLog.Errorf("Failed to marshal reply for <%s> "+
"command: %v", parsedCmd.method, err)
return
}
c.SendMessage(reply, nil)
}
out:
for {
select {
case cmd := <-c.asyncChan:
if !waiting {
c.wg.Add(1)
go func(cmd *parsedRPCCmd) {
runHandler(cmd)
asyncHandlerDoneChan <- struct{}{}
c.wg.Done()
}(cmd)
} else {
pendingCmds.PushBack(cmd)
}
waiting = true
case <-asyncHandlerDoneChan:
// No longer waiting if there are no more messages in
// the pending messages queue.
next := pendingCmds.Front()
if next == nil {
waiting = false
continue
}
// Notify the outHandler about the next item to
// asynchronously send.
element := pendingCmds.Remove(next)
c.wg.Add(1)
go func(cmd *parsedRPCCmd) {
runHandler(cmd)
asyncHandlerDoneChan <- struct{}{}
c.wg.Done()
}(element.(*parsedRPCCmd))
case <-c.quit:
break out
}
}
// Drain any wait channels before exiting so nothing is left waiting
// around to send.
cleanup:
for {
select {
case <-c.asyncChan:
case <-asyncHandlerDoneChan:
default:
break cleanup
}
}
c.wg.Done()
rpcsLog.Tracef("Websocket client async handler done for %s", c.addr)
}
// SendMessage sends the passed json to the websocket client. It is backed // SendMessage sends the passed json to the websocket client. It is backed
// by a buffered channel, so it will not block until the send channel is full. // by a buffered channel, so it will not block until the send channel is full.
// Note however that QueueNotification must be used for sending async // Note however that QueueNotification must be used for sending async
@ -1414,8 +1315,8 @@ func newWebsocketClient(server *rpcServer, conn *websocket.Conn,
server: server, server: server,
addrRequests: make(map[string]struct{}), addrRequests: make(map[string]struct{}),
spentRequests: make(map[wire.OutPoint]struct{}), spentRequests: make(map[wire.OutPoint]struct{}),
serviceRequestSem: makeSemaphore(cfg.RPCMaxConcurrentReqs),
ntfnChan: make(chan []byte, 1), // nonblocking sync ntfnChan: make(chan []byte, 1), // nonblocking sync
asyncChan: make(chan *parsedRPCCmd, 1), // nonblocking sync
sendChan: make(chan wsResponse, websocketSendBufferSize), sendChan: make(chan wsResponse, websocketSendBufferSize),
quit: make(chan struct{}), quit: make(chan struct{}),
} }