Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,38 @@ object PaginatedResultEvent {
): PaginatedResultEvent = {
PaginatedResultEvent(req.requestID, req.operatorID, req.pageIndex, table, schema)
}

def apply(
req: ResultPaginationRequest,
table: List[ObjectNode],
schema: List[Attribute],
totalNumTuples: Long,
sortSkipped: Boolean
): PaginatedResultEvent = {
PaginatedResultEvent(
req.requestID,
req.operatorID,
req.pageIndex,
table,
schema,
Some(totalNumTuples),
sortSkipped
)
}
}

case class PaginatedResultEvent(
requestID: String,
operatorID: String,
pageIndex: Int,
table: List[ObjectNode],
schema: List[Attribute]
schema: List[Attribute],
// Total rows matching the request's filters / rowSearch. Optional because the
// unfiltered fast path doesn't bother recomputing it — the frontend already
// tracks totalNumTuples from WebPaginationUpdate in that case.
totalNumTuples: Option[Long] = None,
// True when sort was requested but the filtered row count exceeded the
// configured `storage.result.sort.max-rows` cap; the table comes back in
// scan order so the UI can prompt the user to narrow their filter.
sortSkipped: Boolean = false
) extends TexeraWebSocketEvent
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,21 @@

package org.apache.texera.web.model.websocket.request

import org.apache.texera.amber.core.storage.model.{ColumnFilter, SortSpec}

case class ResultPaginationRequest(
requestID: String,
operatorID: String,
pageIndex: Int,
pageSize: Int,
// The columnOffset / columnLimit / columnSearch fields predate ag-grid's
// built-in column virtualization and column-toggle UI. Kept defaulted for
// wire compatibility with the Python SDK; new frontends do not set them.
columnOffset: Int = 0,
columnLimit: Int = Int.MaxValue,
columnSearch: Option[String] = None
columnSearch: Option[String] = None,
// Phase 2 / Phase 3 of the result-pane upgrade — row-level pushdown.
filters: Seq[ColumnFilter] = Seq.empty,
sorts: Seq[SortSpec] = Seq.empty,
rowSearch: Option[String] = None
) extends TexeraWebSocketRequest
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.pekko.actor.Cancellable
import com.fasterxml.jackson.annotation.{JsonTypeInfo, JsonTypeName}
import com.fasterxml.jackson.databind.node.ObjectNode
import com.typesafe.scalalogging.LazyLogging
import org.apache.texera.amber.config.ApplicationConfig
import org.apache.texera.amber.config.{ApplicationConfig, StorageConfig}
import org.apache.texera.amber.core.storage.model.VirtualDocument
import org.apache.texera.amber.core.storage.result._
import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory}
Expand Down Expand Up @@ -454,16 +454,44 @@ class ExecutionResultService(
)
}

