feat(dao): add operator_port_cache table#5967
Conversation
Adds the operator_port_cache table (texera_ddl.sql + Liquibase migration sql/updates/26.sql), keyed by (workflow_id, global_port_id, cache_key) with ON DELETE CASCADE to workflow. The cache read/write logic and its tests land with the cache service that uses it. Part of apache#5882.
Automated Reviewer SuggestionsBased on the
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #5967 +/- ##
============================================
+ Coverage 56.28% 56.65% +0.37%
- Complexity 2992 3023 +31
============================================
Files 1120 1121 +1
Lines 43217 43294 +77
Branches 4662 4667 +5
============================================
+ Hits 24326 24530 +204
+ Misses 17472 17325 -147
- Partials 1419 1439 +20
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
@carloea2 could you help review this one when you get a chance? Thanks! |
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 376 | 0.229 | 24,770/37,608/37,608 us | 🔴 +5.1% / 🔴 +149.5% |
| 🔴 | bs=100 sw=10 sl=64 | 779 | 0.475 | 127,712/149,927/149,927 us | 🔴 +5.5% / 🔴 +39.3% |
| ⚪ | bs=1000 sw=10 sl=64 | 915 | 0.558 | 1,089,736/1,141,300/1,141,300 us | ⚪ within ±5% / 🔴 +11.0% |
Baseline details
Latest main a24d1d1 from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 376 tuples/sec | 393 tuples/sec | 777.62 tuples/sec | -4.3% | -51.6% |
| bs=10 sw=10 sl=64 | MB/s | 0.229 MB/s | 0.24 MB/s | 0.475 MB/s | -4.6% | -51.8% |
| bs=10 sw=10 sl=64 | p50 | 24,770 us | 24,108 us | 12,612 us | +2.7% | +96.4% |
| bs=10 sw=10 sl=64 | p95 | 37,608 us | 35,777 us | 15,070 us | +5.1% | +149.5% |
| bs=10 sw=10 sl=64 | p99 | 37,608 us | 35,777 us | 18,360 us | +5.1% | +104.8% |
| bs=100 sw=10 sl=64 | throughput | 779 tuples/sec | 818 tuples/sec | 988.31 tuples/sec | -4.8% | -21.2% |
| bs=100 sw=10 sl=64 | MB/s | 0.475 MB/s | 0.499 MB/s | 0.603 MB/s | -4.8% | -21.3% |
| bs=100 sw=10 sl=64 | p50 | 127,712 us | 121,033 us | 101,066 us | +5.5% | +26.4% |
| bs=100 sw=10 sl=64 | p95 | 149,927 us | 144,950 us | 107,594 us | +3.4% | +39.3% |
| bs=100 sw=10 sl=64 | p99 | 149,927 us | 144,950 us | 115,830 us | +3.4% | +29.4% |
| bs=1000 sw=10 sl=64 | throughput | 915 tuples/sec | 908 tuples/sec | 1,019 tuples/sec | +0.8% | -10.2% |
| bs=1000 sw=10 sl=64 | MB/s | 0.558 MB/s | 0.554 MB/s | 0.622 MB/s | +0.7% | -10.3% |
| bs=1000 sw=10 sl=64 | p50 | 1,089,736 us | 1,097,356 us | 986,982 us | -0.7% | +10.4% |
| bs=1000 sw=10 sl=64 | p95 | 1,141,300 us | 1,152,889 us | 1,028,491 us | -1.0% | +11.0% |
| bs=1000 sw=10 sl=64 | p99 | 1,141,300 us | 1,152,889 us | 1,058,493 us | -1.0% | +7.8% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,532.61,200,128000,376,0.229,24769.58,37607.52,37607.52
1,100,10,64,20,2568.28,2000,1280000,779,0.475,127711.79,149927.22,149927.22
2,1000,10,64,20,21862.33,20000,12800000,915,0.558,1089736.19,1141300.33,1141300.33|
@Xiao-zhen-Liu please link issue properly |
Address review: result implies a direction, storage_uri is clearer. tuple_count is kept (immutable per row, populated at materialization, read by the coordinator alongside the cache lookup so cached-region stats need no extra storage round-trip).
|
Thanks @Yicong-Huang — replies inline. Renamed |
Address review: spell out that cache_key is the hash/lookup key and cache_key_json is the JSON it was computed from (collision check); that a changed upstream computation (e.g. operator version) yields a new cache_key and a new row rather than overwriting; and why tuple_count is kept.
…ey_hash Address review (Carlos, Yicong): make the hash explicit. cache_key_hash is the SHA-256 hash / lookup key; cache_key_json stays as the JSON it was computed from.
What changes were proposed in this PR?
Adds the
operator_port_cachetable that records a materialized output portresult so it can be reused across executions. It is keyed by
(workflow_id, global_port_id, cache_key)and stores the JSON the cache key wascomputed from, the result location, an optional tuple count and source execution
id, and a database-managed
updated_at. The foreign key toworkflow(wid)isON DELETE CASCADE. The stored JSON (cache_key_json) lets a lookup confirm ahash match by comparing the full JSON, so a hash collision never reuses the wrong
result.
The change is additive: a new table in
sql/texera_ddl.sql(fresh installs) plusa Liquibase migration
sql/updates/26.sqlregistered insql/changelog.xml(existing deployments). No code reads or writes the table yet; the cache read/write
logic and its tests land with the cache service that uses it, following the
convention of testing a table through its consumer (as
feedbackis tested viaFeedbackResourceSpec).Any related issues, documentation, discussions?
Closes #5969. Part of the storage foundation #5882 (umbrella #5881). Design discussion: #5880.
How was this PR tested?
Verified the schema directly against Postgres: the migration applies cleanly, the
columns and primary key
(workflow_id, global_port_id, cache_key)are correct,the foreign key's delete rule is
CASCADE, the schema file and the migrationdefine identical columns/keys, and
changelog.xmlis well-formed and registers26.sql. The generated jOOQ classes build from the table. The table's runtimebehavior is exercised by the cache service tests in the follow-up PR.
Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.8 (Claude Code)