Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions packages/bigframes/bigframes/blob/_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,14 @@ def _create_udf(self):
\"\"\"
"""

bf_io_bigquery.start_query_with_client(
bf_io_bigquery.start_query_with_job(
self._session.bqclient,
sql,
job_config=bigquery.QueryJobConfig(),
metrics=self._session._metrics,
location=None,
project=None,
timeout=None,
query_with_job=True,
publisher=self._session._publisher,
)

Expand Down
3 changes: 3 additions & 0 deletions packages/bigframes/bigframes/core/bq_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@ def __post_init__(self):
# Optimization field, must be correct if set, don't put maybe-stale number here
n_rows: Optional[int] = None

def with_ordering(self, ordering: orderings.RowOrdering) -> BigqueryDataSource:
return dataclasses.replace(self, ordering=ordering)


_WORKER_TIME_INCREMENT = 0.05

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def from_table(
project_id: str,
dataset_id: str,
table_id: str,
uid_gen: guid.SequentialUIDGenerator,
uid_gen: guid.SequentialUIDGenerator | None = None,
columns: typing.Sequence[str] = (),
sql_predicate: typing.Optional[str] = None,
system_time: typing.Optional[datetime.datetime] = None,
Expand All @@ -202,6 +202,8 @@ def from_table(
if system_time
else None
)
if uid_gen is None:
uid_gen = guid.SequentialUIDGenerator()
table_alias = next(uid_gen.get_uid_stream("bft_"))
table_expr = sge.Table(
this=sql.identifier(table_id),
Expand Down
16 changes: 12 additions & 4 deletions packages/bigframes/bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -846,10 +846,10 @@ def remap_refs(
) -> ReadTableNode:
return self

def with_order_cols(self):
def pull_out_order(self):
# Maybe the ordering should be required to always be in the scan list, and then we won't need this?
if self.source.ordering is None:
return self, orderings.RowOrdering()
return self, RowOrdering()

order_cols = {col.sql for col in self.source.ordering.referenced_columns}
scan_cols = {col.source_id for col in self.scan_list.items}
Expand All @@ -863,10 +863,18 @@ def with_order_cols(self):
]
new_scan_list = ScanList(items=(*self.scan_list.items, *new_scan_cols))
new_order = self.source.ordering.remap_column_refs(
{identifiers.ColumnId(item.source_id): item.id for item in new_scan_cols},
{
identifiers.ColumnId(item.source_id): item.id
for item in new_scan_list.items
},
allow_partial_bindings=True,
)
return dataclasses.replace(self, scan_list=new_scan_list), new_order
new_node = dataclasses.replace(
self,
scan_list=new_scan_list,
source=self.source.with_ordering(RowOrdering()),
)
return new_node, new_order


@dataclasses.dataclass(frozen=True, eq=False)
Expand Down
3 changes: 2 additions & 1 deletion packages/bigframes/bigframes/core/rewrite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from bigframes.core.rewrite.implicit_align import try_row_join
from bigframes.core.rewrite.legacy_align import legacy_join_as_projection
from bigframes.core.rewrite.nullity import simplify_join
from bigframes.core.rewrite.order import bake_order, defer_order
from bigframes.core.rewrite.order import bake_order, defer_order, pull_out_order
from bigframes.core.rewrite.pruning import column_pruning
from bigframes.core.rewrite.scan_reduction import (
try_reduce_to_local_scan,
Expand Down Expand Up @@ -50,6 +50,7 @@
"rewrite_range_rolling",
"try_reduce_to_table_scan",
"bake_order",
"pull_out_order",
"try_reduce_to_local_scan",
"fold_row_counts",
"pull_out_window_order",
Expand Down
13 changes: 11 additions & 2 deletions packages/bigframes/bigframes/core/rewrite/order.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ def bake_order(
return node


def pull_out_order(
node: bigframes.core.nodes.BigFrameNode,
) -> Tuple[bigframes.core.nodes.BigFrameNode, bigframes.core.ordering.RowOrdering]:
import bigframes.core.rewrite.slices

node = node.bottom_up(bigframes.core.rewrite.slices.rewrite_slice)
return _pull_up_order(node, order_root=True)


# Makes ordering explicit in window definitions
def _pull_up_order(
root: bigframes.core.nodes.BigFrameNode,
Expand Down Expand Up @@ -153,7 +162,7 @@ def pull_up_order_inner(
)
elif isinstance(node, bigframes.core.nodes.ReadTableNode):
if node.source.ordering is not None:
return node.with_order_cols()
return node.pull_out_order()
else:
# No defined ordering
return node, bigframes.core.ordering.RowOrdering()
Expand Down Expand Up @@ -272,7 +281,7 @@ def pull_up_order_inner(
offsets_id
)
return new_explode, child_order.join(inner_order)
raise ValueError(f"Unexpected node: {node}")
raise ValueError(f"Unexpected node type {type(node).__name__}")

def pull_order_concat(
node: bigframes.core.nodes.ConcatNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,14 @@ def _create_bq_function(self, create_function_ddl: str) -> None:
# TODO(swast): plumb through the original, user-facing api_name.
import bigframes.session._io.bigquery

_, query_job = bigframes.session._io.bigquery.start_query_with_client(
_, query_job = bigframes.session._io.bigquery.start_query_with_job(
cast(bigquery.Client, self._session.bqclient),
create_function_ddl,
job_config=bigquery.QueryJobConfig(),
location=None,
project=None,
timeout=None,
metrics=None,
query_with_job=True,
publisher=self._session._publisher,
)
logger.info(f"Created bigframes function {query_job.ddl_target_routine}")
Expand Down
3 changes: 1 addition & 2 deletions packages/bigframes/bigframes/functions/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,9 @@ def __call__(self, *args, **kwargs):

args_string = ", ".join([sg_sql.to_sql(sg_sql.literal(v)) for v in args])
sql = f"SELECT `{str(self._udf_def.routine_ref)}`({args_string})"
iter, job = bf_io_bigquery.start_query_with_client(
iter, job = bf_io_bigquery.start_query_with_job(
self._session.bqclient,
sql=sql,
query_with_job=True,
job_config=bigquery.QueryJobConfig(),
publisher=self._session._publisher,
) # type: ignore
Expand Down
6 changes: 2 additions & 4 deletions packages/bigframes/bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2310,15 +2310,14 @@ def _start_query_ml_ddl(
# so we must reset any encryption set in the job config
# https://cloud.google.com/bigquery/docs/customer-managed-encryption#encrypt-model
job_config.destination_encryption_configuration = None
iterator, query_job = bf_io_bigquery.start_query_with_client(
iterator, query_job = bf_io_bigquery.start_query_with_job(
self.bqclient,
sql,
job_config=job_config,
metrics=self._metrics,
location=None,
project=None,
timeout=None,
query_with_job=True,
job_retry=third_party_gcb_retry.DEFAULT_ML_JOB_RETRY,
publisher=self._publisher,
session=self,
Expand All @@ -2340,15 +2339,14 @@ def _create_object_table(self, path: str, connection: str) -> str:
uris = ['{path}']);
"""
)
bf_io_bigquery.start_query_with_client(
bf_io_bigquery.start_query_with_job(
self.bqclient,
sql,
job_config=bigquery.QueryJobConfig(),
metrics=self._metrics,
location=None,
project=None,
timeout=None,
query_with_job=True,
publisher=self._publisher,
session=self,
)
Expand Down
Loading
Loading