Remove isInitialized() check in sendMsg, which was causing a deadlock (#763)

* Remove isInitialized() check in sendMsg, which was causing a deadlock

* Enable akka logging to help debug NeutrinoNodeTest

* Revert to bypassing the isInitialized() check in sendMsg()

* Run scalafmt again
This commit is contained in:
Chris Stewart 2019-10-01 06:25:03 -05:00 committed by GitHub
parent 7e19a706de
commit 9b1caa6561
2 changed files with 23 additions and 49 deletions

View file

@ -12,7 +12,6 @@ import org.scalatest.{DoNotDiscover, FutureOutcome}
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
@DoNotDiscover
class NeutrinoNodeTest extends NodeUnitTest {
/** Wallet config with data directory set to user temp directory */

View file

@ -58,33 +58,31 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
}
/** Sends a [[org.bitcoins.core.p2p.VersionMessage VersionMessage]] to our peer */
def sendVersionMessage()(implicit ec: ExecutionContext): Future[Unit] = {
def sendVersionMessage(): Future[Unit] = {
val versionMsg = VersionMessage(client.peer.socket, conf.network)
logger.trace(s"Sending versionMsg=$versionMsg to peer=${client.peer}")
sendMsg(versionMsg)
}
def sendVerackMessage()(implicit ec: ExecutionContext): Future[Unit] = {
def sendVerackMessage(): Future[Unit] = {
val verackMsg = VerAckMessage
sendMsg(verackMsg)
}
/** Responds to a ping message */
def sendPong(ping: PingMessage)(
implicit ec: ExecutionContext): Future[Unit] = {
def sendPong(ping: PingMessage): Future[Unit] = {
val pong = PongMessage(ping.nonce)
logger.trace(s"Sending pong=$pong to peer=${client.peer}")
sendMsg(pong)
}
def sendGetHeadersMessage(lastHash: DoubleSha256Digest)(
implicit ec: ExecutionContext): Future[Unit] = {
def sendGetHeadersMessage(lastHash: DoubleSha256Digest): Future[Unit] = {
val headersMsg = GetHeadersMessage(lastHash)
logger.trace(s"Sending getheaders=$headersMsg to peer=${client.peer}")
sendMsg(headersMsg)
}
def sendHeadersMessage()(implicit ec: ExecutionContext): Future[Unit] = {
def sendHeadersMessage(): Future[Unit] = {
val sendHeadersMsg = SendHeadersMessage
sendMsg(sendHeadersMsg)
}
@ -92,8 +90,7 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
/**
* Sends a inventory message with the given transactions
*/
def sendInventoryMessage(transactions: Transaction*)(
implicit ec: ExecutionContext): Future[Unit] = {
def sendInventoryMessage(transactions: Transaction*): Future[Unit] = {
val inventories =
transactions.map(tx => Inventory(TypeIdentifier.MsgTx, tx.txId))
val message = InventoryMessage(inventories)
@ -101,34 +98,30 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
sendMsg(message)
}
def sendFilterClearMessage()(implicit ec: ExecutionContext): Future[Unit] = {
def sendFilterClearMessage(): Future[Unit] = {
sendMsg(FilterClearMessage)
}
def sendFilterAddMessage(hash: HashDigest)(
implicit ec: ExecutionContext): Future[Unit] = {
def sendFilterAddMessage(hash: HashDigest): Future[Unit] = {
val message = FilterAddMessage.fromHash(hash)
logger.trace(s"Sending filteradd=$message to peer=${client.peer}")
sendMsg(message)
}
def sendFilterLoadMessage(bloom: BloomFilter)(
implicit ec: ExecutionContext): Future[Unit] = {
def sendFilterLoadMessage(bloom: BloomFilter): Future[Unit] = {
val message = FilterLoadMessage(bloom)
logger.trace(s"Sending filterload=$message to peer=${client.peer}")
sendMsg(message)
}
def sendTransactionMessage(transaction: Transaction)(
implicit ec: ExecutionContext): Future[Unit] = {
def sendTransactionMessage(transaction: Transaction): Future[Unit] = {
val message = TransactionMessage(transaction)
logger.trace(s"Sending txmessage=$message to peer=${client.peer}")
sendMsg(message)
}
/** Sends a request for filtered blocks matching the given headers */
def sendGetDataMessage(headers: BlockHeader*)(
implicit ec: ExecutionContext): Future[Unit] = {
def sendGetDataMessage(headers: BlockHeader*): Future[Unit] = {
val inventories =
headers.map(header =>
Inventory(TypeIdentifier.MsgFilteredBlock, header.hash))
@ -139,8 +132,7 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
def sendGetCompactFiltersMessage(
startHeight: Int,
stopHash: DoubleSha256Digest)(
implicit ec: ExecutionContext): Future[Unit] = {
stopHash: DoubleSha256Digest): Future[Unit] = {
val message =
GetCompactFiltersMessage(if (startHeight < 0) 0 else startHeight,
stopHash)
@ -150,8 +142,7 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
def sendGetCompactFilterHeadersMessage(
startHeight: Int,
stopHash: DoubleSha256Digest)(
implicit ec: ExecutionContext): Future[Unit] = {
stopHash: DoubleSha256Digest): Future[Unit] = {
val message =
GetCompactFilterHeadersMessage(if (startHeight < 0) 0 else startHeight,
stopHash)
@ -159,37 +150,21 @@ case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
sendMsg(message)
}
def sendGetCompactFilterCheckPointMessage(stopHash: DoubleSha256Digest)(
implicit ec: ExecutionContext): Future[Unit] = {
def sendGetCompactFilterCheckPointMessage(
stopHash: DoubleSha256Digest): Future[Unit] = {
val message = GetCompactFilterCheckPointMessage(stopHash)
logger.debug(s"Sending getcfcheckpt=$message to peer ${client.peer}")
sendMsg(message)
}
private[node] def sendMsg(msg: NetworkPayload)(
implicit ec: ExecutionContext): Future[Unit] = {
msg match {
case _: VersionMessage | VerAckMessage =>
//version or verack messages are the only messages that
//can be sent before we are fully initialized
//as they are needed to complete our handshake with our peer
logger.debug(s"Sending msg=${msg.commandName} to peer=${socket}")
val newtworkMsg = NetworkMessage(conf.network, msg)
client.actor ! newtworkMsg
FutureUtil.unit
case _: NetworkPayload =>
isInitialized().map { isInit =>
if (isInit) {
logger.debug(s"Sending msg=${msg.commandName} to peer=${socket}")
val newtworkMsg = NetworkMessage(conf.network, msg)
client.actor ! newtworkMsg
} else {
logger.warn(
s"Cannot send msg=${msg.commandName} to peer=${socket} because we aren't initialized!")
()
}
}
}
private[node] def sendMsg(msg: NetworkPayload): Future[Unit] = {
//version or verack messages are the only messages that
//can be sent before we are fully initialized
//as they are needed to complete our handshake with our peer
logger.debug(s"Sending msg=${msg.commandName} to peer=${socket}")
val newtworkMsg = NetworkMessage(conf.network, msg)
client.actor ! newtworkMsg
FutureUtil.unit
}
}