Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2412,12 +2412,17 @@ struct controller_impl {
return;
}

// wire latency is block_timestamp -> first network arrival at this node (received_time, set by net_plugin),
// distinct from latency which is block_timestamp -> apply complete. Their difference is the local apply-queue
// / in-producing-mode delay. wire latency is 0 when received_time is unset (replay, loaded from disk).
const auto& received_time = chain_head.internal()->received_time;
ilog("Received block {}... #{} @ {} signed by {} " // "Received" instead of "Applied" so it matches existing log output
"[trxs: {}, lib: {}, net: {}, cpu: {} us, elapsed: {} us, applying time: {} us, latency: {} ms]",
"[trxs: {}, lib: {}, net: {}, cpu: {} us, elapsed: {} us, applying time: {} us, wire latency: {} ms, latency: {} ms]",
chain_head.id().short_id(), chain_head.block_num(), chain_head.timestamp(), chain_head.producer(),
chain_head.block()->transactions.size(), chain_head.irreversible_blocknum(),
br.total_net_usage, br.total_cpu_usage_us,
br.total_elapsed_time, now - br.start_time, (now - chain_head.timestamp()).count() / 1000);
br.total_net_usage, br.total_cpu_usage_us, br.total_elapsed_time, now - br.start_time,
received_time != fc::time_point() ? (received_time - chain_head.block_time()).count() / 1000 : 0,
(now - chain_head.timestamp()).count() / 1000);

