mirror of
https://github.com/bitcoin-s/bitcoin-s.git
synced 2024-11-19 09:52:09 +01:00
Fix P2PClient parsing unknown messages (#2315)
* Fix P2PClient parsing unknown messages * Use Vector, calculate correct checksum
This commit is contained in:
parent
8cdddfecde
commit
9291d6ec3a
@ -27,7 +27,8 @@ case class NetworkHeader(
|
||||
payloadSize: UInt32,
|
||||
checksum: ByteVector
|
||||
) extends NetworkElement {
|
||||
require(bytes.length == 24, "NetworkHeaders must be 24 bytes")
|
||||
require(bytes.length == NetworkHeader.bytesSize,
|
||||
s"NetworkHeaders must be ${NetworkHeader.bytesSize} bytes")
|
||||
|
||||
override def bytes: ByteVector = RawNetworkHeaderSerializer.write(this)
|
||||
|
||||
@ -35,6 +36,8 @@ case class NetworkHeader(
|
||||
|
||||
object NetworkHeader extends Factory[NetworkHeader] {
|
||||
|
||||
val bytesSize = 24
|
||||
|
||||
override def fromBytes(bytes: ByteVector): NetworkHeader =
|
||||
RawNetworkHeaderSerializer.read(bytes)
|
||||
|
||||
|
@ -4,10 +4,10 @@ import akka.io.Tcp
|
||||
import akka.testkit.{TestActorRef, TestProbe}
|
||||
import org.bitcoins.core.config.TestNet3
|
||||
import org.bitcoins.core.number.{Int32, UInt32, UInt64}
|
||||
import org.bitcoins.core.p2p.{HeadersMessage, NetworkMessage, VersionMessage}
|
||||
import org.bitcoins.core.p2p._
|
||||
import org.bitcoins.core.protocol.CompactSizeUInt
|
||||
import org.bitcoins.core.protocol.blockchain.BlockHeader
|
||||
import org.bitcoins.crypto.DoubleSha256Digest
|
||||
import org.bitcoins.crypto.{CryptoUtil, DoubleSha256Digest}
|
||||
import org.bitcoins.node.NodeCallbacks
|
||||
import org.bitcoins.node.models.Peer
|
||||
import org.bitcoins.node.networking.peer.PeerMessageReceiver
|
||||
@ -105,9 +105,25 @@ class P2PClientTest extends BitcoindRpcTest with CachedBitcoinSAppConfig {
|
||||
hex"fabfb5da6d65726b6c65626c6f636b0097000000b4b6e45d00000020387191f7d488b849b4080fdf105c71269fc841a2f0f2944fc5dc785c830c716e37f36373098aae06a668cc74e388caf50ecdcb5504ce936490b4b72940e08859548c305dffff7f20010000000200000002ecd1c722709bfc241f8b94fc64034dcba2c95409dc4cd1d7b864e1128a04e5b044133327b04ff8ac576e7748a4dae4111f0c765dacbfe0c5a9fddbeb8f60d5af0105fabfb5da747800000000000000000000cc0100004413332702000000065b7f0f3eec398047e921037815aa41709b6243a1897f1423194b7558399ae0300000000017160014008dc9d88d1797305f3fbd30d2b36d6bde984a09feffffffe9145055d671fd705a09f028033da614b619205b9926fe5ebe45e15ae8b3231e0100000017160014d74cfac04bb0e6838c35f1f4a0a60d13655be2fbfeffffff797f8ff9c10fa618b6254343a648be995410e82c03fd8accb0de2271a3fb1abd00000000171600143ee832c09db48eca28a64a358ed7a01dbe52d31bfeffffffc794dba971b9479dfcbc662a3aacd641553bdb2418b15c0221c5dfd4471a7a70000000001716001452c13ba0314f7718c234ed6adfea6422ce03a545feffffffb7c3bf1762b15f3b0e0eaa5beb46fe96a9e2829a7413fd900b9b7e0d192ab64800000000171600143ee832c09db48eca28a64a358ed7a01dbe52d31bfeffffffb6ced6cb8dfc2f7f5b37561938ead3bc5ca4036e2b45d9738cc086a10eed4e010100000017160014aebb17e245fe8c98a75f0b6717fcadca30e491e2feffffff02002a7515000000001976a9148374ff8beb55ea2945039881ca26071b5749fafe88ac485620000000000017a91405d36a2b0bdedf3fc58bed6f9e4026f8934a2716876b050000fabfb5da686561646572730000000000010000001406e05800"
|
||||
val (messages, leftover) = P2PClient.parseIndividualMessages(bytes)
|
||||
assert(messages.length == 3)
|
||||
assert(leftover.isEmpty)
|
||||
|
||||
val commandNames = messages.map(_.header.commandName)
|
||||
assert(commandNames == Vector("merkleblock", "tx", "headers"))
|
||||
|
||||
assert(leftover.isEmpty)
|
||||
}
|
||||
|
||||
it must "parse an unknown command" in {
|
||||
val header: NetworkHeader =
|
||||
NetworkHeader(TestNet3,
|
||||
"madeup",
|
||||
UInt32.zero,
|
||||
CryptoUtil.doubleSHA256(ByteVector.empty).bytes.take(4))
|
||||
|
||||
val (messages, leftover) = P2PClient.parseIndividualMessages(header.bytes)
|
||||
assert(messages.isEmpty)
|
||||
assert(leftover.isEmpty)
|
||||
}
|
||||
|
||||
behavior of "P2PClient"
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
|
@ -5,7 +5,7 @@ import akka.event.LoggingReceive
|
||||
import akka.io.{IO, Tcp}
|
||||
import akka.util.{ByteString, CompactByteString, Timeout}
|
||||
import org.bitcoins.core.config.NetworkParameters
|
||||
import org.bitcoins.core.p2p.{NetworkMessage, NetworkPayload}
|
||||
import org.bitcoins.core.p2p.{NetworkHeader, NetworkMessage, NetworkPayload}
|
||||
import org.bitcoins.core.util.FutureUtil
|
||||
import org.bitcoins.node.P2PLogger
|
||||
import org.bitcoins.node.config.NodeAppConfig
|
||||
@ -355,34 +355,58 @@ object P2PClient extends P2PLogger {
|
||||
* @return the parsed [[NetworkMessage]]'s and the unaligned bytes that did not parse to a message
|
||||
*/
|
||||
private[bitcoins] def parseIndividualMessages(
|
||||
bytes: ByteVector): (List[NetworkMessage], ByteVector) = {
|
||||
bytes: ByteVector): (Vector[NetworkMessage], ByteVector) = {
|
||||
@tailrec
|
||||
def loop(
|
||||
remainingBytes: ByteVector,
|
||||
accum: List[NetworkMessage]): (List[NetworkMessage], ByteVector) = {
|
||||
accum: Vector[NetworkMessage]): (Vector[NetworkMessage], ByteVector) = {
|
||||
if (remainingBytes.length <= 0) {
|
||||
(accum.reverse, remainingBytes)
|
||||
(accum, remainingBytes)
|
||||
} else {
|
||||
val messageTry = Try(NetworkMessage(remainingBytes))
|
||||
messageTry match {
|
||||
case Success(message) =>
|
||||
val headerTry = Try(
|
||||
NetworkHeader.fromBytes(remainingBytes.take(NetworkHeader.bytesSize)))
|
||||
headerTry match {
|
||||
case Success(header) =>
|
||||
val payloadBytes = remainingBytes
|
||||
.drop(NetworkHeader.bytesSize)
|
||||
.take(header.payloadSize.toInt)
|
||||
|
||||
val newRemainingBytes =
|
||||
remainingBytes.slice(message.bytes.length, remainingBytes.length)
|
||||
logger.trace(
|
||||
s"Parsed a message=${message.header.commandName} from bytes, continuing with remainingBytes=${newRemainingBytes.length}")
|
||||
loop(newRemainingBytes, message :: accum)
|
||||
remainingBytes.drop(NetworkHeader.bytesSize + payloadBytes.size)
|
||||
|
||||
// If it's a message type we know, try to parse it
|
||||
if (NetworkPayload.commandNames.contains(header.commandName)) {
|
||||
Try(NetworkMessage(header.bytes ++ payloadBytes)) match {
|
||||
case Success(message) =>
|
||||
logger.trace(
|
||||
s"Parsed a message=${message.header.commandName} from bytes, continuing with remainingBytes=${newRemainingBytes.length}")
|
||||
|
||||
loop(newRemainingBytes, accum :+ message)
|
||||
case Failure(_) =>
|
||||
// Can't parse message yet, we need to wait for more bytes
|
||||
(accum, remainingBytes)
|
||||
}
|
||||
} else if (payloadBytes.size == header.payloadSize.toInt) { // If we've received the entire unknown message
|
||||
logger.info(
|
||||
s"Received unknown network message ${header.commandName}")
|
||||
loop(newRemainingBytes, accum)
|
||||
} else {
|
||||
// If we can't parse the entire unknown message, continue on until we can
|
||||
// so we properly skip it
|
||||
(accum, remainingBytes)
|
||||
}
|
||||
case Failure(exc) =>
|
||||
logger.trace(
|
||||
s"Failed to parse network message, could be because TCP frame isn't aligned: $exc")
|
||||
s"Failed to parse network message $remainingBytes, could be because TCP frame isn't aligned: $exc")
|
||||
|
||||
//this case means that our TCP frame was not aligned with bitcoin protocol
|
||||
//return the unaligned bytes so we can apply them to the next tcp frame of bytes we receive
|
||||
//http://stackoverflow.com/a/37979529/967713
|
||||
(accum.reverse, remainingBytes)
|
||||
(accum, remainingBytes)
|
||||
}
|
||||
}
|
||||
}
|
||||
val (messages, remainingBytes) = loop(bytes, Nil)
|
||||
val (messages, remainingBytes) = loop(bytes, Vector.empty)
|
||||
(messages, remainingBytes)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user