Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions .github/workflows-examples/deploy.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# EXAMPLE WORKFLOW - copy this file to .github/workflows/ in your own repository.
# It lives in workflows-examples/ because this examples repository does not deploy to a
# real Confluent Cloud environment. It tests, deploys Example_08_IntegrationAndDeployment,
# then lists the result. See the README's CI/CD section for the deployment model and the
# required secrets.
name: Deploy Table API Program

on:
# Choose the trigger(s) that fit your deployment strategy. This template can be run
# manually from the GitHub Actions UI; uncomment the others to deploy automatically,
# for example on a merge to your main branch or on a release tag.
workflow_dispatch:
# push:
# branches: [main]
# push:
# tags: ['v*']

env:
CLOUD_PROVIDER: ${{ secrets.CLOUD_PROVIDER }}
CLOUD_REGION: ${{ secrets.CLOUD_REGION }}
FLINK_API_KEY: ${{ secrets.FLINK_API_KEY }}
FLINK_API_SECRET: ${{ secrets.FLINK_API_SECRET }}
ORG_ID: ${{ secrets.ORG_ID }}
ENV_ID: ${{ secrets.ENV_ID }}
COMPUTE_POOL_ID: ${{ secrets.COMPUTE_POOL_ID }}
TARGET_CATALOG: ${{ secrets.TARGET_CATALOG }}
TARGET_DATABASE: ${{ secrets.TARGET_DATABASE }}
# Fixed here because they define what this pipeline deploys (and a push trigger cannot
# supply inputs); the manage workflow takes them as inputs instead.
APPLICATION_NAME: marketplace-analytics
STATEMENT_NAME: vendors-per-brand

jobs:
test-and-deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6

- uses: actions/setup-java@v5
with:
distribution: 'temurin'
java-version: '17'
cache: maven

- name: Run local unit tests and integration tests against Confluent Cloud
run: ./mvnw -B --no-transfer-progress verify

- name: Deploy to Confluent Cloud
Comment thread
Timm0 marked this conversation as resolved.
# --on-conflict replace redeploys a changed pipeline under the same name; it
# requires --application-name. "Replace" deletes and recreates the statement, so a
# stateful pipeline restarts from its configured source offsets without resuming
# prior state (see the README CI/CD section).
run: >
java -jar target/flink-table-api-java-examples-1.0.jar
--statement-name "$STATEMENT_NAME"
--application-name "$APPLICATION_NAME"
--on-conflict replace

- name: Verify the deployed statement
# Submitting a background statement returns once Confluent Cloud accepts it, so
# list reports the statement and its phase for visibility in the log.
run: >
java -jar target/flink-table-api-java-examples-1.0.jar
list
--application-name "$APPLICATION_NAME"
71 changes: 71 additions & 0 deletions .github/workflows-examples/manage.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# EXAMPLE WORKFLOW - copy this file to .github/workflows/ in your own repository.
# It lives in workflows-examples/ because this examples repository does not manage
# statements in a real Confluent Cloud environment. It runs the deployment JAR with one
# of the plugin's built-in lifecycle actions (list/describe/resume/stop/delete) chosen by
# the operator. See the README's CI/CD section for the deployment model and the required
# secrets. The action defaults below match what deploy.yml uses; override them per run.
name: Manage Flink Statements

on:
workflow_dispatch:
inputs:
action:
description: 'Action to perform'
required: true
type: choice
options:
- list
- describe
- resume
- stop
- delete
application-name:
description: 'Application the statement belongs to (set at deploy time)'
required: true
default: marketplace-analytics
statement-name:
description: 'Statement name set at deploy time (leave empty for list to show all)'
required: false
default: vendors-per-brand

env:
CLOUD_PROVIDER: ${{ secrets.CLOUD_PROVIDER }}
CLOUD_REGION: ${{ secrets.CLOUD_REGION }}
FLINK_API_KEY: ${{ secrets.FLINK_API_KEY }}
FLINK_API_SECRET: ${{ secrets.FLINK_API_SECRET }}
ORG_ID: ${{ secrets.ORG_ID }}
ENV_ID: ${{ secrets.ENV_ID }}
COMPUTE_POOL_ID: ${{ secrets.COMPUTE_POOL_ID }}

jobs:
manage:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6

- uses: actions/setup-java@v5
with:
distribution: 'temurin'
java-version: '17'
cache: maven

