overriding microbatch macro for capturing the compiled code of the mi…#1000
Conversation
|
👋 @GuyEshdat |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThis PR adds support for capturing compiled code from dbt microbatch incremental models through a cache-backed macro system. New macros capture compiled code during model execution, store it in a scoped cache, and integrate with existing compiled code retrieval via a fallback mechanism. Integration tests validate capture with and without an override macro. ChangesMicrobatch Compiled Code Capture and Retrieval
Sequence DiagramsequenceDiagram
participant TestRunner as Test Runner
participant DbtExecution as dbt Model<br/>Execution
participant Macro as get_incremental_<br/>microbatch_sql
participant Cache as Elementary<br/>Cache
participant Retrieval as get_compiled_<br/>code Utility
TestRunner->>DbtExecution: Run microbatch model
DbtExecution->>Macro: Invoke during incremental resolution
Macro->>Macro: Capture current model's compiled_code
Macro->>Cache: Store compiled_code by unique_id
Cache-->>Macro: Cache updated
Macro-->>DbtExecution: Return dispatched SQL
DbtExecution-->>TestRunner: Execution complete
TestRunner->>Retrieval: Query compiled_code for node
Retrieval->>Retrieval: Check dispatched result
alt Dispatched compiled_code exists
Retrieval-->>TestRunner: Return compiled_code
else Dispatched compiled_code missing
Retrieval->>Cache: Lookup by unique_id
Cache-->>Retrieval: Return cached compiled_code
Retrieval-->>TestRunner: Return cached compiled_code
end
Estimated Code Review Effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py (2)
30-33: ⚡ Quick winAvoid hardcoding the project/package segment in
unique_id.Line 30 assumes
model.elementary_tests.*. If project name changes, the test can fail even when run results are correct.Proposed fix
- unique_id = f"model.elementary_tests.{test_id}" run_results = dbt_project.read_table( "dbt_run_results", - where=f"unique_id = '{unique_id}' and status = 'success'", + where=f"name = '{test_id}' and resource_type = 'model' and status = 'success'", order_by="generated_at desc", limit=1, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py` around lines 30 - 33, The test hardcodes the package segment in unique_id ("model.elementary_tests.{test_id}") which breaks if the project/package name changes; update the construction of unique_id in test_microbatch_compiled_code.py to derive the package/project segment dynamically (e.g., obtain the package/project name from the dbt_project fixture or manifest) before calling dbt_project.read_table, so unique_id is built as f"model.{package_name}.{test_id}" (refer to the unique_id variable and the dbt_project.read_table call to locate where to change this).
5-5: ⚡ Quick winRestore
disable_run_resultsafter the test.Line 5 mutates shared runner state and does not restore it. This can create cross-test coupling when fixture scope is not function-level.
Proposed fix
def test_microbatch_run_results_has_compiled_code(test_id: str, dbt_project: DbtProject): - dbt_project.dbt_runner.vars["disable_run_results"] = False + previous_disable_run_results = dbt_project.dbt_runner.vars.get("disable_run_results") + dbt_project.dbt_runner.vars["disable_run_results"] = False - model_sql = """ + try: + model_sql = """ {{ config( materialized='incremental', incremental_strategy='microbatch', @@ from {{ ref('one') }} """ - with dbt_project.create_temp_model_for_existing_table( - test_id, raw_code=model_sql - ) as model_path: - dbt_project.dbt_runner.run(select=str(model_path)) + with dbt_project.create_temp_model_for_existing_table( + test_id, raw_code=model_sql + ) as model_path: + dbt_project.dbt_runner.run(select=str(model_path)) - unique_id = f"model.elementary_tests.{test_id}" - run_results = dbt_project.read_table( - "dbt_run_results", - where=f"unique_id = '{unique_id}' and status = 'success'", - order_by="generated_at desc", - limit=1, - ) - assert run_results, "Expected a successful run result row for microbatch model" - assert run_results[0]["compiled_code"], ( - "Expected compiled_code to be populated for successful microbatch model run result" - ) + unique_id = f"model.elementary_tests.{test_id}" + run_results = dbt_project.read_table( + "dbt_run_results", + where=f"unique_id = '{unique_id}' and status = 'success'", + order_by="generated_at desc", + limit=1, + ) + assert run_results, "Expected a successful run result row for microbatch model" + assert run_results[0]["compiled_code"], ( + "Expected compiled_code to be populated for successful microbatch model run result" + ) + finally: + if previous_disable_run_results is None: + dbt_project.dbt_runner.vars.pop("disable_run_results", None) + else: + dbt_project.dbt_runner.vars["disable_run_results"] = previous_disable_run_results🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py` at line 5, The test mutates shared state by setting dbt_project.dbt_runner.vars["disable_run_results"] = False and does not restore it; fix by saving the original value (orig = dbt_project.dbt_runner.vars.get("disable_run_results")), set the key for the test, and ensure restoration in a finally block (or use pytest fixture/monkeypatch to set and revert) so dbt_project.dbt_runner.vars["disable_run_results"] is reset to orig (or deleted if it was absent) after the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py`:
- Around line 30-33: The test hardcodes the package segment in unique_id
("model.elementary_tests.{test_id}") which breaks if the project/package name
changes; update the construction of unique_id in
test_microbatch_compiled_code.py to derive the package/project segment
dynamically (e.g., obtain the package/project name from the dbt_project fixture
or manifest) before calling dbt_project.read_table, so unique_id is built as
f"model.{package_name}.{test_id}" (refer to the unique_id variable and the
dbt_project.read_table call to locate where to change this).
- Line 5: The test mutates shared state by setting
dbt_project.dbt_runner.vars["disable_run_results"] = False and does not restore
it; fix by saving the original value (orig =
dbt_project.dbt_runner.vars.get("disable_run_results")), set the key for the
test, and ensure restoration in a finally block (or use pytest
fixture/monkeypatch to set and revert) so
dbt_project.dbt_runner.vars["disable_run_results"] is reset to orig (or deleted
if it was absent) after the test.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 1446de91-45dc-4082-95f1-1e83579ba7cc
📒 Files selected for processing (1)
integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py
| {% if not compiled_code and node and node.get("unique_id") %} | ||
| {% set compiled_code = elementary.get_cache( | ||
| "microbatch_compiled_code_by_unique_id", {} | ||
| ).get(node.get("unique_id")) %} | ||
| {% endif %} |
There was a problem hiding this comment.
🔴 Microbatch cache fallback bypasses Redshift % → %% escaping
On Redshift, redshift__get_compiled_code (macros/utils/graph/get_compiled_code.sql:21-25) replaces % with %% to prevent SQL formatting errors when the compiled code is inserted as a string literal. The new microbatch cache fallback path (lines 3-7) retrieves compiled code directly from the cache without going through adapter.dispatch, so the Redshift-specific escaping is never applied. If a microbatch model's compiled SQL contains % characters, this will produce unescaped % in the INSERT into dbt_run_results, which can cause runtime SQL errors on Redshift (e.g., incomplete format or similar adapter-level failures).
Prompt for agents
In macros/utils/graph/get_compiled_code.sql, the new microbatch cache fallback (lines 3-7) returns raw compiled code, bypassing adapter-specific transformations such as the Redshift percent-escaping in redshift__get_compiled_code (lines 21-25). To fix this, the cached compiled code should be run through the same adapter-specific post-processing. One approach: after retrieving the code from the cache, apply the same adapter.dispatch or at minimum replicate the Redshift escaping logic. For example, you could introduce a new dispatchable macro like elementary.post_process_compiled_code(compiled_code) that is a no-op by default but does .replace("%", "%%") on Redshift, and call it on the cache-retrieved value. Alternatively, store the already-escaped code in the cache by calling the capture during redshift__get_compiled_code, but that would couple the cache to a specific adapter.
Was this helpful? React with 👍 or 👎 to provide feedback.
| {% endif %} | ||
|
|
||
| {% do compiled_code_by_unique_id.update({model_unique_id: model_compiled_code}) %} | ||
| {% do elementary.set_cache( |
There was a problem hiding this comment.
This set_cache could have a race if multiple dbt models run in parallel, and all of them update the initial dict.
I think instead you should create the microbatch_compiled_code_by_unique_id in the init_elementary_graph macro, and then I think you don't even need to do set_cache.
…_unique_id map. Instead, we will initalize it when initalizing the graph, and only update the dict itself instead of updating the cache
…ity into microbatch-compiled-code
…crobatch models
Summary by CodeRabbit
New Features
Tests