From 41b85c7061cdf4acea21aabf322d0c078ec49b25 Mon Sep 17 00:00:00 2001 From: Martijn Visser <2989614+MartijnVisser@users.noreply.github.com> Date: Mon, 29 Jun 2026 14:52:37 +0200 Subject: [PATCH] Add examples for local testing and CI/CD lifecycle management --- .github/workflows-examples/deploy.yml | 65 +++++ .github/workflows-examples/manage.yml | 71 +++++ .github/workflows/ci.yml | 27 ++ .gitignore | 3 +- README.md | 112 ++++++++ pom.xml | 79 +++++- .../Example_08_IntegrationAndDeployment.java | 254 ++++++------------ ...Example_08_IntegrationAndDeploymentIT.java | 189 +++++++++++++ ...ample_08_IntegrationAndDeploymentTest.java | 108 ++++++++ .../table/Example_09_FunctionsTest.java | 43 +++ 10 files changed, 770 insertions(+), 181 deletions(-) create mode 100644 .github/workflows-examples/deploy.yml create mode 100644 .github/workflows-examples/manage.yml create mode 100644 .github/workflows/ci.yml create mode 100644 src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentIT.java create mode 100644 src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentTest.java create mode 100644 src/test/java/io/confluent/flink/examples/table/Example_09_FunctionsTest.java diff --git a/.github/workflows-examples/deploy.yml b/.github/workflows-examples/deploy.yml new file mode 100644 index 0000000..6816b63 --- /dev/null +++ b/.github/workflows-examples/deploy.yml @@ -0,0 +1,65 @@ +# EXAMPLE WORKFLOW - copy this file to .github/workflows/ in your own repository. +# It lives in workflows-examples/ because this examples repository does not deploy to a +# real Confluent Cloud environment. It tests, deploys Example_08_IntegrationAndDeployment, +# then lists the result. See the README's CI/CD section for the deployment model and the +# required secrets. +name: Deploy Table API Program + +on: + # Choose the trigger(s) that fit your deployment strategy. This template can be run + # manually from the GitHub Actions UI; uncomment the others to deploy automatically, + # for example on a merge to your main branch or on a release tag. + workflow_dispatch: + # push: + # branches: [main] + # push: + # tags: ['v*'] + +env: + CLOUD_PROVIDER: ${{ secrets.CLOUD_PROVIDER }} + CLOUD_REGION: ${{ secrets.CLOUD_REGION }} + FLINK_API_KEY: ${{ secrets.FLINK_API_KEY }} + FLINK_API_SECRET: ${{ secrets.FLINK_API_SECRET }} + ORG_ID: ${{ secrets.ORG_ID }} + ENV_ID: ${{ secrets.ENV_ID }} + COMPUTE_POOL_ID: ${{ secrets.COMPUTE_POOL_ID }} + TARGET_CATALOG: ${{ secrets.TARGET_CATALOG }} + TARGET_DATABASE: ${{ secrets.TARGET_DATABASE }} + # Fixed here because they define what this pipeline deploys (and a push trigger cannot + # supply inputs); the manage workflow takes them as inputs instead. + APPLICATION_NAME: marketplace-analytics + STATEMENT_NAME: vendors-per-brand + +jobs: + test-and-deploy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + + - uses: actions/setup-java@v5 + with: + distribution: 'temurin' + java-version: '17' + cache: maven + + - name: Run local unit tests and integration tests against Confluent Cloud + run: ./mvnw -B --no-transfer-progress verify + + - name: Deploy to Confluent Cloud + # --on-conflict replace redeploys a changed pipeline under the same name; it + # requires --application-name. "Replace" deletes and recreates the statement, so a + # stateful pipeline restarts from its configured source offsets without resuming + # prior state (see the README CI/CD section). + run: > + java -jar target/flink-table-api-java-examples-1.0.jar + --statement-name "$STATEMENT_NAME" + --application-name "$APPLICATION_NAME" + --on-conflict replace + + - name: Verify the deployed statement + # Submitting a background statement returns once Confluent Cloud accepts it, so + # list reports the statement and its phase for visibility in the log. + run: > + java -jar target/flink-table-api-java-examples-1.0.jar + list + --application-name "$APPLICATION_NAME" diff --git a/.github/workflows-examples/manage.yml b/.github/workflows-examples/manage.yml new file mode 100644 index 0000000..5b727c2 --- /dev/null +++ b/.github/workflows-examples/manage.yml @@ -0,0 +1,71 @@ +# EXAMPLE WORKFLOW - copy this file to .github/workflows/ in your own repository. +# It lives in workflows-examples/ because this examples repository does not manage +# statements in a real Confluent Cloud environment. It runs the deployment JAR with one +# of the plugin's built-in lifecycle actions (list/describe/resume/stop/delete) chosen by +# the operator. See the README's CI/CD section for the deployment model and the required +# secrets. The action defaults below match what deploy.yml uses; override them per run. +name: Manage Flink Statements + +on: + workflow_dispatch: + inputs: + action: + description: 'Action to perform' + required: true + type: choice + options: + - list + - describe + - resume + - stop + - delete + application-name: + description: 'Application the statement belongs to (set at deploy time)' + required: true + default: marketplace-analytics + statement-name: + description: 'Statement name set at deploy time (leave empty for list to show all)' + required: false + default: vendors-per-brand + +env: + CLOUD_PROVIDER: ${{ secrets.CLOUD_PROVIDER }} + CLOUD_REGION: ${{ secrets.CLOUD_REGION }} + FLINK_API_KEY: ${{ secrets.FLINK_API_KEY }} + FLINK_API_SECRET: ${{ secrets.FLINK_API_SECRET }} + ORG_ID: ${{ secrets.ORG_ID }} + ENV_ID: ${{ secrets.ENV_ID }} + COMPUTE_POOL_ID: ${{ secrets.COMPUTE_POOL_ID }} + +jobs: + manage: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + + - uses: actions/setup-java@v5 + with: + distribution: 'temurin' + java-version: '17' + cache: maven + + - name: Build + run: ./mvnw -B --no-transfer-progress -DskipTests package + + - name: Run lifecycle action + # Inputs are passed via env, not interpolated into the script, so free-text input + # cannot inject shell commands. The command is built as an array so each argument + # stays a single word regardless of its content. + env: + ACTION: ${{ inputs.action }} + APPLICATION_NAME: ${{ inputs.application-name }} + STATEMENT_NAME: ${{ inputs.statement-name }} + # --statement-name is added only when set (list then covers all statements). + # --wait makes stop/resume/delete block until the target phase so the exit code + # reflects the outcome; list and describe ignore it. + run: | + args=("$ACTION" --application-name "$APPLICATION_NAME" --wait) + if [ -n "$STATEMENT_NAME" ]; then + args+=(--statement-name "$STATEMENT_NAME") + fi + java -jar target/flink-table-api-java-examples-1.0.jar "${args[@]}" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..fc8e584 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,27 @@ +# Continuous integration for this repository. +# +# Runs entirely without Confluent Cloud connectivity: code format check (Spotless), +# compilation, local unit tests on Apache Flink, and the fat JAR build. +# Integration tests (*IT) require Confluent Cloud credentials and would fail without +# them, so this workflow skips them explicitly with -DskipITs. +name: CI + +on: + pull_request: + push: + branches: [master] + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + + - uses: actions/setup-java@v5 + with: + distribution: 'temurin' + java-version: '17' + cache: maven + + - name: Build and test + run: ./mvnw -B --no-transfer-progress verify -DskipITs diff --git a/.gitignore b/.gitignore index ae8186c..a45b90d 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ target/ .java-version *.DS_Store cloud.properties -dependency-reduced-pom.xml \ No newline at end of file +dependency-reduced-pom.xml +*.env diff --git a/README.md b/README.md index ffada6a..acb3a09 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,20 @@ public static void main(String[] args) { // pipeline.execute().await(); ``` +## Developer Journey + +The examples in this repository follow the journey of taking a table program from a first +experiment all the way to a production deployment: + +| Stage | What you do | Where to look | +|-------|-------------|---------------| +| Get started | Configure a connection to Confluent Cloud and run a first program | `Example_00` - `Example_02`, [Getting Started](#getting-started) | +| Build | Transform tables, build pipelines, work with data types, UDFs, and structured objects | `Example_03` - `Example_07`, `Example_09` - `Example_11`, `TableProgramTemplate` | +| Test locally | Run unit tests on mock data, without Confluent Cloud connectivity | `src/test/java/`, [Testing Table Programs](#testing-table-programs) | +| Test on Confluent Cloud | Run integration tests against the real service | `Example_08_IntegrationAndDeploymentIT`, [Testing Table Programs](#testing-table-programs) | +| Deploy | Submit statements with deterministic names from a CI/CD pipeline | `Example_08_IntegrationAndDeployment`, [CI/CD with GitHub Actions](#cicd-with-github-actions) | +| Operate | List, describe, stop, resume, and delete deployed statements | `.github/workflows-examples/manage.yml` | + ## Getting Started ### Prerequisites @@ -456,6 +470,104 @@ ConfluentSettings settings3 = ConfluentSettings.newBuilder() .build(); ``` +## Testing Table Programs + +Table programs can be tested in three tiers, from fastest feedback to highest fidelity: + +1. **Unit tests on plain logic.** UDFs and other business logic are plain Java classes and can be + tested with JUnit alone: no Apache Flink, no Confluent Cloud connectivity, and no artifact + upload required. See `Example_09_FunctionsTest`. +2. **Local pipeline tests on Apache Flink.** Pipeline logic that is structured as + a function from input `Table`s to an output `Table` (see `VendorsPerBrand` in + `Example_08_IntegrationAndDeployment`) can be executed locally with mock data from + `fromValues()`, without Confluent Cloud connectivity. See + `Example_08_IntegrationAndDeploymentTest` and run with `./mvnw test`. +3. **Integration tests against Confluent Cloud.** The same pipeline logic runs on the real + service with the exact Confluent semantics, on a Kafka-backed table that is made bounded with + dynamic options. See `Example_08_IntegrationAndDeploymentIT` and run with `./mvnw verify`. + These tests require the connection environment variables (see + [Via Environment Variables](#via-environment-variables)) plus `TARGET_CATALOG` (the name of + your Confluent Cloud environment) and `TARGET_DATABASE` (the name of a Kafka cluster with + write access), and fail fast when any are missing, so a CI pipeline cannot silently skip its + verification step and still report success. To build without Confluent Cloud credentials on + purpose, skip them explicitly: `./mvnw verify -DskipITs`. + +### How local testing works + +The Confluent plugin executes all statements on Confluent Cloud; it does not run them locally. +Local tests therefore run on Apache Flink (planner, runtime, and an embedded mini-cluster), which +this project adds as test-scoped dependencies. + +The plugin and the Apache Flink planner cannot share a runtime classpath: both register their +Executor and Planner factories under the identifier `default`, and `TableEnvironment.create(...)` fails with +`Multiple factories for identifier 'default'` if both are present. This project resolves the +conflict with classpath exclusions in the `pom.xml`: + +- `./mvnw test` (surefire) excludes the Confluent plugin, so unit tests run on Apache Flink. +- `./mvnw verify` (failsafe, test classes named `*IT`) excludes the Apache Flink planner, so + integration tests run against Confluent Cloud. + +Keep pipeline logic free of `io.confluent.flink.plugin` imports so that unit tests can execute it +locally. + +NOTE: IDEs ignore these classpath exclusions, so run the tests via `./mvnw test` and +`./mvnw verify` instead of the IDE's test runner. To use the IDE's test runner anyway, replicate +the exclusion in the test's run configuration (IntelliJ IDEA: Modify options -> Modify classpath -> +Exclude): exclude the `confluent-flink-table-api-java-plugin` JAR for unit tests, or the +`flink-table-planner-loader` JAR for integration tests. For production projects, the cleaner +structure is a multi-module build: one module contains the pipeline logic with only +`flink-table-api-java` and the Apache Flink test dependencies, and another module adds the +Confluent plugin and the deployment entrypoints. + +### Local testing limitations + +Running locally on Apache Flink is not identical to Confluent Cloud: + +- The `$rowtime` system column and other Confluent system columns do not exist locally. +- There is no local catalog mirroring your Confluent Cloud schemas. Mock tables are declared + manually with `fromValues()` and must be kept in sync with the real schemas. +- Confluent-specific SQL syntax (such as `DISTRIBUTED INTO ... BUCKETS`) and Confluent-provided + functions are not available. + +Local tests give fast feedback on transformation logic; integration tests against Confluent Cloud +remain the source of truth. + +## CI/CD with GitHub Actions + +The repository contains workflows that show how a table program moves through a CI/CD pipeline: + +- `.github/workflows/ci.yml` runs in this repository on every pull request: code format check, + compilation, local unit tests, and the fat JAR build. It requires no Confluent Cloud + credentials. +- `.github/workflows-examples/deploy.yml` is a template for your own repository: it runs the + integration tests against Confluent Cloud and then deploys the program by running its `main()` + method with `--statement-name`, `--application-name`, and `--on-conflict replace`. The statement + and application names are deployment configuration passed by the pipeline (not hardcoded in the + program), so the same name is used for deployment and for management; the application name is + prefixed to the statement name on submission (e.g. `marketplace-analytics-vendors-per-brand`). + Re-running with unchanged code is idempotent, and a changed pipeline replaces the existing + statement under the same name. + + `--on-conflict replace` deletes the existing statement and submits a new one: the new statement + starts from its configured source offsets and does not resume the previous statement's state. + For stateless pipelines (filters, projections, routing) this has no effect on results. For + stateful pipelines (aggregations, joins, deduplication, including the aggregation in this + example) the new statement rebuilds its state by reprocessing from the configured start + position, so choose the redeploy timing and the source startup mode accordingly. +- `.github/workflows-examples/manage.yml` is a template for explicit lifecycle operations. It runs + the same deployment JAR with one of the plugin's built-in actions (`list`, `describe`, `stop`, + `resume`, `delete`) as the first argument; the plugin executes the action instead of deploying, + so no separate program is needed. Deployment and lifecycle management are separate concerns; + removing code does not imply that a running statement should be stopped or deleted. + +The workflows authenticate via the environment variables described in +[Via Environment Variables](#via-environment-variables), mapped from GitHub Actions secrets. In +addition, the integration tests and the deploy entrypoint read `TARGET_CATALOG` (the name of the +Confluent Cloud environment) and `TARGET_DATABASE` (the name of the Kafka cluster) to determine +where tables may be created. For staging-to-production promotion, run the same deploy job against +different GitHub environments so that each environment provides its own secrets and protection +rules. + ## Documentation for Confluent Utilities ### Confluent Tools diff --git a/pom.xml b/pom.xml index 4a39dcd..ab53c67 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,8 @@ Apache FlinkĀ® Table API Java Examples on Confluent Cloud - 2.2.1 + 2.3.0 + 2.2-24 17 UTF-8 @@ -18,6 +19,9 @@ ${target.java.version} 2.26.0 3.7.0 + 5.12.2 + 3.27.3 + 3.5.4 @@ -53,6 +57,40 @@ ${confluent-plugin.version} + + + org.apache.flink + flink-table-planner-loader + ${flink.version} + test + + + org.apache.flink + flink-table-runtime + ${flink.version} + test + + + org.apache.flink + flink-clients + ${flink.version} + test + + + org.junit.jupiter + junit-jupiter + ${junit.version} + test + + + org.assertj + assertj-core + ${assertj.version} + test + + @@ -119,6 +157,45 @@ + + + org.apache.maven.plugins + maven-surefire-plugin + ${surefire.version} + + + io.confluent.flink:confluent-flink-table-api-java-plugin + + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + ${surefire.version} + + + org.apache.flink:flink-table-planner-loader + + + + + + integration-test + verify + + + + + diff --git a/src/main/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeployment.java b/src/main/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeployment.java index e71c1cc..3f06621 100644 --- a/src/main/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeployment.java +++ b/src/main/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeployment.java @@ -4,218 +4,114 @@ import io.confluent.flink.plugin.ConfluentTools; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; -import org.apache.flink.types.Row; -import org.apache.flink.util.CloseableIterator; - -import java.util.List; -import java.util.UUID; -import java.util.stream.Collectors; +import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.lit; -import static org.apache.flink.table.api.Expressions.withAllColumns; /** - * An example that illustrates how to embed a table program into a CI/CD pipeline for continuous - * testing and rollout. - * - *