val paginationIterable = {
virtualDocument
val hasQuery =
request.filters.nonEmpty || request.sorts.nonEmpty || request.rowSearch.exists(_.nonEmpty)

if (!hasQuery) {
val paginationIterable = virtualDocument
.getRange(from, from + request.pageSize, columns)
.to(Iterable)
val mappedResults = convertTuplesToJson(paginationIterable)
val attributes = paginationIterable.headOption
.map(_.getSchema.getAttributes)
.getOrElse(List.empty)
PaginatedResultEvent.apply(request, mappedResults, attributes)
} else {
// Filter / sort / rowSearch path: compute totalNumTuples up front so the
// frontend datasource can size the infinite scrollbar after the user's
// filter applies. If sort was requested and the matched row count blows
// the cap, IcebergDocument silently returns scan order — we surface that
// via `sortSkipped` so the UI can banner.
val totalMatching = virtualDocument.countWithQuery(request.filters, request.rowSearch)
val sortRequested = request.sorts.nonEmpty
val sortSkipped = sortRequested && totalMatching > StorageConfig.resultSortMaxRows

val paginationIterable = virtualDocument
.getRangeWithQuery(
from,
from + request.pageSize,
columns,
request.filters,
request.sorts,
request.rowSearch
)
.to(Iterable)
val mappedResults = convertTuplesToJson(paginationIterable)
val attributes = paginationIterable.headOption
.map(_.getSchema.getAttributes)
.getOrElse(List.empty)
PaginatedResultEvent.apply(request, mappedResults, attributes, totalMatching, sortSkipped)
}
val mappedResults = convertTuplesToJson(paginationIterable)
val attributes = paginationIterable.headOption
.map(_.getSchema.getAttributes)
.getOrElse(List.empty)
PaginatedResultEvent.apply(request, mappedResults, attributes)

case None =>
// Handle the case when storageUri is empty
Expand Down
13 changes: 13 additions & 0 deletions common/config/src/main/resources/storage.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@
# See PR https://github.com/Texera/texera/pull/3326 for configuration guidelines.
storage {

# Configuration for result pane query behavior.
# The result pane (frontend) lets users sort by any column; Iceberg cannot push
# ORDER BY into the file reader, so sorts are evaluated in process memory. To
# prevent OOM on very large operator outputs, sorting is skipped once the
# post-filter row count exceeds this cap — the response carries a flag so the UI
# can prompt the user to narrow the filter.
result {
sort {
max-rows = 100000
max-rows = ${?STORAGE_RESULT_SORT_MAX_ROWS}
}
}

# Configuration for Apache Iceberg, used for storing the workflow results & stats
iceberg {
catalog {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ object StorageConfig {
val jdbcUsername: String = conf.getString("storage.jdbc.username")
val jdbcPassword: String = conf.getString("storage.jdbc.password")

// Result-pane query specifics
val resultSortMaxRows: Long = conf.getLong("storage.result.sort.max-rows")

// Iceberg specifics
val icebergCatalogType: String = conf.getString("storage.iceberg.catalog.type")
val icebergRESTCatalogUri: String = conf.getString("storage.iceberg.catalog.rest.uri")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.core.storage.model

/**
* Row-level filter predicate for a VirtualDocument query.
*
* Values are sent as strings over the wire for simplicity; storage backends are
* responsible for parsing them against the column's declared type (see
* IcebergPredicateBuilder for the canonical implementation).
*
* Supported `op` values: eq, ne, lt, le, gt, ge, contains, startsWith, endsWith,
* isNull, isNotNull, in.
*/
case class ColumnFilter(
columnName: String,
op: String,
value: Option[String] = None,
values: Option[Seq[String]] = None
)

/** Sort specification for a single column. `direction` must be "asc" or "desc". */
case class SortSpec(columnName: String, direction: String)
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,40 @@ abstract class VirtualDocument[T] extends ReadonlyVirtualDocument[T] {
def getRange(from: Int, until: Int, columns: Option[Seq[String]] = None): Iterator[T] =
throw new NotImplementedError("getRange method is not implemented")

/**
* Get an iterator over rows matching `filters` and `rowSearch`, ordered by `sorts`,
* sliced to `[from, until)` of the filtered/sorted view.
*
* Unlike `getRange`, the `from`/`until` indices here address the post-filter,
* post-sort sequence — not the underlying stored order. Implementations that don't
* support query pushdown should leave the default in place, which ignores
* filters/sorts/rowSearch and falls through to `getRange`.
*
* @param from index (inclusive) into the filtered, sorted view
* @param until index (exclusive) into the filtered, sorted view
* @param columns columns to project (None = all columns)
* @param filters row predicates ANDed together
* @param sorts sort keys applied in order
* @param rowSearch free-text substring; implementation defines how it expands across columns
*/
def getRangeWithQuery(
from: Int,
until: Int,
columns: Option[Seq[String]] = None,
filters: Seq[ColumnFilter] = Seq.empty,
sorts: Seq[SortSpec] = Seq.empty,
rowSearch: Option[String] = None
): Iterator[T] = getRange(from, until, columns)

/**
* Count rows that satisfy `filters` and `rowSearch`. Used by paginated UIs to
* size the scrollbar after a filter is applied. Default falls back to total count.
*/
def countWithQuery(
filters: Seq[ColumnFilter] = Seq.empty,
rowSearch: Option[String] = None
): Long = getCount

/**
* get an iterator of all items after the specified index `offset`
* @param offset the starting index (exclusive)
Expand Down
Loading
Loading