- name: Build
run: ./mvnw -B --no-transfer-progress -DskipTests package

- name: Run lifecycle action
# Inputs are passed via env, not interpolated into the script, so free-text input
# cannot inject shell commands. The command is built as an array so each argument
# stays a single word regardless of its content.
env:
ACTION: ${{ inputs.action }}
APPLICATION_NAME: ${{ inputs.application-name }}
STATEMENT_NAME: ${{ inputs.statement-name }}
# --statement-name is added only when set (list then covers all statements).
# --wait makes stop/resume/delete block until the target phase so the exit code
# reflects the outcome; list and describe ignore it.
run: |
args=("$ACTION" --application-name "$APPLICATION_NAME" --wait)
if [ -n "$STATEMENT_NAME" ]; then
args+=(--statement-name "$STATEMENT_NAME")
fi
java -jar target/flink-table-api-java-examples-1.0.jar "${args[@]}"
27 changes: 27 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Continuous integration for this repository.
#
# Runs entirely without Confluent Cloud connectivity: code format check (Spotless),
# compilation, local unit tests on Apache Flink, and the fat JAR build.
# Integration tests (*IT) require Confluent Cloud credentials and would fail without
# them, so this workflow skips them explicitly with -DskipITs.
name: CI

on:
pull_request:
push:
branches: [master]

jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6

- uses: actions/setup-java@v5
with:
distribution: 'temurin'
java-version: '17'
cache: maven

