diff --git a/build.sbt b/build.sbt index 06c9ab8..359b212 100644 --- a/build.sbt +++ b/build.sbt @@ -40,7 +40,9 @@ val loggingDependencies = Seq( libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % akkaVersion, "com.typesafe.akka" %% "akka-stream" % akkaVersion, + "com.typesafe.akka" %% "akka-remote" % akkaVersion, "com.iheart" %% "ficus" % "1.4.2", + "commons-net" % "commons-net" % "3.3" ) ++ apiDependencies ++ loggingDependencies ++ databaseDependencies fork := true diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 1a29e67..ff87471 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -1,9 +1,9 @@ explorer { parseSettings { - nodes = [""] + nodes = [] recoverBatchSize = 15 - infinitePing = true // if set to true, explorer will continue to ping node infinetely, otherwise it stops after #numberOfAttempts attempts - askNode = false // if set false, explorer won't ping the node if it stopped working + infinitePing = true # if set to true, explorer will continue to ping node infinetely, otherwise it stops after #numberOfAttempts attempts + askNode = false # if set false, explorer won't ping the node if it stopped working } blackListSettings { banTime = 60m @@ -16,10 +16,35 @@ explorer { maxPoolSize = 5 connectionTimeout = 60000 } - nodeSettings { - maxRollbackDepth = 100 + networkSettings { + syncPacketLength = 1000 + bindAddressHost = "0.0.0.0" + bindAddressPort = 0 + nodeName = "explorer" + appVersion = 0.9.3 + handshakeTimeout = 1m + peerForConnectionHost = "" + peerForConnectionPort = 9001 + peerForConnectionApiPort = 9051 + declaredAddressHost = "" + declaredAddressPort = 0 + } + multisigSettings { + checkTxMinedPeriod = 30 + numberOfBlocksToCheck = 3 + mnemonicKeys = [] + } + ntpSettings { + server = "pool.ntp.org" + updateEvery = 30m + timeout = 30s + } + frontendSettings { + host = "" + port = 5150 } } + parser-dispatcher { type = Dispatcher executor = "thread-pool-executor" @@ -34,11 +59,23 @@ blocking-dispatcher { } throughput = 1 } + akka { log-dead-letters = 0 log-dead-letters-during-shutdown = off loggers = [ "akka.event.slf4j.Slf4jLogger" ] logger-startup-timeout = 60s actor.warn-about-java-serializer-usage = false + + actor { + provider = "akka.remote.RemoteActorRefProvider" + } + remote { + enabled-transports = ["akka.remote.netty.tcp"] + netty.tcp { + hostname = "127.0.0.1" + port = 0 + } + } } diff --git a/src/main/resources/data_model.sql b/src/main/resources/data_model.sql index e7369c3..09ba962 100644 --- a/src/main/resources/data_model.sql +++ b/src/main/resources/data_model.sql @@ -58,7 +58,7 @@ CREATE INDEX tx_id_inputs_index ON inputs (txId); CREATE TABLE contracts( hash VARCHAR(64) PRIMARY KEY, contract TEXT -) +); CREATE TABLE accounts( -- idx SERIAL, diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 2df2ebd..aea2ae9 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -7,7 +7,7 @@ 2 - INFO + DEBUG [%d{yyyy-MM-dd HH:mm:ss}] >> [%thread] >> [%-5level] >> %msg%n diff --git a/src/main/scala/encry/ExplorerApp.scala b/src/main/scala/encry/ExplorerApp.scala index e93ce17..a6f4151 100644 --- a/src/main/scala/encry/ExplorerApp.scala +++ b/src/main/scala/encry/ExplorerApp.scala @@ -1,18 +1,19 @@ package encry -import akka.actor.{ActorSystem, Props} +import akka.actor.{ActorRef, ActorSystem, Props} import akka.stream.ActorMaterializer -import cats.effect.{Blocker, IO} +import cats.effect.{Blocker, ContextShift, IO} import cats.implicits._ import doobie.hikari.HikariTransactor import doobie.util.ExecutionContexts import encry.database.{DBActor, DBService} +import encry.network.{NetworkServer, NetworkTimeProvider} +import encry.parser.ParsersController import encry.settings.ExplorerSettings -import doobie.implicits._ import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} -object ExplorerApp extends App { +object ExplorerApp extends App { implicit val system: ActorSystem = ActorSystem() implicit val materializer: ActorMaterializer = ActorMaterializer() @@ -20,7 +21,10 @@ object ExplorerApp extends App { val settings = ExplorerSettings.read - implicit val cs = IO.contextShift(ExecutionContext.global) + val frontRemoteActor = + system.actorSelection(s"akka.tcp://application@${settings.frontendSettings.host}:${settings.frontendSettings.port}/user/receiver") + + implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global) val pgTransactor = for { ce <- ExecutionContexts.fixedThreadPool[IO](settings.databaseSettings.maxPoolSize) @@ -44,7 +48,12 @@ object ExplorerApp extends App { } *> IO { val dbService = DBService(xa) val dbActor = system.actorOf(Props(new DBActor(dbService)), s"dbActor") - system.actorOf(Props(new ParsersController(settings.parseSettings, settings.blackListSettings, dbActor)), s"parserController") + + val timeProvider: NetworkTimeProvider = new NetworkTimeProvider(settings.ntpSettings) + val networkServer: ActorRef = system.actorOf(NetworkServer.props(settings.networkSettings, timeProvider, frontRemoteActor), "networkServer") + + system.actorOf(Props(new ParsersController(settings.parseSettings, settings.blackListSettings, dbActor, networkServer)), + s"parserController") } *> IO.never }.unsafeRunSync() } diff --git a/src/main/scala/encry/blockchain/modifiers/ScriptedAssetDirective.scala b/src/main/scala/encry/blockchain/modifiers/ScriptedAssetDirective.scala index be817ad..bf5cfc3 100644 --- a/src/main/scala/encry/blockchain/modifiers/ScriptedAssetDirective.scala +++ b/src/main/scala/encry/blockchain/modifiers/ScriptedAssetDirective.scala @@ -4,7 +4,6 @@ import com.google.common.primitives.Ints import io.circe.syntax._ import encry.blockchain.modifiers.Directive.DTypeId import encry.blockchain.modifiers.boxes.{AssetBox, EncryBaseBox, EncryProposition} -import encry.utils.CoreTaggedTypes.ModifierId import encry.utils.Utils import io.circe.{Decoder, Encoder, HCursor} import org.encryfoundation.common.utils.Algos diff --git a/src/main/scala/encry/network/NetworkMessagesHandler.scala b/src/main/scala/encry/network/NetworkMessagesHandler.scala new file mode 100644 index 0000000..26a1552 --- /dev/null +++ b/src/main/scala/encry/network/NetworkMessagesHandler.scala @@ -0,0 +1,59 @@ +package encry.network + +import TransactionProto.TransactionProtoMessage +import akka.actor.{Actor, ActorRef, Props} +import com.typesafe.scalalogging.StrictLogging +import encry.network.NetworkMessagesHandler.MessageFromNetwork +import encry.network.PeerHandler.ConnectedPeer +import org.encryfoundation.common.modifiers.mempool.transaction.{Transaction, TransactionProtoSerializer} +import org.encryfoundation.common.network.BasicMessagesRepo._ +import org.encryfoundation.common.utils.Algos + +class NetworkMessagesHandler(networkServer: ActorRef) extends Actor with StrictLogging { + + override def receive: Receive = { + + case MessageFromNetwork(message, peerOpt) => message match { + + case InvNetworkMessage((modifierTypeId, modifierIds)) => + peerOpt.foreach { peer => + if (Transaction.modifierTypeId == modifierTypeId) { + logger.debug(s"Request modifier: $modifierTypeId ${modifierIds.map(Algos.encode).mkString(",")}") + peer.handlerRef ! RequestModifiersNetworkMessage((modifierTypeId, modifierIds)) + } + } + + case ModifiersNetworkMessage((modifierTypeId, modifierMap)) => + logger.debug(s"Response modifiers: $modifierTypeId size ${modifierMap.size}") + modifierMap.foreach { case (modifierId, bytes) => + modifierTypeId match { + case Transaction.modifierTypeId => + val tx = TransactionProtoSerializer.fromProto(TransactionProtoMessage.parseFrom(bytes)) + tx.foreach(networkServer ! _) + + case _ => + } + } + + case _ => + } + case _ => + } +} + +object NetworkMessagesHandler { + + case class Transaction(tx: Transaction) + case class Block(block: Block) + + /** + * @param message - message, received from network + * @param source - sender of received message + * + * This case class transfers network message from PeerConnectionHandler actor to the NetworkController. + * Main duty is to transfer message from network with sender of it message to the NetworkController as an end point. + */ + case class MessageFromNetwork(message: NetworkMessage, source: Option[ConnectedPeer]) + + def props(networkServer: ActorRef) = Props(new NetworkMessagesHandler(networkServer)) +} \ No newline at end of file diff --git a/src/main/scala/encry/network/NetworkServer.scala b/src/main/scala/encry/network/NetworkServer.scala new file mode 100644 index 0000000..5dd5788 --- /dev/null +++ b/src/main/scala/encry/network/NetworkServer.scala @@ -0,0 +1,96 @@ +package encry.network + +import java.net.InetSocketAddress + +import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props} +import akka.io.Tcp.SO.KeepAlive +import akka.io.Tcp._ +import akka.io.{IO, Tcp} +import com.typesafe.scalalogging.StrictLogging +import NetworkServer.{CheckConnection, ConnectionSetupSuccessfully} +import PeerHandler._ +import encry.parser.NodeParser.BlockFromNode +import encry.settings.NetworkSettings +import org.encryfoundation.common.modifiers.mempool.transaction.Transaction + +import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.duration._ + +class NetworkServer(settings: NetworkSettings, timeProvider: NetworkTimeProvider, + frontRemoteActor: ActorSelection) extends Actor with StrictLogging { + + implicit val system: ActorSystem = context.system + implicit val ec: ExecutionContextExecutor = context.dispatcher + + var isConnected = false + + val messagesHandler: ActorRef = context.actorOf(NetworkMessagesHandler.props(self)) + + var tmpConnectionHandler: Option[ActorRef] = None + + val selfPeer: InetSocketAddress = + new InetSocketAddress(settings.bindAddressHost, settings.bindAddressPort) + + val connectingPeer: InetSocketAddress = + new InetSocketAddress(settings.peerForConnectionHost, settings.peerForConnectionPort) + + IO(Tcp) ! Bind(self, selfPeer) + + override def receive: Receive = { + + case Bound(localAddress) => + logger.info(s"Local app was successfully bound to $localAddress!") + context.system.scheduler.schedule(5.seconds, 30.seconds, self, CheckConnection) + + case CommandFailed(add: Bind) => + logger.info(s"Failed to bind to ${add.localAddress}.") + context.stop(self) + + case Connected(remote, _) if !isConnected && remote.getAddress == connectingPeer.getAddress => + val handler: ActorRef = context.actorOf( + PeerHandler.props(remote, sender(), settings, timeProvider, messagesHandler) + ) + logger.info(s"Successfully connected to $remote. Creating handler: $handler.") + isConnected = true + tmpConnectionHandler = Some(handler) + sender ! Register(handler) + sender ! ResumeReading + + case Connected(remote, _) => logger.info(s"Remote: $remote try to connect but isConnected: $isConnected.") + + case CommandFailed(c: Connect) => + isConnected = false + tmpConnectionHandler = None + logger.info(s"Failed to connect to: ${c.remoteAddress}") + + case CheckConnection if !isConnected => + IO(Tcp) ! Connect(connectingPeer, options = KeepAlive(true) :: Nil, timeout = Some(5.seconds)) + logger.info(s"Trying to connect to $connectingPeer.") + + case CheckConnection => + logger.info(s"Triggered CheckConnection. Current connection is: $isConnected") + + case RemovePeerFromConnectionList(peer) => + isConnected = false + tmpConnectionHandler = None + logger.info(s"Disconnected from $peer.") + + case tx: Transaction => + frontRemoteActor ! tx + + case BlockFromNode(block, nodeAddr, nodeInfo) => + val txIds = block.payload.txs.map(_.id) + frontRemoteActor ! txIds + + case msg => + logger.info(s"Got strange message on NetworkServer: $msg.") + } +} + +object NetworkServer { + case object CheckConnection + case object ConnectionSetupSuccessfully + + def props(settings: NetworkSettings, timeProvider: NetworkTimeProvider, frontRemoteActor: ActorSelection): Props = + Props(new NetworkServer(settings, timeProvider, frontRemoteActor)) +} \ No newline at end of file diff --git a/src/main/scala/encry/network/NetworkTime.scala b/src/main/scala/encry/network/NetworkTime.scala new file mode 100644 index 0000000..e92d4f0 --- /dev/null +++ b/src/main/scala/encry/network/NetworkTime.scala @@ -0,0 +1,86 @@ +package encry.network + +import java.net.InetAddress + +import com.typesafe.scalalogging.StrictLogging +import NetworkTime.Time +import org.apache.commons.net.ntp.{NTPUDPClient, TimeInfo} + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.Left +import scala.util.control.NonFatal + +object NetworkTime { + def localWithOffset(offset: Long): Long = System.currentTimeMillis() + offset + + type Offset = Long + type Time = Long +} + +protected case class NetworkTime(offset: NetworkTime.Offset, lastUpdate: NetworkTime.Time) + +case class NetworkTimeProviderSettings(server: String, updateEvery: FiniteDuration, timeout: FiniteDuration) + +case class FrontendSettings(host: String, port: Int) + +class NetworkTimeProvider(ntpSettings: NetworkTimeProviderSettings) extends StrictLogging { + + private var state: State = Right(NetworkTime(0L, 0L)) + private var delta: Time = 0L + + private type State = Either[(NetworkTime, Future[NetworkTime]), NetworkTime] + + private def updateOffSet(): Option[NetworkTime.Offset] = { + val client: NTPUDPClient = new NTPUDPClient() + client.setDefaultTimeout(ntpSettings.timeout.toMillis.toInt) + try { + client.open() + val info: TimeInfo = client.getTime(InetAddress.getByName(ntpSettings.server)) + info.computeDetails() + Option(info.getOffset) + } catch { + case t: Throwable => None + } finally { + client.close() + } + } + + private def timeAndState(currentState: State): Future[(NetworkTime.Time, State)] = + currentState match { + case Right(nt) => + val time: Long = NetworkTime.localWithOffset(nt.offset) + val state: Either[(NetworkTime, Future[NetworkTime]), NetworkTime] = + if (time > nt.lastUpdate + ntpSettings.updateEvery.toMillis) { + Left(nt -> Future(updateOffSet()).map { mbOffset => + logger.info("New offset adjusted: " + mbOffset) + val offset = mbOffset.getOrElse(nt.offset) + NetworkTime(offset, NetworkTime.localWithOffset(offset)) + }) + } else Right(nt) + Future.successful((time, state)) + case Left((nt, networkTimeFuture)) => + networkTimeFuture + .map(networkTime => NetworkTime.localWithOffset(networkTime.offset) -> Right(networkTime)) + .recover { + case NonFatal(th) => + logger.warn(s"Failed to evaluate networkTimeFuture $th") + NetworkTime.localWithOffset(nt.offset) -> Left(nt -> networkTimeFuture) + } + } + + def estimatedTime: Time = state match { + case Right(nt) if NetworkTime.localWithOffset(nt.offset) <= nt.lastUpdate + ntpSettings.updateEvery.toMillis => + NetworkTime.localWithOffset(nt.offset) + case _ => System.currentTimeMillis() + delta + } + + def time(): Future[NetworkTime.Time] = + timeAndState(state) + .map { case (timeFutureResult, stateFutureResult) => + state = stateFutureResult + delta = timeFutureResult - System.currentTimeMillis() + timeFutureResult + } +} \ No newline at end of file diff --git a/src/main/scala/encry/network/PeerHandler.scala b/src/main/scala/encry/network/PeerHandler.scala new file mode 100644 index 0000000..68184e6 --- /dev/null +++ b/src/main/scala/encry/network/PeerHandler.scala @@ -0,0 +1,237 @@ +package encry.network + +import java.net.InetSocketAddress +import java.nio.ByteOrder + +import akka.actor.{Actor, ActorRef, Cancellable, Props} +import akka.io.Tcp +import akka.io.Tcp._ +import akka.util.{ByteString, CompactByteString} +import com.google.common.primitives.Ints +import com.typesafe.scalalogging.StrictLogging +import org.encryfoundation.common.network.BasicMessagesRepo._ +import NetworkServer.ConnectionSetupSuccessfully +import encry.network.NetworkMessagesHandler.MessageFromNetwork +import encry.network.PeerHandler.{Ack, ConnectedPeer, HandshakeDone, HandshakeTimeout, Outgoing, RemovePeerFromConnectionList, StartIteration} +import encry.settings.NetworkSettings + +import scala.annotation.tailrec +import scala.collection.immutable.HashMap +import scala.concurrent.ExecutionContextExecutor +import scala.util.{Failure, Success} + +class PeerHandler(remoteAddress: InetSocketAddress, + listener: ActorRef, + settings: NetworkSettings, + timeProvider: NetworkTimeProvider, + receivedMessagesHandler: ActorRef) extends Actor with StrictLogging { + + context.watch(listener) + + implicit val ec: ExecutionContextExecutor = context.dispatcher + + override def preStart(): Unit = self ! StartIteration + + override def postStop(): Unit = { + logger.info(s"Peer handler $self to $remoteAddress is destroyed.") + context.parent ! RemovePeerFromConnectionList(remoteAddress) + listener ! Close + } + + var chunksBuffer: ByteString = CompactByteString.empty + var outMessagesBuffer: HashMap[Long, ByteString] = HashMap.empty + var outMessagesCounter: Long = 0 + + var isHandshakeSent: Boolean = false + var receivedHandshake: Option[Handshake] = None + + def awaitingConnectionBehaviour(timeout: Option[Cancellable]): Receive = { + + case StartIteration => timeProvider.time() map { time => + val handshake: Handshake = Handshake( + protocolToBytes(settings.appVersion), + settings.nodeName, + Some(new InetSocketAddress(settings.declaredAddressHost, settings.declaredAddressPort)), + time + ) + listener ! Write(ByteString(GeneralizedNetworkMessage.toProto(handshake).toByteArray)) + isHandshakeSent = true + logger.info(s"Sent initial handshake to $remoteAddress.") + if (receivedHandshake.isDefined && isHandshakeSent) { + logger.info(s"Got successfully bounded connection with $remoteAddress. Starting working behaviour.") + timeout.foreach(_.cancel()) + context.parent ! ConnectionSetupSuccessfully + context.become(workingCycleWriting(ConnectedPeer(remoteAddress, self, Outgoing, receivedHandshake.get))) + } else context.become(awaitingConnectionBehaviour( + Some(context.system.scheduler.scheduleOnce(settings.handshakeTimeout, self, HandshakeTimeout))) + ) + } + + case HandshakeTimeout => + logger.info(s"Handshake timeout has expired for $remoteAddress, going to drop the connection.") + self ! Close + + case HandshakeDone => + logger.info(s"Got successfully bounded connection with $remoteAddress. Starting working behaviour.") + listener ! ResumeReading + timeout.foreach(_.cancel()) + context.become(workingCycleWriting(ConnectedPeer(remoteAddress, self, Outgoing, receivedHandshake.get))) + + case Received(data) => GeneralizedNetworkMessage.fromProto(data.toArray[Byte]) match { + case Success(value) => value match { + case handshake: Handshake => + logger.info(s"Got a Handshake from $remoteAddress.") + receivedHandshake = Some(handshake) + listener ! ResumeReading + if (isHandshakeSent && receivedHandshake.isDefined) { + logger.info(s"Got successfully bounded connection with $remoteAddress. Starting working behaviour.") + timeout.foreach(_.cancel()) + context.parent ! ConnectionSetupSuccessfully + context.become(workingCycleWriting(ConnectedPeer(remoteAddress, self, Outgoing, handshake))) + } else context.become(awaitingConnectionBehaviour(timeout)) + + case message => logger.info(s"Expecting handshake, but received ${message.messageName}.") + } + case Failure(exception) => + logger.info(s"Error during parsing a handshake: $exception.") + self ! Close + } + case _ => + } + + override def receive: Receive = awaitingConnectionBehaviour(None) + + def defaultLogic: Receive = { + case cc: ConnectionClosed => + logger.info(s"Connection closed to $remoteAddress cause ${cc.getErrorCause}.") + context.stop(self) + + case fail@CommandFailed(cmd: Command) => + logger.info(s"Failed to execute command : $cmd cause ${fail.cause}.") + listener ! ResumeReading + + case _ => + } + + def workingCycleWriting(cp: ConnectedPeer): Receive = workingCycleLocalInterfaceWritingMode(cp) + .orElse(workingCycleRemoteInterface(cp)) + .orElse(defaultLogic) + + def workingCycleLocalInterfaceWritingMode(cp: ConnectedPeer): Receive = { + case message: NetworkMessage => + def sendMessage(): Unit = { + outMessagesCounter += 1 + val messageToNetwork: Array[Byte] = GeneralizedNetworkMessage.toProto(message).toByteArray + val bytes: ByteString = ByteString(Ints.toByteArray(messageToNetwork.length) ++ messageToNetwork) + listener ! Write(bytes, Ack(outMessagesCounter)) + } + + sendMessage() + + case fail@CommandFailed(Write(msg, Ack(id))) => + logger.debug(s"Failed to write ${msg.length} bytes to $remoteAddress cause ${fail.cause}, switching to buffering mode") + listener ! ResumeReading + toBuffer(id, msg) + context.become(workingCycleBuffering(cp)) + case Ack(_) => // ignore ACKs in stable mode + case WritingResumed => // ignore in stable mode + } + + def workingCycleRemoteInterface(cp: ConnectedPeer): Receive = { + case Received(data) => + val packet: (List[ByteString], ByteString) = getPacket(chunksBuffer ++ data) + chunksBuffer = packet._2 + packet._1.find { packet => + GeneralizedNetworkMessage.fromProto(packet.toArray[Byte]) match { + case Success(message) => + receivedMessagesHandler ! MessageFromNetwork(message, Some(cp)) + logger.debug("Received message " + message.messageName + " from " + remoteAddress) + false + case Failure(e) => + logger.info(s"Corrupted data from: " + remoteAddress + s"$e") + true + } + } + listener ! ResumeReading + } + + def workingCycleBuffering(cp: ConnectedPeer): Receive = workingCycleLocalInterfaceBufferingMode(cp) + .orElse(workingCycleRemoteInterface(cp)) + .orElse(defaultLogic) + + // operate in ACK mode until all buffered messages are transmitted + def workingCycleLocalInterfaceBufferingMode(cp: ConnectedPeer): Receive = { + case message: NetworkMessage => + outMessagesCounter += 1 + val messageToNetwork: Array[Byte] = GeneralizedNetworkMessage.toProto(message).toByteArray + val bytes: ByteString = ByteString(Ints.toByteArray(messageToNetwork.length) ++ messageToNetwork) + toBuffer(outMessagesCounter, bytes) + case fail@CommandFailed(Write(msg, Ack(id))) => + logger.debug(s"Failed to buffer ${msg.length} bytes to $remoteAddress cause ${fail.cause}") + listener ! ResumeWriting + toBuffer(id, msg) + case CommandFailed(ResumeWriting) => // ignore in ACK mode + case WritingResumed => writeFirst() + case Ack(id) => + outMessagesBuffer -= id + if (outMessagesBuffer.nonEmpty) writeFirst() + else { + logger.debug("Buffered messages processed, exiting buffering mode") + context.become(workingCycleWriting(cp)) + } + } + + def getPacket(data: ByteString): (List[ByteString], ByteString) = { + + val headerSize: Int = 4 + + @tailrec + def multiPacket(packets: List[ByteString], current: ByteString): (List[ByteString], ByteString) = + if (current.length < headerSize) (packets.reverse, current) + else { + val len: Int = current.iterator.getInt(ByteOrder.BIG_ENDIAN) + if (current.length < len + headerSize) (packets.reverse, current) + else { + val rem: ByteString = current drop headerSize + val (front: ByteString, back: ByteString) = rem.splitAt(len) + multiPacket(front :: packets, back) + } + } + + multiPacket(List[ByteString](), data) + } + + def writeFirst(): Unit = outMessagesBuffer.headOption.foreach { case (id, msg) => listener ! Write(msg, Ack(id)) } + + def writeAll(): Unit = outMessagesBuffer.foreach { case (id, msg) => listener ! Write(msg, Ack(id)) } + + def toBuffer(id: Long, message: ByteString): Unit = outMessagesBuffer += id -> message + + private def protocolToBytes(protocol: String): Array[Byte] = protocol.split("\\.").map(elem => elem.toByte) +} + +object PeerHandler { + + sealed trait ConnectionType + case object Incoming extends ConnectionType + case object Outgoing extends ConnectionType + + case class ConnectedPeer(socketAddress: InetSocketAddress, handlerRef: ActorRef, direction: ConnectionType, handshake: Handshake) { + override def toString: String = s"ConnectedPeer($socketAddress)" + } + + sealed trait ConnectionMessages + case object HandshakeTimeout extends ConnectionMessages + case object HandshakeDone extends ConnectionMessages + case object StartIteration + final case class Ack(offset: Long) extends Tcp.Event + + case class RemovePeerFromConnectionList(peer: InetSocketAddress) extends ConnectionMessages + + def props(remoteAddress: InetSocketAddress, + listener: ActorRef, + settings: NetworkSettings, + timeProvider: NetworkTimeProvider, + messagesHandler: ActorRef): Props = + Props(new PeerHandler(remoteAddress, listener, settings, timeProvider, messagesHandler)) +} \ No newline at end of file diff --git a/src/main/scala/encry/parser/NodeParser.scala b/src/main/scala/encry/parser/NodeParser.scala index 348b2c0..afbc590 100644 --- a/src/main/scala/encry/parser/NodeParser.scala +++ b/src/main/scala/encry/parser/NodeParser.scala @@ -3,7 +3,7 @@ package encry.parser import java.net.{InetAddress, InetSocketAddress} import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} -import akka.actor.{Actor, ActorRef} +import akka.actor.{Actor, ActorRef, Props} import com.typesafe.scalalogging.StrictLogging import encry.blockchain.modifiers.{Block, Header} import encry.blockchain.nodeRoutes.InfoRoute @@ -196,6 +196,7 @@ class NodeParser(node: InetSocketAddress, blocksToReask -= height blocksToWrite += blockId -> (System.nanoTime(), height) dbActor ! BlockFromNode(block, node, currentNodeInfo) + parserController ! BlockFromNode(block, node, currentNodeInfo) } } } @@ -238,6 +239,7 @@ class NodeParser(node: InetSocketAddress, currentBestBlockHeight.set(block.header.height) blocksToWrite += blockId -> (System.nanoTime(), height) dbActor ! BlockFromNode(block, node, currentNodeInfo) + parserController ! BlockFromNode(block, node, currentNodeInfo) } }} } @@ -282,7 +284,6 @@ class NodeParser(node: InetSocketAddress, object NodeParser { - case class PeersFromApi(peers: Set[InetAddress]) case object PingNode @@ -300,4 +301,10 @@ object NodeParser { case object Recover case object BecomeAwaitDB + + def props(node: InetSocketAddress, + parserController: ActorRef, + dbActor: ActorRef, + settings: ParseSettings): Props = + Props(new NodeParser(node, parserController, dbActor, settings)).withDispatcher("parser-dispatcher") } \ No newline at end of file diff --git a/src/main/scala/encry/ParsersController.scala b/src/main/scala/encry/parser/ParsersController.scala similarity index 88% rename from src/main/scala/encry/ParsersController.scala rename to src/main/scala/encry/parser/ParsersController.scala index 9ce9642..bb66203 100644 --- a/src/main/scala/encry/ParsersController.scala +++ b/src/main/scala/encry/parser/ParsersController.scala @@ -1,22 +1,23 @@ -package encry +package encry.parser import java.net.{InetAddress, InetSocketAddress} -import scala.concurrent.ExecutionContext.Implicits.global -import akka.actor.{Actor, ActorRef, OneForOneStrategy, Props, SupervisorStrategy} -import encry.parser.{NodeParser, SimpleNodeParser} -import encry.settings.{BlackListSettings, ParseSettings} import akka.actor.SupervisorStrategy.Stop +import akka.actor.{Actor, ActorRef, OneForOneStrategy, Props, SupervisorStrategy} import com.typesafe.scalalogging.StrictLogging -import encry.ParsersController.{BadPeer, RemoveBadPeer} import encry.database.DBActor.RecoveryMode -import encry.parser.NodeParser.PeersFromApi +import encry.network.NetworkServer +import encry.parser.NodeParser.{BlockFromNode, PeersFromApi} +import encry.parser.ParsersController.{BadPeer, RemoveBadPeer} +import encry.settings.{BlackListSettings, ParseSettings} +import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ class ParsersController(settings: ParseSettings, blackListSettings: BlackListSettings, - dbActor: ActorRef) extends Actor with StrictLogging { + dbActor: ActorRef, + networkServer: ActorRef) extends Actor with StrictLogging { var peerReconnects: Map[InetAddress, Int] = Map.empty[InetAddress, Int] @@ -33,7 +34,7 @@ class ParsersController(settings: ParseSettings, context.system.scheduler.scheduleOnce(blackListSettings.cleanupTime, self, RemoveBadPeer) logger.info(s"Starting Parsing controller. Try to create listeners for: ${settings.nodes.mkString(",")}") settings.nodes.foreach(node => - context.actorOf(Props(new NodeParser(node, self, dbActor, settings)).withDispatcher("parser-dispatcher")) + context.actorOf(NodeParser.props(node, self, dbActor, settings)) ) val initialPeers: Set[InetAddress] = settings.nodes.map(_.getAddress).toSet logger.info(s"Initial peers are: ${initialPeers.mkString(",")}. Starting main behaviour...") @@ -77,6 +78,9 @@ class ParsersController(settings: ParseSettings, context.system.scheduler.scheduleOnce(blackListSettings.cleanupTime, self, RemoveBadPeer) context.become(mainBehaviour(knownPeers -- peersForRemove.map(_._1))) + case blockFromNode: BlockFromNode => + networkServer ! blockFromNode + case msg => logger.info(s"Got strange message on ParserController: $msg.") } diff --git a/src/main/scala/encry/parser/SimpleNodeParser.scala b/src/main/scala/encry/parser/SimpleNodeParser.scala index 658d91a..822a0fc 100644 --- a/src/main/scala/encry/parser/SimpleNodeParser.scala +++ b/src/main/scala/encry/parser/SimpleNodeParser.scala @@ -3,7 +3,7 @@ package encry.parser import java.net.{InetAddress, InetSocketAddress} import akka.actor.{Actor, ActorRef, Props} import com.typesafe.scalalogging.StrictLogging -import encry.ParsersController.BadPeer +import encry.parser.ParsersController.BadPeer import encry.blockchain.nodeRoutes.InfoRoute import encry.database.DBActor.UpdatedInfoAboutNode import encry.parser.NodeParser.{PeersFromApi, PingNode} diff --git a/src/main/scala/encry/settings/BlackListSettings.scala b/src/main/scala/encry/settings/BlackListSettings.scala deleted file mode 100644 index a6e8bfb..0000000 --- a/src/main/scala/encry/settings/BlackListSettings.scala +++ /dev/null @@ -1,5 +0,0 @@ -package encry.settings - -import scala.concurrent.duration.FiniteDuration - -case class BlackListSettings(banTime: FiniteDuration, cleanupTime: FiniteDuration) \ No newline at end of file diff --git a/src/main/scala/encry/settings/DatabaseSettings.scala b/src/main/scala/encry/settings/DatabaseSettings.scala deleted file mode 100644 index 5297b8b..0000000 --- a/src/main/scala/encry/settings/DatabaseSettings.scala +++ /dev/null @@ -1,7 +0,0 @@ -package encry.settings - -case class DatabaseSettings(host: String, - user: String, - password: String, - maxPoolSize: Int, - connectionTimeout: Long) diff --git a/src/main/scala/encry/settings/ExplorerSettings.scala b/src/main/scala/encry/settings/ExplorerSettings.scala deleted file mode 100644 index c5cb1eb..0000000 --- a/src/main/scala/encry/settings/ExplorerSettings.scala +++ /dev/null @@ -1,25 +0,0 @@ -package encry.settings - -import java.net.InetSocketAddress -import com.typesafe.config.{Config, ConfigFactory} -import net.ceedubs.ficus.Ficus._ -import net.ceedubs.ficus.readers.ArbitraryTypeReader._ -import net.ceedubs.ficus.readers.ValueReader - -case class ExplorerSettings(parseSettings: ParseSettings, - blackListSettings: BlackListSettings, - databaseSettings: DatabaseSettings, - nodeSettings: NodeSettings) - -object ExplorerSettings { - - implicit val inetSocketAddressReader: ValueReader[InetSocketAddress] = { (config: Config, path: String) => - val split = config.getString(path).split(":") - new InetSocketAddress(split(0), split(1).toInt) - } - - val configPath: String = "explorer" - - def read: ExplorerSettings = ConfigFactory.load("local.conf") - .withFallback(ConfigFactory.load()).as[ExplorerSettings](configPath) -} diff --git a/src/main/scala/encry/settings/NodeSettings.scala b/src/main/scala/encry/settings/NodeSettings.scala deleted file mode 100644 index 84f28dd..0000000 --- a/src/main/scala/encry/settings/NodeSettings.scala +++ /dev/null @@ -1,3 +0,0 @@ -package encry.settings - -case class NodeSettings(maxRollbackDepth: Int) diff --git a/src/main/scala/encry/settings/ParseSettings.scala b/src/main/scala/encry/settings/ParseSettings.scala deleted file mode 100644 index d53e327..0000000 --- a/src/main/scala/encry/settings/ParseSettings.scala +++ /dev/null @@ -1,9 +0,0 @@ -package encry.settings - -import java.net.InetSocketAddress - -case class ParseSettings(nodes: List[InetSocketAddress], - recoverBatchSize: Int, - infinitePing: Boolean, - askNode: Boolean, - numberOfAttempts: Option[Int] = None) diff --git a/src/main/scala/encry/settings/Settings.scala b/src/main/scala/encry/settings/Settings.scala new file mode 100644 index 0000000..221cb08 --- /dev/null +++ b/src/main/scala/encry/settings/Settings.scala @@ -0,0 +1,51 @@ +package encry.settings + +import java.net.InetSocketAddress + +import com.typesafe.config.{Config, ConfigFactory} +import encry.network.{FrontendSettings, NetworkTimeProviderSettings} +import net.ceedubs.ficus.Ficus._ +import net.ceedubs.ficus.readers.ArbitraryTypeReader._ +import net.ceedubs.ficus.readers.ValueReader + +import scala.concurrent.duration.FiniteDuration + +case class ExplorerSettings(parseSettings: ParseSettings, + blackListSettings: BlackListSettings, + databaseSettings: DatabaseSettings, + ntpSettings: NetworkTimeProviderSettings, + networkSettings: NetworkSettings, + multisigSettings: MultisigSettings, + frontendSettings: FrontendSettings) + +case class NetworkSettings(syncPacketLength: Int, + bindAddressHost: String, + bindAddressPort: Int, + nodeName: String, + appVersion: String, + handshakeTimeout: FiniteDuration, + peerForConnectionHost: String, + peerForConnectionPort: Int, + peerForConnectionApiPort: Int, + declaredAddressHost: String, + declaredAddressPort: Int) + +case class MultisigSettings(checkTxMinedPeriod: Int, numberOfBlocksToCheck: Int, mnemonicKeys: List[String]) + +case class ParseSettings(nodes: List[InetSocketAddress], recoverBatchSize: Int, infinitePing: Boolean, askNode: Boolean, + numberOfAttempts: Option[Int] = None) + +case class DatabaseSettings(host: String, user: String, password: String, maxPoolSize: Int, connectionTimeout: Long) + +case class BlackListSettings(banTime: FiniteDuration, cleanupTime: FiniteDuration) + +object ExplorerSettings { + + implicit val inetSocketAddressReader: ValueReader[InetSocketAddress] = { (config: Config, path: String) => + val split = config.getString(path).split(":") + new InetSocketAddress(split(0), split(1).toInt) + } + + def read: ExplorerSettings = ConfigFactory.load("local.conf") + .withFallback(ConfigFactory.load()).as[ExplorerSettings]("explorer") +} diff --git a/src/main/scala/encry/utils/CoreTaggedTypes.scala b/src/main/scala/encry/utils/CoreTaggedTypes.scala deleted file mode 100644 index 79a7b57..0000000 --- a/src/main/scala/encry/utils/CoreTaggedTypes.scala +++ /dev/null @@ -1,18 +0,0 @@ -package encry.utils - -import supertagged.TaggedType - -object CoreTaggedTypes { - object ModifierTypeId extends TaggedType[Byte] - - object ModifierId extends TaggedType[Array[Byte]] - - object VersionTag extends TaggedType[Array[Byte]] - - type ModifierTypeId = ModifierTypeId.Type - - type ModifierId = ModifierId.Type - - type VersionTag = VersionTag.Type - -}