diff --git a/libraries/chain/authorization_manager.cpp b/libraries/chain/authorization_manager.cpp index b7d4ae18fc..afc06f0e5d 100644 --- a/libraries/chain/authorization_manager.cpp +++ b/libraries/chain/authorization_manager.cpp @@ -104,6 +104,7 @@ namespace sysio { namespace chain { using section_t = typename decltype(utils)::index_t::value_type; snapshot->read_section([this, &read_row_count]( auto& section ) { + decltype(utils)::preallocate(_db, section.row_count()); bool more = !section.empty(); while(more) { decltype(utils)::create(_db, [this, §ion, &more]( auto &row ) { diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index ac3222d3e1..cb5fd37dc7 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -1560,6 +1560,7 @@ struct controller_impl { using utils_t = decltype(utils); using value_t = typename decltype(utils)::index_t::value_type; snapshot->read_section([this, &read_row_count]( auto& section ) { + utils_t::preallocate(db, section.row_count()); bool more = !section.empty(); while (more) { utils_t::create(db, [this, §ion, &more](auto& row) { @@ -1682,6 +1683,7 @@ struct controller_impl { // TODO: } snapshot->read_section([this,&rows_loaded]( auto& section ) { + decltype(utils)::preallocate(db, section.row_count()); bool more = !section.empty(); while(more) { decltype(utils)::create(db, [this, §ion, &more]( auto &row ) { diff --git a/libraries/chain/include/sysio/chain/snapshot.hpp b/libraries/chain/include/sysio/chain/snapshot.hpp index 65fc2f9239..9dd5409aa5 100644 --- a/libraries/chain/include/sysio/chain/snapshot.hpp +++ b/libraries/chain/include/sysio/chain/snapshot.hpp @@ -320,6 +320,10 @@ namespace sysio { namespace chain { return _reader.empty(); } + size_t row_count() const { + return _reader.section_row_count(); + } + private: friend class snapshot_reader; section_reader(snapshot_reader& _reader) @@ -348,6 +352,7 @@ namespace sysio { namespace chain { virtual void return_to_header() = 0; virtual size_t total_row_count() = 0; + virtual size_t section_row_count() const = 0; virtual bool supports_threading() const {return false;} @@ -391,6 +396,7 @@ namespace sysio { namespace chain { void clear_section() override; void return_to_header() override; size_t total_row_count() override; + size_t section_row_count() const override; bool has_section( const std::string& section_name ) const override; private: @@ -494,6 +500,7 @@ namespace sysio { namespace chain { void clear_section() override; void return_to_header() override; size_t total_row_count() override; + size_t section_row_count() const override; private: bool validate_section() const; @@ -519,6 +526,7 @@ namespace sysio { namespace chain { void clear_section() override; void return_to_header() override; size_t total_row_count() override; + size_t section_row_count() const override; bool supports_threading() const override {return true;} bool has_section( const std::string& section_name ) const override; diff --git a/libraries/chain/resource_limits.cpp b/libraries/chain/resource_limits.cpp index 16ea7627f5..88bb38396d 100644 --- a/libraries/chain/resource_limits.cpp +++ b/libraries/chain/resource_limits.cpp @@ -94,6 +94,7 @@ void resource_limits_manager::add_to_snapshot( const snapshot_writer_ptr& snapsh void resource_limits_manager::read_from_snapshot( const snapshot_reader_ptr& snapshot, std::atomic_size_t& read_row_count, boost::asio::io_context& ctx ) { resource_index_set::walk_indices_via_post(ctx, [this, &snapshot, &read_row_count]( auto utils ){ snapshot->read_section([this, &read_row_count]( auto& section ) { + decltype(utils)::preallocate(_db, section.row_count()); bool more = !section.empty(); while(more) { decltype(utils)::create(_db, [this, §ion, &more]( auto &row ) { diff --git a/libraries/chain/snapshot.cpp b/libraries/chain/snapshot.cpp index 2e6c0f2891..cfa27e81a6 100644 --- a/libraries/chain/snapshot.cpp +++ b/libraries/chain/snapshot.cpp @@ -143,6 +143,11 @@ void variant_snapshot_reader::return_to_header() { clear_section(); } +size_t variant_snapshot_reader::section_row_count() const { + if (!cur_section) return 0; + return (*cur_section)["rows"].get_array().size(); +} + size_t variant_snapshot_reader::total_row_count() { size_t total = 0; @@ -436,6 +441,10 @@ void istream_json_snapshot_reader::return_to_header() { clear_section(); } +size_t istream_json_snapshot_reader::section_row_count() const { + return impl->num_rows; +} + size_t istream_json_snapshot_reader::total_row_count() { size_t total = 0; @@ -610,6 +619,10 @@ void threaded_snapshot_reader::return_to_header() { clear_section(); } +size_t threaded_snapshot_reader::section_row_count() const { + return num_rows; +} + size_t threaded_snapshot_reader::total_row_count() { SYS_ASSERT(index_loaded_, snapshot_exception, "Snapshot index must be loaded before querying row count"); diff --git a/libraries/chaindb/include/chainbase/chainbase.hpp b/libraries/chaindb/include/chainbase/chainbase.hpp index ad5f03129d..7b1a609a32 100644 --- a/libraries/chaindb/include/chainbase/chainbase.hpp +++ b/libraries/chaindb/include/chainbase/chainbase.hpp @@ -132,34 +132,13 @@ namespace chainbase { template using generic_index = multi_index_to_undo_index; - class abstract_session { - public: - virtual ~abstract_session(){}; - virtual void push() = 0; - virtual void squash() = 0; - virtual void undo() = 0; - }; - - template - class session_impl : public abstract_session - { - public: - session_impl( SessionType&& s ):_session( std::move( s ) ){} - - virtual void push() override { _session.push(); } - virtual void squash() override{ _session.squash(); } - virtual void undo() override { _session.undo(); } - private: - SessionType _session; - }; - class abstract_index { public: abstract_index( void* i ):_idx_ptr(i){} virtual ~abstract_index(){} virtual void set_revision( uint64_t revision ) = 0; - virtual unique_ptr start_undo_session( bool enabled ) = 0; + virtual void add_undo_session() = 0; virtual int64_t revision()const = 0; virtual void undo()const = 0; @@ -184,9 +163,7 @@ namespace chainbase { public: index_impl( BaseIndex& base ):abstract_index( &base ),_base(base){} - virtual unique_ptr start_undo_session( bool enabled ) override { - return unique_ptr(new session_impl( _base.start_undo_session( enabled ) ) ); - } + virtual void add_undo_session() override { _base.add_session(); } virtual void set_revision( uint64_t revision ) override { _base.set_revision( revision ); } virtual int64_t revision()const override { return _base.revision(); } @@ -273,38 +250,36 @@ namespace chainbase { struct session { public: - session( session&& s ):_index_sessions( std::move(s._index_sessions) ){} - session( vector>&& s ):_index_sessions( std::move(s) ) - { - } + session( const session& ) = delete; + session& operator=( const session& ) = delete; + + session( session&& s ) noexcept : _db(s._db), _apply(s._apply) { s._apply = false; } + explicit session( database& db ) : _db(&db), _apply(true) {} ~session() { undo(); } - void push() - { - for( auto& i : _index_sessions ) i->push(); - _index_sessions.clear(); - } - - void squash() - { - for( auto& i : _index_sessions ) i->squash(); - _index_sessions.clear(); + session& operator=(session&& s) noexcept { + if (this != &s) { + undo(); + _db = s._db; + _apply = s._apply; + s._apply = false; + } + return *this; } - void undo() - { - for( auto& i : _index_sessions ) i->undo(); - _index_sessions.clear(); - } + void push() { _apply = false; } + void squash() { if (_apply) _db->squash_from_session(); _apply = false; } + void undo() { if (_apply) _db->undo_from_session(); _apply = false; } private: friend class database; - session(){} + session() : _db(nullptr), _apply(false) {} - vector< std::unique_ptr > _index_sessions; + database* _db; + bool _apply; }; session start_undo_session( bool enabled ); @@ -559,6 +534,13 @@ namespace chainbase { } private: + // Session cleanup must work even when _read_only_mode is true (e.g. SIGTERM + // during a read window while a block-building session is still alive). + // The old per-index session design bypassed the read_only_mode check; these + // methods preserve that behavior. + void undo_from_session(); + void squash_from_session(); + pinnable_mapped_file _db_file; bool _read_only = false; diff --git a/libraries/chaindb/include/chainbase/shared_cow_string.hpp b/libraries/chaindb/include/chainbase/shared_cow_string.hpp index 32e139f0c3..163f9d593d 100644 --- a/libraries/chaindb/include/chainbase/shared_cow_string.hpp +++ b/libraries/chaindb/include/chainbase/shared_cow_string.hpp @@ -203,7 +203,7 @@ namespace chainbase { void dec_refcount(Alloc&& alloc) { if (_data && --_data->reference_count == 0) { assert(_data->size); // if size == 0, _data should be nullptr - std::forward(alloc).deallocate((char*)&*_data, sizeof(impl) + _data->size + 1); + std::forward(alloc).deallocate((char*)&*_data, sizeof(impl) + _data->size); } } @@ -229,12 +229,11 @@ namespace chainbase { void _alloc(Alloc&& alloc, const char* ptr, std::size_t size) { impl* new_data = nullptr; if (size > 0) { - new_data = (impl*)&*std::forward(alloc).allocate(sizeof(impl) + size + 1); + new_data = (impl*)&*std::forward(alloc).allocate(sizeof(impl) + size); new_data->reference_count = 1; new_data->size = size; if (ptr) std::memcpy(new_data->data, ptr, size); - new_data->data[size] = '\0'; } dec_refcount(std::forward(alloc)); _data = new_data; diff --git a/libraries/chaindb/include/chainbase/small_size_allocator.hpp b/libraries/chaindb/include/chainbase/small_size_allocator.hpp index 0688f1ccea..d93c8b578e 100644 --- a/libraries/chaindb/include/chainbase/small_size_allocator.hpp +++ b/libraries/chaindb/include/chainbase/small_size_allocator.hpp @@ -5,10 +5,10 @@ #include #include #include +#include #include #include #include -#include #include namespace bip = boost::interprocess; @@ -25,7 +25,7 @@ namespace detail { // - allocates in batch from `backing_allocator` (see `allocation_batch_size`) // - freed buffers are linked into a free list for fast further allocations // - allocated buffers are never returned to the `backing_allocator` -// - thread-safe +// - thread-safe via spinlock (required for parallel snapshot loading) // --------------------------------------------------------------------------------------- template class allocator { @@ -37,7 +37,7 @@ class allocator { , _back_alloc(back_alloc) {} pointer allocate() { - std::lock_guard g(_m); + auto guard = spin_lock(); if (_block_start == _block_end && _freelist == nullptr) { get_some(); } @@ -55,22 +55,49 @@ class allocator { } void deallocate(const pointer& p) { - std::lock_guard g(_m); + auto guard = spin_lock(); _freelist = new (&*p) list_item{_freelist}; ++_freelist_size; } size_t freelist_memory_usage() const { - std::lock_guard g(_m); + auto guard = spin_lock(); return _freelist_size * _sz + (_block_end - _block_start); } size_t num_blocks_allocated() const { - std::lock_guard g(_m); + auto guard = spin_lock(); return _num_blocks_allocated; } private: + // Parallel snapshot loading creates objects across multiple threads, each + // allocating shared_blob storage through different index types concurrently. + // TTAS with bounded pause then yield: avoids cache-line ping-pong on the + // test_and_set and prevents livelock when the holder is preempted (common + // under ASan / heavily-loaded CI runners). + struct spin_guard { + std::atomic_flag& _flag; + spin_guard(std::atomic_flag& f) : _flag(f) { + while (_flag.test_and_set(std::memory_order_acquire)) { + for (unsigned spins = 0; _flag.test(std::memory_order_relaxed); ++spins) { + if (spins < 16) { +#if defined(__x86_64__) || defined(__i386__) + __builtin_ia32_pause(); +#elif defined(__aarch64__) + asm volatile("yield" ::: "memory"); +#endif + } else { + std::this_thread::yield(); + } + } + } + } + ~spin_guard() { _flag.clear(std::memory_order_release); } + }; + + spin_guard spin_lock() const { return spin_guard{_lock}; } + struct list_item { bip::offset_ptr _next; }; static constexpr size_t max_allocation_batch_size = 512; @@ -93,7 +120,7 @@ class allocator { size_t _allocation_batch_size = 32; size_t _freelist_size = 0; size_t _num_blocks_allocated = 0; // number of blocks allocated from boost segment allocator - mutable std::mutex _m; + mutable std::atomic_flag _lock = ATOMIC_FLAG_INIT; }; } // namespace detail diff --git a/libraries/chaindb/include/chainbase/undo_index.hpp b/libraries/chaindb/include/chainbase/undo_index.hpp index 665d414513..7934460698 100644 --- a/libraries/chaindb/include/chainbase/undo_index.hpp +++ b/libraries/chaindb/include/chainbase/undo_index.hpp @@ -489,6 +489,18 @@ namespace chainbase { return session{*this, enabled}; } + // Starts a new undo session without creating a session RAII object. + // Used by database::start_undo_session to avoid per-index heap allocations. + // Exception safety: strong + int64_t add_session() { + _undo_stack.emplace_back(); + _undo_stack.back().old_values_end = _old_values.empty()?nullptr:&*_old_values.begin(); + _undo_stack.back().removed_values_end = _removed_values.empty()?nullptr:&*_removed_values.begin(); + _undo_stack.back().old_next_id = _next_id; + _undo_stack.back().ctime = ++_monotonic_revision; + return ++_revision; + } + void set_revision( uint64_t revision ) { if( _undo_stack.size() != 0 ) BOOST_THROW_EXCEPTION( std::logic_error("cannot set revision while there is an existing undo stack") ); @@ -680,17 +692,6 @@ namespace chainbase { [this](pointer p) { dispose_node(*p); }); } - // starts a new undo session. - // Exception safety: strong - int64_t add_session() { - _undo_stack.emplace_back(); - _undo_stack.back().old_values_end = _old_values.empty()?nullptr:&*_old_values.begin(); - _undo_stack.back().removed_values_end = _removed_values.empty()?nullptr:&*_removed_values.begin(); - _undo_stack.back().old_next_id = _next_id; - _undo_stack.back().ctime = ++_monotonic_revision; - return ++_revision; - } - template bool insert_impl(value_type& p) { if constexpr (N < sizeof...(Indices)) { diff --git a/libraries/chaindb/src/chainbase.cpp b/libraries/chaindb/src/chainbase.cpp index 810fc3cf8f..44dd935e40 100644 --- a/libraries/chaindb/src/chainbase.cpp +++ b/libraries/chaindb/src/chainbase.cpp @@ -27,20 +27,26 @@ namespace chainbase { { if ( _read_only_mode ) BOOST_THROW_EXCEPTION( std::logic_error( "attempting to undo in read-only mode" ) ); - for( auto& item : _index_list ) - { - item->undo(); - } + undo_from_session(); } void database::squash() { if ( _read_only_mode ) BOOST_THROW_EXCEPTION( std::logic_error( "attempting to squash in read-only mode" ) ); + squash_from_session(); + } + + void database::undo_from_session() + { + for( auto& item : _index_list ) + item->undo(); + } + + void database::squash_from_session() + { for( auto& item : _index_list ) - { item->squash(); - } } void database::commit( int64_t revision ) @@ -68,12 +74,9 @@ namespace chainbase { if ( _read_only_mode ) BOOST_THROW_EXCEPTION( std::logic_error( "attempting to start_undo_session in read-only mode" ) ); if( enabled ) { - vector< std::unique_ptr > _sub_sessions; - _sub_sessions.reserve( _index_list.size() ); - for( auto& item : _index_list ) { - _sub_sessions.push_back( item->start_undo_session( enabled ) ); - } - return session( std::move( _sub_sessions ) ); + for( auto& item : _index_list ) + item->add_undo_session(); + return session( *this ); } else { return session(); } diff --git a/libraries/chaindb/test/test.cpp b/libraries/chaindb/test/test.cpp index b0563b13fd..83ef0f468e 100644 --- a/libraries/chaindb/test/test.cpp +++ b/libraries/chaindb/test/test.cpp @@ -525,7 +525,6 @@ BOOST_AUTO_TEST_CASE(shared_cow_string_apis) { BOOST_REQUIRE_EQUAL(s3, test_string); shared_cow_string s4 { test_string.size(), boost::container::default_init_t() }; - BOOST_REQUIRE_EQUAL(s4.data()[test_string.size()], 0); // null terminator should be added by constructor std::memcpy(s4.mutable_data(), test_string.c_str(), test_string.size()); BOOST_REQUIRE_EQUAL(s4, test_string); diff --git a/unittests/subjective_billing_tests.cpp b/unittests/subjective_billing_tests.cpp index 250aa09108..9a67fd676d 100644 --- a/unittests/subjective_billing_tests.cpp +++ b/unittests/subjective_billing_tests.cpp @@ -368,15 +368,23 @@ BOOST_AUTO_TEST_CASE( subjective_billing_integration_test ) { BOOST_TEST(sub_bill.get_subjective_bill(other, b->timestamp).count() > 0); sub_bill.set_subjective_account_cpu_allowed(fc::microseconds{1000}); - const size_t num_itrs = 1000; + // Push failing transactions until other's subjective bill reaches the threshold. + // Check at the top of each iteration so we never push when billing has already + // crossed the limit; doing so would throw tx_cpu_usage_exceeded instead of + // sysio_assert_message_exception and trip the BOOST_CHECK_THROW below. + // update_billed_cpu_time always contributes >= 1 us to other per transaction + // (delta_per_action = (remaining / num_actions) + 1, minimum = 1), so the + // threshold of 1000 us is reached in at most 1000 iterations; 5000 gives 5x + // headroom for any rounding variation across runtimes and sanitizer builds. + const size_t num_itrs = 5000; size_t i = 0; for (; i < num_itrs; ++i) { - ptrx = create_trx(0, {1,1,1,0}); - BOOST_CHECK_THROW(push_trx( ptrx, fc::time_point::maximum(), {}, false ), sysio_assert_message_exception); if (sub_bill.get_subjective_bill(other, b->timestamp) >= sub_bill.get_subjective_account_cpu_allowed()) break; + ptrx = create_trx(0, {1,1,1,0}); + BOOST_CHECK_THROW(push_trx( ptrx, fc::time_point::maximum(), {}, false ), sysio_assert_message_exception); } - BOOST_REQUIRE(i < num_itrs); // failed to accumulate subjective billing + BOOST_REQUIRE_MESSAGE(i < num_itrs, "other's subjective billing failed to reach the threshold"); ptrx = create_trx(0, {1,1,1,1}); BOOST_CHECK_EXCEPTION(push_trx( ptrx, fc::time_point::maximum(), {}, false ), tx_cpu_usage_exceeded, fc_exception_message_contains("Authorized account other exceeded subjective CPU limit"));