diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index b19de09d70..6d7c70a75c 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -2412,12 +2412,17 @@ struct controller_impl { return; } + // wire latency is block_timestamp -> first network arrival at this node (received_time, set by net_plugin), + // distinct from latency which is block_timestamp -> apply complete. Their difference is the local apply-queue + // / in-producing-mode delay. wire latency is 0 when received_time is unset (replay, loaded from disk). + const auto& received_time = chain_head.internal()->received_time; ilog("Received block {}... #{} @ {} signed by {} " // "Received" instead of "Applied" so it matches existing log output - "[trxs: {}, lib: {}, net: {}, cpu: {} us, elapsed: {} us, applying time: {} us, latency: {} ms]", + "[trxs: {}, lib: {}, net: {}, cpu: {} us, elapsed: {} us, applying time: {} us, wire latency: {} ms, latency: {} ms]", chain_head.id().short_id(), chain_head.block_num(), chain_head.timestamp(), chain_head.producer(), chain_head.block()->transactions.size(), chain_head.irreversible_blocknum(), - br.total_net_usage, br.total_cpu_usage_us, - br.total_elapsed_time, now - br.start_time, (now - chain_head.timestamp()).count() / 1000); + br.total_net_usage, br.total_cpu_usage_us, br.total_elapsed_time, now - br.start_time, + received_time != fc::time_point() ? (received_time - chain_head.block_time()).count() / 1000 : 0, + (now - chain_head.timestamp()).count() / 1000); if (_update_incoming_block_metrics) { _update_incoming_block_metrics({.trxs_incoming_total = chain_head.block()->transactions.size(), @@ -2886,7 +2891,8 @@ struct controller_impl { // thread safe, expected to be called from thread other than the main thread // tuple - controller::accepted_block_result create_block_state_i( const block_id_type& id, const signed_block_ptr& b, const block_state& prev ) { + controller::accepted_block_result create_block_state_i( const block_id_type& id, const signed_block_ptr& b, const block_state& prev, + fc::time_point received_time ) { std::optional qc = verify_basic_block_invariants(id, b, prev); log_and_drop_future verify_qc_future; if (qc) { @@ -2917,6 +2923,8 @@ struct controller_impl { SYS_ASSERT( id == bsp->id(), block_validate_exception, "provided id {} does not match block id {}", id, bsp->id() ); + bsp->received_time = received_time; + assert(!!qc == verify_qc_future.valid()); if (qc) { verify_qc_future.get(); @@ -2940,7 +2948,8 @@ struct controller_impl { } // thread safe, expected to be called from thread other than the main thread - controller::accepted_block_result create_block_handle( const block_id_type& id, const signed_block_ptr& b ) { + controller::accepted_block_result create_block_handle( const block_id_type& id, const signed_block_ptr& b, + fc::time_point received_time ) { SYS_ASSERT( b, block_validate_exception, "null block" ); if (auto bsp = fork_db_.get_block(id, include_root_t::yes)) @@ -2950,7 +2959,7 @@ struct controller_impl { if( !prev ) return controller::accepted_block_result{.add_result = fork_db_add_t::failure, .block{}}; - return create_block_state_i( id, b, *prev ); + return create_block_state_i( id, b, *prev, received_time ); } // thread safe, QC already verified by verify_proper_block_exts @@ -3955,8 +3964,9 @@ boost::asio::io_context& controller::get_thread_pool() { return my->thread_pool.get_executor(); } -controller::accepted_block_result controller::accept_block( const block_id_type& id, const signed_block_ptr& b ) const { - return my->create_block_handle( id, b ); +controller::accepted_block_result controller::accept_block( const block_id_type& id, const signed_block_ptr& b, + fc::time_point received_time ) const { + return my->create_block_handle( id, b, received_time ); } transaction_trace_ptr controller::push_transaction( const transaction_metadata_ptr& trx, diff --git a/libraries/chain/include/sysio/chain/block_handle.hpp b/libraries/chain/include/sysio/chain/block_handle.hpp index fcd3d60c86..f94239425f 100644 --- a/libraries/chain/include/sysio/chain/block_handle.hpp +++ b/libraries/chain/include/sysio/chain/block_handle.hpp @@ -37,6 +37,11 @@ struct block_handle { void write(const std::filesystem::path& state_file); bool read(const std::filesystem::path& state_file); + + // Returns true if `id` is in this block's ancestry (or is this block itself within the finality_core's tracking range). + bool extends(const block_id_type& id) const { + return _bsp && _bsp->core.extends(id); + } }; } // namespace sysio::chain diff --git a/libraries/chain/include/sysio/chain/block_state.hpp b/libraries/chain/include/sysio/chain/block_state.hpp index 6e6442f59c..8a0c9afe5c 100644 --- a/libraries/chain/include/sysio/chain/block_state.hpp +++ b/libraries/chain/include/sysio/chain/block_state.hpp @@ -75,6 +75,8 @@ struct block_state : public block_header_state { // block_header_state provi weak_digest_t weak_digest; // finalizer_digest (weak, cached so we can quickly validate votes) aggregating_qc_t aggregating_qc; // where we accumulate votes we receive std::optional valid; + fc::time_point received_time; // wall-clock time the block was first received off the wire by net_plugin; + // default-initialized for blocks that were produced locally or loaded from disk // ------ updated for votes, used for fork_db ordering ------------------------------ private: diff --git a/libraries/chain/include/sysio/chain/controller.hpp b/libraries/chain/include/sysio/chain/controller.hpp index ad2140295e..ad59d85859 100644 --- a/libraries/chain/include/sysio/chain/controller.hpp +++ b/libraries/chain/include/sysio/chain/controller.hpp @@ -238,7 +238,11 @@ namespace sysio::chain { std::optional block; // empty optional if block is unlinkable }; // thread-safe - accepted_block_result accept_block( const block_id_type& id, const signed_block_ptr& b ) const; + // received_time, if set, is the wall-clock time the block was first received off the wire; recorded on the + // block_state for diagnostic logging. Default-init for callers that aren't network-receiving the block (replay, + // tests, locally-produced blocks). + accepted_block_result accept_block( const block_id_type& id, const signed_block_ptr& b, + fc::time_point received_time = fc::time_point() ) const; /// Apply any blocks that are ready from the fork_db struct apply_blocks_result_t { diff --git a/plugins/net_plugin/src/net_plugin.cpp b/plugins/net_plugin/src/net_plugin.cpp index 0208c60710..4e158b088f 100644 --- a/plugins/net_plugin/src/net_plugin.cpp +++ b/plugins/net_plugin/src/net_plugin.cpp @@ -932,7 +932,7 @@ namespace sysio { void handle_message( const block_nack_request_message& msg ); void handle_message( const sync_request_message& msg ); void handle_message( const signed_block& msg ) = delete; // signed_block_ptr overload used instead - void handle_message( const block_id_type& id, signed_block_ptr ptr ); + void handle_message( const block_id_type& id, signed_block_ptr ptr, fc::time_point received_time ); void handle_message( const transaction_message& msg ) = delete; // handled via process_next_trx_message void handle_message( const packed_transaction_ptr& trx ); void handle_message( const vote_message_ptr& msg ); @@ -3222,7 +3222,7 @@ namespace sysio { shared_ptr ptr = std::make_shared(); fc::raw::unpack( ds, *ptr ); - handle_message( blk_id, std::move( ptr ) ); + handle_message( blk_id, std::move( ptr ), now ); return true; } @@ -3795,9 +3795,18 @@ namespace sysio { std::chrono::nanoseconds rec{msg.rec}; int64_t offset = (double((rec - org).count()) + double(msg_xmt.count() - msg.dst)) / 2.0; + // Outbound = our send to peer's receive (org -> rec). Inbound = peer's reply to our handle (xmt -> dst). + // Sign of offset shows direction; magnitude is the asymmetry. Possible causes: (a) our send queue is backed + // up (large outbound); (b) our connection-strand processing is behind real-time (large inbound); (c) actual + // clock skew between this node and the peer (NTP drift) which the offset formula assumes is symmetric and + // would attribute to either direction. Persistent same-sign offsets across many peers point at (a)/(b); + // a single peer drifting alone points at (c). if (std::abs(offset) > block_interval_ns) { - peer_wlog(p2p_msg_log, this, "Clock offset is {}us, calculation: (rec {} - org {} + xmt {} - dst {})/2", - offset / 1000, rec.count(), org.count(), msg_xmt.count(), msg.dst); + const int64_t outbound_ms = (rec.count() - org.count()) / 1'000'000; + const int64_t inbound_ms = (msg.dst - msg_xmt.count()) / 1'000'000; + peer_wlog(p2p_msg_log, this, + "Peer message latency asymmetry or clock skew: outbound {} ms, inbound {} ms, offset {} ms", + outbound_ms, inbound_ms, offset / 1'000'000); } } org = std::chrono::nanoseconds{0}; @@ -3891,9 +3900,18 @@ namespace sysio { if (before_lib || my_impl->dispatcher.have_block(msg.id)) { if (block_num - 1 == block_header::num_from_id(last_block_nack)) { + // log when consecutive nacks cross the threshold and bcast_block switches us to notice-only for this peer + if (consecutive_blocks_nacks == consecutive_block_nacks_threshold) { + peer_ilog(p2p_blk_log, this, "switching to block_notice mode (peer ahead of us, consecutive_nacks={})", + consecutive_blocks_nacks + 1); + } ++consecutive_blocks_nacks; adjust_peer_score(peer_scoring::block_nack); } else { + if (consecutive_blocks_nacks > consecutive_block_nacks_threshold) { + peer_ilog(p2p_blk_log, this, "resuming full block broadcast (consecutive_nacks reset from {})", + consecutive_blocks_nacks); + } consecutive_blocks_nacks = 0; } if (!before_lib) { @@ -4071,10 +4089,10 @@ namespace sysio { } // called from connection strand - void connection::handle_message( const block_id_type& id, signed_block_ptr ptr ) { + void connection::handle_message( const block_id_type& id, signed_block_ptr ptr, fc::time_point received_time ) { // post to dispatcher strand so that we don't have multiple threads validating the block header peer_dlog(p2p_blk_log, this, "posting block {} to dispatcher strand", ptr->block_num()); - boost::asio::dispatch(my_impl->dispatcher.strand, [id, c{shared_from_this()}, ptr{std::move(ptr)}, cid=connection_id]() mutable { + boost::asio::dispatch(my_impl->dispatcher.strand, [id, c{shared_from_this()}, ptr{std::move(ptr)}, cid=connection_id, received_time]() mutable { if (app().is_quiting()) // large sync span can have many of these queued up, exit quickly return; controller& cc = my_impl->chain_plug->chain(); @@ -4090,7 +4108,7 @@ namespace sysio { "received a block from the future, rejecting it: {}", id); } // this will return empty optional if block is not linkable - controller::accepted_block_result abh = cc.accept_block( id, ptr ); + controller::accepted_block_result abh = cc.accept_block( id, ptr, received_time ); fork_db_add_result = abh.add_result; obh = std::move(abh.block); unlinkable = fork_db_add_result == fork_db_add_t::failure; diff --git a/plugins/producer_plugin/src/producer_plugin.cpp b/plugins/producer_plugin/src/producer_plugin.cpp index 59a577f612..b71e48b088 100644 --- a/plugins/producer_plugin/src/producer_plugin.cpp +++ b/plugins/producer_plugin/src/producer_plugin.cpp @@ -710,6 +710,14 @@ class producer_plugin_impl : public std::enable_shared_from_this _producers; chain::db_read_mode _db_read_mode = db_read_mode::HEAD; pending_block_mode _pending_block_mode = pending_block_mode::speculating; + // Tracks blocks signed during the current producing round so we can summarize on-head-vs-orphaned at round exit. + struct producing_round_state { + account_name producer; + block_timestamp_type round_start; + block_num_type first_block_num{}; + std::vector signed_blocks; + }; + std::optional _producing_round; unapplied_transaction_queue _unapplied_transactions; alignas(hardware_destructive_interference_sz) std::atomic _max_transaction_time_ms; // modified by app thread, read by net_plugin thread pool @@ -2265,6 +2273,34 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { _time_tracker.clear(); // make sure we start tracking block time after `apply_blocks()` + // Round transition diagnostics: at round-start slots, summarize the previous round (if any) and log entry for the + // new one. On-head check uses head.extends, valid for blocks still above LIB which is true at next-round-start; + // sub-second fork switches after round end could shift the count by a block or two. + const bool round_start_slot = + (block_timestamp_type(block_time).slot % chain::config::producer_repetitions) == 0; + if (round_start_slot) { + if (_producing_round) { + size_t on_head = 0; + for (const auto& id : _producing_round->signed_blocks) { + if (head.id() == id || head.extends(id)) + ++on_head; + } + const auto signed_count = _producing_round->signed_blocks.size(); + ilog("Round complete for {} starting #{} at {}: signed {}, on head {}, orphaned {}", + _producing_round->producer, _producing_round->first_block_num, _producing_round->round_start, + signed_count, on_head, signed_count - on_head); + _producing_round.reset(); + } + if (in_producing_mode()) { + const auto fhead = chain.fork_db_head(); + ilog("Entering producing round for {} at {}: head #{} {}, fhead #{} {}, {} blocks unapplied", + scheduled_producer.producer_name, block_time, head.block_num(), head.id().short_id(), + fhead.block_num(), fhead.id().short_id(), fhead.block_num() - head.block_num()); + _producing_round.emplace(scheduled_producer.producer_name, block_time, head.block_num() + 1); + _producing_round->signed_blocks.reserve(config::producer_repetitions); + } + } + const auto& preprocess_deadline = _pending_block_deadline; const block_num_type head_block_num = head.block_num(); @@ -2959,6 +2995,8 @@ void producer_plugin_impl::produce_block() { chain.commit_block(); const signed_block_ptr new_b = chain.head().block(); + if (_producing_round) + _producing_round->signed_blocks.push_back(chain.head().id()); fc::time_point now = fc::time_point::now(); _time_tracker.add_other_time(now); _time_tracker.report(new_b->block_num(), new_b->producer, now);