diff --git a/be/src/exec/sink/vtablet_block_convertor.cpp b/be/src/exec/sink/vtablet_block_convertor.cpp index 134aa2a214049a..5f6770e0ed149f 100644 --- a/be/src/exec/sink/vtablet_block_convertor.cpp +++ b/be/src/exec/sink/vtablet_block_convertor.cpp @@ -255,9 +255,10 @@ Status OlapTableBlockConvertor::_internal_validate_column(RuntimeState* state, B if (invalid_count) { // For string column, if in non-strict load mode(for both insert stmt and stream load), // truncate the string to schema len. - // After truncation, still need to check if byte len of each row exceed the schema len, - // because currently the schema len is defined in bytes, and substring works by unit of chars. - // This is a workaround for now, need to improve it after better support of multi-byte chars. + // The substring function works by unit of characters, but the schema len is defined in + // bytes. If any rows still exceed the byte limit after the substring truncation (due to + // multi-byte UTF-8 characters where char count < byte limit < byte count), they will be + // manually truncated at valid UTF-8 character boundaries in the validation loop below. if (type_str && !state->enable_insert_strict()) { ColumnsWithTypeAndName argument_template; auto input_type = remove_nullable(orig_type); @@ -298,33 +299,81 @@ Status OlapTableBlockConvertor::_internal_validate_column(RuntimeState* state, B null_map = tmp_column_ptr == nullptr ? nullptr : tmp_column_ptr->get_null_map_data().data(); } + bool need_manual_truncation = false; for (size_t j = 0; j < row_count; ++j) { auto row = rows ? (*rows)[j] : j; if (need_to_validate(j, row, filter_map, null_map)) { auto str_val = column_string->get_data_at(j); bool invalid = str_val.size > limit; if (invalid) { - if (str_val.size > len) { - fmt::format_to(error_msg, "{}", - "the length of input is too long than schema. "); - fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ", - str_val.to_prefix(32)); - fmt::format_to(error_msg, "schema length: {}; ", len); - fmt::format_to(error_msg, "actual length: {}; ", str_val.size); - } else if (str_val.size > limit) { - fmt::format_to( - error_msg, "{}", - "the length of input string is too long than vec schema. "); - fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ", - str_val.to_prefix(32)); - fmt::format_to(error_msg, "schema length: {}; ", len); - fmt::format_to(error_msg, "limit length: {}; ", limit); - fmt::format_to(error_msg, "actual length: {}; ", str_val.size); + if (!state->enable_insert_strict() && type_str) { + // In non-strict mode, the string will be truncated at a valid + // UTF-8 character boundary within the byte limit (see below). + need_manual_truncation = true; + } else { + if (str_val.size > len) { + fmt::format_to(error_msg, "{}", + "the length of input is too long than schema. "); + fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ", + str_val.to_prefix(32)); + fmt::format_to(error_msg, "schema length: {}; ", len); + fmt::format_to(error_msg, "actual length: {}; ", str_val.size); + } else if (str_val.size > limit) { + fmt::format_to( + error_msg, "{}", + "the length of input string is too long than vec schema. "); + fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ", + str_val.to_prefix(32)); + fmt::format_to(error_msg, "schema length: {}; ", len); + fmt::format_to(error_msg, "limit length: {}; ", limit); + fmt::format_to(error_msg, "actual length: {}; ", str_val.size); + } + RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); } - RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); } } } + if (need_manual_truncation) { + // The substring call above works by unit of characters, so it may not fully + // truncate strings containing multi-byte UTF-8 characters to the byte limit. + // Here we manually truncate each string at a valid UTF-8 character boundary + // that fits within the byte limit. + auto mut_string_col = ColumnString::create(); + for (size_t j = 0; j < row_count; ++j) { + auto str_val = column_string->get_data_at(j); + size_t new_len = str_val.size; + if (new_len > static_cast(limit)) { + // Find the largest valid UTF-8 character boundary <= limit bytes. + // UTF-8 continuation bytes have the form 10xxxxxx (0x80-0xBF). + size_t truncate_pos = static_cast(limit); + if (truncate_pos > str_val.size) { + truncate_pos = str_val.size; + } else { + while (truncate_pos > 0 && + (static_cast(str_val.data[truncate_pos]) & 0xC0) == + 0x80) { + --truncate_pos; + } + } + new_len = truncate_pos; + } + mut_string_col->insert_data(str_val.data, new_len); + } + if (tmp_column_ptr) { + orig_column = ColumnNullable::create( + std::move(mut_string_col), + IColumn::mutate(tmp_column_ptr->get_null_map_column_ptr())); + } else { + orig_column = std::move(mut_string_col); + } + tmp_column_ptr = check_and_get_column(*orig_column); + tmp_real_column_ptr = tmp_column_ptr == nullptr + ? orig_column + : tmp_column_ptr->get_nested_column_ptr(); + column_string = assert_cast(tmp_real_column_ptr.get()); + null_map = tmp_column_ptr == nullptr ? nullptr + : tmp_column_ptr->get_null_map_data().data(); + } } return Status::OK(); };