diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 55f46be8f99d5..37b277efbaf2a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -59,6 +59,7 @@ import org.apache.iotdb.db.pipe.receiver.visitor.PipeTreeStatementDataTypeConvertExecutionVisitor; import org.apache.iotdb.db.pipe.receiver.visitor.PipeTreeStatementToBatchVisitor; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req; @@ -568,12 +569,10 @@ protected TSStatus loadFileV2( private TSStatus loadTsFileAsync(final String dataBaseName, final List absolutePaths) throws IOException { final Map loadAttributes = - ActiveLoadPathHelper.buildAttributes( + buildLoadTsFileAttributesForAsync( dataBaseName, - null, shouldConvertDataTypeOnTypeMismatch, validateTsFile.get(), - null, shouldMarkAsPipeRequest.get()); if (!LoadUtil.loadFilesToActiveDir(loadAttributes, absolutePaths, true)) { @@ -582,17 +581,38 @@ private TSStatus loadTsFileAsync(final String dataBaseName, final List a return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } + static Map buildLoadTsFileAttributesForAsync( + final String dataBaseName, + final boolean shouldConvertDataTypeOnTypeMismatch, + final boolean validateTsFile, + final boolean shouldMarkAsPipeRequest) { + return ActiveLoadPathHelper.buildAttributes( + dataBaseName, + LoadTsFileStatement.getDatabaseLevelByTreeDatabase(dataBaseName), + shouldConvertDataTypeOnTypeMismatch, + validateTsFile, + null, + shouldMarkAsPipeRequest); + } + private TSStatus loadTsFileSync(final String dataBaseName, final String fileAbsolutePath) throws FileNotFoundException { + return executeStatementAndClassifyExceptions( + buildLoadTsFileStatementForSync(dataBaseName, fileAbsolutePath, validateTsFile.get())); + } + + static LoadTsFileStatement buildLoadTsFileStatementForSync( + final String dataBaseName, final String fileAbsolutePath, final boolean validateTsFile) + throws FileNotFoundException { final LoadTsFileStatement statement = LoadTsFileStatement.createUnchecked(fileAbsolutePath); statement.setDeleteAfterLoad(true); statement.setConvertOnTypeMismatch(true); - statement.setVerifySchema(validateTsFile.get()); + statement.setVerifySchema(validateTsFile); statement.setAutoCreateDatabase( IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()); statement.setDatabase(dataBaseName); - - return executeStatementAndClassifyExceptions(statement); + statement.updateDatabaseLevelByTreeDatabase(); + return statement; } private TSStatus loadSchemaSnapShot( @@ -845,12 +865,7 @@ private TSStatus executeStatementAndClassifyExceptions( return STATEMENT_STATUS_VISITOR.process(statement, result); } } catch (final Exception e) { - PipeLogger.log( - LOGGER::warn, - e, - "Receiver id = %s: Exception encountered while executing statement %s: ", - receiverId.get(), - statement.getPipeLoggingString()); + logStatementExceptionIfNecessary(statement, e); return STATEMENT_EXCEPTION_VISITOR.process(statement, e); } finally { if (Objects.nonNull(allocatedMemoryBlock)) { @@ -860,6 +875,29 @@ private TSStatus executeStatementAndClassifyExceptions( } } + private void logStatementExceptionIfNecessary(final Statement statement, final Exception e) { + if (shouldLogStatementException(receiverId.get(), statement, e)) { + PipeLogger.log( + LOGGER::warn, + e, + "Receiver id = %s: Exception encountered while executing statement %s: ", + receiverId.get(), + Objects.isNull(statement) ? null : statement.getPipeLoggingString()); + } + } + + static boolean shouldLogStatementException( + final long receiverId, final Statement statement, final Exception e) { + // Use the reducer cache as a gate. The actual stack trace is logged only when it passes. + return PipePeriodicalLogReducer.log( + message -> {}, + "Receiver id = %s, statement = %s, exception = %s, message = %s", + receiverId, + Objects.isNull(statement) ? null : statement.getPipeLoggingString(), + e.getClass().getName(), + e.getMessage()); + } + private TSStatus executeStatementWithPermissionCheckAndRetryOnDataTypeMismatch( final Statement statement) { if (statement == null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java index 42b1f0e5b1e3c..8e590c5847c0e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java @@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.exception.SemanticException; import org.apache.iotdb.commons.exception.auth.AccessDeniedException; @@ -52,6 +53,12 @@ public TSStatus visitNode(final StatementNode node, final Exception context) { if (context instanceof AccessDeniedException) { return new TSStatus(TSStatusCode.NO_PERMISSION.getStatusCode()) .setMessage(context.getMessage()); + } else if (context instanceof IoTDBRuntimeException + && ((IoTDBRuntimeException) context).getErrorCode() + == TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) { + return new TSStatus( + TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(context.getMessage()); } return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) .setMessage(context.getMessage()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java index 59c7f9a57ef50..73133611ed283 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java @@ -19,7 +19,9 @@ package org.apache.iotdb.db.queryengine.plan.statement.crud; +import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; @@ -44,6 +46,7 @@ import java.util.List; import java.util.Map; +import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT; import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ASYNC_LOAD_KEY; import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY; import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_LEVEL_KEY; @@ -337,6 +340,28 @@ private void initAttributes(final Map loadAttributes) { } } + public void updateDatabaseLevelByTreeDatabase() { + final Integer databaseLevel = getDatabaseLevelByTreeDatabase(database); + if (databaseLevel != null) { + this.databaseLevel = databaseLevel; + } + } + + public static Integer getDatabaseLevelByTreeDatabase(final String database) { + if (database == null) { + return null; + } + try { + final String[] nodes = PathUtils.splitPathToDetachedNodes(database); + if (nodes.length > 1 && PATH_ROOT.equals(nodes[0])) { + return nodes.length - 1; + } + } catch (final IllegalPathException ignored) { + // Keep the configured database level when database is not a legal tree path. + } + return null; + } + public boolean reconstructStatementIfMiniFileConverted(final List isMiniTsFile) { int lastNonMiniTsFileIndex = -1; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java index 2b20f1d91efb8..756d11818251f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.receiver; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; @@ -62,4 +63,17 @@ public void testTTLIdempotency() { StatusUtils.OK, new TSStatus(TSStatusCode.OUT_OF_TTL.getStatusCode())))) .getCode()); } + + @Test + public void testDatabaseNotExistRuntimeExceptionClassification() { + Assert.assertEquals( + TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode(), + IoTDBDataNodeReceiver.STATEMENT_EXCEPTION_VISITOR + .process( + new InsertRowsStatement(), + new IoTDBRuntimeException( + "Create DataPartition failed because the database: root.test.sg_0 is not exists", + TSStatusCode.DATABASE_NOT_EXIST.getStatusCode())) + .getCode()); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java new file mode 100644 index 0000000000000..f41c44763f997 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.receiver.protocol.thrift; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper; +import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator; + +import org.junit.Assert; +import org.junit.Test; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; + +public class IoTDBDataNodeReceiverTest { + + @Test + public void testLoadTsFileSyncStatementUsesTreeDatabaseLevelFromDatabaseName() throws Exception { + final Path tsFile = Files.createTempFile("pipe-load-tree-database-level", ".tsfile"); + try { + final LoadTsFileStatement statement = + IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync( + "root.test.sg_0", tsFile.toString(), true); + + Assert.assertEquals("root.test.sg_0", statement.getDatabase()); + Assert.assertEquals(2, statement.getDatabaseLevel()); + } finally { + Files.deleteIfExists(tsFile); + } + } + + @Test + public void testLoadTsFileAsyncAttributesUseTreeDatabaseLevelFromDatabaseName() throws Exception { + final Path tsFile = Files.createTempFile("pipe-async-load-tree-database-level", ".tsfile"); + try { + final Map attributes = + IoTDBDataNodeReceiver.buildLoadTsFileAttributesForAsync( + "root.test.sg_0", true, true, true); + + Assert.assertEquals( + "root.test.sg_0", attributes.get(LoadTsFileConfigurator.DATABASE_NAME_KEY)); + Assert.assertEquals("2", attributes.get(LoadTsFileConfigurator.DATABASE_LEVEL_KEY)); + + final LoadTsFileStatement statement = LoadTsFileStatement.createUnchecked(tsFile.toString()); + ActiveLoadPathHelper.applyAttributesToStatement(attributes, statement, true); + Assert.assertEquals("root.test.sg_0", statement.getDatabase()); + Assert.assertEquals(2, statement.getDatabaseLevel()); + } finally { + Files.deleteIfExists(tsFile); + } + } + + @Test + public void testLoadTsFileSyncStatementKeepsDefaultDatabaseLevelWhenDatabaseNameIsNull() + throws Exception { + final Path tsFile = Files.createTempFile("pipe-load-default-database-level", ".tsfile"); + try { + final LoadTsFileStatement statement = + IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(null, tsFile.toString(), true); + + Assert.assertNull(statement.getDatabase()); + Assert.assertEquals( + IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel(), + statement.getDatabaseLevel()); + } finally { + Files.deleteIfExists(tsFile); + } + } + + @Test + public void testRepeatedStatementExceptionLogIsReduced() throws Exception { + final Path tsFile = Files.createTempFile("pipe-load-log-reducer", ".tsfile"); + try { + final LoadTsFileStatement statement = + IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync( + "root.test.sg_0", tsFile.toString(), true); + final long receiverId = System.nanoTime(); + final Exception exception = new RuntimeException("repeated receiver exception " + receiverId); + + Assert.assertTrue( + IoTDBDataNodeReceiver.shouldLogStatementException(receiverId, statement, exception)); + Assert.assertFalse( + IoTDBDataNodeReceiver.shouldLogStatementException(receiverId, statement, exception)); + Assert.assertTrue( + IoTDBDataNodeReceiver.shouldLogStatementException( + receiverId, statement, new RuntimeException("another receiver exception"))); + } finally { + Files.deleteIfExists(tsFile); + } + } +}