Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libraries/chain/authorization_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ namespace sysio { namespace chain {
using section_t = typename decltype(utils)::index_t::value_type;

snapshot->read_section<section_t>([this, &read_row_count]( auto& section ) {
decltype(utils)::preallocate(_db, section.row_count());
bool more = !section.empty();
while(more) {
decltype(utils)::create(_db, [this, &section, &more]( auto &row ) {
Expand Down
2 changes: 2 additions & 0 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1560,6 +1560,7 @@ struct controller_impl {
using utils_t = decltype(utils);
using value_t = typename decltype(utils)::index_t::value_type;
snapshot->read_section<value_t>([this, &read_row_count]( auto& section ) {
utils_t::preallocate(db, section.row_count());
bool more = !section.empty();
while (more) {
utils_t::create(db, [this, &section, &more](auto& row) {
Expand Down Expand Up @@ -1682,6 +1683,7 @@ struct controller_impl {
// TODO:
}
snapshot->read_section<value_t>([this,&rows_loaded]( auto& section ) {
decltype(utils)::preallocate(db, section.row_count());
bool more = !section.empty();
while(more) {
decltype(utils)::create(db, [this, &section, &more]( auto &row ) {
Expand Down
8 changes: 8 additions & 0 deletions libraries/chain/include/sysio/chain/snapshot.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ namespace sysio { namespace chain {
return _reader.empty();
}

size_t row_count() const {
return _reader.section_row_count();
}

private:
friend class snapshot_reader;
section_reader(snapshot_reader& _reader)
Expand Down Expand Up @@ -348,6 +352,7 @@ namespace sysio { namespace chain {
virtual void return_to_header() = 0;

virtual size_t total_row_count() = 0;
virtual size_t section_row_count() const = 0;

virtual bool supports_threading() const {return false;}

Expand Down Expand Up @@ -391,6 +396,7 @@ namespace sysio { namespace chain {
void clear_section() override;
void return_to_header() override;
size_t total_row_count() override;
size_t section_row_count() const override;
bool has_section( const std::string& section_name ) const override;

private:
Expand Down Expand Up @@ -494,6 +500,7 @@ namespace sysio { namespace chain {
void clear_section() override;
void return_to_header() override;
size_t total_row_count() override;
size_t section_row_count() const override;

private:
bool validate_section() const;
Expand All @@ -519,6 +526,7 @@ namespace sysio { namespace chain {
void clear_section() override;
void return_to_header() override;
size_t total_row_count() override;
size_t section_row_count() const override;
bool supports_threading() const override {return true;}

bool has_section( const std::string& section_name ) const override;
Expand Down
1 change: 1 addition & 0 deletions libraries/chain/resource_limits.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ void resource_limits_manager::add_to_snapshot( const snapshot_writer_ptr& snapsh
void resource_limits_manager::read_from_snapshot( const snapshot_reader_ptr& snapshot, std::atomic_size_t& read_row_count, boost::asio::io_context& ctx ) {
resource_index_set::walk_indices_via_post(ctx, [this, &snapshot, &read_row_count]( auto utils ){
snapshot->read_section<typename decltype(utils)::index_t::value_type>([this, &read_row_count]( auto& section ) {
decltype(utils)::preallocate(_db, section.row_count());
bool more = !section.empty();
while(more) {
decltype(utils)::create(_db, [this, &section, &more]( auto &row ) {
Expand Down
13 changes: 13 additions & 0 deletions libraries/chain/snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ void variant_snapshot_reader::return_to_header() {
clear_section();
}

size_t variant_snapshot_reader::section_row_count() const {
if (!cur_section) return 0;
return (*cur_section)["rows"].get_array().size();
}

size_t variant_snapshot_reader::total_row_count() {
size_t total = 0;

Expand Down Expand Up @@ -436,6 +441,10 @@ void istream_json_snapshot_reader::return_to_header() {
clear_section();
}

size_t istream_json_snapshot_reader::section_row_count() const {
return impl->num_rows;
}

size_t istream_json_snapshot_reader::total_row_count() {
size_t total = 0;

Expand Down Expand Up @@ -610,6 +619,10 @@ void threaded_snapshot_reader::return_to_header() {
clear_section();
}

size_t threaded_snapshot_reader::section_row_count() const {
return num_rows;
}

size_t threaded_snapshot_reader::total_row_count() {
SYS_ASSERT(index_loaded_, snapshot_exception, "Snapshot index must be loaded before querying row count");

Expand Down
74 changes: 28 additions & 46 deletions libraries/chaindb/include/chainbase/chainbase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,34 +132,13 @@ namespace chainbase {
template<typename MultiIndexType>
using generic_index = multi_index_to_undo_index<MultiIndexType>;

class abstract_session {
public:
virtual ~abstract_session(){};
virtual void push() = 0;
virtual void squash() = 0;
virtual void undo() = 0;
};

template<typename SessionType>
class session_impl : public abstract_session
{
public:
session_impl( SessionType&& s ):_session( std::move( s ) ){}

virtual void push() override { _session.push(); }
virtual void squash() override{ _session.squash(); }
virtual void undo() override { _session.undo(); }
private:
SessionType _session;
};

class abstract_index
{
public:
abstract_index( void* i ):_idx_ptr(i){}
virtual ~abstract_index(){}
virtual void set_revision( uint64_t revision ) = 0;
virtual unique_ptr<abstract_session> start_undo_session( bool enabled ) = 0;
virtual void add_undo_session() = 0;

virtual int64_t revision()const = 0;
virtual void undo()const = 0;
Expand All @@ -184,9 +163,7 @@ namespace chainbase {
public:
index_impl( BaseIndex& base ):abstract_index( &base ),_base(base){}

virtual unique_ptr<abstract_session> start_undo_session( bool enabled ) override {
return unique_ptr<abstract_session>(new session_impl<typename BaseIndex::session>( _base.start_undo_session( enabled ) ) );
}
virtual void add_undo_session() override { _base.add_session(); }

virtual void set_revision( uint64_t revision ) override { _base.set_revision( revision ); }
virtual int64_t revision()const override { return _base.revision(); }
Expand Down Expand Up @@ -273,38 +250,36 @@ namespace chainbase {

struct session {
public:
session( session&& s ):_index_sessions( std::move(s._index_sessions) ){}
session( vector<std::unique_ptr<abstract_session>>&& s ):_index_sessions( std::move(s) )
{
}
session( const session& ) = delete;
session& operator=( const session& ) = delete;

session( session&& s ) noexcept : _db(s._db), _apply(s._apply) { s._apply = false; }
explicit session( database& db ) : _db(&db), _apply(true) {}

~session() {
undo();
}

void push()
{
for( auto& i : _index_sessions ) i->push();
_index_sessions.clear();
}

void squash()
{
for( auto& i : _index_sessions ) i->squash();
_index_sessions.clear();
session& operator=(session&& s) noexcept {
if (this != &s) {
undo();
_db = s._db;
_apply = s._apply;
s._apply = false;
}
return *this;
}

void undo()
{
for( auto& i : _index_sessions ) i->undo();
_index_sessions.clear();
}
void push() { _apply = false; }
void squash() { if (_apply) _db->squash_from_session(); _apply = false; }
void undo() { if (_apply) _db->undo_from_session(); _apply = false; }

private:
friend class database;
session(){}
session() : _db(nullptr), _apply(false) {}

vector< std::unique_ptr<abstract_session> > _index_sessions;
database* _db;
bool _apply;
};

session start_undo_session( bool enabled );
Expand Down Expand Up @@ -559,6 +534,13 @@ namespace chainbase {
}

private:
// Session cleanup must work even when _read_only_mode is true (e.g. SIGTERM
// during a read window while a block-building session is still alive).
// The old per-index session design bypassed the read_only_mode check; these
// methods preserve that behavior.
void undo_from_session();
void squash_from_session();

pinnable_mapped_file _db_file;
bool _read_only = false;

Expand Down
5 changes: 2 additions & 3 deletions libraries/chaindb/include/chainbase/shared_cow_string.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ namespace chainbase {
void dec_refcount(Alloc&& alloc) {
if (_data && --_data->reference_count == 0) {
assert(_data->size); // if size == 0, _data should be nullptr
std::forward<Alloc>(alloc).deallocate((char*)&*_data, sizeof(impl) + _data->size + 1);
std::forward<Alloc>(alloc).deallocate((char*)&*_data, sizeof(impl) + _data->size);
}
}

Expand All @@ -229,12 +229,11 @@ namespace chainbase {
void _alloc(Alloc&& alloc, const char* ptr, std::size_t size) {
impl* new_data = nullptr;
if (size > 0) {
new_data = (impl*)&*std::forward<Alloc>(alloc).allocate(sizeof(impl) + size + 1);
new_data = (impl*)&*std::forward<Alloc>(alloc).allocate(sizeof(impl) + size);
new_data->reference_count = 1;
new_data->size = size;
if (ptr)
std::memcpy(new_data->data, ptr, size);
new_data->data[size] = '\0';
}
dec_refcount(std::forward<Alloc>(alloc));
_data = new_data;
Expand Down
41 changes: 34 additions & 7 deletions libraries/chaindb/include/chainbase/small_size_allocator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <thread>
#include <vector>
#include <string>
#include <functional>
#include <mutex>
#include <boost/interprocess/offset_ptr.hpp>

namespace bip = boost::interprocess;
Expand All @@ -25,7 +25,7 @@ namespace detail {
// - allocates in batch from `backing_allocator` (see `allocation_batch_size`)
// - freed buffers are linked into a free list for fast further allocations
// - allocated buffers are never returned to the `backing_allocator`
// - thread-safe
// - thread-safe via spinlock (required for parallel snapshot loading)
// ---------------------------------------------------------------------------------------
template <class backing_allocator>
class allocator {
Expand All @@ -37,7 +37,7 @@ class allocator {
, _back_alloc(back_alloc) {}

pointer allocate() {
std::lock_guard g(_m);
auto guard = spin_lock();
if (_block_start == _block_end && _freelist == nullptr) {
get_some();
}
Expand All @@ -55,22 +55,49 @@ class allocator {
}

void deallocate(const pointer& p) {
std::lock_guard g(_m);
auto guard = spin_lock();
_freelist = new (&*p) list_item{_freelist};
++_freelist_size;
}

size_t freelist_memory_usage() const {
std::lock_guard g(_m);
auto guard = spin_lock();
return _freelist_size * _sz + (_block_end - _block_start);
}

size_t num_blocks_allocated() const {
std::lock_guard g(_m);
auto guard = spin_lock();
return _num_blocks_allocated;
}

private:
// Parallel snapshot loading creates objects across multiple threads, each
// allocating shared_blob storage through different index types concurrently.
// TTAS with bounded pause then yield: avoids cache-line ping-pong on the
// test_and_set and prevents livelock when the holder is preempted (common
// under ASan / heavily-loaded CI runners).
struct spin_guard {
std::atomic_flag& _flag;
spin_guard(std::atomic_flag& f) : _flag(f) {
while (_flag.test_and_set(std::memory_order_acquire)) {
for (unsigned spins = 0; _flag.test(std::memory_order_relaxed); ++spins) {
if (spins < 16) {
#if defined(__x86_64__) || defined(__i386__)
__builtin_ia32_pause();
#elif defined(__aarch64__)
asm volatile("yield" ::: "memory");
#endif
} else {
std::this_thread::yield();
}
}
}
}
~spin_guard() { _flag.clear(std::memory_order_release); }
};

spin_guard spin_lock() const { return spin_guard{_lock}; }

struct list_item { bip::offset_ptr<list_item> _next; };
static constexpr size_t max_allocation_batch_size = 512;

Expand All @@ -93,7 +120,7 @@ class allocator {
size_t _allocation_batch_size = 32;
size_t _freelist_size = 0;
size_t _num_blocks_allocated = 0; // number of blocks allocated from boost segment allocator
mutable std::mutex _m;
mutable std::atomic_flag _lock = ATOMIC_FLAG_INIT;
};

} // namespace detail
Expand Down
23 changes: 12 additions & 11 deletions libraries/chaindb/include/chainbase/undo_index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,18 @@ namespace chainbase {
return session{*this, enabled};
}

// Starts a new undo session without creating a session RAII object.
// Used by database::start_undo_session to avoid per-index heap allocations.
// Exception safety: strong
int64_t add_session() {
_undo_stack.emplace_back();
_undo_stack.back().old_values_end = _old_values.empty()?nullptr:&*_old_values.begin();
_undo_stack.back().removed_values_end = _removed_values.empty()?nullptr:&*_removed_values.begin();
_undo_stack.back().old_next_id = _next_id;
_undo_stack.back().ctime = ++_monotonic_revision;
return ++_revision;
}

void set_revision( uint64_t revision ) {
if( _undo_stack.size() != 0 )
BOOST_THROW_EXCEPTION( std::logic_error("cannot set revision while there is an existing undo stack") );
Expand Down Expand Up @@ -680,17 +692,6 @@ namespace chainbase {
[this](pointer p) { dispose_node(*p); });
}

// starts a new undo session.
// Exception safety: strong
int64_t add_session() {
_undo_stack.emplace_back();
_undo_stack.back().old_values_end = _old_values.empty()?nullptr:&*_old_values.begin();
_undo_stack.back().removed_values_end = _removed_values.empty()?nullptr:&*_removed_values.begin();
_undo_stack.back().old_next_id = _next_id;
_undo_stack.back().ctime = ++_monotonic_revision;
return ++_revision;
}

template<int N = 0>
bool insert_impl(value_type& p) {
if constexpr (N < sizeof...(Indices)) {
Expand Down
Loading
Loading