[Hackathon] feat: add streaming HTTP/WebSocket sources and an LLM agent operator#5103
Draft
nathant27 wants to merge 1 commit into
Draft
[Hackathon] feat: add streaming HTTP/WebSocket sources and an LLM agent operator#5103nathant27 wants to merge 1 commit into
nathant27 wants to merge 1 commit into
Conversation
Adds four first-class Texera operators ("PulseFlow") for building live
analytical workflows over real-time data feeds:
1. PollingHttpSource (source)
- Polls an HTTP/REST endpoint at a fixed interval and emits each
response as a tuple.
- Configurable method (GET/POST/PUT/PATCH/DELETE), headers, request
body, interval, and an optional maxIterations cap (0 = forever).
- Output schema: response_body, status_code, polled_at.
- Implemented via a forever-running Iterator that sleeps between
polls; works around Texera's bounded-source model without engine
changes.
2. WebSocketSource (source)
- Connects to a ws:// or wss:// endpoint and emits each received
frame as a tuple, forever.
- Uses JDK 11+ java.net.http WebSocket; supports an initial
subscribe message and arbitrary handshake headers.
- Permissive URI handling: trims whitespace and percent-encodes the
'@' character (common in Binance-style stream names) so users can
paste provider URLs verbatim.
- Requests Long.MaxValue messages up front to avoid per-frame
back-pressure bookkeeping; reassembles partial text frames.
- Output schema: message, received_at.
3. HttpRequest (transformer)
- For each input tuple, performs a configurable HTTP call with
${fieldName} interpolation in URL and body templates.
- Appends http_request_status, http_request_body, and
http_request_error to the input schema (namespaced to avoid
collisions with upstream columns like response_body).
- failOnError toggle controls whether non-2xx responses crash the
workflow or are surfaced inline.
4. LLMAgent (transformer)
- Calls an Anthropic Messages or OpenAI Chat Completions endpoint
per tuple with a templated system + user prompt.
- Provider enum (LLMProvider: ANTHROPIC, OPENAI) switches request
body shape and reply-text extraction path
(content[0].text vs choices[0].message.content).
- Request body built via Jackson ObjectNode so user-supplied prompt
content is automatically JSON-escaped — no broken templates from
embedded quotes or newlines.
- API key sourced from the operator field, falling back to the
ANTHROPIC_API_KEY / OPENAI_API_KEY environment variable.
- Appends a configurable output column (default "llm_response") and
"llm_error" to the input schema.
Shared utilities (operator/http/util/):
- HttpClientFactory: lazy singleton java.net.http.HttpClient reused
by all operators.
- HttpMethod: enum with @jsonvalue so the UI renders a dropdown.
- KeyValuePair: Jackson-friendly header entry class.
- TemplateInterpolator: ${fieldName} substitution from a Tuple.
Registration: each operator gets a single @JsonSubTypes entry in
LogicalOp.scala; the metadata/palette refreshes automatically from
that registry.
Operator icons: 128x128 PNGs added under frontend assets — clock for
PollingHttpSource, dashed stream for WebSocketSource, curly-brace
API glyph for HttpRequest, and a stylized brain for LLMAgent.
Frontend fix (result-panel cell click):
- When clicking a cell in the result table, the modal now receives
the table's row data as a fallback and displays it immediately,
overwriting only if the paginated server lookup returns a
non-empty tuple. Previously, clicking a cell when the paginated
result service was not yet initialized produced a permanently
blank modal because the request to fetch the full row never
fired (?. short-circuit on undefined service).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Contributor
Author
|
https://drive.google.com/file/d/14WV4hxlNoN3keYo8xOTaiYfMO4pfEiC9/view?usp=sharing |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5103 +/- ##
============================================
+ Coverage 42.85% 42.92% +0.07%
- Complexity 2207 2209 +2
============================================
Files 1045 1045
Lines 40146 40178 +32
Branches 4240 4250 +10
============================================
+ Hits 17203 17245 +42
+ Misses 21878 21866 -12
- Partials 1065 1067 +2
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this PR?
Video/Demo Link
My Problem:
When I was using Texera for the first time, the thing I wanted to do the most was interact with data in real time. While there are some previously implemented operators that interact with external services, like the twitch and reddit ones, they are very limited in what endpoints they can access, and they especially can't access data in realtime. In addition, I wanted to be able to get this data and analyze or summarize this data with LLM's instead of having to do it myself.
My Solution:
PulseFlow, my extension to Texera that allows for real time LLM analysis from external API datasources.
For this extension I implemented 4 main operators that help build these live analytical workflows with real time data feeds:
Implementation Details
PollingHttpSource (source)
WebSocketSource (source)
HttpRequest (transformer)
LLMAgent (transformer)
Shared utilities (operator/http/util/):
Frontend fix (result-panel cell click):