Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion genesis.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"chain_id": "minichain_testnet_1",
"chain_id": "minichain-default",
"timestamp": 1716880000000,
"difficulty": 4,
"alloc": {
Expand Down
184 changes: 132 additions & 52 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@

Usage:
python main.py --port 9000
python main.py --port 9001 --connect 127.0.0.1:9000
python main.py --port 9001 --connect <multiaddress>

Commands (type in the terminal while the node is running):
balance — show all account balances
send <to> <amount> — send coins to another address
mine — mine a block from the mempool
peers — show connected peers
connect <host>:<port> — connect to another node
connect <multiaddr> — connect to another node
address — show this node's public key
help — show available commands
quit — shut down the node
Expand All @@ -26,6 +26,7 @@
from nacl.encoding import HexEncoder

from minichain import Transaction, Blockchain, Block, State, Mempool, P2PNetwork, mine_block
from minichain.rpc import JSONRPCServer
from minichain.validators import is_valid_receiver
from minichain.block import calculate_receipt_root

Expand Down Expand Up @@ -113,53 +114,128 @@ def mine_and_process_block(chain, mempool, miner_pk):
# Network message handler
# ──────────────────────────────────────────────

def make_network_handler(chain, mempool):
def make_network_handler(chain, mempool, network):
"""Return an async callback that processes incoming P2P messages."""

async def handler(data):
msg_type = data.get("type")
payload = data.get("data")
peer_addr = data.get("_peer_addr", "unknown")

if msg_type == "sync":
peer_host = peer_addr.rsplit(":", 1)[0] if ":" in peer_addr else peer_addr
peer_host = peer_host.strip("[]")
is_trusted = peer_addr in TRUSTED_PEERS or peer_host in TRUSTED_PEERS
is_localhost = peer_host in LOCALHOST_PEERS
if chain.state.accounts and not (is_trusted or is_localhost):
logger.warning("🔒 Rejected sync from untrusted peer %s", peer_addr)
return
if payload is None and msg_type in ("hello", "chain_request", "chain_response"):
return

# Merge remote state into local state (for accounts we don't have yet)
remote_accounts = payload.get("accounts") if isinstance(payload, dict) else None
if not isinstance(remote_accounts, dict):
logger.warning("🔒 Rejected sync from %s with invalid accounts payload", peer_addr)
if msg_type == "hello":
peer_chain_id = payload.get("chain_id")
peer_gen_hash = payload.get("genesis_hash")
if peer_chain_id != chain.chain_id:
logger.warning("🔒 Disconnecting peer %s: chain_id mismatch (got %s, expected %s)", peer_addr, peer_chain_id, chain.chain_id)
asyncio.create_task(network.disconnect_peer(peer_addr))
return
if peer_gen_hash != chain.chain[0].hash:
logger.warning("🔒 Disconnecting peer %s: genesis hash mismatch", peer_addr)
asyncio.create_task(network.disconnect_peer(peer_addr))
return

for addr, acc in remote_accounts.items():
if not isinstance(acc, dict):
logger.warning("🔒 Skipping malformed account %r from %s", addr, peer_addr)
continue
if addr not in chain.state.accounts:
chain.state.accounts[addr] = acc
logger.info("🔄 Synced account %s... (balance=%d)", addr[:12], acc.get("balance", 0))
logger.info("🔄 Accepted state sync from %s — %d accounts", peer_addr, len(chain.state.accounts))
logger.info("🔄 Handshake successful with %s", peer_addr)
peer_tip = payload.get("latest_block_index", 0)
if peer_tip > chain.last_block.index:
logger.info("📡 Peer %s is ahead (%d > %d). Initiating chunked sync...", peer_addr, peer_tip, chain.last_block.index)
req = {"type": "chain_request", "data": {"start_index": chain.last_block.index + 1, "limit": 500}}
asyncio.create_task(network._broadcast_raw(req))
Comment thread
coderabbitai[bot] marked this conversation as resolved.

elif msg_type == "tx":
tx = Transaction.from_dict(payload)
if mempool.add_transaction(tx):
logger.info("📥 Received tx from %s... (amount=%s)", tx.sender[:8], tx.amount)
try:
tx = Transaction.from_dict(payload)
if getattr(tx, "chain_id", None) != chain.chain_id:
logger.warning("Invalid chain_id in tx from %s", peer_addr)
return
if mempool.add_transaction(tx):
logger.info("📥 Received tx from %s... (amount=%s)", tx.sender[:8], tx.amount)
except Exception as e:
logger.warning("Invalid tx payload from %s: %s", peer_addr, e)

elif msg_type == "block":
block = Block.from_dict(payload)
try:
block = Block.from_dict(payload)
except Exception as e:
logger.warning("Invalid block payload from %s: %s", peer_addr, e)
return

if chain.add_block(block):
logger.info("📥 Received Block #%d — added to chain", block.index)

# Drop only confirmed transactions so higher nonces can remain queued.
mempool.remove_transactions(block.transactions)
else:
logger.warning("📥 Received Block #%s — rejected", block.index)
if block.index > chain.last_block.index + 1:
logger.warning("📥 Received Block #%s — ahead of us (tip: %s). Requesting chunked sync...", block.index, chain.last_block.index)
req = {"type": "chain_request", "data": {"start_index": chain.last_block.index + 1, "limit": 500}}
asyncio.create_task(network._broadcast_raw(req))
else:
logger.warning("📥 Received Block #%s — rejected. Fork detected, trigger reorg sync.", block.index)
# For a fork, request the full chain to use resolve_conflicts
req = {"type": "chain_request", "data": {"start_index": 0, "limit": 1000000}} # Request full chain for reorg
asyncio.create_task(network._broadcast_raw(req))

elif msg_type == "chain_request":
start_index = payload.get("start_index", 0)
limit = payload.get("limit", 500)
logger.info("📡 Peer requested blocks from %d (limit %d).", start_index, limit)

if start_index < len(chain.chain):
blocks_slice = chain.chain[start_index : start_index + limit]
blocks_dicts = [b.to_dict() for b in blocks_slice]
else:
blocks_dicts = []

resp_payload = {"type": "chain_response", "data": {"blocks": blocks_dicts, "requested_limit": limit}}
asyncio.create_task(network._unicast_raw(peer_addr, resp_payload))

elif msg_type == "chain_response":
blocks_payload = payload.get("blocks", [])
requested_limit = payload.get("requested_limit", 500)
if not blocks_payload:
return

new_chain = []
try:
new_chain = [Block.from_dict(b) for b in blocks_payload]
except Exception as e:
logger.warning("❌ Failed to parse chain_response: %s", e)
return

if new_chain:
# Distinguish between linear catch-up vs full reorg based on whether we received block 0
if new_chain[0].index == 0:
# Fork / Reorg sync
success, orphans = chain.resolve_conflicts(new_chain)
if success:
logger.info("🔄 Reorg complete! Restoring %d orphaned txs to mempool.", len(orphans))
for tx in orphans:
mempool.add_transaction(tx)
else:
# Linear Catch-up
all_added = True
for block in new_chain:
if block.index <= chain.last_block.index:
continue # Ignore already known blocks
if chain.add_block(block):
logger.info("📥 Synced Block #%d", block.index)
mempool.remove_transactions(block.transactions)
else:
logger.warning("❌ Sync failed at Block #%d. Fork detected. Requesting full chain.", block.index)
req = {"type": "chain_request", "data": {"start_index": 0, "limit": 1000000}}
asyncio.create_task(network._broadcast_raw(req))
all_added = False
break

# If we added all blocks and we hit the limit, request next batch
if all_added and len(new_chain) == requested_limit:
next_index = chain.last_block.index + 1
logger.info("📡 Requesting next batch from index %d", next_index)
req = {"type": "chain_request", "data": {"start_index": next_index, "limit": requested_limit}}
asyncio.create_task(network._broadcast_raw(req))

return handler

Expand Down Expand Up @@ -235,7 +311,7 @@ async def cli_loop(sk, pk, chain, mempool, network):
continue

nonce = chain.state.get_account(pk).get("nonce", 0)
tx = Transaction(sender=pk, receiver=receiver, amount=amount, nonce=nonce, fee=fee)
tx = Transaction(sender=pk, receiver=receiver, amount=amount, nonce=nonce, fee=fee, chain_id=chain.chain_id)
tx.sign(sk)

if mempool.add_transaction(tx):
Expand Down Expand Up @@ -269,7 +345,7 @@ async def cli_loop(sk, pk, chain, mempool, network):
continue

nonce = chain.state.get_account(pk).get("nonce", 0)
tx = Transaction(sender=pk, receiver=None, amount=amount, nonce=nonce, fee=fee, data=code)
tx = Transaction(sender=pk, receiver=None, amount=amount, nonce=nonce, fee=fee, data=code, chain_id=chain.chain_id)
tx.sign(sk)

if mempool.add_transaction(tx):
Expand Down Expand Up @@ -301,7 +377,7 @@ async def cli_loop(sk, pk, chain, mempool, network):
continue

nonce = chain.state.get_account(pk).get("nonce", 0)
tx = Transaction(sender=pk, receiver=receiver, amount=amount, nonce=nonce, fee=fee, data=payload)
tx = Transaction(sender=pk, receiver=receiver, amount=amount, nonce=nonce, fee=fee, data=payload, chain_id=chain.chain_id)
tx.sign(sk)

if mempool.add_transaction(tx):
Expand All @@ -323,19 +399,14 @@ async def cli_loop(sk, pk, chain, mempool, network):
# ── connect ──
elif cmd == "connect":
if len(parts) < 2:
print(" Usage: connect <host>:<port>")
print(" Usage: connect <multiaddress>")
continue
try:
host, port_str = parts[1].rsplit(":", 1)
port = int(port_str)
except ValueError:
print(" Invalid format. Use host:port")
continue
success = await network.connect_to_peer(host, port)
maddr_str = parts[1]
success = await network.connect_to_peer(maddr_str)
if success:
print(f" Connected to {host}:{port}")
print(f" Attempting to dial {maddr_str}...")
else:
print(f" Failed to connect to {host}:{port}")
print(f" Failed to initiate connection to {maddr_str}")

# ── address ──
elif cmd == "address":
Expand Down Expand Up @@ -389,23 +460,34 @@ async def run_node(port: int, host: str, connect_to: str | None, fund: int, data
mempool = Mempool()
network = P2PNetwork()

handler = make_network_handler(chain, mempool)
handler = make_network_handler(chain, mempool, network)
network.register_handler(handler)

rpc_server = JSONRPCServer(chain, mempool, network)

# When a new peer connects, send our state so they can sync
# When a new peer connects, send our hello so they can handshake
async def on_peer_connected(writer):
import json as _json
sync_msg = _json.dumps({
"type": "sync",
"data": {"accounts": chain.state.accounts}
"type": "hello",
"data": {
"chain_id": chain.chain_id,
"genesis_hash": chain.chain[0].hash,
"latest_block_index": chain.last_block.index,
"latest_block_hash": chain.last_block.hash
}
}) + "\n"
writer.write(sync_msg.encode())
await writer.drain()
logger.info("🔄 Sent state sync to new peer")
logger.info("🔄 Sent hello handshake to new peer")

network.register_on_peer_connected(on_peer_connected)

await network.start(port=port, host=host)

# Start RPC server on a port correlated to the node port (e.g. 8545 if P2P is 9000)
rpc_port = 8545 + (port - 9000)
await rpc_server.start(host="127.0.0.1", port=rpc_port)

# Fund this node's wallet so it can transact in the demo
if fund > 0:
Expand All @@ -414,11 +496,7 @@ async def on_peer_connected(writer):

# Connect to a seed peer if requested
if connect_to:
try:
host, peer_port = connect_to.rsplit(":", 1)
await network.connect_to_peer(host, int(peer_port))
except ValueError:
logger.error("Invalid --connect format. Use host:port")
await network.connect_to_peer(connect_to)

try:
await cli_loop(sk, pk, chain, mempool, network)
Expand All @@ -431,14 +509,16 @@ async def on_peer_connected(writer):
logger.info("Chain saved to '%s'", datadir)
except Exception as e:
logger.error("Failed to save chain during shutdown: %s", e)

await rpc_server.stop()
await network.stop()


def main():
parser = argparse.ArgumentParser(description="MiniChain Node — Testnet Demo")
parser.add_argument("--host", type=str, default="127.0.0.1", help="Host/IP to bind the P2P server (default: 127.0.0.1)")
parser.add_argument("--port", type=int, default=9000, help="TCP port to listen on (default: 9000)")
parser.add_argument("--connect", type=str, default=None, help="Peer address to connect to (host:port)")
parser.add_argument("--connect", type=str, default=None, help="Peer address to connect to (multiaddr)")
parser.add_argument("--fund", type=int, default=100, help="Initial coins to fund this wallet (default: 100)")
parser.add_argument("--datadir", type=str, default=None, help="Directory to save/load blockchain state (enables persistence)")
args = parser.parse_args()
Expand Down
Loading