diff --git a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/PaginatedResultEvent.scala b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/PaginatedResultEvent.scala index 081d072180b..910d9f4bc94 100644 --- a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/PaginatedResultEvent.scala +++ b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/PaginatedResultEvent.scala @@ -31,6 +31,24 @@ object PaginatedResultEvent { ): PaginatedResultEvent = { PaginatedResultEvent(req.requestID, req.operatorID, req.pageIndex, table, schema) } + + def apply( + req: ResultPaginationRequest, + table: List[ObjectNode], + schema: List[Attribute], + totalNumTuples: Long, + sortSkipped: Boolean + ): PaginatedResultEvent = { + PaginatedResultEvent( + req.requestID, + req.operatorID, + req.pageIndex, + table, + schema, + Some(totalNumTuples), + sortSkipped + ) + } } case class PaginatedResultEvent( @@ -38,5 +56,13 @@ case class PaginatedResultEvent( operatorID: String, pageIndex: Int, table: List[ObjectNode], - schema: List[Attribute] + schema: List[Attribute], + // Total rows matching the request's filters / rowSearch. Optional because the + // unfiltered fast path doesn't bother recomputing it — the frontend already + // tracks totalNumTuples from WebPaginationUpdate in that case. + totalNumTuples: Option[Long] = None, + // True when sort was requested but the filtered row count exceeded the + // configured `storage.result.sort.max-rows` cap; the table comes back in + // scan order so the UI can prompt the user to narrow their filter. + sortSkipped: Boolean = false ) extends TexeraWebSocketEvent diff --git a/amber/src/main/scala/org/apache/texera/web/model/websocket/request/ResultPaginationRequest.scala b/amber/src/main/scala/org/apache/texera/web/model/websocket/request/ResultPaginationRequest.scala index 4a3e0a58a3e..195d9bbc965 100644 --- a/amber/src/main/scala/org/apache/texera/web/model/websocket/request/ResultPaginationRequest.scala +++ b/amber/src/main/scala/org/apache/texera/web/model/websocket/request/ResultPaginationRequest.scala @@ -19,12 +19,21 @@ package org.apache.texera.web.model.websocket.request +import org.apache.texera.amber.core.storage.model.{ColumnFilter, SortSpec} + case class ResultPaginationRequest( requestID: String, operatorID: String, pageIndex: Int, pageSize: Int, + // The columnOffset / columnLimit / columnSearch fields predate ag-grid's + // built-in column virtualization and column-toggle UI. Kept defaulted for + // wire compatibility with the Python SDK; new frontends do not set them. columnOffset: Int = 0, columnLimit: Int = Int.MaxValue, - columnSearch: Option[String] = None + columnSearch: Option[String] = None, + // Phase 2 / Phase 3 of the result-pane upgrade — row-level pushdown. + filters: Seq[ColumnFilter] = Seq.empty, + sorts: Seq[SortSpec] = Seq.empty, + rowSearch: Option[String] = None ) extends TexeraWebSocketRequest diff --git a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala index b335ed0c3c7..00d16789af4 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/ExecutionResultService.scala @@ -23,7 +23,7 @@ import org.apache.pekko.actor.Cancellable import com.fasterxml.jackson.annotation.{JsonTypeInfo, JsonTypeName} import com.fasterxml.jackson.databind.node.ObjectNode import com.typesafe.scalalogging.LazyLogging -import org.apache.texera.amber.config.ApplicationConfig +import org.apache.texera.amber.config.{ApplicationConfig, StorageConfig} import org.apache.texera.amber.core.storage.model.VirtualDocument import org.apache.texera.amber.core.storage.result._ import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory} @@ -454,16 +454,44 @@ class ExecutionResultService( ) } - val paginationIterable = { - virtualDocument + val hasQuery = + request.filters.nonEmpty || request.sorts.nonEmpty || request.rowSearch.exists(_.nonEmpty) + + if (!hasQuery) { + val paginationIterable = virtualDocument .getRange(from, from + request.pageSize, columns) .to(Iterable) + val mappedResults = convertTuplesToJson(paginationIterable) + val attributes = paginationIterable.headOption + .map(_.getSchema.getAttributes) + .getOrElse(List.empty) + PaginatedResultEvent.apply(request, mappedResults, attributes) + } else { + // Filter / sort / rowSearch path: compute totalNumTuples up front so the + // frontend datasource can size the infinite scrollbar after the user's + // filter applies. If sort was requested and the matched row count blows + // the cap, IcebergDocument silently returns scan order — we surface that + // via `sortSkipped` so the UI can banner. + val totalMatching = virtualDocument.countWithQuery(request.filters, request.rowSearch) + val sortRequested = request.sorts.nonEmpty + val sortSkipped = sortRequested && totalMatching > StorageConfig.resultSortMaxRows + + val paginationIterable = virtualDocument + .getRangeWithQuery( + from, + from + request.pageSize, + columns, + request.filters, + request.sorts, + request.rowSearch + ) + .to(Iterable) + val mappedResults = convertTuplesToJson(paginationIterable) + val attributes = paginationIterable.headOption + .map(_.getSchema.getAttributes) + .getOrElse(List.empty) + PaginatedResultEvent.apply(request, mappedResults, attributes, totalMatching, sortSkipped) } - val mappedResults = convertTuplesToJson(paginationIterable) - val attributes = paginationIterable.headOption - .map(_.getSchema.getAttributes) - .getOrElse(List.empty) - PaginatedResultEvent.apply(request, mappedResults, attributes) case None => // Handle the case when storageUri is empty diff --git a/common/config/src/main/resources/storage.conf b/common/config/src/main/resources/storage.conf index 29d5f7be512..95b56bb2fb8 100644 --- a/common/config/src/main/resources/storage.conf +++ b/common/config/src/main/resources/storage.conf @@ -18,6 +18,19 @@ # See PR https://github.com/Texera/texera/pull/3326 for configuration guidelines. storage { + # Configuration for result pane query behavior. + # The result pane (frontend) lets users sort by any column; Iceberg cannot push + # ORDER BY into the file reader, so sorts are evaluated in process memory. To + # prevent OOM on very large operator outputs, sorting is skipped once the + # post-filter row count exceeds this cap — the response carries a flag so the UI + # can prompt the user to narrow the filter. + result { + sort { + max-rows = 100000 + max-rows = ${?STORAGE_RESULT_SORT_MAX_ROWS} + } + } + # Configuration for Apache Iceberg, used for storing the workflow results & stats iceberg { catalog { diff --git a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala index 728e3c0c2de..10644e9d009 100644 --- a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala +++ b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala @@ -34,6 +34,9 @@ object StorageConfig { val jdbcUsername: String = conf.getString("storage.jdbc.username") val jdbcPassword: String = conf.getString("storage.jdbc.password") + // Result-pane query specifics + val resultSortMaxRows: Long = conf.getLong("storage.result.sort.max-rows") + // Iceberg specifics val icebergCatalogType: String = conf.getString("storage.iceberg.catalog.type") val icebergRESTCatalogUri: String = conf.getString("storage.iceberg.catalog.rest.uri") diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/QueryClauses.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/QueryClauses.scala new file mode 100644 index 00000000000..09e4986d5e7 --- /dev/null +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/QueryClauses.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.core.storage.model + +/** + * Row-level filter predicate for a VirtualDocument query. + * + * Values are sent as strings over the wire for simplicity; storage backends are + * responsible for parsing them against the column's declared type (see + * IcebergPredicateBuilder for the canonical implementation). + * + * Supported `op` values: eq, ne, lt, le, gt, ge, contains, startsWith, endsWith, + * isNull, isNotNull, in. + */ +case class ColumnFilter( + columnName: String, + op: String, + value: Option[String] = None, + values: Option[Seq[String]] = None +) + +/** Sort specification for a single column. `direction` must be "asc" or "desc". */ +case class SortSpec(columnName: String, direction: String) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/VirtualDocument.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/VirtualDocument.scala index 8e72c52a803..7f2ca9a263d 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/VirtualDocument.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/model/VirtualDocument.scala @@ -61,6 +61,40 @@ abstract class VirtualDocument[T] extends ReadonlyVirtualDocument[T] { def getRange(from: Int, until: Int, columns: Option[Seq[String]] = None): Iterator[T] = throw new NotImplementedError("getRange method is not implemented") + /** + * Get an iterator over rows matching `filters` and `rowSearch`, ordered by `sorts`, + * sliced to `[from, until)` of the filtered/sorted view. + * + * Unlike `getRange`, the `from`/`until` indices here address the post-filter, + * post-sort sequence — not the underlying stored order. Implementations that don't + * support query pushdown should leave the default in place, which ignores + * filters/sorts/rowSearch and falls through to `getRange`. + * + * @param from index (inclusive) into the filtered, sorted view + * @param until index (exclusive) into the filtered, sorted view + * @param columns columns to project (None = all columns) + * @param filters row predicates ANDed together + * @param sorts sort keys applied in order + * @param rowSearch free-text substring; implementation defines how it expands across columns + */ + def getRangeWithQuery( + from: Int, + until: Int, + columns: Option[Seq[String]] = None, + filters: Seq[ColumnFilter] = Seq.empty, + sorts: Seq[SortSpec] = Seq.empty, + rowSearch: Option[String] = None + ): Iterator[T] = getRange(from, until, columns) + + /** + * Count rows that satisfy `filters` and `rowSearch`. Used by paginated UIs to + * size the scrollbar after a filter is applied. Default falls back to total count. + */ + def countWithQuery( + filters: Seq[ColumnFilter] = Seq.empty, + rowSearch: Option[String] = None + ): Long = getCount + /** * get an iterator of all items after the specified index `offset` * @param offset the starting index (exclusive) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala index e10152cdaeb..380493b7394 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala @@ -19,14 +19,16 @@ package org.apache.texera.amber.core.storage.result.iceberg +import org.apache.texera.amber.config.StorageConfig import org.apache.texera.amber.core.storage.IcebergCatalogInstance -import org.apache.texera.amber.core.storage.model.{BufferedItemWriter, VirtualDocument} +import org.apache.texera.amber.core.storage.model.{BufferedItemWriter, ColumnFilter, SortSpec, VirtualDocument} import org.apache.texera.amber.core.storage.util.StorageUtil.{withLock, withReadLock, withWriteLock} import org.apache.texera.amber.util.IcebergUtil import org.apache.commons.io.IOUtils import org.apache.iceberg.catalog.{Catalog, TableIdentifier} import org.apache.iceberg.data.Record import org.apache.iceberg.exceptions.NoSuchTableException +import org.apache.iceberg.expressions.Expression import org.apache.iceberg.types.{Conversions, Types} import org.apache.iceberg.{FileScanTask, Table} @@ -128,6 +130,187 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef]( table.newScan().planFiles().iterator().asScala.map(f => f.file().recordCount()).sum } + /** + * Query-aware overload of getRange. Translates `filters` to Iceberg predicates for + * pushdown into the Parquet reader, then applies residual filters (`contains`, + * `endsWith`), `rowSearch`, and `sorts` in memory. The `from`/`until` slice is + * taken from the post-filter, post-sort view. + * + * Sort is capped by `storage.result.sort.max-rows`: when the filtered count + * exceeds the cap, results are returned unsorted in scan order. Callers can + * detect this by comparing `countWithQuery` to the cap. + */ + override def getRangeWithQuery( + from: Int, + until: Int, + columns: Option[Seq[String]] = None, + filters: Seq[ColumnFilter] = Seq.empty, + sorts: Seq[SortSpec] = Seq.empty, + rowSearch: Option[String] = None + ): Iterator[T] = { + if (filters.isEmpty && sorts.isEmpty && rowSearch.isEmpty) { + return getRange(from, until, columns) + } + + withReadLock(lock) { + val tableOpt = IcebergUtil.loadTableMetadata(catalog, tableNamespace, tableName) + if (tableOpt.isEmpty) return Iterator.empty + val table = tableOpt.get + table.refresh() + + val (pushdownExpr, residualFilters) = + IcebergPredicateBuilder.buildPushdownAndResidual(filters, table.schema()) + + // Sort and rowSearch may need columns not present in `columns` (e.g. sort by + // a hidden column, or rowSearch across all strings). Materialize each scan + // with the full schema so residual evaluation works; we re-project at the end. + val records = scanRecords(table, pushdownExpr) + val filtered = records + .filter(rec => matchesAll(rec, residualFilters, table.schema())) + .filter(rec => matchesRowSearch(rec, rowSearch, table.schema())) + + val ordered: Iterator[Record] = + if (sorts.isEmpty) filtered + else sortBoundedOrFallback(filtered, sorts, table.schema()) + + val sliced = ordered.slice(from, until) + + val projectionSchema = columns match { + case Some(cols) => tableSchema.select(cols.asJava) + case None => tableSchema + } + sliced.map(rec => deserde(projectionSchema, rec)) + } + } + + /** + * Count of rows matching `filters` and `rowSearch`. Used by paginated UIs to size + * the scrollbar after a filter is applied; returns `getCount` (file-stat sum) when + * no predicates are present, otherwise scans the filtered iterator. + */ + override def countWithQuery( + filters: Seq[ColumnFilter] = Seq.empty, + rowSearch: Option[String] = None + ): Long = { + if (filters.isEmpty && rowSearch.isEmpty) return getCount + + withReadLock(lock) { + val tableOpt = IcebergUtil.loadTableMetadata(catalog, tableNamespace, tableName) + if (tableOpt.isEmpty) return 0L + val table = tableOpt.get + table.refresh() + val (pushdownExpr, residualFilters) = + IcebergPredicateBuilder.buildPushdownAndResidual(filters, table.schema()) + val records = scanRecords(table, pushdownExpr) + records + .filter(rec => matchesAll(rec, residualFilters, table.schema())) + .count(rec => matchesRowSearch(rec, rowSearch, table.schema())).toLong + } + } + + /** + * Plan scan tasks under the given pushdown expression, sorted by file sequence + * number to keep the in-memory residual evaluator deterministic. + */ + private def scanRecords(table: Table, pushdownExpr: Option[Expression]): Iterator[Record] = { + val baseScan = table.newScan() + val scan = pushdownExpr.fold(baseScan)(baseScan.filter) + val fileTasks = scan.planFiles().iterator().asScala.toSeq.sortBy(_.file().fileSequenceNumber()) + fileTasks.iterator.flatMap { task => + IcebergUtil.readDataFileAsIterator(task.file(), tableSchema, table) + } + } + + private def matchesAll(record: Record, filters: Seq[ColumnFilter], schema: org.apache.iceberg.Schema): Boolean = { + filters.forall(f => matchesOne(record, f, schema)) + } + + private def matchesOne(record: Record, filter: ColumnFilter, schema: org.apache.iceberg.Schema): Boolean = { + val raw = record.getField(filter.columnName) + val needle = filter.value.getOrElse("") + filter.op match { + case "contains" => raw != null && raw.toString.contains(needle) + case "endsWith" => raw != null && raw.toString.endsWith(needle) + case _ => true // pushdown-handled ops never reach here + } + } + + /** + * rowSearch is a substring match across every string-typed column. We do it on + * the raw Record so we can answer based on the field type without deserializing + * the whole row first. + */ + private def matchesRowSearch( + record: Record, + rowSearch: Option[String], + schema: org.apache.iceberg.Schema + ): Boolean = { + rowSearch match { + case None | Some("") => true + case Some(needle) => + val n = needle.toLowerCase + schema.columns().asScala.exists { col => + val v = record.getField(col.name()) + v != null && v.toString.toLowerCase.contains(n) + } + } + } + + /** + * In-memory sort, capped at `storage.result.sort.max-rows`. When the filtered + * count exceeds the cap we return the iterator in scan order — the caller is + * expected to check `countWithQuery` against the cap and surface a UI warning. + */ + private def sortBoundedOrFallback( + records: Iterator[Record], + sorts: Seq[SortSpec], + schema: org.apache.iceberg.Schema + ): Iterator[Record] = { + val cap = StorageConfig.resultSortMaxRows + val buffer = mutable.ArrayBuffer.empty[Record] + val it = records + var overCap = false + while (it.hasNext && !overCap) { + val r = it.next() + buffer += r + if (buffer.size > cap) overCap = true + } + if (overCap) { + // Concatenate what we've buffered with the remainder, unsorted. + buffer.iterator ++ it + } else { + val ordering = sorts.foldRight(Ordering.by[Record, Int](_ => 0)) { (spec, base) => + val cmp = recordOrdering(spec.columnName, spec.direction) + new Ordering[Record] { + override def compare(a: Record, b: Record): Int = { + val r = cmp.compare(a, b) + if (r != 0) r else base.compare(a, b) + } + } + } + buffer.sorted(ordering).iterator + } + } + + private def recordOrdering(columnName: String, direction: String): Ordering[Record] = { + val base: Ordering[Record] = new Ordering[Record] { + override def compare(a: Record, b: Record): Int = { + val va = a.getField(columnName) + val vb = b.getField(columnName) + (va, vb) match { + case (null, null) => 0 + case (null, _) => -1 + case (_, null) => 1 + case (x: java.lang.Comparable[_], y) => + x.asInstanceOf[java.lang.Comparable[Any]].compareTo(y) + case (x, y) => + x.toString.compareTo(y.toString) + } + } + } + if (direction.equalsIgnoreCase("desc")) base.reverse else base + } + /** * Creates a BufferedItemWriter for writing data to the table. * diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergPredicateBuilder.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergPredicateBuilder.scala new file mode 100644 index 00000000000..a1a41fd9f1f --- /dev/null +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergPredicateBuilder.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.core.storage.result.iceberg + +import org.apache.iceberg.Schema +import org.apache.iceberg.expressions.{Expression, Expressions} +import org.apache.iceberg.types.Type +import org.apache.iceberg.types.Types +import org.apache.texera.amber.core.storage.model.ColumnFilter + +/** + * Translates [[ColumnFilter]]s (string-typed-over-the-wire) into Iceberg [[Expression]]s + * suitable for predicate pushdown via `Scan.filter(...)`. + * + * Iceberg pushes the resulting predicates into the Parquet reader and prunes whole + * data files using min/max stats, so accurate type parsing here is the single biggest + * lever for read performance on filtered queries. Filters whose semantics Iceberg + * cannot express (`contains`, `endsWith`) are reported back to the caller via + * [[buildPushdownAndResidual]] so the caller can run them as an in-memory pass. + */ +object IcebergPredicateBuilder { + + /** Operators we can fully express as an Iceberg [[Expression]]. */ + private val PushdownOps: Set[String] = + Set("eq", "ne", "lt", "le", "gt", "ge", "startsWith", "isNull", "isNotNull", "in") + + /** Operators we evaluate after the scan because Iceberg has no native equivalent. */ + private val ResidualOps: Set[String] = Set("contains", "endsWith") + + case class ParseError(columnName: String, value: String, expectedType: String) extends RuntimeException( + s"Cannot interpret '$value' as $expectedType for column '$columnName'" + ) + + /** + * Split the filters into (pushdownExpression, residualFilters): + * - pushdownExpression: ANDed Iceberg predicate suitable for `Scan.filter` + * - residualFilters: filters that must be applied in memory over scan output + * + * Throws [[ParseError]] for malformed values so the caller can surface a typed + * error back to the UI rather than silently returning wrong rows. + */ + def buildPushdownAndResidual( + filters: Seq[ColumnFilter], + schema: Schema + ): (Option[Expression], Seq[ColumnFilter]) = { + if (filters.isEmpty) return (None, Seq.empty) + + val pushable = filters.filter(f => PushdownOps.contains(f.op)) + val residual = filters.filterNot(f => PushdownOps.contains(f.op)) + + residual.foreach { f => + if (!ResidualOps.contains(f.op)) { + throw new IllegalArgumentException(s"Unsupported filter op '${f.op}' on column '${f.columnName}'") + } + } + + val expression = pushable + .map(toExpression(_, schema)) + .reduceOption[Expression]((acc, e) => Expressions.and(acc, e)) + + (expression, residual) + } + + /** + * Convert a single pushdown-capable filter to an Iceberg [[Expression]]. + * Caller is responsible for filtering to PushdownOps first. + */ + def toExpression(filter: ColumnFilter, schema: Schema): Expression = { + val field = Option(schema.findField(filter.columnName)) + .getOrElse(throw new IllegalArgumentException(s"Unknown column: ${filter.columnName}")) + val icebergType = field.`type`() + + filter.op match { + case "isNull" => Expressions.isNull(filter.columnName) + case "isNotNull" => Expressions.notNull(filter.columnName) + case "in" => + val parsed = filter.values + .getOrElse( + throw new IllegalArgumentException(s"`in` filter requires `values` (column: ${filter.columnName})") + ) + .map(v => parseValue(filter.columnName, v, icebergType)) + Expressions.in(filter.columnName, parsed: _*) + case op => + val raw = filter.value.getOrElse( + throw new IllegalArgumentException(s"`$op` filter requires `value` (column: ${filter.columnName})") + ) + val parsed = parseValue(filter.columnName, raw, icebergType) + op match { + case "eq" => Expressions.equal(filter.columnName, parsed) + case "ne" => Expressions.notEqual(filter.columnName, parsed) + case "lt" => Expressions.lessThan(filter.columnName, parsed) + case "le" => Expressions.lessThanOrEqual(filter.columnName, parsed) + case "gt" => Expressions.greaterThan(filter.columnName, parsed) + case "ge" => Expressions.greaterThanOrEqual(filter.columnName, parsed) + case "startsWith" => Expressions.startsWith(filter.columnName, raw) + case _ => throw new IllegalArgumentException(s"Op `$op` is not pushdown-capable") + } + } + } + + /** + * Parse a string value into the JVM type Iceberg expects for the given column type. + * Throws [[ParseError]] when the value doesn't fit the type — letting the websocket + * layer translate that into a structured client error. + */ + def parseValue(columnName: String, raw: String, icebergType: Type): AnyRef = { + try { + icebergType match { + case _ if icebergType == Types.IntegerType.get() => Integer.valueOf(raw.trim) + case _ if icebergType == Types.LongType.get() => java.lang.Long.valueOf(raw.trim) + case _ if icebergType == Types.DoubleType.get() => java.lang.Double.valueOf(raw.trim) + case _ if icebergType == Types.FloatType.get() => java.lang.Float.valueOf(raw.trim) + case _ if icebergType == Types.BooleanType.get() => java.lang.Boolean.valueOf(raw.trim) + case _ if icebergType == Types.StringType.get() => raw + case _ if icebergType == Types.TimestampType.withoutZone() => parseTimestampMicros(raw) + case _ if icebergType == Types.TimestampType.withZone() => parseTimestampMicros(raw) + case _ if icebergType == Types.DateType.get() => Integer.valueOf(java.time.LocalDate.parse(raw.trim).toEpochDay.toInt) + case _ => raw + } + } catch { + case _: NumberFormatException | _: java.time.format.DateTimeParseException => + throw ParseError(columnName, raw, icebergType.toString) + } + } + + /** + * Iceberg stores TIMESTAMP as microseconds-since-epoch. Accept either a numeric + * micros value, an ISO-8601 instant, or a millis-since-epoch number (sniffed by + * magnitude). All three are common shapes for ag-grid's date filter values. + */ + private def parseTimestampMicros(raw: String): java.lang.Long = { + val trimmed = raw.trim + if (trimmed.forall(_.isDigit) || (trimmed.startsWith("-") && trimmed.drop(1).forall(_.isDigit))) { + val n = java.lang.Long.parseLong(trimmed) + // Numbers below year-3000 in millis (< ~3.3e13) come from JS Date.getTime(); + // anything larger we treat as already-micros. + if (math.abs(n) < 100000000000000L) java.lang.Long.valueOf(n * 1000L) else java.lang.Long.valueOf(n) + } else { + val instant = java.time.Instant.parse(trimmed) + java.lang.Long.valueOf(instant.getEpochSecond * 1000000L + instant.getNano / 1000L) + } + } +} diff --git a/frontend/package.json b/frontend/package.json index 08b298260e3..e04da490196 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -41,6 +41,8 @@ "@ngneat/until-destroy": "8.1.4", "@ngx-formly/core": "6.3.12", "@ngx-formly/ng-zorro-antd": "6.3.12", + "ag-grid-angular": "^33", + "ag-grid-community": "^33", "ai": "5.0.93", "ajv": "8.10.0", "concaveman": "2.0.0", diff --git a/frontend/src/app/workspace/component/result-panel/result-panel.component.html b/frontend/src/app/workspace/component/result-panel/result-panel.component.html index d2c6a535493..cfca0afe42c 100644 --- a/frontend/src/app/workspace/component/result-panel/result-panel.component.html +++ b/frontend/src/app/workspace/component/result-panel/result-panel.component.html @@ -22,18 +22,6 @@ nz-menu id="result-buttons" [ngClass]="{'shadow': !width}"> -
  • - -
  • -
  • + [class.hidden]="!width" + (nzResize)="onResize($event)">