- name: Build and test
run: ./mvnw -B --no-transfer-progress verify -DskipITs
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ target/
.java-version
*.DS_Store
cloud.properties
dependency-reduced-pom.xml
dependency-reduced-pom.xml
*.env
112 changes: 112 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@ public static void main(String[] args) {
// pipeline.execute().await();
```

## Developer Journey

The examples in this repository follow the journey of taking a table program from a first
experiment all the way to a production deployment:

| Stage | What you do | Where to look |
|-------|-------------|---------------|
| Get started | Configure a connection to Confluent Cloud and run a first program | `Example_00` - `Example_02`, [Getting Started](#getting-started) |
| Build | Transform tables, build pipelines, work with data types, UDFs, and structured objects | `Example_03` - `Example_07`, `Example_09` - `Example_11`, `TableProgramTemplate` |
| Test locally | Run unit tests on mock data, without Confluent Cloud connectivity | `src/test/java/`, [Testing Table Programs](#testing-table-programs) |
| Test on Confluent Cloud | Run integration tests against the real service | `Example_08_IntegrationAndDeploymentIT`, [Testing Table Programs](#testing-table-programs) |
| Deploy | Submit statements with deterministic names from a CI/CD pipeline | `Example_08_IntegrationAndDeployment`, [CI/CD with GitHub Actions](#cicd-with-github-actions) |
| Operate | List, describe, stop, resume, and delete deployed statements | `.github/workflows-examples/manage.yml` |

## Getting Started

### Prerequisites
Expand Down Expand Up @@ -456,6 +470,104 @@ ConfluentSettings settings3 = ConfluentSettings.newBuilder()
.build();
```

## Testing Table Programs

Table programs can be tested in three tiers, from fastest feedback to highest fidelity:

1. **Unit tests on plain logic.** UDFs and other business logic are plain Java classes and can be
tested with JUnit alone: no Apache Flink, no Confluent Cloud connectivity, and no artifact
upload required. See `Example_09_FunctionsTest`.
2. **Local pipeline tests on Apache Flink.** Pipeline logic that is structured as
a function from input `Table`s to an output `Table` (see `VendorsPerBrand` in
`Example_08_IntegrationAndDeployment`) can be executed locally with mock data from
`fromValues()`, without Confluent Cloud connectivity. See
`Example_08_IntegrationAndDeploymentTest` and run with `./mvnw test`.
3. **Integration tests against Confluent Cloud.** The same pipeline logic runs on the real
service with the exact Confluent semantics, on a Kafka-backed table that is made bounded with
dynamic options. See `Example_08_IntegrationAndDeploymentIT` and run with `./mvnw verify`.
These tests require the connection environment variables (see
[Via Environment Variables](#via-environment-variables)) plus `TARGET_CATALOG` (the name of
your Confluent Cloud environment) and `TARGET_DATABASE` (the name of a Kafka cluster with
write access), and fail fast when any are missing, so a CI pipeline cannot silently skip its
verification step and still report success. To build without Confluent Cloud credentials on
purpose, skip them explicitly: `./mvnw verify -DskipITs`.

### How local testing works

The Confluent plugin executes all statements on Confluent Cloud; it does not run them locally.
Local tests therefore run on Apache Flink (planner, runtime, and an embedded mini-cluster), which
this project adds as test-scoped dependencies.

The plugin and the Apache Flink planner cannot share a runtime classpath: both register their
Executor and Planner factories under the identifier `default`, and `TableEnvironment.create(...)` fails with
`Multiple factories for identifier 'default'` if both are present. This project resolves the
conflict with classpath exclusions in the `pom.xml`:

- `./mvnw test` (surefire) excludes the Confluent plugin, so unit tests run on Apache Flink.
- `./mvnw verify` (failsafe, test classes named `*IT`) excludes the Apache Flink planner, so
integration tests run against Confluent Cloud.

Keep pipeline logic free of `io.confluent.flink.plugin` imports so that unit tests can execute it
locally.

NOTE: IDEs ignore these classpath exclusions, so run the tests via `./mvnw test` and
`./mvnw verify` instead of the IDE's test runner. To use the IDE's test runner anyway, replicate
the exclusion in the test's run configuration (IntelliJ IDEA: Modify options -> Modify classpath ->
Exclude): exclude the `confluent-flink-table-api-java-plugin` JAR for unit tests, or the
`flink-table-planner-loader` JAR for integration tests. For production projects, the cleaner
structure is a multi-module build: one module contains the pipeline logic with only
`flink-table-api-java` and the Apache Flink test dependencies, and another module adds the
Confluent plugin and the deployment entrypoints.

### Local testing limitations

Running locally on Apache Flink is not identical to Confluent Cloud:

- The `$rowtime` system column and other Confluent system columns do not exist locally.
- There is no local catalog mirroring your Confluent Cloud schemas. Mock tables are declared
manually with `fromValues()` and must be kept in sync with the real schemas.
- Confluent-specific SQL syntax (such as `DISTRIBUTED INTO ... BUCKETS`) and Confluent-provided
functions are not available.

Local tests give fast feedback on transformation logic; integration tests against Confluent Cloud
remain the source of truth.

## CI/CD with GitHub Actions

The repository contains workflows that show how a table program moves through a CI/CD pipeline:

- `.github/workflows/ci.yml` runs in this repository on every pull request: code format check,
compilation, local unit tests, and the fat JAR build. It requires no Confluent Cloud
credentials.
- `.github/workflows-examples/deploy.yml` is a template for your own repository: it runs the
integration tests against Confluent Cloud and then deploys the program by running its `main()`
method with `--statement-name`, `--application-name`, and `--on-conflict replace`. The statement
and application names are deployment configuration passed by the pipeline (not hardcoded in the
program), so the same name is used for deployment and for management; the application name is
prefixed to the statement name on submission (e.g. `marketplace-analytics-vendors-per-brand`).
Re-running with unchanged code is idempotent, and a changed pipeline replaces the existing
statement under the same name.

`--on-conflict replace` deletes the existing statement and submits a new one: the new statement
starts from its configured source offsets and does not resume the previous statement's state.
For stateless pipelines (filters, projections, routing) this has no effect on results. For
stateful pipelines (aggregations, joins, deduplication, including the aggregation in this
example) the new statement rebuilds its state by reprocessing from the configured start
position, so choose the redeploy timing and the source startup mode accordingly.
- `.github/workflows-examples/manage.yml` is a template for explicit lifecycle operations. It runs
the same deployment JAR with one of the plugin's built-in actions (`list`, `describe`, `stop`,
`resume`, `delete`) as the first argument; the plugin executes the action instead of deploying,
so no separate program is needed. Deployment and lifecycle management are separate concerns;
removing code does not imply that a running statement should be stopped or deleted.

The workflows authenticate via the environment variables described in
[Via Environment Variables](#via-environment-variables), mapped from GitHub Actions secrets. In
addition, the integration tests and the deploy entrypoint read `TARGET_CATALOG` (the name of the
Confluent Cloud environment) and `TARGET_DATABASE` (the name of the Kafka cluster) to determine
where tables may be created. For staging-to-production promotion, run the same deploy job against
different GitHub environments so that each environment provides its own secrets and protection
rules.

## Documentation for Confluent Utilities

### Confluent Tools
Expand Down
Loading