Because we cannot rely on production data in this example, the program sets up some - * Kafka-backed tables with data during the {@code setup} phase. + * An example that illustrates how to structure, test, and deploy a table program for production use + * in a CI/CD pipeline. * - *

Afterward, the program can operate in two modes: one for integration testing ({@code test} - * phase) and one for deployment ({@code deploy} phase). + *

The example separates two concerns that often end up entangled: * - *

A CI/CD workflow could execute the following: + *

    + *
  • The pipeline logic in {@link VendorsPerBrand} is plain Table API code without any + * Confluent-specific dependencies. It receives its input table as a parameter instead of + * resolving it from a catalog. This makes the logic executable on Apache Flink in local unit + * tests, where the input is mocked with {@code fromValues()} (see {@code + * Example_08_IntegrationAndDeploymentTest}), as well as on Confluent Cloud for Apache Flink, + * where the input is a Kafka-backed table (see {@code + * Example_08_IntegrationAndDeploymentIT}). + *
  • The {@link #main(String[])} method is the deployment entrypoint. It wires the pipeline to + * Confluent Cloud and submits it as a long-running background statement. + *
* - *
- *     export EXAMPLE_JAR=./target/flink-table-api-java-examples-1.0.jar
- *     export EXAMPLE_CLASS=io.confluent.flink.examples.table.Example_08_IntegrationAndDeployment
- *     java -jar $EXAMPLE_JAR $EXAMPLE_CLASS setup
- *     java -jar $EXAMPLE_JAR $EXAMPLE_CLASS test
- *     java -jar $EXAMPLE_JAR $EXAMPLE_CLASS deploy
- * 
+ *

The program is configured with {@link ConfluentSettings#fromArgs(String[])}, which reads + * configuration from command-line arguments, with environment variables as a fallback. This also + * enables the plugin's built-in CI/CD lifecycle actions: when the JAR is run with an action as the + * first argument ({@code list}, {@code describe}, {@code resume}, {@code stop}, or {@code delete}), + * the plugin executes the action and exits before the deployment logic runs, so the same JAR both + * deploys and manages (see {@code .github/workflows-examples/manage.yml}). * - *

NOTE: This example requires write access to a Kafka cluster. Fill out the given variables - * below with target catalog/database if this is fine for you. + *

The statement name and application name are deployment configuration, not source constants: + * the pipeline provides them via {@code --statement-name} / {@code --application-name}, which keeps + * a single source of truth for both deploy and management and is required for the lifecycle actions + * (they read the name at startup, before {@code main()} runs). A program that submits several + * statements instead names each one in code via {@link + * ConfluentTools#setStatementName(TableEnvironment, String)}. * - *

ALSO NOTE: The example submits an unbounded background statement. Make sure to stop the - * statement in the Web UI afterward to clean up resources. + *

Re-running the deployment with unchanged code is idempotent. When the pipeline changed, pass + * {@code --on-conflict replace} to replace the existing statement; see the README's CI/CD section + * for what that means for stateful pipelines. The README also covers configuration, environment + * promotion, and the full set of workflow steps. * - *

The complete CI/CD workflow performs the following steps: + *

NOTE: This example requires write access to a Kafka cluster, configured via the environment + * variables TARGET_CATALOG (environment name) and TARGET_DATABASE (Kafka cluster name). * - *

    - *
  1. Create Kafka table 'ProductsMock' and 'VendorsPerBrand'. - *
  2. Fill Kafka table 'ProductsMock' with data from marketplace examples table 'products'. - *
  3. Test the given SQL on a subset of data in 'ProductsMock' with the help of dynamic options. - *
  4. Deploy an unbounded version of the tested SQL that write into 'VendorsPerBrand'. - *
+ *

ALSO NOTE: The example submits an unbounded background statement. Use the lifecycle actions + * (see {@code .github/workflows-examples/manage.yml}) or the Web UI to stop and delete the + * statement afterward to clean up resources. */ public class Example_08_IntegrationAndDeployment { - // Fill this with an environment you have write access to - static final String TARGET_CATALOG = ""; - - // Fill this with a Kafka cluster you have write access to - static final String TARGET_DATABASE = ""; - - // Fill this with names of the Kafka Topics you want to create - static final String SOURCE_TABLE = "ProductsMock"; + // Name of the table that stores the results static final String TARGET_TABLE = "VendorsPerBrand"; - // The following SQL will be tested on a finite subset of data before - // it gets deployed to production. - // In production, it will run on unbounded input. - // The '%s' parameterizes the SQL for testing. - static final String SQL = - "SELECT brand, COUNT(*) AS vendors FROM ProductsMock %s GROUP BY brand"; - - // All logic is defined in a main() method. It can run both in an IDE or CI/CD system. - public static void main(String[] args) throws Exception { - if (args.length == 0) { - throw new IllegalArgumentException( - "No mode specified. Possible values are 'setup', 'test', or 'deploy'."); - } - - EnvironmentSettings settings = ConfluentSettings.fromResource("/cloud.properties"); - TableEnvironment env = TableEnvironment.create(settings); - env.useCatalog(TARGET_CATALOG); - env.useDatabase(TARGET_DATABASE); - - String mode = args[0]; - switch (mode) { - case "setup": - setupProgram(env); - break; - case "test": - testProgram(env); - break; - case "deploy": - deployProgram(env); - break; - default: - throw new IllegalArgumentException("Unknown mode: " + mode); + /** + * The pipeline logic under test: counts the number of vendors per brand. + * + *

This class must not reference any {@code io.confluent.flink.plugin} classes so that unit + * tests can run it on Apache Flink without the plugin on the classpath. + */ + public static class VendorsPerBrand { + public static Table buildPipeline(Table products) { + return products.groupBy($("brand")).select($("brand"), lit(1).count().as("vendors")); } } - // -------------------------------------------------------------------------------------------- - // Setup Phase - // -------------------------------------------------------------------------------------------- + // The main() method performs the deployment, unless an action argument is present, in which + // case ConfluentSettings.fromArgs(...) below executes that action and exits before the rest + // of this method runs. + public static void main(String[] args) { + // In GitHub Actions, the connection variables map naturally to repository or environment + // secrets; see the README for the full list. + EnvironmentSettings settings = ConfluentSettings.fromArgs(args); + TableEnvironment env = TableEnvironment.create(settings); - private static void setupProgram(TableEnvironment env) throws Exception { - System.out.println("Running setup..."); + env.useCatalog(requireEnv("TARGET_CATALOG")); + env.useDatabase(requireEnv("TARGET_DATABASE")); - System.out.println("Creating table..." + SOURCE_TABLE); - // Create a mock table that has exactly the same schema as the example `products` table. - // The LIKE clause is very convenient for this task which is why we use SQL here. - // Since we use little data, a bucket of 1 is important to satisfy the `scan.bounded.mode` - // during testing. + System.out.println("Creating table... " + TARGET_TABLE); + // The pipeline owns its output table and creates it on the first deployment. env.executeSql( String.format( "CREATE TABLE IF NOT EXISTS `%s`\n" - + "DISTRIBUTED INTO 1 BUCKETS\n" - + "LIKE `examples`.`marketplace`.`products` (EXCLUDING OPTIONS)", - SOURCE_TABLE)); - - System.out.println("Start filling table..."); - // Let Flink copy generated data into the mock table. Note that the statement is unbounded - // and submitted as a background statement by default. - TableResult pipelineResult = - env.from("`examples`.`marketplace`.`products`") - .select(withAllColumns()) - .insertInto(SOURCE_TABLE) - .execute(); - - System.out.println("Waiting for at least 200 elements in table..."); - // We start a second Flink statement for monitoring how the copying progresses - TableResult countResult = env.from(SOURCE_TABLE).select(lit(1).count()).as("c").execute(); - // This waits for the condition to be met: - try (CloseableIterator iterator = countResult.collect()) { - while (iterator.hasNext()) { - Row row = iterator.next(); - long count = row.getFieldAs("c"); - if (count >= 200L) { - System.out.println("200 elements reached. Stopping..."); - break; - } - } - } - - // By using a closable iterator, the foreground statement will be stopped automatically when - // the iterator is closed. But the background statement still needs a manual stop. - ConfluentTools.stopStatement(pipelineResult); - - System.out.println("Creating table..." + TARGET_TABLE); - // Create a table for storing the results after deployment. - env.executeSql( - String.format( - "CREATE TABLE IF NOT EXISTS `%s` \n" + "(brand STRING, vendors BIGINT, PRIMARY KEY(brand) NOT ENFORCED)\n" + "DISTRIBUTED INTO 1 BUCKETS", TARGET_TABLE)); - } - - // -------------------------------------------------------------------------------------------- - // Test Phase - // -------------------------------------------------------------------------------------------- - private static void testProgram(TableEnvironment env) { - System.out.println("Running test..."); - - // Dynamic options allow influencing parts of a table scan. In this case, they define a - // range (from start offset '0' to end offset '100') how to read from Kafka. Effectively, - // they make the table bounded. If all tables are finite, the statement can terminate. - // This allows us to run checks on the result. - String dynamicOptions = - "/*+ OPTIONS(\n" - + "'scan.startup.mode' = 'specific-offsets',\n" - + "'scan.startup.specific-offsets' = 'partition: 0, offset: 0',\n" - + "'scan.bounded.mode' = 'specific-offsets',\n" - + "'scan.bounded.specific-offsets' = 'partition: 0, offset: 100'\n" - + ") */"; - - System.out.println("Requesting test data..."); - TableResult result = env.executeSql(String.format(SQL, dynamicOptions)); - List rows = ConfluentTools.collectMaterialized(result); + System.out.println("Deploying statement..."); + // The same pipeline logic that was tested locally and against Confluent Cloud now runs + // unbounded on the continuously generated rows of the examples catalog. + Table products = env.from("`examples`.`marketplace`.`products`"); + TableResult result = + VendorsPerBrand.buildPipeline(products).insertInto(TARGET_TABLE).execute(); + // Print the final submitted name (application prefix included) for use with the lifecycle + // actions. If no name was configured, the plugin generates one, which is not addressable + // for later management; CI/CD deployments should always pass --statement-name. System.out.println( - "Test data:\n" - + rows.stream().map(Row::toString).collect(Collectors.joining("\n"))); - - // Use the testing framework of your choice and add checks to verify the - // correctness of the test data - boolean testSuccessful = - rows.stream() - .map(r -> r.getFieldAs("brand")) - .anyMatch(brand -> brand.equals("Apple")); - if (testSuccessful) { - System.out.println("Success. Ready for deployment."); - } else { - throw new IllegalStateException("Test was not successful"); - } + "Statement has been deployed as: " + ConfluentTools.getStatementName(result)); } - // -------------------------------------------------------------------------------------------- - // Deploy Phase - // -------------------------------------------------------------------------------------------- - - private static void deployProgram(TableEnvironment env) { - System.out.println("Running deploy..."); - - // It is possible to give a better statement name for deployment but make sure that the name - // is unique across environment and region. - String statementName = "vendors-per-brand-" + UUID.randomUUID(); - env.getConfig().set("client.statement-name", statementName); - - // Execute the SQL without dynamic options. - // The result is unbounded and piped into the target table. - TableResult insertIntoResult = - env.sqlQuery(String.format(SQL, "")).insertInto(TARGET_TABLE).execute(); - - // The API might add suffixes to manual statement names such as '-sql' or '-api'. - // For the final submitted name, use the provided tools. - String finalName = ConfluentTools.getStatementName(insertIntoResult); - - System.out.println("Statement has been deployed as: " + finalName); + private static String requireEnv(String name) { + String value = System.getenv(name); + if (value == null || value.isBlank()) { + throw new IllegalArgumentException("Environment variable '" + name + "' is required."); + } + return value; } } diff --git a/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentIT.java b/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentIT.java new file mode 100644 index 0000000..53dab7f --- /dev/null +++ b/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentIT.java @@ -0,0 +1,189 @@ +package io.confluent.flink.examples.table; + +import io.confluent.flink.plugin.ConfluentPluginOptions; +import io.confluent.flink.plugin.ConfluentSettings; +import io.confluent.flink.plugin.ConfluentTools; + +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.flink.table.api.Expressions.lit; +import static org.apache.flink.table.api.Expressions.withAllColumns; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * Integration tests for the pipeline logic of {@link Example_08_IntegrationAndDeployment}, executed + * against Confluent Cloud. + * + *

While the unit tests (see {@code Example_08_IntegrationAndDeploymentTest}) verify the logic + * locally, these tests verify it on the real service: the exact Confluent SQL semantics, the + * Confluent catalog, and Kafka-backed tables. + * + *

The tests run during {@code ./mvnw verify} and fail fast when the required environment + * variables are not set, so a CI pipeline cannot silently skip its verification step and still + * report success. Builds without Confluent Cloud credentials skip them with {@code ./mvnw verify + * -DskipITs}. They require the standard connection variables (see the README's "Via Environment + * Variables" section) plus TARGET_CATALOG and TARGET_DATABASE pointing to an environment and Kafka + * cluster with write access. + * + *

Because we cannot rely on production data in this example, the test fixture creates a mock + * Kafka-backed table and fills it with data from the marketplace examples table. Dynamic options + * then make the table bounded, so the pipeline terminates and its result can be asserted. + * + *

NOTE: Running from the IDE needs the opposite classpath exclusion from the unit tests (the + * {@code flink-table-planner-loader} JAR) plus the environment variables in the run configuration; + * see the README's testing section. + */ +class Example_08_IntegrationAndDeploymentIT { + + // Name of the mock Kafka topic that emulates the production input + static final String SOURCE_TABLE = "ProductsMock"; + + static TableEnvironment env; + + @BeforeAll + // The timeout runs the setup in a separate thread so that it can be interrupted even while + // blocked on statement results, e.g. when the compute pool has no capacity for the fill + // statement. Without it, a stuck setup would hang until the CI job timeout. + @Timeout(value = 15, unit = TimeUnit.MINUTES, threadMode = Timeout.ThreadMode.SEPARATE_THREAD) + static void setUpMockTable() throws Exception { + requireEnvironment(); + env = TableEnvironment.create(ConfluentSettings.fromGlobalVariables()); + env.useCatalog(System.getenv("TARGET_CATALOG")); + env.useDatabase(System.getenv("TARGET_DATABASE")); + + System.out.println("Creating table... " + SOURCE_TABLE); + // Create a mock table that has exactly the same schema as the example `products` table. + // The LIKE clause is very convenient for this task which is why we use SQL here. + // Since we use little data, a bucket of 1 is important to satisfy the + // `scan.bounded.mode` during testing. + env.executeSql( + String.format( + "CREATE TABLE IF NOT EXISTS `%s`\n" + + "DISTRIBUTED INTO 1 BUCKETS\n" + + "LIKE `examples`.`marketplace`.`products` (EXCLUDING OPTIONS)", + SOURCE_TABLE)); + + System.out.println("Start filling table..."); + // Let Flink copy generated data into the mock table. Note that the statement is + // unbounded and submitted as a background statement by default. + TableResult pipelineResult = + env.from("`examples`.`marketplace`.`products`") + .select(withAllColumns()) + .insertInto(SOURCE_TABLE) + .execute(); + + long count = 0; + try { + System.out.println("Waiting for at least 200 elements in table..."); + // A second Flink statement monitors how the copying progresses. The foreground + // statement is stopped automatically when its iterator is closed. + TableResult countResult = + env.from(SOURCE_TABLE).select(lit(1).count()).as("c").execute(); + try (CloseableIterator iterator = countResult.collect()) { + while (count < 200L && iterator.hasNext()) { + count = iterator.next().getFieldAs("c"); + } + } + } finally { + // The fill statement is unbounded and must always be cleaned up, even when the wait + // above fails, as it would otherwise keep running and consuming CFUs. It is also + // deleted rather than just stopped: it gets a random name on every run, and stopped + // statements would accumulate in the environment. + String fillStatement = ConfluentTools.getStatementName(pipelineResult); + ConfluentTools.stopStatement(env, fillStatement); + ConfluentTools.deleteStatement(env, fillStatement); + } + if (count < 200L) { + fail( + "The mock table only reached " + + count + + " elements before the monitoring statement terminated."); + } + System.out.println("200 elements reached."); + } + + // Fails fast with a clear message instead of skipping, so that a CI pipeline with missing + // secrets cannot report a successful verification that never ran. The connection variable + // names come from ConfluentPluginOptions, so the list cannot drift from the plugin contract. + private static void requireEnvironment() { + List required = new ArrayList<>(); + // A properties file referenced via FLINK_PROPERTIES is a valid alternative to the + // discrete connection variables (see the README's "Configuration" section). + if (isBlank(System.getenv(ConfluentPluginOptions.VAR_FLINK_PROPERTIES))) { + required.add(ConfluentPluginOptions.VAR_CLOUD_PROVIDER); + required.add(ConfluentPluginOptions.VAR_CLOUD_REGION); + required.add(ConfluentPluginOptions.VAR_FLINK_API_KEY); + required.add(ConfluentPluginOptions.VAR_FLINK_API_SECRET); + required.add(ConfluentPluginOptions.VAR_ORG_ID); + required.add(ConfluentPluginOptions.VAR_ENV_ID); + required.add(ConfluentPluginOptions.VAR_COMPUTE_POOL_ID); + } + required.add("TARGET_CATALOG"); + required.add("TARGET_DATABASE"); + List missing = + required.stream() + .filter(name -> isBlank(System.getenv(name))) + .collect(Collectors.toList()); + if (!missing.isEmpty()) { + fail( + "Integration tests verify the pipeline against Confluent Cloud and require the" + + " environment variables " + + missing + + ". Set them (see the README section 'Via Environment Variables') or" + + " skip the integration tests explicitly with -DskipITs."); + } + } + + private static boolean isBlank(String value) { + return value == null || value.isBlank(); + } + + @Test + void countsVendorsPerBrandOnBoundedData() { + // Dynamic options allow influencing parts of a table scan. In this case, they define a + // range (from start offset '0' to end offset '100') how to read from Kafka. Effectively, + // they make the table bounded. If all tables are finite, the statement can terminate. + // This allows us to run checks on the result. + Table boundedProducts = + env.sqlQuery( + String.format( + "SELECT * FROM `%s` /*+ OPTIONS(\n" + + "'scan.startup.mode' = 'specific-offsets',\n" + + "'scan.startup.specific-offsets' = 'partition: 0, offset: 0',\n" + + "'scan.bounded.mode' = 'specific-offsets',\n" + + "'scan.bounded.specific-offsets' = 'partition: 0, offset: 100'\n" + + ") */", + SOURCE_TABLE)); + + // The exact same pipeline logic that the unit tests run locally + Table result = + Example_08_IntegrationAndDeployment.VendorsPerBrand.buildPipeline(boundedProducts); + + List rows = ConfluentTools.collectMaterialized(result.execute()); + + assertThat(rows).isNotEmpty(); + assertThat(rows) + .allSatisfy( + row -> { + assertThat(row.getFieldAs("brand")).isNotBlank(); + assertThat(row.getFieldAs("vendors")).isPositive(); + }); + // The examples data generator produces a fixed set of brands. Checking for a known one + // guards against reading the wrong data, not just producing plausible-looking rows. + assertThat(rows).extracting(row -> row.getFieldAs("brand")).contains("Apple"); + } +} diff --git a/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentTest.java b/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentTest.java new file mode 100644 index 0000000..5e62572 --- /dev/null +++ b/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentTest.java @@ -0,0 +1,108 @@ +package io.confluent.flink.examples.table; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.table.api.Expressions.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for the pipeline logic of {@link Example_08_IntegrationAndDeployment}. + * + *

These tests run entirely locally on Apache Flink with mock data from {@code fromValues()}. No + * Confluent Cloud connectivity, credentials, or compute pool are required, which makes them + * suitable for fast feedback during development and for CI runs on pull requests. + * + *

NOTE: The Confluent plugin and the Apache Flink planner cannot share a runtime classpath (both + * register Executor and Planner factories under the identifier 'default'), so these tests must be + * executed via {@code ./mvnw test}, where the surefire configuration excludes the plugin. Running + * them directly from the IDE fails with "Multiple factories for identifier 'default'"; see the + * README's testing section for the IDE run-configuration setup. + * + *

ALSO NOTE: Running locally on Apache Flink is not identical to Confluent Cloud. + * Confluent-specific features such as the {@code $rowtime} system column, the Confluent catalog, + * and Confluent SQL extensions are not available locally. Use the integration tests (see {@code + * Example_08_IntegrationAndDeploymentIT}) to verify behavior against the real service. + */ +class Example_08_IntegrationAndDeploymentTest { + + private static Table mockProducts(TableEnvironment env) { + return env.fromValues( + DataTypes.ROW( + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("brand", DataTypes.STRING())), + row("MacBook", "Apple"), + row("iPhone", "Apple"), + row("Galaxy", "Samsung")); + } + + @Test + void countsVendorsPerBrandInBatchMode() throws Exception { + // Batch mode computes the final result over the finite mock data, which makes + // assertions straightforward. + TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inBatchMode()); + + Table result = + Example_08_IntegrationAndDeployment.VendorsPerBrand.buildPipeline( + mockProducts(env)); + + assertThat(collectRows(result)) + .containsExactlyInAnyOrder(Row.of("Apple", 2L), Row.of("Samsung", 1L)); + } + + @Test + void countsVendorsPerBrandInStreamingMode() throws Exception { + // Streaming mode emits a changelog: an insert for the first product of a brand, + // followed by update_before/update_after pairs as more products arrive. This mirrors + // how the statement behaves on Confluent Cloud. + TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + + Table result = + Example_08_IntegrationAndDeployment.VendorsPerBrand.buildPipeline( + mockProducts(env)); + + List changelog = collectRows(result); + assertThat(materialize(changelog)) + .containsExactlyInAnyOrder(Row.of("Apple", 2L), Row.of("Samsung", 1L)); + } + + private static List collectRows(Table table) throws Exception { + List rows = new ArrayList<>(); + try (CloseableIterator iterator = table.execute().collect()) { + iterator.forEachRemaining(rows::add); + } + return rows; + } + + // Applies the changelog to derive the final result, similar to what + // ConfluentTools.collectMaterialized() does for statements running on Confluent Cloud. + // Rows are copied so that the caller's changelog is left untouched. + private static List materialize(List changelog) { + List state = new ArrayList<>(); + for (Row row : changelog) { + Row copy = Row.copy(row); + copy.setKind(RowKind.INSERT); + switch (row.getKind()) { + case INSERT: + case UPDATE_AFTER: + state.add(copy); + break; + case UPDATE_BEFORE: + case DELETE: + state.remove(copy); + break; + } + } + return state; + } +} diff --git a/src/test/java/io/confluent/flink/examples/table/Example_09_FunctionsTest.java b/src/test/java/io/confluent/flink/examples/table/Example_09_FunctionsTest.java new file mode 100644 index 0000000..db66f3f --- /dev/null +++ b/src/test/java/io/confluent/flink/examples/table/Example_09_FunctionsTest.java @@ -0,0 +1,43 @@ +package io.confluent.flink.examples.table; + +import org.apache.flink.api.common.functions.util.ListCollector; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for the User-Defined Functions of {@link Example_09_Functions}. + * + *

UDFs are plain Java classes, so their logic can be tested with JUnit alone: no Apache Flink, + * no Confluent Cloud connectivity, and no artifact upload are required. This is the fastest test + * tier and should cover the bulk of a UDF's business logic before it is registered and exercised on + * Confluent Cloud. + */ +class Example_09_FunctionsTest { + + @Test + void customTaxReturnsRatePerLocation() { + Example_09_Functions.CustomTax tax = new Example_09_Functions.CustomTax(); + + assertThat(tax.eval("USA")).isEqualTo(10); + assertThat(tax.eval("EU")).isEqualTo(5); + assertThat(tax.eval("Mars")).isEqualTo(0); + } + + @Test + void explodeEmitsOneRowPerElement() { + Example_09_Functions.Explode explode = new Example_09_Functions.Explode(); + + // Table functions emit rows via a collector, which tests can replace with a list + List collected = new ArrayList<>(); + explode.setCollector(new ListCollector<>(collected)); + + explode.eval(List.of("Apples", "Bananas")); + + assertThat(collected).containsExactly("Apples", "Bananas"); + } +}