if (_update_incoming_block_metrics) {
_update_incoming_block_metrics({.trxs_incoming_total = chain_head.block()->transactions.size(),
Expand Down Expand Up @@ -2886,7 +2891,8 @@ struct controller_impl {

// thread safe, expected to be called from thread other than the main thread
// tuple<bool best_head, block_handle new_block_handle>
controller::accepted_block_result create_block_state_i( const block_id_type& id, const signed_block_ptr& b, const block_state& prev ) {
controller::accepted_block_result create_block_state_i( const block_id_type& id, const signed_block_ptr& b, const block_state& prev,
fc::time_point received_time ) {
std::optional<qc_t> qc = verify_basic_block_invariants(id, b, prev);
log_and_drop_future<void> verify_qc_future;
if (qc) {
Expand Down Expand Up @@ -2917,6 +2923,8 @@ struct controller_impl {
SYS_ASSERT( id == bsp->id(), block_validate_exception,
"provided id {} does not match block id {}", id, bsp->id() );

bsp->received_time = received_time;

assert(!!qc == verify_qc_future.valid());
if (qc) {
verify_qc_future.get();
Expand All @@ -2940,7 +2948,8 @@ struct controller_impl {
}

// thread safe, expected to be called from thread other than the main thread
controller::accepted_block_result create_block_handle( const block_id_type& id, const signed_block_ptr& b ) {
controller::accepted_block_result create_block_handle( const block_id_type& id, const signed_block_ptr& b,
fc::time_point received_time ) {
SYS_ASSERT( b, block_validate_exception, "null block" );

if (auto bsp = fork_db_.get_block(id, include_root_t::yes))
Expand All @@ -2950,7 +2959,7 @@ struct controller_impl {
if( !prev )
return controller::accepted_block_result{.add_result = fork_db_add_t::failure, .block{}};

return create_block_state_i( id, b, *prev );
return create_block_state_i( id, b, *prev, received_time );
}

// thread safe, QC already verified by verify_proper_block_exts
Expand Down Expand Up @@ -3955,8 +3964,9 @@ boost::asio::io_context& controller::get_thread_pool() {
return my->thread_pool.get_executor();
}

controller::accepted_block_result controller::accept_block( const block_id_type& id, const signed_block_ptr& b ) const {
return my->create_block_handle( id, b );
controller::accepted_block_result controller::accept_block( const block_id_type& id, const signed_block_ptr& b,
fc::time_point received_time ) const {
return my->create_block_handle( id, b, received_time );
}

transaction_trace_ptr controller::push_transaction( const transaction_metadata_ptr& trx,
Expand Down
5 changes: 5 additions & 0 deletions libraries/chain/include/sysio/chain/block_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ struct block_handle {

void write(const std::filesystem::path& state_file);
bool read(const std::filesystem::path& state_file);

// Returns true if `id` is in this block's ancestry (or is this block itself within the finality_core's tracking range).
bool extends(const block_id_type& id) const {
return _bsp && _bsp->core.extends(id);
}
};

} // namespace sysio::chain
Expand Down
2 changes: 2 additions & 0 deletions libraries/chain/include/sysio/chain/block_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ struct block_state : public block_header_state { // block_header_state provi
weak_digest_t weak_digest; // finalizer_digest (weak, cached so we can quickly validate votes)
aggregating_qc_t aggregating_qc; // where we accumulate votes we receive
std::optional<valid_t> valid;
fc::time_point received_time; // wall-clock time the block was first received off the wire by net_plugin;
// default-initialized for blocks that were produced locally or loaded from disk

// ------ updated for votes, used for fork_db ordering ------------------------------
private:
Expand Down
6 changes: 5 additions & 1 deletion libraries/chain/include/sysio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,11 @@ namespace sysio::chain {
std::optional<block_handle> block; // empty optional if block is unlinkable
};
// thread-safe
accepted_block_result accept_block( const block_id_type& id, const signed_block_ptr& b ) const;
// received_time, if set, is the wall-clock time the block was first received off the wire; recorded on the
// block_state for diagnostic logging. Default-init for callers that aren't network-receiving the block (replay,
// tests, locally-produced blocks).
accepted_block_result accept_block( const block_id_type& id, const signed_block_ptr& b,
fc::time_point received_time = fc::time_point() ) const;

/// Apply any blocks that are ready from the fork_db
struct apply_blocks_result_t {
Expand Down
32 changes: 25 additions & 7 deletions plugins/net_plugin/src/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ namespace sysio {
void handle_message( const block_nack_request_message& msg );
void handle_message( const sync_request_message& msg );
void handle_message( const signed_block& msg ) = delete; // signed_block_ptr overload used instead
void handle_message( const block_id_type& id, signed_block_ptr ptr );
void handle_message( const block_id_type& id, signed_block_ptr ptr, fc::time_point received_time );
void handle_message( const transaction_message& msg ) = delete; // handled via process_next_trx_message
void handle_message( const packed_transaction_ptr& trx );
void handle_message( const vote_message_ptr& msg );
Expand Down Expand Up @@ -3222,7 +3222,7 @@ namespace sysio {
shared_ptr<signed_block> ptr = std::make_shared<signed_block>();
fc::raw::unpack( ds, *ptr );

handle_message( blk_id, std::move( ptr ) );
handle_message( blk_id, std::move( ptr ), now );
return true;
}

Expand Down Expand Up @@ -3795,9 +3795,18 @@ namespace sysio {
std::chrono::nanoseconds rec{msg.rec};
int64_t offset = (double((rec - org).count()) + double(msg_xmt.count() - msg.dst)) / 2.0;

// Outbound = our send to peer's receive (org -> rec). Inbound = peer's reply to our handle (xmt -> dst).
// Sign of offset shows direction; magnitude is the asymmetry. Possible causes: (a) our send queue is backed
// up (large outbound); (b) our connection-strand processing is behind real-time (large inbound); (c) actual
// clock skew between this node and the peer (NTP drift) which the offset formula assumes is symmetric and
// would attribute to either direction. Persistent same-sign offsets across many peers point at (a)/(b);
// a single peer drifting alone points at (c).
if (std::abs(offset) > block_interval_ns) {
peer_wlog(p2p_msg_log, this, "Clock offset is {}us, calculation: (rec {} - org {} + xmt {} - dst {})/2",
offset / 1000, rec.count(), org.count(), msg_xmt.count(), msg.dst);
const int64_t outbound_ms = (rec.count() - org.count()) / 1'000'000;
const int64_t inbound_ms = (msg.dst - msg_xmt.count()) / 1'000'000;
peer_wlog(p2p_msg_log, this,
"Peer message latency asymmetry or clock skew: outbound {} ms, inbound {} ms, offset {} ms",
outbound_ms, inbound_ms, offset / 1'000'000);
}
}
org = std::chrono::nanoseconds{0};
Expand Down Expand Up @@ -3891,9 +3900,18 @@ namespace sysio {

if (before_lib || my_impl->dispatcher.have_block(msg.id)) {
if (block_num - 1 == block_header::num_from_id(last_block_nack)) {
// log when consecutive nacks cross the threshold and bcast_block switches us to notice-only for this peer
if (consecutive_blocks_nacks == consecutive_block_nacks_threshold) {
peer_ilog(p2p_blk_log, this, "switching to block_notice mode (peer ahead of us, consecutive_nacks={})",
consecutive_blocks_nacks + 1);
}
++consecutive_blocks_nacks;
adjust_peer_score(peer_scoring::block_nack);
} else {
if (consecutive_blocks_nacks > consecutive_block_nacks_threshold) {
peer_ilog(p2p_blk_log, this, "resuming full block broadcast (consecutive_nacks reset from {})",
consecutive_blocks_nacks);
}
consecutive_blocks_nacks = 0;
}
if (!before_lib) {
Expand Down Expand Up @@ -4071,10 +4089,10 @@ namespace sysio {
}

// called from connection strand
void connection::handle_message( const block_id_type& id, signed_block_ptr ptr ) {
void connection::handle_message( const block_id_type& id, signed_block_ptr ptr, fc::time_point received_time ) {
// post to dispatcher strand so that we don't have multiple threads validating the block header
peer_dlog(p2p_blk_log, this, "posting block {} to dispatcher strand", ptr->block_num());
boost::asio::dispatch(my_impl->dispatcher.strand, [id, c{shared_from_this()}, ptr{std::move(ptr)}, cid=connection_id]() mutable {
boost::asio::dispatch(my_impl->dispatcher.strand, [id, c{shared_from_this()}, ptr{std::move(ptr)}, cid=connection_id, received_time]() mutable {
if (app().is_quiting()) // large sync span can have many of these queued up, exit quickly
return;
controller& cc = my_impl->chain_plug->chain();
Expand All @@ -4090,7 +4108,7 @@ namespace sysio {
"received a block from the future, rejecting it: {}", id);
}
// this will return empty optional<block_handle> if block is not linkable
controller::accepted_block_result abh = cc.accept_block( id, ptr );
controller::accepted_block_result abh = cc.accept_block( id, ptr, received_time );
fork_db_add_result = abh.add_result;
obh = std::move(abh.block);
unlinkable = fork_db_add_result == fork_db_add_t::failure;
Expand Down
38 changes: 38 additions & 0 deletions plugins/producer_plugin/src/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,14 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
std::set<chain::account_name> _producers;
chain::db_read_mode _db_read_mode = db_read_mode::HEAD;
pending_block_mode _pending_block_mode = pending_block_mode::speculating;
// Tracks blocks signed during the current producing round so we can summarize on-head-vs-orphaned at round exit.
struct producing_round_state {
account_name producer;
block_timestamp_type round_start;
block_num_type first_block_num{};
std::vector<block_id_type> signed_blocks;
};
std::optional<producing_round_state> _producing_round;
unapplied_transaction_queue _unapplied_transactions;
alignas(hardware_destructive_interference_sz)
std::atomic<int32_t> _max_transaction_time_ms; // modified by app thread, read by net_plugin thread pool
Expand Down Expand Up @@ -2265,6 +2273,34 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {

_time_tracker.clear(); // make sure we start tracking block time after `apply_blocks()`

// Round transition diagnostics: at round-start slots, summarize the previous round (if any) and log entry for the
// new one. On-head check uses head.extends, valid for blocks still above LIB which is true at next-round-start;
// sub-second fork switches after round end could shift the count by a block or two.
const bool round_start_slot =
(block_timestamp_type(block_time).slot % chain::config::producer_repetitions) == 0;
if (round_start_slot) {
if (_producing_round) {
size_t on_head = 0;
for (const auto& id : _producing_round->signed_blocks) {
if (head.id() == id || head.extends(id))
++on_head;
}
const auto signed_count = _producing_round->signed_blocks.size();
ilog("Round complete for {} starting #{} at {}: signed {}, on head {}, orphaned {}",
_producing_round->producer, _producing_round->first_block_num, _producing_round->round_start,
signed_count, on_head, signed_count - on_head);
_producing_round.reset();
}
if (in_producing_mode()) {
const auto fhead = chain.fork_db_head();
ilog("Entering producing round for {} at {}: head #{} {}, fhead #{} {}, {} blocks unapplied",
scheduled_producer.producer_name, block_time, head.block_num(), head.id().short_id(),
fhead.block_num(), fhead.id().short_id(), fhead.block_num() - head.block_num());
_producing_round.emplace(scheduled_producer.producer_name, block_time, head.block_num() + 1);
_producing_round->signed_blocks.reserve(config::producer_repetitions);
}
}

const auto& preprocess_deadline = _pending_block_deadline;

const block_num_type head_block_num = head.block_num();
Expand Down Expand Up @@ -2959,6 +2995,8 @@ void producer_plugin_impl::produce_block() {
chain.commit_block();

const signed_block_ptr new_b = chain.head().block();
if (_producing_round)
_producing_round->signed_blocks.push_back(chain.head().id());
fc::time_point now = fc::time_point::now();
_time_tracker.add_other_time(now);
_time_tracker.report(new_b->block_num(), new_b->producer, now);
Expand Down