Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,13 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
// HTTP thread utilization
HTTP_THREAD_UTILIZATION("httpThreadUtilization", true),
// Track the concurrent executions of the API resources that use @ManagedAsync
MANAGED_ASYNC_ACTIVE_THREADS("threads", true);
MANAGED_ASYNC_ACTIVE_THREADS("threads", true),

// Per-table flag (1/0) indicating whether the table's schema uses the deprecated TimeFieldSpec.
TABLE_USES_DEPRECATED_TIME_FIELD_SPEC("tableUsesDeprecatedTimeFieldSpec", false),

// Global count of tables (across this controller's leadership set) whose schema uses the deprecated TimeFieldSpec.
TABLES_WITH_DEPRECATED_TIME_FIELD_SPEC("tablesWithDeprecatedTimeFieldSpec", true);


private final String _gaugeName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
import org.apache.pinot.controller.util.BrokerServiceHelper;
import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.controller.validation.BrokerResourceValidationManager;
import org.apache.pinot.controller.validation.DeprecatedFieldSpecChecker;
import org.apache.pinot.controller.validation.DiskUtilizationChecker;
import org.apache.pinot.controller.validation.OfflineSegmentValidationManager;
import org.apache.pinot.controller.validation.RealtimeOffsetAutoResetManager;
Expand Down Expand Up @@ -210,6 +211,7 @@ public abstract class BaseControllerStarter implements ServiceStartable {
protected RealtimeOffsetAutoResetManager _realtimeOffsetAutoResetManager;
protected RealtimeSegmentValidationManager _realtimeSegmentValidationManager;
protected BrokerResourceValidationManager _brokerResourceValidationManager;
protected DeprecatedFieldSpecChecker _deprecatedFieldSpecChecker;
protected SegmentRelocator _segmentRelocator;
protected RetentionManager _retentionManager;
protected SegmentStatusChecker _segmentStatusChecker;
Expand Down Expand Up @@ -442,6 +444,10 @@ public BrokerResourceValidationManager getBrokerResourceValidationManager() {
return _brokerResourceValidationManager;
}

public DeprecatedFieldSpecChecker getDeprecatedFieldSpecChecker() {
return _deprecatedFieldSpecChecker;
}

public PinotHelixTaskResourceManager getHelixTaskResourceManager() {
return _helixTaskResourceManager;
}
Expand Down Expand Up @@ -1028,6 +1034,9 @@ protected List<PeriodicTask> setupControllerPeriodicTasks() {
_brokerResourceValidationManager =
new BrokerResourceValidationManager(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics);
periodicTasks.add(_brokerResourceValidationManager);
_deprecatedFieldSpecChecker =
new DeprecatedFieldSpecChecker(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics);
periodicTasks.add(_deprecatedFieldSpecChecker);
_segmentStatusChecker =
new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
_tableSizeReader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ public static class ControllerPeriodicTasksConf {
"controller.broker.resource.validation.frequencyPeriod";
public static final String BROKER_RESOURCE_VALIDATION_INITIAL_DELAY_IN_SECONDS =
"controller.broker.resource.validation.initialDelayInSeconds";
public static final String DEPRECATED_FIELD_SPEC_CHECKER_FREQUENCY_PERIOD =
"controller.deprecatedFieldSpecChecker.frequencyPeriod";
public static final String DEPRECATED_FIELD_SPEC_CHECKER_INITIAL_DELAY_IN_SECONDS =
"controller.deprecatedFieldSpecChecker.initialDelayInSeconds";
public static final String STATUS_CHECKER_FREQUENCY_PERIOD = "controller.statuschecker.frequencyPeriod";
public static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_PERIOD =
"controller.statuschecker.waitForPushTimePeriod";
Expand Down Expand Up @@ -289,6 +293,7 @@ public static long getRandomInitialDelayInSeconds() {
public static final String DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_PERIOD = "1h";
public static final String DEFAULT_REALTIME_OFFSET_AUTO_RESET_BACKFILL_FREQUENCY_PERIOD = "1h";
public static final String DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_PERIOD = "1h";
public static final String DEFAULT_DEPRECATED_FIELD_SPEC_CHECKER_FREQUENCY_PERIOD = "1h";
public static final String DEFAULT_STATUS_CHECKER_FREQUENCY_PERIOD = "5m";
public static final String DEFAULT_REBALANCE_CHECKER_FREQUENCY_PERIOD = "5m";
public static final String DEFAULT_TENANT_REBALANCE_CHECKER_FREQUENCY_PERIOD = "5m";
Expand Down Expand Up @@ -752,6 +757,20 @@ public long getBrokerResourceValidationInitialDelayInSeconds() {
getPeriodicTaskInitialDelayInSeconds());
}

public int getDeprecatedFieldSpecCheckerFrequencyInSeconds() {
String period = getProperty(ControllerPeriodicTasksConf.DEPRECATED_FIELD_SPEC_CHECKER_FREQUENCY_PERIOD,
ControllerPeriodicTasksConf.DEFAULT_DEPRECATED_FIELD_SPEC_CHECKER_FREQUENCY_PERIOD);
if (!isValidPeriodWithLogging(ControllerPeriodicTasksConf.DEPRECATED_FIELD_SPEC_CHECKER_FREQUENCY_PERIOD, period)) {
period = ControllerPeriodicTasksConf.DEFAULT_DEPRECATED_FIELD_SPEC_CHECKER_FREQUENCY_PERIOD;
}
return (int) convertPeriodToSeconds(period);
}

public long getDeprecatedFieldSpecCheckerInitialDelayInSeconds() {
return getProperty(ControllerPeriodicTasksConf.DEPRECATED_FIELD_SPEC_CHECKER_INITIAL_DELAY_IN_SECONDS,
getPeriodicTaskInitialDelayInSeconds());
}

public int getStatusCheckerFrequencyInSeconds() {
String period = getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_PERIOD,
ControllerPeriodicTasksConf.DEFAULT_STATUS_CHECKER_FREQUENCY_PERIOD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,30 @@ private void validateSchemaName(Schema schema) {
}
}

/**
* Rejects new schemas that contain the deprecated TIME field. New schemas must use DATE_TIME (DateTimeFieldSpec).
* <p>
* Only enforced on the POST (create) path for genuinely new schemas. Re-applying an existing legacy schema (same
* name, already in ZK with a TimeFieldSpec) is allowed so that restore / reconcile workflows keep working. The
* PUT (update) endpoint and the {@code force=true} escape hatch are also lenient. This partial block is the first
* step toward removing {@link org.apache.pinot.spi.data.TimeFieldSpec};
* see https://github.com/apache/pinot/issues/2756.
*/
@SuppressWarnings("deprecation")
private void rejectDeprecatedTimeFieldSpec(Schema schema) {
if (schema.getTimeFieldSpec() == null) {
return;
}
// Allow re-applying an existing legacy schema; only block creating a brand-new one.
Schema existing = _pinotHelixResourceManager.getSchema(schema.getSchemaName());
if (existing != null && existing.getTimeFieldSpec() != null) {
return;
}
throw new ControllerApplicationException(LOGGER,
"Invalid schema: " + schema.getSchemaName() + ". Reason: TimeFieldSpec (fieldType=TIME) is deprecated; "
+ "use DateTimeFieldSpec (fieldType=DATE_TIME) instead.", Response.Status.BAD_REQUEST);
}

private void validateSchemaInternal(Schema schema) {
try {
List<TableConfig> tableConfigs = _pinotHelixResourceManager.getTableConfigsForSchema(schema.getSchemaName());
Expand All @@ -424,6 +448,9 @@ private void validateSchemaInternal(Schema schema) {
*/
private SuccessResponse addSchema(Schema schema, boolean override, boolean force) {
String schemaName = schema.getSchemaName();
if (!force) {
rejectDeprecatedTimeFieldSpec(schema);
}
validateSchemaInternal(schema);

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.validation;

import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/// Periodic cluster health check that flags tables whose schema still uses the deprecated
/// [org.apache.pinot.spi.data.TimeFieldSpec].
///
/// For each table the current controller leads, it loads the table schema and:
///
/// - logs a warning naming the table and schema if a `TimeFieldSpec` is present;
/// - emits a per-table gauge [ControllerGauge#TABLE_USES_DEPRECATED_TIME_FIELD_SPEC] (1 if affected, else 0);
/// - emits a global gauge [ControllerGauge#TABLES_WITH_DEPRECATED_TIME_FIELD_SPEC] with the count of affected
/// raw tables this controller saw on the latest run (hybrid tables count once).
///
/// The count is naturally partitioned across lead controllers; summing the gauge across controllers gives the
/// cluster-wide total. Per-table gauges are cleared via [#nonLeaderCleanup] when this controller loses leadership
/// for a table so the partitioning stays sound. See https://github.com/apache/pinot/issues/2756 for the
/// deprecation plan.
public class DeprecatedFieldSpecChecker extends ControllerPeriodicTask<DeprecatedFieldSpecChecker.Context> {
private static final Logger LOGGER = LoggerFactory.getLogger(DeprecatedFieldSpecChecker.class);

public DeprecatedFieldSpecChecker(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics) {
super("DeprecatedFieldSpecChecker", config.getDeprecatedFieldSpecCheckerFrequencyInSeconds(),
config.getDeprecatedFieldSpecCheckerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
controllerMetrics);
}

@Override
protected Context preprocess(Properties periodicTaskProperties) {
return new Context();
}

@Override
@SuppressWarnings("deprecation")
protected void processTable(String tableNameWithType, Context context) {
Schema schema = _pinotHelixResourceManager.getTableSchema(tableNameWithType);
if (schema == null) {
// Distinguish "not affected" (gauge=0) from "could not check" — do NOT overwrite the prior gauge here;
// a transient ZK miss should not silently flip a previously-flagged table to clean.
LOGGER.warn("No schema found for table {}; skipping deprecated-field check this run", tableNameWithType);
return;
}
if (schema.getTimeFieldSpec() != null) {
// Hybrid tables share a single raw schema; dedupe on raw table name so the global count is per-table.
if (context._affectedRawTables.add(TableNameBuilder.extractRawTableName(tableNameWithType))) {
LOGGER.warn("Table {} uses deprecated TimeFieldSpec (schema: {}); migrate to DateTimeFieldSpec. "
+ "See https://github.com/apache/pinot/issues/2756", tableNameWithType, schema.getSchemaName());
}
_controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.TABLE_USES_DEPRECATED_TIME_FIELD_SPEC, 1);
} else {
_controllerMetrics.setValueOfTableGauge(tableNameWithType,
ControllerGauge.TABLE_USES_DEPRECATED_TIME_FIELD_SPEC, 0);
}
}

@Override
protected void postprocess(Context context) {
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.TABLES_WITH_DEPRECATED_TIME_FIELD_SPEC,
context._affectedRawTables.size());
if (!context._affectedRawTables.isEmpty()) {
LOGGER.warn("{} table(s) led by this controller still use deprecated TimeFieldSpec",
context._affectedRawTables.size());
}
}

@Override
protected void nonLeaderCleanup(List<String> tableNamesWithType) {
// Once this controller is no longer leader for a table, drop the per-table gauge so summing the gauge across
// controllers does not double-count after a leadership shift.
for (String tableNameWithType : tableNamesWithType) {
_controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.TABLE_USES_DEPRECATED_TIME_FIELD_SPEC);
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — fixed in 222fa93. nonLeaderCleanup now detects the "all previously-led tables are being cleaned up" case and resets the global gauge to 0; added a regression test testGlobalGaugeResetWhenAllLeadershipLost.

// If every previously-led table is being cleaned up, ControllerPeriodicTask.runTask did NOT call processTables
// (and hence postprocess) this cycle, so the global gauge would otherwise stay stuck at its old value.
if (tableNamesWithType.size() == _prevLeaderOfTables.size()) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ControllerPeriodicTask.runTask() calls processTables() before nonLeaderCleanup(). If leadership fully rotates in one cycle (all old tables lost, new tables gained), this size check still matches and resets TABLES_WITH_DEPRECATED_TIME_FIELD_SPEC to 0 after postprocess() already set the count for the new leaders. That leaves the global gauge wrong until the next run. This reset needs to be gated on the current leader set actually being empty, or moved into a code path that can see the current leaders.

_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.TABLES_WITH_DEPRECATED_TIME_FIELD_SPEC, 0);
}
}

public static final class Context {
private final Set<String> _affectedRawTables = new HashSet<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,45 @@ public void testUnrecognizedPropertiesFileEndpoints()
"{\"unrecognizedProperties\":{\"/illegalKey1\":1},\"status\":\"transcript2 successfully added\"}");
}

@Test
public void testRejectDeprecatedTimeFieldSpec()
throws Exception {
PinotAdminClient adminClient = TEST_INSTANCE.getOrCreateAdminClient();
String timeSchemaJson = "{\n"
+ " \"schemaName\" : \"legacyTimeSchema\",\n"
+ " \"dimensionFieldSpecs\" : [ { \"name\" : \"d1\", \"dataType\" : \"STRING\" } ],\n"
+ " \"timeFieldSpec\" : {\n"
+ " \"incomingGranularitySpec\" : { \"name\" : \"ts\", \"dataType\" : \"LONG\","
+ " \"timeType\" : \"MILLISECONDS\" }\n"
+ " }\n"
+ "}";

// POST must reject because TimeFieldSpec is deprecated.
RuntimeException runtimeException = expectThrows(RuntimeException.class,
() -> runUnchecked(() -> adminClient.getSchemaClient().createSchema(timeSchemaJson)));
Throwable cause = unwrap(runtimeException);
assertTrue(cause instanceof PinotAdminValidationException, "Unexpected exception: " + cause);
assertTrue(cause.getMessage().contains("TimeFieldSpec"),
"Expected TimeFieldSpec error, got: " + cause.getMessage());

// POST with force=true keeps the operator escape hatch open: a legacy schema can still be (re)applied.
String forcedSchemaName = "legacyTimeSchemaForced";
String forcedSchemaJson = timeSchemaJson.replace("legacyTimeSchema", forcedSchemaName);
adminClient.getSchemaClient().createSchema(forcedSchemaJson, true, true);
try {
// PUT must remain lenient so existing legacy schemas in ZK can still be updated.
adminClient.getSchemaClient().updateSchema(forcedSchemaName, forcedSchemaJson);
Schema fetched = adminClient.getSchemaClient().getSchemaObject(forcedSchemaName);
assertEquals(fetched.getSchemaName(), forcedSchemaName);

// POST re-apply (no force) of an EXISTING legacy schema must also succeed, so restore/reconcile
// workflows keep working — only genuinely new TimeFieldSpec schemas are blocked.
adminClient.getSchemaClient().createSchema(forcedSchemaJson);
} finally {
adminClient.getSchemaClient().deleteSchema(forcedSchemaName);
}
}

@Test
public void testSchemaDeletionWithLogicalTable()
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public ControllerStarter createControllerStarter() {
}

private class MockControllerStarter extends ControllerStarter {
private static final int NUM_PERIODIC_TASKS = 13;
private static final int NUM_PERIODIC_TASKS = 14;

public MockControllerStarter() {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,12 +800,14 @@ public void addDummySchema(String tableName)
}

/**
* Add a schema to the controller.
* Add a schema to the controller. Test infrastructure path: passes {@code force=true} so legacy test fixtures
* (e.g. shared schemas that still use TimeFieldSpec) are accepted -- production callers go through the strict
* default path.
*/
public void addSchema(Schema schema)
throws IOException {
try {
getOrCreateAdminClient().getSchemaClient().createSchema(schema.toSingleLineJsonString());
getOrCreateAdminClient().getSchemaClient().createSchema(schema.toSingleLineJsonString(), true, true);
} catch (PinotAdminException e) {
throw new IOException(e);
}
Expand Down
Loading
Loading