mirror of
https://github.com/bitcoin-s/bitcoin-s.git
synced 2025-03-13 19:37:30 +01:00
Node cleanup (#591)
* Cleanup In this commit we cleanup a few Scaladocs and change some trait/impl pairs into just a case class. * Node cleanup In this commit we 1) Rename Client to P2PClient. Client is a very generic name. 2) Clean up some Scaladocs 3) Remove some unecessary objects/traits/classes and replace them with case classes 4) Add trace logging of bytes received in P2PClient
This commit is contained in:
parent
31ddb89fa3
commit
ae134f9742
17 changed files with 132 additions and 235 deletions
|
@ -10,68 +10,35 @@ import scodec.bits.ByteVector
|
|||
/**
|
||||
* Represents a message header on the peer-to-peer network
|
||||
* @see https://bitcoin.org/en/developer-reference#message-headers
|
||||
*
|
||||
* @param network Each network has magic bytes indicating the originating network;
|
||||
used to seek to next message when stream state is unknown.
|
||||
* @param commandName ASCII string which identifies what message type is contained in the payload.
|
||||
Followed by nulls (0x00) to pad out byte count; for example: version\0\0\0\0\0.
|
||||
* @param payloadSize Number of bytes in payload. The current maximum number of bytes (MAX_SIZE) allowed in the payload
|
||||
* by Bitcoin Core is 32 MiB—messages with a payload size larger than this will be dropped or rejected.
|
||||
* @param checksum Added in protocol version 209.
|
||||
* First 4 bytes of SHA256(SHA256(payload)) in internal byte order.
|
||||
* If payload is empty, as in verack and getaddr messages,
|
||||
* the checksum is always 0x5df6e0e2 (SHA256(SHA256(""))).
|
||||
*/
|
||||
sealed trait NetworkHeader extends NetworkElement {
|
||||
case class NetworkHeader(
|
||||
network: NetworkParameters,
|
||||
commandName: String,
|
||||
payloadSize: UInt32,
|
||||
checksum: ByteVector
|
||||
) extends NetworkElement {
|
||||
require(bytes.length == 24, "NetworkHeaders must be 24 bytes")
|
||||
|
||||
override def bytes: ByteVector = RawNetworkHeaderSerializer.write(this)
|
||||
|
||||
/**
|
||||
* Each network has magic bytes indicating the originating network;
|
||||
* used to seek to next message when stream state is unknown.
|
||||
*/
|
||||
def network: NetworkParameters
|
||||
|
||||
/**
|
||||
* ASCII string which identifies what message type is contained in the payload.
|
||||
* Followed by nulls (0x00) to pad out byte count; for example: version\0\0\0\0\0.
|
||||
*/
|
||||
def commandName: String
|
||||
|
||||
/**
|
||||
* Number of bytes in payload. The current maximum number of bytes (MAX_SIZE) allowed in the payload
|
||||
* by Bitcoin Core is 32 MiB—messages with a payload size larger than this will be dropped or rejected.
|
||||
*/
|
||||
def payloadSize: UInt32
|
||||
|
||||
/**
|
||||
* Added in protocol version 209.
|
||||
* First 4 bytes of SHA256(SHA256(payload)) in internal byte order.
|
||||
* If payload is empty, as in verack and getaddr messages,
|
||||
* the checksum is always 0x5df6e0e2 (SHA256(SHA256(""))).
|
||||
*/
|
||||
def checksum: ByteVector
|
||||
|
||||
}
|
||||
|
||||
object NetworkHeader extends Factory[NetworkHeader] {
|
||||
|
||||
private case class NetworkHeaderImpl(
|
||||
network: NetworkParameters,
|
||||
commandName: String,
|
||||
payloadSize: UInt32,
|
||||
checksum: ByteVector)
|
||||
extends NetworkHeader {
|
||||
require(bytes.length == 24, "NetworkHeaders must be 24 bytes")
|
||||
}
|
||||
|
||||
override def fromBytes(bytes: ByteVector): NetworkHeader =
|
||||
RawNetworkHeaderSerializer.read(bytes)
|
||||
|
||||
/**
|
||||
* Creates a [[NetworkHeader]] from all of its individual components
|
||||
* @param network the [[NetworkParameters]] object indicating what network this header is sent on
|
||||
* @param commandName the name of the command being sent in the header
|
||||
* @param payloadSize the size of the payload being sent by this header
|
||||
* @param checksum the checksum of the payload to ensure that the entire payload was sent
|
||||
*/
|
||||
def apply(
|
||||
network: NetworkParameters,
|
||||
commandName: String,
|
||||
payloadSize: UInt32,
|
||||
checksum: ByteVector): NetworkHeader = {
|
||||
NetworkHeaderImpl(network, commandName, payloadSize, checksum)
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a [[NetworkHeader]] from it's [[NetworkParameters]] and [[NetworkPayload]]
|
||||
* @param network the [[NetworkParameters]] object that indicates what network the payload needs to be sent on
|
||||
|
|
|
@ -7,13 +7,14 @@ import org.bitcoins.core.util.Factory
|
|||
import scodec.bits.ByteVector
|
||||
|
||||
/**
|
||||
* Created by chris on 6/10/16.
|
||||
* Represents an entire p2p network message in bitcoins
|
||||
* Represents a P2P network message
|
||||
*/
|
||||
sealed abstract class NetworkMessage extends NetworkElement {
|
||||
def header: NetworkHeader
|
||||
def payload: NetworkPayload
|
||||
override def bytes: ByteVector = RawNetworkMessageSerializer.write(this)
|
||||
|
||||
override def toString(): String = s"NetworkMessage($header, $payload)"
|
||||
}
|
||||
|
||||
object NetworkMessage extends Factory[NetworkMessage] {
|
||||
|
|
|
@ -274,23 +274,18 @@ object GetHeadersMessage extends Factory[GetHeadersMessage] {
|
|||
* The headers message sends one or more block headers to a node
|
||||
* which previously requested certain headers with a getheaders message.
|
||||
* @see [[https://bitcoin.org/en/developer-reference#headers]]
|
||||
*
|
||||
* @param count Number of block headers up to a maximum of 2,000.
|
||||
* Note: headers-first sync assumes the sending node
|
||||
* will send the maximum number of headers whenever possible.
|
||||
*
|
||||
* @param headers Block headers: each 80-byte block header is in the format described in the
|
||||
* block headers section with an additional 0x00 suffixed.
|
||||
* This 0x00 is called the transaction count, but because the headers message
|
||||
* doesn’t include any transactions, the transaction count is always zero.
|
||||
*/
|
||||
trait HeadersMessage extends DataPayload {
|
||||
|
||||
/**
|
||||
* Number of block headers up to a maximum of 2,000.
|
||||
* Note: headers-first sync assumes the sending node
|
||||
* will send the maximum number of headers whenever possible.
|
||||
*/
|
||||
def count: CompactSizeUInt
|
||||
|
||||
/**
|
||||
* Block headers: each 80-byte block header is in the format described in the
|
||||
* block headers section with an additional 0x00 suffixed.
|
||||
* This 0x00 is called the transaction count, but because the headers message
|
||||
* doesn’t include any transactions, the transaction count is always zero.
|
||||
*/
|
||||
def headers: Vector[BlockHeader]
|
||||
case class HeadersMessage(count: CompactSizeUInt, headers: Vector[BlockHeader])
|
||||
extends DataPayload {
|
||||
|
||||
override def commandName = NetworkPayload.headersCommandName
|
||||
|
||||
|
@ -298,22 +293,13 @@ trait HeadersMessage extends DataPayload {
|
|||
}
|
||||
|
||||
object HeadersMessage extends Factory[HeadersMessage] {
|
||||
private case class HeadersMessageImpl(
|
||||
count: CompactSizeUInt,
|
||||
headers: Vector[BlockHeader])
|
||||
extends HeadersMessage
|
||||
|
||||
def fromBytes(bytes: ByteVector): HeadersMessage =
|
||||
RawHeadersMessageSerializer.read(bytes)
|
||||
|
||||
def apply(
|
||||
count: CompactSizeUInt,
|
||||
headers: Vector[BlockHeader]): HeadersMessage =
|
||||
HeadersMessageImpl(count, headers)
|
||||
|
||||
def apply(headers: Vector[BlockHeader]): HeadersMessage = {
|
||||
val count = CompactSizeUInt(UInt64(headers.length))
|
||||
HeadersMessageImpl(count, headers)
|
||||
HeadersMessage(count, headers)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -397,11 +383,10 @@ case object MemPoolMessage extends DataPayload {
|
|||
* they will be sent separately as tx messages.
|
||||
*
|
||||
* @see [[https://bitcoin.org/en/developer-reference#merkleblock]]
|
||||
*
|
||||
* @param merkleBlock The actual [[MerkleBlock]] that this message represents
|
||||
*/
|
||||
trait MerkleBlockMessage extends DataPayload {
|
||||
|
||||
/** The actual [[MerkleBlock]] that this message represents */
|
||||
def merkleBlock: MerkleBlock
|
||||
case class MerkleBlockMessage(merkleBlock: MerkleBlock) extends DataPayload {
|
||||
|
||||
override def commandName = NetworkPayload.merkleBlockCommandName
|
||||
|
||||
|
@ -414,15 +399,9 @@ trait MerkleBlockMessage extends DataPayload {
|
|||
*/
|
||||
object MerkleBlockMessage extends Factory[MerkleBlockMessage] {
|
||||
|
||||
private case class MerkleBlockMessageImpl(merkleBlock: MerkleBlock)
|
||||
extends MerkleBlockMessage
|
||||
|
||||
def fromBytes(bytes: ByteVector): MerkleBlockMessage =
|
||||
RawMerkleBlockMessageSerializer.read(bytes)
|
||||
|
||||
def apply(merkleBlock: MerkleBlock): MerkleBlockMessage = {
|
||||
MerkleBlockMessageImpl(merkleBlock)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -6,11 +6,9 @@ import org.bitcoins.core.util.{BitcoinSUtil, Factory}
|
|||
import scodec.bits.ByteVector
|
||||
|
||||
/**
|
||||
* Created by chris on 7/14/15.
|
||||
*/
|
||||
/**
|
||||
* Compact sized unsigned integer as described in:
|
||||
* https://bitcoin.org/en/developer-reference#compactsize-unsigned-integers
|
||||
* Compact sized unsigned integer, a Bitcoin-native data structure
|
||||
*
|
||||
* @see https://bitcoin.org/en/developer-reference#compactsize-unsigned-integers
|
||||
*/
|
||||
sealed abstract class CompactSizeUInt extends NetworkElement {
|
||||
|
||||
|
@ -34,6 +32,8 @@ sealed abstract class CompactSizeUInt extends NetworkElement {
|
|||
"Cannot convert CompactSizeUInt toInt, got: " + this)
|
||||
l.toInt
|
||||
}
|
||||
|
||||
override def toString(): String = s"CompactSizeUInt(${num.toLong})"
|
||||
}
|
||||
|
||||
object CompactSizeUInt extends Factory[CompactSizeUInt] {
|
||||
|
|
|
@ -57,7 +57,7 @@
|
|||
<logger name="org.bitcoins.node.networking.peer.DataMessageHandler" level="INFO"/>
|
||||
|
||||
<!-- inspect TCP details -->
|
||||
<logger name="org.bitcoins.node.networking.Client" level="INFO"/>
|
||||
<logger name="org.bitcoins.node.networking.P2PClientActor" level="INFO"/>
|
||||
|
||||
<!-- ╔════════════════════╗ -->
|
||||
<!-- ║ Chain module ║ -->
|
||||
|
|
|
@ -77,7 +77,7 @@ class ClientTest
|
|||
val peerMessageReceiver =
|
||||
PeerMessageReceiver(state = Preconnection)
|
||||
val client =
|
||||
TestActorRef(Client.props(peer, peerMessageReceiver), probe.ref)
|
||||
TestActorRef(P2PClient.props(peer, peerMessageReceiver), probe.ref)
|
||||
|
||||
client ! Tcp.Connect(remote)
|
||||
|
|
@ -3,11 +3,10 @@ package org.bitcoins.node
|
|||
import akka.actor.ActorSystem
|
||||
import org.bitcoins.chain.api.ChainApi
|
||||
import org.bitcoins.chain.config.ChainAppConfig
|
||||
import org.bitcoins.core.crypto.DoubleSha256DigestBE
|
||||
import org.bitcoins.core.util.BitcoinSLogger
|
||||
import org.bitcoins.node.config.NodeAppConfig
|
||||
import org.bitcoins.node.models.Peer
|
||||
import org.bitcoins.node.networking.Client
|
||||
import org.bitcoins.node.networking.P2PClient
|
||||
import org.bitcoins.node.networking.peer.{
|
||||
PeerMessageReceiver,
|
||||
PeerMessageSender
|
||||
|
@ -34,11 +33,11 @@ case class SpvNode(
|
|||
private val peerMsgRecv =
|
||||
PeerMessageReceiver.newReceiver(callbacks)
|
||||
|
||||
private val client: Client =
|
||||
Client(context = system, peer = peer, peerMessageReceiver = peerMsgRecv)
|
||||
private val client: P2PClient =
|
||||
P2PClient(context = system, peer = peer, peerMessageReceiver = peerMsgRecv)
|
||||
|
||||
private val peerMsgSender: PeerMessageSender = {
|
||||
PeerMessageSender(client, nodeAppConfig.network)
|
||||
PeerMessageSender(client)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -6,7 +6,6 @@ import org.bitcoins.core.crypto.DoubleSha256Digest
|
|||
import org.bitcoins.core.p2p.NetworkMessage
|
||||
import org.bitcoins.core.protocol.blockchain.BlockHeader
|
||||
import org.bitcoins.core.util.BitcoinSLogger
|
||||
import org.bitcoins.node.constant.Constants
|
||||
import org.bitcoins.core.p2p._
|
||||
|
||||
/**
|
||||
|
|
|
@ -13,43 +13,44 @@ import org.bitcoins.node.networking.peer.PeerMessageReceiver.NetworkMessageRecei
|
|||
import org.bitcoins.node.util.BitcoinSpvNodeUtil
|
||||
import scodec.bits.ByteVector
|
||||
import org.bitcoins.node.config.NodeAppConfig
|
||||
import akka.util.CompactByteString
|
||||
|
||||
/**
|
||||
* Created by chris on 6/6/16.
|
||||
* This actor is responsible for creating a connection,
|
||||
* relaying messages and closing a connection to our peer on
|
||||
* the p2p network. This is the actor that directly interacts
|
||||
* the P2P network. This is the actor that directly interacts
|
||||
* with the p2p network. It's responsibly is to deal with low
|
||||
* level [[Tcp.Message]].
|
||||
* level .TCP messages.
|
||||
*
|
||||
* If the [[Client]] receives a [[NetworkMessage]], from a [[org.bitcoins.node.networking.peer.PeerMessageSender]]
|
||||
* it serializes the message to it to a [[akka.util.ByteString]] and then sends it to the [[manager]]
|
||||
* which streams the data to our peer on the bitcoin network.
|
||||
* If the client receives a [[NetworkMessage]], from a
|
||||
* [[org.bitcoins.node.networking.peer.PeerMessageSender PeerMessageSender]]
|
||||
* it serializes the message to it to a [[akka.util.ByteString]] and then
|
||||
* sends it to the internal `manager` which streams the data to our peer
|
||||
* on the Bitcoin network.
|
||||
*
|
||||
* If the [[Client]] receives a [[Tcp.Received]] message, it means we have received
|
||||
* a message from our peer on the bitcoin p2p network. This means we try to parse
|
||||
* the [[ByteString]] into a [[NetworkMessage]]. If we successfully parse the message
|
||||
* we relay that message to the [[org.bitcoins.node.networking.peer.PeerMessageSender]]
|
||||
* If the client receives a [[Tcp.Received]] message, it means we have received
|
||||
* a message from our peer on the Bitcoin P2P network. This means we try to parse
|
||||
* the bytes into a [[org.bitcoins.core.p2p.NetworkMessage NetworkMessage]].
|
||||
* If we successfully parse the message we relay that message to the
|
||||
* [[org.bitcoins.node.networking.peer.PeerMessageSender PeerMessageSender]]
|
||||
* that created the Client Actor.
|
||||
*
|
||||
* In this class you will see a 'unalignedBytes' value passed around in a lot of methods
|
||||
* This is because we cannot assume that a Bitcoin [[NetworkMessage]] aligns with a tcp packet.
|
||||
* For instance, a large [[org.bitcoins.node.messages.BlockMessage]] (up to 4MB in size)
|
||||
* CANNOT fit in a single tcp packet. This means we must cache
|
||||
* In this class you will see a 'unalignedBytes' value passed around in a lot of methods.
|
||||
* This is because we cannot assume that a Bitcoin P2P message aligns with a TCP packet.
|
||||
* For instance, a large block message (up to 4MB in size)
|
||||
* CANNOT fit in a single TCP packet. This means we must cache
|
||||
* the bytes and wait for the rest of them to be sent.
|
||||
*
|
||||
* @param peerMsgHandlerReceiver The place we send messages that we successfully parsed
|
||||
* from our peer on the P2P network. This is mostly likely
|
||||
* a [[org.bitcoins.node.networking.peer.PeerMessageSender]]
|
||||
*/
|
||||
sealed abstract class ClientActor extends Actor with BitcoinSLogger {
|
||||
|
||||
val config: NodeAppConfig
|
||||
|
||||
def peer: Peer
|
||||
|
||||
/** The place we send messages that we successfully parsed from our
|
||||
* peer on the p2p network. This is mostly likely a [[org.bitcoins.node.networking.peer.PeerMessageSender]]
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
def peerMsgHandlerReceiver: PeerMessageReceiver
|
||||
case class P2PClientActor(
|
||||
peer: Peer,
|
||||
peerMsgHandlerReceiver: PeerMessageReceiver
|
||||
)(implicit config: NodeAppConfig)
|
||||
extends Actor
|
||||
with BitcoinSLogger {
|
||||
|
||||
/**
|
||||
* The manager is an actor that handles the underlying low level I/O resources (selectors, channels)
|
||||
|
@ -59,12 +60,12 @@ sealed abstract class ClientActor extends Actor with BitcoinSLogger {
|
|||
|
||||
/**
|
||||
* The parameters for the network we are connected to
|
||||
* i.e. [[org.bitcoins.core.config.MainNet]] or [[org.bitcoins.core.config.TestNet3]]
|
||||
* @return
|
||||
*/
|
||||
def network: NetworkParameters = config.network
|
||||
val network: NetworkParameters = config.network
|
||||
|
||||
/**
|
||||
* TODO: this comment seems wrong?
|
||||
*
|
||||
* This actor signifies the node we are connected to on the p2p network
|
||||
* This is the context we are in after we received a [[Tcp.Connected]] message
|
||||
*/
|
||||
|
@ -89,10 +90,10 @@ sealed abstract class ClientActor extends Actor with BitcoinSLogger {
|
|||
//after receiving Tcp.Connected we switch to the
|
||||
//'awaitNetworkRequest' context. This is the main
|
||||
//execution loop for the Client actor
|
||||
val _ = handleCommand(cmd, None)
|
||||
val _ = handleCommand(cmd, peer = None)
|
||||
|
||||
case connected: Tcp.Connected =>
|
||||
val _ = handleEvent(connected, ByteVector.empty)
|
||||
val _ = handleEvent(connected, unalignedBytes = ByteVector.empty)
|
||||
|
||||
case msg: NetworkMessage =>
|
||||
self.forward(msg.payload)
|
||||
|
@ -103,8 +104,8 @@ sealed abstract class ClientActor extends Actor with BitcoinSLogger {
|
|||
|
||||
/**
|
||||
* Handles boiler plate [[Tcp.Message]] types.
|
||||
* @param message
|
||||
* @return the unaligned bytes if we haven't received a full bitcoin p2p message yet
|
||||
*
|
||||
* @return the unaligned bytes if we haven't received a full Bitcoin P2P message yet
|
||||
*/
|
||||
private def handleTcpMessage(
|
||||
message: Tcp.Message,
|
||||
|
@ -122,7 +123,6 @@ sealed abstract class ClientActor extends Actor with BitcoinSLogger {
|
|||
|
||||
/**
|
||||
* This function is responsible for handling a [[Tcp.Event]] algebraic data type
|
||||
* @param event
|
||||
*/
|
||||
private def handleEvent(
|
||||
event: Tcp.Event,
|
||||
|
@ -147,7 +147,7 @@ sealed abstract class ClientActor extends Actor with BitcoinSLogger {
|
|||
//our bitcoin peer will send all messages to this actor.
|
||||
sender ! Tcp.Register(self)
|
||||
|
||||
val _ = peerMsgHandlerReceiver.connect(Client(self, peer))
|
||||
val _ = peerMsgHandlerReceiver.connect(P2PClient(self, peer))
|
||||
|
||||
context.become(awaitNetworkRequest(sender, ByteVector.empty))
|
||||
|
||||
|
@ -164,22 +164,36 @@ sealed abstract class ClientActor extends Actor with BitcoinSLogger {
|
|||
context.stop(self)
|
||||
unalignedBytes
|
||||
case Tcp.Received(byteString: ByteString) =>
|
||||
//logger.debug("Received byte string in peerMessageHandler " + BitcoinSUtil.encodeHex(byteString.toArray))
|
||||
//logger.debug("Unaligned bytes: " + BitcoinSUtil.encodeHex(unalignedBytes))
|
||||
val byteVec = ByteVector(byteString.toArray)
|
||||
logger.debug(s"Received ${byteVec.length} TCP bytes")
|
||||
logger.trace(s"Received TCP bytes: ${byteVec.toHex}")
|
||||
logger.trace({
|
||||
val post =
|
||||
if (unalignedBytes.isEmpty) "None"
|
||||
else unalignedBytes.toHex
|
||||
s"Unaligned bytes: $post"
|
||||
})
|
||||
|
||||
//we need to aggregate our previous 'unalignedBytes' with the new message
|
||||
//we just received from our peer to hopefully be able to parse full messages
|
||||
val bytes: ByteVector = unalignedBytes ++ ByteVector(byteString.toArray)
|
||||
//logger.debug("Bytes for message parsing: " + BitcoinSUtil.encodeHex(bytes))
|
||||
val bytes: ByteVector = unalignedBytes ++ byteVec
|
||||
logger.trace(s"Bytes for message parsing: ${bytes.toHex}")
|
||||
val (messages, newUnalignedBytes) =
|
||||
BitcoinSpvNodeUtil.parseIndividualMessages(bytes)
|
||||
|
||||
logger.debug({
|
||||
val length = messages.length
|
||||
val suffix = if (length == 0) "" else s": ${messages.mkString(", ")}"
|
||||
|
||||
s"Parsed $length messages from bytes$suffix"
|
||||
})
|
||||
|
||||
//for the messages we successfully parsed above
|
||||
//send them to 'context.parent' -- this is the
|
||||
//PeerMessageHandler that is responsible for
|
||||
//creating this Client Actor
|
||||
messages.foreach { m =>
|
||||
val msg = NetworkMessageReceived(m, Client(self, peer))
|
||||
val msg = NetworkMessageReceived(m, P2PClient(self, peer))
|
||||
peerMsgHandlerReceiver.handleNetworkMessageReceived(msg)
|
||||
|
||||
}
|
||||
|
@ -190,7 +204,6 @@ sealed abstract class ClientActor extends Actor with BitcoinSLogger {
|
|||
|
||||
/**
|
||||
* This function is responsible for handling a [[Tcp.Command]] algebraic data type
|
||||
* @param command
|
||||
*/
|
||||
private def handleCommand(
|
||||
command: Tcp.Command,
|
||||
|
@ -207,42 +220,36 @@ sealed abstract class ClientActor extends Actor with BitcoinSLogger {
|
|||
|
||||
/**
|
||||
* Sends a network request to our peer on the network
|
||||
* @param message
|
||||
* @return
|
||||
*/
|
||||
private def sendNetworkMessage(
|
||||
message: NetworkMessage,
|
||||
peer: ActorRef): Unit = {
|
||||
val byteMessage = BitcoinSpvNodeUtil.buildByteString(message.bytes)
|
||||
|
||||
val byteMessage = CompactByteString(message.bytes.toArray)
|
||||
peer ! Tcp.Write(byteMessage)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
case class Client(actor: ActorRef, peer: Peer)
|
||||
case class P2PClient(actor: ActorRef, peer: Peer)
|
||||
|
||||
object Client {
|
||||
private case class ClientActorImpl(
|
||||
peer: Peer,
|
||||
peerMsgHandlerReceiver: PeerMessageReceiver)(
|
||||
implicit override val config: NodeAppConfig
|
||||
) extends ClientActor
|
||||
object P2PClient {
|
||||
|
||||
def props(peer: Peer, peerMsgHandlerReceiver: PeerMessageReceiver)(
|
||||
implicit config: NodeAppConfig
|
||||
): Props =
|
||||
Props(classOf[ClientActorImpl], peer, peerMsgHandlerReceiver, config)
|
||||
Props(classOf[P2PClientActor], peer, peerMsgHandlerReceiver, config)
|
||||
|
||||
def apply(
|
||||
context: ActorRefFactory,
|
||||
peer: Peer,
|
||||
peerMessageReceiver: PeerMessageReceiver)(
|
||||
implicit config: NodeAppConfig): Client = {
|
||||
implicit config: NodeAppConfig): P2PClient = {
|
||||
val actorRef = context.actorOf(
|
||||
props(peer = peer, peerMsgHandlerReceiver = peerMessageReceiver),
|
||||
BitcoinSpvNodeUtil.createActorName(this.getClass))
|
||||
|
||||
Client(actorRef, peer)
|
||||
P2PClient(actorRef, peer)
|
||||
}
|
||||
|
||||
}
|
|
@ -10,7 +10,6 @@ import org.bitcoins.core.p2p.NetworkMessage
|
|||
import org.bitcoins.core.protocol.Address
|
||||
import org.bitcoins.core.protocol.blockchain.MerkleBlock
|
||||
import org.bitcoins.core.util.BitcoinSLogger
|
||||
import org.bitcoins.node.constant.Constants
|
||||
import org.bitcoins.core.p2p._
|
||||
import org.bitcoins.node.util.BitcoinSpvNodeUtil
|
||||
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
package org.bitcoins.node.networking.peer
|
||||
|
||||
import org.bitcoins.core.util.BitcoinSLogger
|
||||
import org.bitcoins.core.p2p._
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
||||
class ControlMessageHandler()(implicit ec: ExecutionContext)
|
||||
extends BitcoinSLogger {
|
||||
|
||||
def handleControlPayload(
|
||||
controlMsg: ControlPayload,
|
||||
peerMsgSender: PeerMessageSender): Unit = {
|
||||
controlMsg match {
|
||||
case _: PingMessage =>
|
||||
()
|
||||
|
||||
case SendHeadersMessage =>
|
||||
//not implemented as of now
|
||||
()
|
||||
case _: AddrMessage =>
|
||||
()
|
||||
case _ @(_: FilterAddMessage | _: FilterLoadMessage |
|
||||
FilterClearMessage) =>
|
||||
()
|
||||
case _ @(GetAddrMessage | VerAckMessage | _: VersionMessage |
|
||||
_: PongMessage) =>
|
||||
()
|
||||
case _: RejectMessage =>
|
||||
()
|
||||
|
||||
case _: FeeFilterMessage =>
|
||||
()
|
||||
}
|
||||
}
|
||||
}
|
|
@ -7,7 +7,7 @@ import org.bitcoins.core.util.BitcoinSLogger
|
|||
import org.bitcoins.node.config.NodeAppConfig
|
||||
import org.bitcoins.core.p2p._
|
||||
import org.bitcoins.node.models.Peer
|
||||
import org.bitcoins.node.networking.Client
|
||||
import org.bitcoins.node.networking.P2PClient
|
||||
import org.bitcoins.node.networking.peer.PeerMessageReceiverState.{
|
||||
Disconnected,
|
||||
Initializing,
|
||||
|
@ -49,7 +49,7 @@ class PeerMessageReceiver(
|
|||
* but have NOT started the handshake
|
||||
* This method will initiate the handshake
|
||||
*/
|
||||
protected[networking] def connect(client: Client): Try[Unit] = {
|
||||
protected[networking] def connect(client: P2PClient): Try[Unit] = {
|
||||
|
||||
internalState match {
|
||||
case bad @ (_: Initializing | _: Normal | _: Disconnected) =>
|
||||
|
@ -65,7 +65,7 @@ class PeerMessageReceiver(
|
|||
|
||||
val _ = toState(newState)
|
||||
|
||||
val peerMsgSender = PeerMessageSender(client, chainAppConfig.network)
|
||||
val peerMsgSender = PeerMessageSender(client)
|
||||
|
||||
peerMsgSender.sendVersionMessage()
|
||||
|
||||
|
@ -114,7 +114,7 @@ class PeerMessageReceiver(
|
|||
val client = networkMsgRecv.client
|
||||
|
||||
//create a way to send a response if we need too
|
||||
val peerMsgSender = PeerMessageSender(client, chainAppConfig.network)
|
||||
val peerMsgSender = PeerMessageSender(client)
|
||||
|
||||
logger.debug(
|
||||
s"Received message=${networkMsgRecv.msg.header.commandName} from peer=${client.peer} ")
|
||||
|
@ -142,6 +142,7 @@ class PeerMessageReceiver(
|
|||
//else it means we are receiving this data payload from a peer,
|
||||
//we need to handle it
|
||||
dataMsgHandler.handleDataPayload(payload, sender)
|
||||
()
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -224,10 +225,10 @@ object PeerMessageReceiver {
|
|||
/** Who we need to use to send a reply to our peer
|
||||
* if a response is needed for this message
|
||||
*/
|
||||
def client: Client
|
||||
def client: P2PClient
|
||||
}
|
||||
|
||||
case class NetworkMessageReceived(msg: NetworkMessage, client: Client)
|
||||
case class NetworkMessageReceived(msg: NetworkMessage, client: P2PClient)
|
||||
extends PeerMessageReceiverMsg
|
||||
|
||||
def apply(
|
||||
|
|
|
@ -2,7 +2,7 @@ package org.bitcoins.node.networking.peer
|
|||
|
||||
import org.bitcoins.core.util.BitcoinSLogger
|
||||
import org.bitcoins.core.p2p.{VerAckMessage, VersionMessage}
|
||||
import org.bitcoins.node.networking.Client
|
||||
import org.bitcoins.node.networking.P2PClient
|
||||
|
||||
import scala.concurrent.{Future, Promise}
|
||||
|
||||
|
@ -11,13 +11,13 @@ sealed abstract class PeerMessageReceiverState extends BitcoinSLogger {
|
|||
/** This promise gets completed when we receive a
|
||||
* [[akka.io.Tcp.Connected]] message from [[Client]]
|
||||
*/
|
||||
def clientConnectP: Promise[Client]
|
||||
def clientConnectP: Promise[P2PClient]
|
||||
|
||||
/** The [[org.bitcoins.node.networking.Client]] we are
|
||||
* connected to. This isn't initiated until the client
|
||||
* has called [[PeerMessageReceiver.connect()]]
|
||||
*/
|
||||
private val clientConnectF: Future[Client] = clientConnectP.future
|
||||
private val clientConnectF: Future[P2PClient] = clientConnectP.future
|
||||
|
||||
/** This promise is completed in the [[PeerMessageReceiver.disconnect()]]
|
||||
* when a [[Client]] initiates a disconnections from
|
||||
|
@ -81,7 +81,7 @@ object PeerMessageReceiverState {
|
|||
* where the peer is not connected to the p2p network
|
||||
*/
|
||||
final case object Preconnection extends PeerMessageReceiverState {
|
||||
def clientConnectP: Promise[Client] = Promise[Client]()
|
||||
def clientConnectP: Promise[P2PClient] = Promise[P2PClient]()
|
||||
|
||||
//should this be completed since the client is disconnected???
|
||||
def clientDisconnectP: Promise[Unit] = Promise[Unit]()
|
||||
|
@ -89,7 +89,7 @@ object PeerMessageReceiverState {
|
|||
def verackMsgP: Promise[VerAckMessage.type] = Promise[VerAckMessage.type]()
|
||||
|
||||
/** Converts [[org.bitcoins.node.networking.peer.PeerMessageReceiverState.Preconnection]] to [[Initializing]] */
|
||||
def toInitializing(client: Client): Initializing = {
|
||||
def toInitializing(client: P2PClient): Initializing = {
|
||||
val p = clientConnectP
|
||||
p.success(client)
|
||||
Initializing(
|
||||
|
@ -107,7 +107,7 @@ object PeerMessageReceiverState {
|
|||
* from our peer on the p2p network
|
||||
*/
|
||||
case class Initializing(
|
||||
clientConnectP: Promise[Client],
|
||||
clientConnectP: Promise[P2PClient],
|
||||
clientDisconnectP: Promise[Unit],
|
||||
versionMsgP: Promise[VersionMessage],
|
||||
verackMsgP: Promise[VerAckMessage.type]
|
||||
|
@ -149,7 +149,7 @@ object PeerMessageReceiverState {
|
|||
* the peer on the network
|
||||
*/
|
||||
case class Normal(
|
||||
clientConnectP: Promise[Client],
|
||||
clientConnectP: Promise[P2PClient],
|
||||
clientDisconnectP: Promise[Unit],
|
||||
versionMsgP: Promise[VersionMessage],
|
||||
verackMsgP: Promise[VerAckMessage.type]
|
||||
|
@ -165,7 +165,7 @@ object PeerMessageReceiverState {
|
|||
}
|
||||
|
||||
case class Disconnected(
|
||||
clientConnectP: Promise[Client],
|
||||
clientConnectP: Promise[P2PClient],
|
||||
clientDisconnectP: Promise[Unit],
|
||||
versionMsgP: Promise[VersionMessage],
|
||||
verackMsgP: Promise[VerAckMessage.type])
|
||||
|
@ -183,5 +183,3 @@ object PeerMessageReceiverState {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
object Initializing {}
|
||||
|
|
|
@ -2,14 +2,14 @@ package org.bitcoins.node.networking.peer
|
|||
|
||||
import akka.actor.ActorRef
|
||||
import akka.io.Tcp
|
||||
import org.bitcoins.core.config.NetworkParameters
|
||||
import org.bitcoins.core.crypto.DoubleSha256Digest
|
||||
import org.bitcoins.core.p2p.NetworkMessage
|
||||
import org.bitcoins.core.util.BitcoinSLogger
|
||||
import org.bitcoins.core.p2p._
|
||||
import org.bitcoins.node.networking.Client
|
||||
import org.bitcoins.node.networking.P2PClient
|
||||
import org.bitcoins.node.config.NodeAppConfig
|
||||
|
||||
class PeerMessageSender(client: Client)(implicit np: NetworkParameters)
|
||||
case class PeerMessageSender(client: P2PClient)(implicit conf: NodeAppConfig)
|
||||
extends BitcoinSLogger {
|
||||
private val socket = client.peer.socket
|
||||
|
||||
|
@ -27,7 +27,7 @@ class PeerMessageSender(client: Client)(implicit np: NetworkParameters)
|
|||
|
||||
/** Sends a [[org.bitcoins.node.messages.VersionMessage VersionMessage]] to our peer */
|
||||
def sendVersionMessage(): Unit = {
|
||||
val versionMsg = VersionMessage(client.peer.socket, np)
|
||||
val versionMsg = VersionMessage(client.peer.socket, conf.network)
|
||||
logger.trace(s"Sending versionMsg=$versionMsg to peer=${client.peer}")
|
||||
sendMsg(versionMsg)
|
||||
}
|
||||
|
@ -57,17 +57,13 @@ class PeerMessageSender(client: Client)(implicit np: NetworkParameters)
|
|||
|
||||
private[node] def sendMsg(msg: NetworkPayload): Unit = {
|
||||
logger.debug(s"Sending msg=${msg.commandName} to peer=${socket}")
|
||||
val newtworkMsg = NetworkMessage(np, msg)
|
||||
val newtworkMsg = NetworkMessage(conf.network, msg)
|
||||
client.actor ! newtworkMsg
|
||||
}
|
||||
}
|
||||
|
||||
object PeerMessageSender {
|
||||
|
||||
private case class PeerMessageSenderImpl(client: Client)(
|
||||
implicit np: NetworkParameters)
|
||||
extends PeerMessageSender(client)(np)
|
||||
|
||||
sealed abstract class PeerMessageHandlerMsg
|
||||
|
||||
/**
|
||||
|
@ -86,7 +82,4 @@ object PeerMessageSender {
|
|||
networkMsgs: Vector[(ActorRef, NetworkMessage)],
|
||||
peerHandler: ActorRef)
|
||||
|
||||
def apply(client: Client, np: NetworkParameters): PeerMessageSender = {
|
||||
PeerMessageSenderImpl(client)(np)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
package org.bitcoins.node.util
|
||||
|
||||
import akka.util.{ByteString, CompactByteString}
|
||||
import org.bitcoins.core.p2p.NetworkMessage
|
||||
import org.bitcoins.core.util.BitcoinSLogger
|
||||
import scodec.bits.ByteVector
|
||||
|
@ -53,15 +52,6 @@ trait BitcoinSpvNodeUtil extends BitcoinSLogger {
|
|||
(messages, remainingBytes)
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps our ByteVector into an akka [[ByteString]] object
|
||||
* @param bytes
|
||||
* @return
|
||||
*/
|
||||
def buildByteString(bytes: ByteVector): ByteString = {
|
||||
CompactByteString(bytes.toArray)
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a unique actor name for a actor
|
||||
* @param className
|
||||
|
|
|
@ -10,7 +10,7 @@ import org.bitcoins.node.config.NodeAppConfig
|
|||
import org.bitcoins.core.p2p.VersionMessage
|
||||
import org.bitcoins.core.p2p.GetHeadersMessage
|
||||
import org.bitcoins.node.models.Peer
|
||||
import org.bitcoins.node.networking.Client
|
||||
import org.bitcoins.node.networking.P2PClient
|
||||
import org.bitcoins.node.networking.peer.PeerMessageReceiver
|
||||
import org.bitcoins.rpc.client.common.BitcoindRpcClient
|
||||
import org.bitcoins.node.SpvNode
|
||||
|
@ -81,8 +81,8 @@ abstract class NodeTestUtil extends BitcoinSLogger {
|
|||
|
||||
def client(peer: Peer, peerMsgReceiver: PeerMessageReceiver)(
|
||||
implicit ref: ActorRefFactory,
|
||||
conf: NodeAppConfig): Client = {
|
||||
Client.apply(ref, peer, peerMsgReceiver)
|
||||
conf: NodeAppConfig): P2PClient = {
|
||||
P2PClient.apply(ref, peer, peerMsgReceiver)
|
||||
}
|
||||
|
||||
/** Helper method to get the [[java.net.InetSocketAddress]]
|
||||
|
|
|
@ -79,7 +79,7 @@ trait NodeUnitTest
|
|||
//that can handle the handshake
|
||||
val peerMsgSender: PeerMessageSender = {
|
||||
val client = NodeTestUtil.client(peer, peerMsgReceiver)
|
||||
PeerMessageSender(client, np)
|
||||
PeerMessageSender(client)
|
||||
}
|
||||
|
||||
PeerHandler(peerMsgReceiver, peerMsgSender)
|
||||
|
|
Loading…
Add table
Reference in a new issue