Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 69 additions & 20 deletions be/src/exec/sink/vtablet_block_convertor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,10 @@ Status OlapTableBlockConvertor::_internal_validate_column(RuntimeState* state, B
if (invalid_count) {
// For string column, if in non-strict load mode(for both insert stmt and stream load),
// truncate the string to schema len.
// After truncation, still need to check if byte len of each row exceed the schema len,
// because currently the schema len is defined in bytes, and substring works by unit of chars.
// This is a workaround for now, need to improve it after better support of multi-byte chars.
// The substring function works by unit of characters, but the schema len is defined in
// bytes. If any rows still exceed the byte limit after the substring truncation (due to
// multi-byte UTF-8 characters where char count < byte limit < byte count), they will be
// manually truncated at valid UTF-8 character boundaries in the validation loop below.
if (type_str && !state->enable_insert_strict()) {
ColumnsWithTypeAndName argument_template;
auto input_type = remove_nullable(orig_type);
Expand Down Expand Up @@ -298,33 +299,81 @@ Status OlapTableBlockConvertor::_internal_validate_column(RuntimeState* state, B
null_map = tmp_column_ptr == nullptr ? nullptr
: tmp_column_ptr->get_null_map_data().data();
}
bool need_manual_truncation = false;
for (size_t j = 0; j < row_count; ++j) {
auto row = rows ? (*rows)[j] : j;
if (need_to_validate(j, row, filter_map, null_map)) {
auto str_val = column_string->get_data_at(j);
bool invalid = str_val.size > limit;
if (invalid) {
if (str_val.size > len) {
fmt::format_to(error_msg, "{}",
"the length of input is too long than schema. ");
fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ",
str_val.to_prefix(32));
fmt::format_to(error_msg, "schema length: {}; ", len);
fmt::format_to(error_msg, "actual length: {}; ", str_val.size);
} else if (str_val.size > limit) {
fmt::format_to(
error_msg, "{}",
"the length of input string is too long than vec schema. ");
fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ",
str_val.to_prefix(32));
fmt::format_to(error_msg, "schema length: {}; ", len);
fmt::format_to(error_msg, "limit length: {}; ", limit);
fmt::format_to(error_msg, "actual length: {}; ", str_val.size);
if (!state->enable_insert_strict() && type_str) {
// In non-strict mode, the string will be truncated at a valid
// UTF-8 character boundary within the byte limit (see below).
need_manual_truncation = true;
} else {
if (str_val.size > len) {
fmt::format_to(error_msg, "{}",
"the length of input is too long than schema. ");
fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ",
str_val.to_prefix(32));
fmt::format_to(error_msg, "schema length: {}; ", len);
fmt::format_to(error_msg, "actual length: {}; ", str_val.size);
} else if (str_val.size > limit) {
fmt::format_to(
error_msg, "{}",
"the length of input string is too long than vec schema. ");
fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ",
str_val.to_prefix(32));
fmt::format_to(error_msg, "schema length: {}; ", len);
fmt::format_to(error_msg, "limit length: {}; ", limit);
fmt::format_to(error_msg, "actual length: {}; ", str_val.size);
}
RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
}
RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
}
}
}
if (need_manual_truncation) {
// The substring call above works by unit of characters, so it may not fully
// truncate strings containing multi-byte UTF-8 characters to the byte limit.
// Here we manually truncate each string at a valid UTF-8 character boundary
// that fits within the byte limit.
auto mut_string_col = ColumnString::create();
for (size_t j = 0; j < row_count; ++j) {
auto str_val = column_string->get_data_at(j);
size_t new_len = str_val.size;
if (new_len > static_cast<size_t>(limit)) {
// Find the largest valid UTF-8 character boundary <= limit bytes.
// UTF-8 continuation bytes have the form 10xxxxxx (0x80-0xBF).
size_t truncate_pos = static_cast<size_t>(limit);
if (truncate_pos > str_val.size) {
truncate_pos = str_val.size;
} else {
while (truncate_pos > 0 &&
(static_cast<uint8_t>(str_val.data[truncate_pos]) & 0xC0) ==
0x80) {
--truncate_pos;
}
}
new_len = truncate_pos;
}
mut_string_col->insert_data(str_val.data, new_len);
}
if (tmp_column_ptr) {
orig_column = ColumnNullable::create(
std::move(mut_string_col),
IColumn::mutate(tmp_column_ptr->get_null_map_column_ptr()));
} else {
orig_column = std::move(mut_string_col);
}
tmp_column_ptr = check_and_get_column<ColumnNullable>(*orig_column);
tmp_real_column_ptr = tmp_column_ptr == nullptr
? orig_column
: tmp_column_ptr->get_nested_column_ptr();
column_string = assert_cast<const ColumnString*>(tmp_real_column_ptr.get());
null_map = tmp_column_ptr == nullptr ? nullptr
: tmp_column_ptr->get_null_map_data().data();
}
}
return Status::OK();
};
Expand Down
Loading