diff --git a/Cargo.lock b/Cargo.lock index 86851059fce5e..10cec38aca6fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -172,6 +172,22 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "antithesis_sdk" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18dbd97a5b6c21cc9176891cf715f7f0c273caf3959897f43b9bd1231939e675" +dependencies = [ + "libc", + "libloading", + "linkme", + "once_cell", + "rand 0.8.5", + "rustc_version_runtime", + "serde", + "serde_json", +] + [[package]] name = "anyhow" version = "1.0.102" @@ -5120,6 +5136,26 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "linkme" +version = "0.3.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e83272d46373fb8decca684579ac3e7c8f3d71d4cc3aa693df8759e260ae41cf" +dependencies = [ + "linkme-impl", +] + +[[package]] +name = "linkme-impl" +version = "0.3.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32d59e20403c7d08fe62b4376edfe5c7fb2ef1e6b1465379686d0f21c8df444b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "linux-raw-sys" version = "0.4.15" @@ -5779,6 +5815,7 @@ dependencies = [ name = "mz-catalog" version = "0.0.0" dependencies = [ + "antithesis_sdk", "anyhow", "async-trait", "base64 0.22.1", @@ -7167,6 +7204,7 @@ dependencies = [ name = "mz-persist-client" version = "26.25.0-dev.0" dependencies = [ + "antithesis_sdk", "anyhow", "arrayvec 0.7.6", "arrow", @@ -7942,6 +7980,7 @@ dependencies = [ name = "mz-storage" version = "0.0.0" dependencies = [ + "antithesis_sdk", "anyhow", "arrow", "arrow-ipc", @@ -10661,6 +10700,16 @@ dependencies = [ "semver", ] +[[package]] +name = "rustc_version_runtime" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dd18cd2bae1820af0b6ad5e54f4a51d0f3fcc53b05f845675074efcc7af071d" +dependencies = [ + "rustc_version", + "semver", +] + [[package]] name = "rustix" version = "0.38.44" diff --git a/Cargo.toml b/Cargo.toml index 8ba97cb61b290..5d38ff3d8124b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -263,6 +263,7 @@ ahash = { version = "0.8.12", default-features = false } aho-corasick = "1.1.4" allocation-counter = "0" anyhow = "1.0.102" +antithesis_sdk = "0.2.8" array-concat = "0.5.5" arrayvec = "0.7.6" arrow = { version = "57", default-features = false } diff --git a/bin/ci-builder b/bin/ci-builder index 066bf273130a9..6d53be5cad2f5 100755 --- a/bin/ci-builder +++ b/bin/ci-builder @@ -18,6 +18,9 @@ set -euo pipefail NIGHTLY_RUST_DATE=2026-05-06 +# Allow overriding the container runtime (e.g. MZ_DEV_CI_BUILDER_RUNTIME=podman). +DOCKER="${MZ_DEV_CI_BUILDER_RUNTIME:-docker}" + workdir=$(pwd) cd "$(dirname "$0")/.." @@ -128,10 +131,14 @@ gid=$(id -g) [[ "$gid" -lt 500 ]] && gid=$uid build() { + local cache_args=() + if [[ "$DOCKER" != "podman" ]]; then + cache_args+=(--cache-from=materialize/ci-builder:"$cache_tag") + cache_args+=(--cache-to=type=inline,mode=max) + fi # shellcheck disable=SC2086 # intentional splitting of build args string - docker buildx build --pull \ - --cache-from=materialize/ci-builder:"$cache_tag" \ - --cache-to=type=inline,mode=max \ + "$DOCKER" buildx build --pull \ + "${cache_args[@]}" \ $docker_build_args \ --tag materialize/ci-builder:"$tag" \ --tag ghcr.io/materializeinc/materialize/ci-builder:"$tag" \ @@ -181,13 +188,13 @@ case "$cmd" in build "$@" ;; exists) - docker manifest inspect "$image_registry"/ci-builder:"$tag" &> /dev/null + "$DOCKER" manifest inspect "$image_registry"/ci-builder:"$tag" &> /dev/null ;; tag) echo "$tag" ;; push) - docker login ghcr.io -u materialize-bot --password "$GITHUB_GHCR_TOKEN" + "$DOCKER" login ghcr.io -u materialize-bot --password "$GITHUB_GHCR_TOKEN" build --push "$@" ;; run) @@ -274,6 +281,7 @@ case "$cmd" in --env AZURE_SERVICE_ACCOUNT_PASSWORD --env AZURE_SERVICE_ACCOUNT_TENANT --env GCP_SERVICE_ACCOUNT_JSON + --env ANTITHESIS_GCP_SERVICE_ACCOUNT_JSON --env GITHUB_TOKEN --env GITHUB_GHCR_TOKEN --env GPG_KEY @@ -372,20 +380,26 @@ case "$cmd" in ) fi if [[ "$(uname -s)" = Linux ]]; then - args+=( - --user "$(id -u):$(stat -c %g /var/run/docker.sock)" - ) + if [[ "${MZ_DEV_CI_BUILDER_RUNTIME:-docker}" == "podman" ]]; then + args+=(--userns=keep-id) + else + args+=( + --user "$(id -u):$(stat -c %g /var/run/docker.sock)" + ) + fi if [[ $secrets == "true" ]]; then # Allow Docker-in-Docker by mounting the Docker socket in the # container. Host networking allows us to see ports created by # containers that we launch. args+=( - --volume "/var/run/docker.sock:/var/run/docker.sock" --network host --env "DOCKER_TLS_VERIFY=${DOCKER_TLS_VERIFY-}" --env "DOCKER_HOST=${DOCKER_HOST-}" ) + if [[ -S /var/run/docker.sock ]]; then + args+=(--volume "/var/run/docker.sock:/var/run/docker.sock") + fi # Forward Docker configuration too, if available. docker_dir=${DOCKER_CONFIG:-$HOME/.docker} @@ -431,14 +445,22 @@ case "$cmd" in image="$image_registry/ci-builder:$tag" # Try downloading the image a few times in case of registry flakiness if [[ "${CI:-}" ]]; then - if ! docker inspect "$image" > /dev/null 2>&1; then - docker pull "$image" || (sleep 3 && docker pull "$image") || (sleep 3 && docker pull "$image") || sleep 3 + if ! "$DOCKER" inspect "$image" > /dev/null 2>&1; then + "$DOCKER" pull "$image" || (sleep 3 && "$DOCKER" pull "$image") || (sleep 3 && "$DOCKER" pull "$image") || sleep 3 fi fi - docker run "${args[@]}" "$image" eatmydata "${docker_command[@]}" + if [[ "$DOCKER" == "podman" ]]; then + # --userns=keep-id already maps the host UID/GID into the + # container, so autouseradd is unnecessary. Override the + # entrypoint to skip it. + args+=(--entrypoint eatmydata) + "$DOCKER" run "${args[@]}" "$image" "${docker_command[@]}" + else + "$DOCKER" run "${args[@]}" "$image" eatmydata "${docker_command[@]}" + fi ;; root-shell) - docker exec --interactive --tty --user 0:0 "$(<"$cid_file")" eatmydata ci/builder/root-shell.sh + "$DOCKER" exec --interactive --tty --user 0:0 "$(<"$cid_file")" eatmydata ci/builder/root-shell.sh ;; *) printf "unknown command %q\n" "$cmd" diff --git a/ci/builder/Dockerfile b/ci/builder/Dockerfile index be1da20d8591f..eb6b71be277a4 100644 --- a/ci/builder/Dockerfile +++ b/ci/builder/Dockerfile @@ -399,6 +399,11 @@ ENV CARGO_HOME=/cargo RUN mkdir /cargo && chmod 777 /cargo VOLUME /cargo +# Antithesis coverage instrumentation library (used when --antithesis is passed) +RUN curl -sSL https://antithesis.com/assets/instrumentation/libvoidstar.so \ + -o /usr/lib/libvoidstar.so \ + && ldconfig + # Stage 3: Build a lightweight CI Builder image for console/playwright jobs. FROM ubuntu:noble-20260324 AS ci-builder-console diff --git a/ci/mkpipeline.py b/ci/mkpipeline.py index 79fcb7bd2a0c9..d6be6018c7532 100644 --- a/ci/mkpipeline.py +++ b/ci/mkpipeline.py @@ -121,6 +121,12 @@ def main() -> int: type=Sanitizer, choices=Sanitizer, ) + parser.add_argument( + "--antithesis", + action="store_true", + default=ui.env_is_truthy("CI_ANTITHESIS"), + help="enable Antithesis coverage instrumentation", + ) parser.add_argument( "--priority", type=int, @@ -166,6 +172,7 @@ def get_hashes(arch: Arch) -> tuple[str, bool]: arch=arch, coverage=args.coverage, sanitizer=args.sanitizer, + antithesis=args.antithesis, ) deps = repo.resolve_dependencies(image for image in repo if image.publish) check = deps.check() @@ -209,6 +216,7 @@ def fetch_hashes() -> None: args.coverage, args.sanitizer, lto, + args.antithesis, ) trim_ci_glue_exempt_steps(pipeline) else: @@ -218,9 +226,11 @@ def fetch_hashes() -> None: args.coverage, args.sanitizer, lto, + args.antithesis, ) truncate_skip_length(pipeline) handle_sanitizer_skip(pipeline, args.sanitizer) + handle_antithesis_skip(pipeline, args.antithesis) increase_agents_timeouts(pipeline, args.sanitizer, args.coverage) prioritize_pipeline(pipeline, args.priority) switch_jobs_to_aws(pipeline, args.priority) @@ -240,6 +250,7 @@ def fetch_hashes() -> None: args.coverage, args.sanitizer, lto, + args.antithesis, ) add_nightly_deploy_dependency(pipeline, args.pipeline) remove_dependencies_on_prs(pipeline, args.pipeline, hash_check) @@ -328,6 +339,21 @@ def handle_sanitizer_skip(pipeline: Any, sanitizer: Sanitizer) -> None: step["skip"] = True +def handle_antithesis_skip(pipeline: Any, antithesis: bool) -> None: + if antithesis: + pipeline.setdefault("env", {})["CI_ANTITHESIS"] = "1" + + for step in steps(pipeline): + if step.get("antithesis") == "skip": + step["skip"] = True + + else: + + for step in steps(pipeline): + if step.get("antithesis") == "only": + step["skip"] = True + + def increase_agents_timeouts( pipeline: Any, sanitizer: Sanitizer, coverage: bool ) -> None: @@ -711,6 +737,7 @@ def trim_tests_pipeline( coverage: bool, sanitizer: Sanitizer, lto: bool, + antithesis: bool = False, ) -> None: """Trim pipeline steps whose inputs have not changed in this branch. @@ -731,6 +758,7 @@ def trim_tests_pipeline( profile=mzbuild.Profile.RELEASE if lto else mzbuild.Profile.OPTIMIZED, coverage=coverage, sanitizer=sanitizer, + antithesis=antithesis, ) deps = repo.resolve_dependencies(image for image in repo) @@ -917,6 +945,7 @@ def add_cargo_test_dependency( coverage: bool, sanitizer: Sanitizer, lto: bool, + antithesis: bool = False, ) -> None: """Cargo Test normally doesn't have to wait for the build to complete, but it requires a few images (ubuntu-base, postgres), which are rarely changed. So only add a dependency when those images are not on Dockerhub yet.""" if pipeline_name not in ("test", "nightly"): @@ -933,6 +962,7 @@ def add_cargo_test_dependency( profile=mzbuild.Profile.RELEASE if lto else mzbuild.Profile.OPTIMIZED, coverage=coverage, sanitizer=sanitizer, + antithesis=antithesis, ) composition = Composition(repo, name="cargo-test") deps = composition.dependencies @@ -1090,6 +1120,8 @@ def remove_mz_specific_keys(pipeline: Any) -> None: del step["coverage"] if "sanitizer" in step: del step["sanitizer"] + if "antithesis" in step: + del step["antithesis"] if "ci_glue_exempt" in step: del step["ci_glue_exempt"] if ( diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index d10055451b451..b3c3068e04970 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -65,6 +65,29 @@ steps: branches: "main" skip: "currently broken" + - id: build-x86_64-antithesis + label: ":rust: Build x86_64 (Antithesis)" + # Regenerate the antithesis compose YAML before building so the + # `antithesis-config` image's fingerprint captures the same + # materialized fingerprint we're about to publish — otherwise + # Antithesis would try to pull a stale `materialized:mzbuild-…` + # whenever the committed YAML lagged behind source changes. + command: bin/ci-builder run stable ci/test/build-antithesis.sh + inputs: + - "*" + depends_on: [] + timeout_in_minutes: 90 + agents: + queue: l-builder-linux-x86_64 + env: + CI_ANTITHESIS: "1" + # Antithesis-flavored images get distinct mzbuild fingerprints, so + # they coexist with regular GHCR tags. The build is x86_64-only — + # Antithesis runs amd64 sandboxes. + sanitizer: skip + coverage: skip + antithesis: skip + - id: build-rust-latest-beta label: "Build with Latest Rust Beta" command: bin/ci-builder run stable ci/test/rust-beta-build.sh diff --git a/ci/test/build-antithesis.sh b/ci/test/build-antithesis.sh new file mode 100755 index 0000000000000..23d9480ad8188 --- /dev/null +++ b/ci/test/build-antithesis.sh @@ -0,0 +1,56 @@ +#!/usr/bin/env bash + +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. +# +# build-antithesis.sh — antithesis-flavored build + Antithesis-registry push. +# +# 1. Write `.env` so `antithesis-config` bakes in compose refs that point +# at the Antithesis GCP Artifact Registry (where we'll mirror to). The +# .env content is one of antithesis-config's mzbuild inputs, so the +# image fingerprint tracks the source it references — self-consistent. +# 2. Run the standard `ci.test.build` to compile antithesis-flavored Rust +# binaries and build the docker images (pushed to GHCR via mzbuild). +# 3. `docker login` the Antithesis GCP Artifact Registry using +# `ANTITHESIS_GCP_SERVICE_ACCOUNT_JSON` (a service account scoped to +# `materialize-storage@molten-verve-216720.iam.gserviceaccount.com` — +# kept distinct from `GCP_SERVICE_ACCOUNT_JSON` which is used elsewhere +# for unrelated GCP integrations). +# 4. Retag + push `materialized`, `antithesis-workload`, and +# `antithesis-config` to the Antithesis registry. Public images +# referenced by the compose (postgres, minio, kafka stack) stay on +# their upstream registries — Antithesis can reach those directly. + +set -euo pipefail + +: "${CI_ANTITHESIS:?build-antithesis.sh expects CI_ANTITHESIS=1}" + +# GCP Artifact Registry path for Antithesis. Tags pushed under +# $ANTITHESIS_REGISTRY/:mzbuild-. +ANTITHESIS_REGISTRY="${ANTITHESIS_REGISTRY:-us-central1-docker.pkg.dev/molten-verve-216720/materialize-repository}" + +echo "--- Writing test/antithesis/config/.env (registry: $ANTITHESIS_REGISTRY)" +bin/pyactivate test/antithesis/export-env.py \ + --registry "$ANTITHESIS_REGISTRY" \ + > test/antithesis/config/.env + +echo "--- Building antithesis-flavored mzbuild images" +bin/pyactivate -m ci.test.build + +echo "--- Authenticating to Antithesis registry" +if [[ -z "${ANTITHESIS_GCP_SERVICE_ACCOUNT_JSON:-}" ]]; then + echo "ANTITHESIS_GCP_SERVICE_ACCOUNT_JSON is unset — pushing to the Antithesis registry will fail." >&2 + echo "Provision it as a Buildkite-agent env var (see bin/ci-builder env-forwarding)." >&2 + exit 1 +fi +echo "$ANTITHESIS_GCP_SERVICE_ACCOUNT_JSON" \ + | docker login -u _json_key --password-stdin "https://${ANTITHESIS_REGISTRY%%/*}" + +echo "--- Pushing Materialize-built images to the Antithesis registry" +bin/pyactivate test/antithesis/push-antithesis.py --registry "$ANTITHESIS_REGISTRY" diff --git a/ci/test/build.py b/ci/test/build.py index d91e82ffe2734..89d9402aab08f 100755 --- a/ci/test/build.py +++ b/ci/test/build.py @@ -34,18 +34,36 @@ def main() -> None: set_build_status("pending") coverage = ui.env_is_truthy("CI_COVERAGE_ENABLED") sanitizer = Sanitizer[os.getenv("CI_SANITIZER", "none")] + antithesis = ui.env_is_truthy("CI_ANTITHESIS") repo = mzbuild.Repository( Path("."), coverage=coverage, sanitizer=sanitizer, + antithesis=antithesis, image_registry="materialize", ) # Build and push any images that are not already available on Docker Hub, # so they are accessible to other build agents. print("--- Acquiring mzbuild images") - deps = repo.resolve_dependencies(image for image in repo if image.publish) + if antithesis: + # Antithesis only consumes these three images; everything else in + # the repo (balancerd, sqllogictest, testdrive, ...) is wasted CI + # time for this pipeline. resolve_dependencies walks depends_on + # transitively, so anything materialized actually needs still + # comes along. Keep this list in sync with ANTITHESIS_IMAGES in + # test/antithesis/push-antithesis.py. + antithesis_images = [ + "materialized", + "antithesis-workload", + "antithesis-config", + ] + deps = repo.resolve_dependencies( + repo.images[name] for name in antithesis_images + ) + else: + deps = repo.resolve_dependencies(image for image in repo if image.publish) deps.ensure(pre_build=lambda images: upload_debuginfo(repo, images)) set_build_status("success") annotate_buildkite_with_tags(repo.rd.arch, deps) diff --git a/ci/test/lint-main/checks/check-antithesis-compose.sh b/ci/test/lint-main/checks/check-antithesis-compose.sh new file mode 100755 index 0000000000000..55c54f0bccfba --- /dev/null +++ b/ci/test/lint-main/checks/check-antithesis-compose.sh @@ -0,0 +1,47 @@ +#!/usr/bin/env bash + +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. +# +# check-antithesis-compose.sh — ensure test/antithesis/config/docker-compose.yaml +# is in sync with test/antithesis/mzcompose.py. +# +# Image refs in the committed YAML are `${MATERIALIZED_IMAGE}` style +# placeholders (resolved from `.env` at compose-parse time), so the file is +# stable across materialized source changes. A plain diff catches any +# composition (services/ports/env/deps) drift. + +set -euo pipefail + +cd "$(dirname "$0")/../../../.." + +. misc/shlib/shlib.bash + +check_antithesis_compose() { + local committed=test/antithesis/config/docker-compose.yaml + local generated rc=0 + generated=$(mktemp) + + bin/pyactivate test/antithesis/export-compose.py > "$generated" + + if ! diff -u "$committed" "$generated"; then + echo + echo "$committed is out of sync with test/antithesis/mzcompose.py." + echo "Regenerate with:" + echo " bin/pyactivate test/antithesis/export-compose.py > $committed" + rc=1 + fi + + rm -f "$generated" + return $rc +} + +try check_antithesis_compose + +try_status_report diff --git a/ci/test/lint-main/checks/check-pipeline.sh b/ci/test/lint-main/checks/check-pipeline.sh index baed7ae9a717c..95da47ae547c8 100755 --- a/ci/test/lint-main/checks/check-pipeline.sh +++ b/ci/test/lint-main/checks/check-pipeline.sh @@ -28,6 +28,7 @@ unset CI_TEST_IDS unset CI_TEST_SELECTION unset CI_SANITIZER unset CI_COVERAGE_ENABLED +unset CI_ANTITHESIS unset CI_WAITING_FOR_BUILD pids=() diff --git a/misc/python/materialize/mzbuild.py b/misc/python/materialize/mzbuild.py index f653b84abc4a9..08ca9bb43c943 100644 --- a/misc/python/materialize/mzbuild.py +++ b/misc/python/materialize/mzbuild.py @@ -187,6 +187,7 @@ def __init__( sanitizer: Sanitizer, image_registry: str, image_prefix: str, + antithesis: bool = False, ): self.root = root self.arch = arch @@ -196,6 +197,7 @@ def __init__( self.cargo_workspace = cargo.Workspace(root) self.image_registry = image_registry self.image_prefix = image_prefix + self.antithesis = antithesis def build( self, @@ -471,13 +473,21 @@ def __init__(self, rd: RepositoryDetails, path: Path, config: dict[str, Any]): def run(self, prep: Any) -> None: super().run(prep) + source = Path(self.source) for src in self.inputs(): - dst = self.path / self.destination / src + rel = Path(src).relative_to(source) + dst = self.path / self.destination / rel dst.parent.mkdir(parents=True, exist_ok=True) - shutil.copy(self.rd.root / self.source / src, dst) + shutil.copy(self.rd.root / src, dst) def inputs(self) -> set[str]: - return set(git.expand_globs(self.rd.root / self.source, self.matching)) + # Return repo-root-relative paths so that `ResolvedImage.fingerprint` + # (which resolves each input as `rd.root / rel_path`) can lstat them. + source = Path(self.source) + return { + str(source / p) + for p in git.expand_globs(self.rd.root / self.source, self.matching) + } class CargoPreImage(PreImage): @@ -513,6 +523,8 @@ def extra(self) -> str: flags += "optimized" if self.rd.coverage: flags += "coverage" + if self.rd.antithesis: + flags += ["antithesis"] if self.rd.sanitizer != Sanitizer.none: flags += self.rd.sanitizer.value flags.sort() @@ -547,15 +559,14 @@ def generate_cargo_build_command( examples: list[str], features: list[str] | None = None, ) -> list[str]: - rustflags = ( - rustc_flags.coverage - if rd.coverage - else ( - rustc_flags.sanitizer[rd.sanitizer] - if rd.sanitizer != Sanitizer.none - else ["--cfg=tokio_unstable"] - ) - ) + if rd.antithesis: + rustflags = rustc_flags.antithesis + elif rd.coverage: + rustflags = rustc_flags.coverage + elif rd.sanitizer != Sanitizer.none: + rustflags = rustc_flags.sanitizer[rd.sanitizer] + else: + rustflags = ["--cfg=tokio_unstable"] cflags = ( [ f"--target={target(rd.arch)}", @@ -568,8 +579,8 @@ def generate_cargo_build_command( if rd.sanitizer != Sanitizer.none else [] ) - extra_env = ( - { + if rd.sanitizer != Sanitizer.none: + extra_env = { "CFLAGS": " ".join(cflags), "CXXFLAGS": " ".join(cflags), "LDFLAGS": " ".join(cflags), @@ -582,9 +593,8 @@ def generate_cargo_build_command( "PATH": f"/sanshim:/opt/x-tools/{target(rd.arch)}/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", "TSAN_OPTIONS": "report_bugs=0", # build-scripts fail } - if rd.sanitizer != Sanitizer.none - else {} - ) + else: + extra_env = {} cargo_build = rd.build( "build", channel=None, rustflags=rustflags, extra_env=extra_env @@ -672,7 +682,11 @@ def copy(src: Path, relative_dst: Path) -> None: exe_path.parent.mkdir(parents=True, exist_ok=True) shutil.copy(src, exe_path) - if self.strip: + if self.rd.antithesis: + # Antithesis needs full debug symbols for symbolization. + # Don't strip anything. + pass + elif self.strip: # The debug information is large enough that it slows down CI, # since we're packaging these binaries up into Docker images and # shipping them around. @@ -945,6 +959,7 @@ def _build_locked( "ARCH_GCC": str(self.image.rd.arch), "ARCH_GO": self.image.rd.arch.go_str(), "CI_SANITIZER": str(self.image.rd.sanitizer), + "ANTITHESIS": "1" if self.image.rd.antithesis else "", } f = self.write_dockerfile() @@ -1416,6 +1431,7 @@ def __init__( sanitizer: Sanitizer = Sanitizer.none, image_registry: str = image_registry(), image_prefix: str = "", + antithesis: bool = False, ): self.rd = RepositoryDetails( root, @@ -1425,6 +1441,7 @@ def __init__( sanitizer, image_registry, image_prefix, + antithesis=antithesis, ) self.images: dict[str, Image] = {} self.compositions: dict[str, Path] = {} @@ -1517,6 +1534,12 @@ def install_arguments(parser: argparse.ArgumentParser) -> None: default="", help="a prefix to apply to all Docker image names", ) + parser.add_argument( + "--antithesis", + help="whether to enable Antithesis coverage instrumentation", + default=ui.env_is_truthy("CI_ANTITHESIS"), + action="store_true", + ) @classmethod def from_arguments(cls, root: Path, args: argparse.Namespace) -> "Repository": @@ -1544,6 +1567,7 @@ def from_arguments(cls, root: Path, args: argparse.Namespace) -> "Repository": image_registry=args.image_registry, image_prefix=args.image_prefix, arch=args.arch, + antithesis=args.antithesis, ) @property diff --git a/misc/python/materialize/mzcompose/services/clusterd.py b/misc/python/materialize/mzcompose/services/clusterd.py index e07ca490a5355..bffe3ddc3e470 100644 --- a/misc/python/materialize/mzcompose/services/clusterd.py +++ b/misc/python/materialize/mzcompose/services/clusterd.py @@ -28,7 +28,7 @@ def __init__( options: list[str] = [], restart: str = "no", stop_grace_period: str = "120s", - scratch_directory: str = "/scratch", + scratch_directory: str | None = "/scratch", volumes: list[str] = [], workers: int = 1, process_names: list[str] = [], @@ -68,7 +68,13 @@ def __init__( f"CLUSTERD_STORAGE_TIMELY_CONFIG={storage_timely_config}", ] - options = ["clusterd", f"--scratch-directory={scratch_directory}", *options] + # `scratch_directory=None` omits the CLI flag entirely. clusterd + # treats this as "no scratch" — RocksDB switches to its in-memory + # env (`Env::mem_env()`), matching the production deployment shape + # where cluster replicas have no scratch disk attached. + options = ["clusterd", *options] + if scratch_directory is not None: + options.insert(1, f"--scratch-directory={scratch_directory}") config: ServiceConfig = {} diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py index 3318bee4aeaf5..60161d9d1fe74 100644 --- a/misc/python/materialize/parallel_workload/action.py +++ b/misc/python/materialize/parallel_workload/action.py @@ -1987,7 +1987,7 @@ def run(self, exe: Executor) -> bool: return False role_id = exe.db.role_id exe.db.role_id += 1 - role = Role(role_id) + role = Role(role_id, name_scope=exe.db.name_scope) role.create(exe) exe.db.roles.append(role) return True @@ -2026,6 +2026,13 @@ def run(self, exe: Executor) -> bool: class CreateClusterAction(Action): def run(self, exe: Executor) -> bool: + # In existing-cluster mode the Database wraps a pre-existing + # (caller-supplied) cluster, typically bootstrapped by the + # Antithesis compose, and we have no allocator for additional + # clusters tied to other pool members. Skip — the wrapped + # cluster is the entire test surface. + if exe.db.existing_cluster_name is not None: + return False with exe.db.lock: if len(exe.db.clusters) >= MAX_CLUSTERS: return False @@ -2037,6 +2044,7 @@ def run(self, exe: Executor) -> bool: size=self.rng.choice(["1", "2"]), replication_factor=self.rng.choice([1, 2]), introspection_interval="1s", + name_scope=exe.db.name_scope, ) cluster.create(exe) exe.db.clusters.append(cluster) @@ -2170,6 +2178,11 @@ def run(self, exe: Executor) -> bool: with exe.db.lock: # Keep cluster 0 with 1 replica for sources/sinks unmanaged_clusters = [c for c in exe.db.clusters[1:] if not c.managed] + # Pre-existing (pool) clusters: the framework didn't create them + # and won't mutate them. Skip. + unmanaged_clusters = [ + c for c in unmanaged_clusters if not c.is_pool_backed + ] if not unmanaged_clusters: return False cluster = self.rng.choice(unmanaged_clusters) @@ -2193,6 +2206,10 @@ def run(self, exe: Executor) -> bool: with exe.db.lock: # Keep cluster 0 with 1 replica for sources/sinks unmanaged_clusters = [c for c in exe.db.clusters[1:] if not c.managed] + # Pre-existing (pool) clusters: same reasoning as above. Skip. + unmanaged_clusters = [ + c for c in unmanaged_clusters if not c.is_pool_backed + ] if not unmanaged_clusters: return False cluster = self.rng.choice(unmanaged_clusters) diff --git a/misc/python/materialize/parallel_workload/database.py b/misc/python/materialize/parallel_workload/database.py index bad0b4081bbde..7034be033dae8 100644 --- a/misc/python/materialize/parallel_workload/database.py +++ b/misc/python/materialize/parallel_workload/database.py @@ -7,6 +7,7 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. +import dataclasses import random import threading import uuid @@ -885,18 +886,56 @@ def __str__(self) -> str: class Role: role_id: int lock: threading.Lock - - def __init__(self, role_id: int): + # Inserted between `role` and `{role_id}` in the generated name. Empty by + # default (giving the historical `role0` shape). When set, gives + # `role{name_scope}{role_id}` — used by callers like the Antithesis + # parallel-driver where many concurrent Database instances against one + # materialize would otherwise collide on the same `role0..roleN` names. + name_scope: str + + def __init__(self, role_id: int, name_scope: str = ""): self.role_id = role_id self.lock = threading.Lock() + self.name_scope = name_scope def __str__(self) -> str: + # Format: `role[-{name_scope}-]{role_id}`. The bracketed segment is + # only present when seed-scoping is on, so the historical `role0` + # shape (which non-Antithesis consumers parse) is preserved. + # Scoped names need identifier-quoting because dashes aren't valid + # in an unquoted identifier; unscoped names stay bare to match the + # original SQL the framework emits. + if self.name_scope: + return identifier(f"role-{self.name_scope}-{self.role_id}") return f"role{self.role_id}" def create(self, exe: Executor) -> None: exe.execute(f"CREATE ROLE {self}") +@dataclasses.dataclass(frozen=True) +class ClusterdPoolMember: + """Address+config of one external clusterd container the SUT will host an + unmanaged cluster replica on. + + Used by the Antithesis compose bootstrap (see test/antithesis/mzcompose.py) + to build the CREATE CLUSTER REPLICAS clause for each long-lived pool + cluster: one cluster per pool member, with this member as its sole + replica. After bootstrap the framework only references the cluster by + name (`existing_cluster_name`); pool members aren't passed into + `Database` directly. + + Default ports match clusterd's defaults; override per environment. + """ + + host: str + storagectl_port: int = 2100 + computectl_port: int = 2101 + compute_port: int = 2102 + storage_port: int = 2103 + workers: int = 4 + + class ClusterReplica: replica_id: int size: str @@ -904,7 +943,12 @@ class ClusterReplica: rename: int lock: threading.Lock - def __init__(self, replica_id: int, size: str, cluster: "Cluster"): + def __init__( + self, + replica_id: int, + size: str, + cluster: "Cluster", + ): self.replica_id = replica_id self.size = size self.cluster = cluster @@ -935,6 +979,18 @@ class Cluster: introspection_interval: str rename: int lock: threading.Lock + # Inserted between `cluster` and `-{cluster_id}` in the generated name. + # Empty by default (giving the historical `cluster-N` shape). When set, + # gives `cluster{name_scope}-N` — used by callers like the Antithesis + # parallel-driver, where many concurrent Database instances against one + # materialize would otherwise collide on the same `cluster-N` names. + name_scope: str + # When set, the cluster represents a pre-existing cluster the framework + # did not create and must not drop. `name()` returns this literally + # (bypassing cluster_id / rename / name_scope), and `create()` / `drop()` + # are no-ops. The replicas list is empty in this mode — the framework + # doesn't model the pre-existing replicas because it never touches them. + pre_existing_name: str | None def __init__( self, @@ -943,27 +999,66 @@ def __init__( size: str, replication_factor: int, introspection_interval: str, + name_scope: str = "", + pre_existing_name: str | None = None, ): self.cluster_id = cluster_id self.managed = managed self.size = size - self.replicas = [ - ClusterReplica(i, size, self) for i in range(replication_factor) - ] + self.pre_existing_name = pre_existing_name + if pre_existing_name is not None: + # Pre-existing cluster: framework only models its name. The actual + # replicas live in materialize's catalog from the bootstrap step + # that created the cluster (see test/antithesis/mzcompose.py). + # Empty replicas list flips `is_pool_backed` to True, which is + # what the action classes use to skip DDL on this cluster. + self.managed = False + self.replicas = [] + else: + self.replicas = [ + ClusterReplica(i, size, self) for i in range(replication_factor) + ] self.replica_id = len(self.replicas) self.introspection_interval = introspection_interval self.rename = 0 self.lock = threading.Lock() + self.name_scope = name_scope + + @property + def is_pool_backed(self) -> bool: + """True for clusters the framework didn't create itself and won't + mutate (replica count, drop). Currently set when `pre_existing_name` + was passed in. Action classes that would CREATE/ALTER/DROP REPLICA + check this and bail.""" + return self.pre_existing_name is not None def name(self) -> str: + # Pre-existing clusters: name is fixed by the caller (typically a + # pool-cluster the Antithesis compose bootstrapped). Don't apply + # naughtify / name_scope / rename — they don't apply to objects we + # didn't create. + if self.pre_existing_name is not None: + return self.pre_existing_name + # Format: `cluster[-{name_scope}]-{cluster_id}[-{rename}]`. The + # bracketed `-{name_scope}` segment is only present when seed- + # scoping is on, so the historical `cluster-0` / `cluster-0-1` + # shapes (which non-Antithesis consumers parse) are preserved. + prefix = ( + f"cluster-{self.name_scope}" if self.name_scope else "cluster" + ) if self.rename: - return naughtify(f"cluster-{self.cluster_id}-{self.rename}") - return naughtify(f"cluster-{self.cluster_id}") + return naughtify(f"{prefix}-{self.cluster_id}-{self.rename}") + return naughtify(f"{prefix}-{self.cluster_id}") def __str__(self) -> str: return identifier(self.name()) def create(self, exe: Executor) -> None: + # Pre-existing cluster: the SUT already has it (bootstrapped at + # compose-up). The framework's only responsibility for the cluster + # is to use its name; never DDL it. + if self.pre_existing_name is not None: + return query = f"CREATE CLUSTER {self} " if self.managed: query += f"SIZE = '{self.size}', REPLICATION FACTOR = {len(self.replicas)}, INTROSPECTION INTERVAL = '{self.introspection_interval}'" @@ -1025,12 +1120,35 @@ def __init__( complexity: Complexity, scenario: Scenario, naughty_identifiers: bool, + # When True, top-level objects whose names are not schema-qualified + # (clusters and roles) are scoped by the database seed so concurrent + # Database instances against one materialize don't collide. Off by + # default; opted into by the Antithesis parallel-driver where many + # invocations share the SUT. Tables / schemas / views are already + # qualified by DB.name() which includes the seed, so they don't + # need this. + seed_scoped_names: bool = False, + # When set, the Database runs against a pre-existing cluster the + # framework didn't create and won't drop. CreateClusterAction is + # disabled in this mode; the single initial cluster wraps the + # supplied name. Used by the Antithesis parallel-driver to bind + # each invocation to one of the long-lived pool clusters that the + # compose creates at bootstrap (see test/antithesis/mzcompose.py). + existing_cluster_name: str | None = None, ): self.host = host self.ports = ports self.complexity = complexity self.scenario = scenario self.seed = seed + self.seed_scoped_names = seed_scoped_names + self.existing_cluster_name = existing_cluster_name + # The bare seed (no leading/trailing punctuation) used by Cluster / + # Role / etc. to assemble their scoped names. Empty when seed-scoping + # is off, in which case those classes fall back to their historical + # `cluster-N` / `role0` shapes. See Cluster.name() and Role.__str__() + # for how the seed gets inlaid. + self.name_scope = seed if seed_scoped_names else "" set_naughty_identifiers(naughty_identifiers) self.s3_path = 0 @@ -1064,21 +1182,46 @@ def __init__( ) self.views.append(view) self.view_id = len(self.views) - self.roles = [Role(i) for i in range(rng.randint(0, MAX_INITIAL_ROLES))] - self.role_id = len(self.roles) - # At least one storage cluster required for WebhookSources - self.clusters = [ - Cluster( - i, - managed=rng.choice([True, False]), - size=rng.choice( - ["scale=1,workers=1", "scale=1,workers=4", "scale=2,workers=2"] - ), - replication_factor=1, - introspection_interval="1s", - ) - for i in range(rng.randint(1, MAX_INITIAL_CLUSTERS)) + self.roles = [ + Role(i, name_scope=self.name_scope) + for i in range(rng.randint(0, MAX_INITIAL_ROLES)) ] + self.role_id = len(self.roles) + # At least one storage cluster required for WebhookSources. + # In existing-cluster mode the framework's sole initial cluster + # wraps a pre-existing cluster (typically a pool cluster the + # Antithesis compose bootstrapped). The wrapper's create()/drop() + # are no-ops; CreateClusterAction / CreateClusterReplicaAction / + # DropClusterReplicaAction are also disabled for it. + if existing_cluster_name is not None: + self.clusters = [ + Cluster( + 0, + # managed / size / replication_factor are ignored when + # `pre_existing_name` is set — the wrapper never emits + # CREATE CLUSTER. + managed=False, + size="", + replication_factor=1, + introspection_interval="1s", + name_scope=self.name_scope, + pre_existing_name=existing_cluster_name, + ) + ] + else: + self.clusters = [ + Cluster( + i, + managed=rng.choice([True, False]), + size=rng.choice( + ["scale=1,workers=1", "scale=1,workers=4", "scale=2,workers=2"] + ), + replication_factor=1, + introspection_interval="1s", + name_scope=self.name_scope, + ) + for i in range(rng.randint(1, MAX_INITIAL_CLUSTERS)) + ] self.cluster_id = len(self.clusters) self.indexes = set() self.webhook_sources = [ diff --git a/misc/python/materialize/rustc_flags.py b/misc/python/materialize/rustc_flags.py index 6353f83d3b68a..f6aac45573e14 100644 --- a/misc/python/materialize/rustc_flags.py +++ b/misc/python/materialize/rustc_flags.py @@ -25,6 +25,20 @@ ] +# Flags to enable Antithesis coverage instrumentation. +# Requires libvoidstar.so at /usr/lib/ (installed in ci-builder and +# the materialized Docker image). +# See: https://antithesis.com/docs/using_antithesis/sdk/rust/instrumentation/ +antithesis = [ + "-Ccodegen-units=1", + "-Cpasses=sancov-module", + "-Cllvm-args=-sanitizer-coverage-level=3", + "-Cllvm-args=-sanitizer-coverage-trace-pc-guard", + "-Clink-args=-Wl,--build-id", + "-lvoidstar", +] + + class Sanitizer(Enum): """What sanitizer to use""" diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 6704bd79d8b06..3553217de30ed 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -10,6 +10,7 @@ publish = false workspace = true [dependencies] +antithesis_sdk.workspace = true anyhow.workspace = true async-trait.workspace = true base64.workspace = true diff --git a/src/catalog/src/durable/persist.rs b/src/catalog/src/durable/persist.rs index c93830e38d7e3..83d560c98004c 100644 --- a/src/catalog/src/durable/persist.rs +++ b/src/catalog/src/durable/persist.rs @@ -17,6 +17,7 @@ use std::str::FromStr; use std::sync::{Arc, LazyLock}; use std::time::{Duration, Instant}; +use antithesis_sdk::assert_always_greater_than; use async_trait::async_trait; use differential_dataflow::lattice::Lattice; use futures::{FutureExt, StreamExt}; @@ -41,6 +42,7 @@ use mz_repr::Diff; use mz_storage_client::controller::PersistEpoch; use mz_storage_types::StorageDiff; use mz_storage_types::sources::SourceData; +use serde_json::json; use sha2::Digest; use timely::progress::{Antichain, Timestamp as TimelyTimestamp}; use tracing::{debug, info, warn}; @@ -145,6 +147,21 @@ impl FenceableToken { current_token, fence_token, } => { + // The two `assert!` calls below are the natural placement + // for an Antithesis `assert_always!` covering the + // FenceableToken state-machine invariant. They are not + // wrapped today because Materialize does not run multiple + // concurrent environmentd processes against the same + // catalog shard, so the `Fenced` state is unreachable in + // every supported topology — including the Antithesis + // topology in this repo. Wrapping them would create + // assertions Antithesis cannot exercise, which is dead + // weight in coverage reports. If we ever ship multi- + // environmentd (e.g. for a 0DT-preflight Antithesis run), + // convert these to `assert_always!` with distinct + // messages so a violation becomes a reportable property + // failure rather than a panic. See the + // `epoch-fencing-prevents-split-brain` catalog entry. assert!( fence_token > current_token, "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}" @@ -1182,12 +1199,43 @@ impl UnopenedPersistCatalogState { "fencing previous catalogs" ); if matches!(self.mode, Mode::Writable) { + // Snapshot the prior durable epoch so the post-CaS anchor + // below can verify monotonicity. Captured before the write + // because `compare_and_append` may call `sync()` which + // reads new state into `self.fenceable_token`. + let prior_durable_epoch = self + .fenceable_token + .token() + .map(|t| t.epoch.get()) + .unwrap_or(0); match self .compare_and_append(fence_updates.clone(), commit_ts) .await { Ok(upper) => { commit_ts = upper; + // Antithesis anchor for `epoch-fencing-prevents- + // split-brain`: after our fence-token CaS commits, + // the freshly-minted epoch we just persisted must + // be strictly greater than the prior durable + // epoch. A regression here would mean a future + // lower-epoch writer would not be fenced out by + // the write we just made, opening the split-brain + // window the catalog is supposed to close. + let new_epoch = current_fenceable_token + .token() + .expect("freshly minted Unfenced token always has a current_token") + .epoch + .get(); + assert_always_greater_than!( + new_epoch, + prior_durable_epoch, + "catalog fencing: new durable epoch did not strictly increase after fence-token CaS", + &json!({ + "prior_durable_epoch": prior_durable_epoch, + "new_epoch": new_epoch, + }) + ); } Err(CompareAndAppendError::Fence(e)) => return Err(e.into()), Err(e @ CompareAndAppendError::UpperMismatch { .. }) => { diff --git a/src/materialized/ci/Dockerfile b/src/materialized/ci/Dockerfile index 18686251a7b07..e06aaf6bad0cf 100644 --- a/src/materialized/ci/Dockerfile +++ b/src/materialized/ci/Dockerfile @@ -20,6 +20,17 @@ COPY materialized entrypoint.sh /usr/local/bin/ USER root RUN ln -s /usr/local/bin/materialized /usr/local/bin/environmentd \ && ln -s /usr/local/bin/materialized /usr/local/bin/clusterd + +# Antithesis instrumentation (conditional on --build-arg ANTITHESIS=1) +ARG ANTITHESIS +RUN if [ -n "$ANTITHESIS" ]; then \ + curl -sSL https://antithesis.com/assets/instrumentation/libvoidstar.so \ + -o /usr/lib/libvoidstar.so \ + && ldconfig \ + && mkdir -p /symbols \ + && ln -s /usr/local/bin/materialized /symbols/materialized; \ + fi + USER materialize ENTRYPOINT ["tini", "--", "entrypoint.sh"] diff --git a/src/persist-client/Cargo.toml b/src/persist-client/Cargo.toml index 0fad73a172d71..0d2b068964372 100644 --- a/src/persist-client/Cargo.toml +++ b/src/persist-client/Cargo.toml @@ -28,6 +28,7 @@ name = "benches" harness = false [dependencies] +antithesis_sdk.workspace = true anyhow.workspace = true arrayvec.workspace = true arrow.workspace = true diff --git a/src/persist-client/src/internal/apply.rs b/src/persist-client/src/internal/apply.rs index a48982ff77eb9..5085b24b3d6fb 100644 --- a/src/persist-client/src/internal/apply.rs +++ b/src/persist-client/src/internal/apply.rs @@ -15,6 +15,9 @@ use std::ops::ControlFlow::{self, Break, Continue}; use std::sync::Arc; use std::time::Instant; +use antithesis_sdk::assert_always_greater_than; +use serde_json::json; + use crate::cache::{LockingTypedState, StateCache}; use crate::error::{CodecMismatch, InvalidUsage}; use crate::internal::gc::GcReq; @@ -598,6 +601,21 @@ where } } + // Antithesis-reportable form of the broader `persist-cas-monotonicity` + // catalog property: SeqNo must strictly increase across any committed + // state transition. The narrower equality check below (next == seqno) + // still panics on violation and stays in place to catch skip/regress + // in the same call. + assert_always_greater_than!( + new_state.seqno().0, + expected.0, + "persist: state seqno did not strictly increase across CaS apply", + &json!({ + "expected_prev": expected.0, + "computed_next": new_state.seqno().0, + "cmd": cmd.name, + }) + ); assert_eq!( expected.next(), new_state.seqno(), diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index f96d9991511dc..2e7f4f4a37ab7 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -15,6 +15,7 @@ bench = false [dependencies] anyhow.workspace = true +antithesis_sdk.workspace = true async-stream.workspace = true async-trait.workspace = true aws-credential-types.workspace = true diff --git a/src/storage/src/source/kafka.rs b/src/storage/src/source/kafka.rs index 60ab8b8928058..2f6e8d28f960e 100644 --- a/src/storage/src/source/kafka.rs +++ b/src/storage/src/source/kafka.rs @@ -14,6 +14,7 @@ use std::sync::Arc; use std::thread; use std::time::Duration; +use antithesis_sdk::{assert_always, assert_unreachable}; use anyhow::anyhow; use chrono::{DateTime, NaiveDateTime}; use differential_dataflow::{AsCollection, Hashable}; @@ -52,6 +53,7 @@ use rdkafka::statistics::Statistics; use rdkafka::topic_partition_list::Offset; use rdkafka::{ClientContext, Message, TopicPartitionList}; use serde::{Deserialize, Serialize}; +use serde_json::json; use timely::PartialOrder; use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::Pipeline; @@ -273,7 +275,13 @@ fn render_reader<'scope>( .iter() .map(|(_name, kind)| kind.clone()) .collect::>(), - _ => panic!("unexpected source export details: {:?}", details), + _ => { + assert_unreachable!( + "kafka: unexpected source export details", + &json!({"source_id": id.to_string()}) + ); + panic!("unexpected source export details: {:?}", details) + } }; let statistics = config @@ -888,6 +896,11 @@ fn render_reader<'scope>( } } // We can now put them back + assert_always!( + reader.partition_consumers.is_empty(), + "kafka: partition_consumers not drained at shutdown", + &json!({"remaining": reader.partition_consumers.len()}) + ); assert!(reader.partition_consumers.is_empty()); reader.partition_consumers = consumers; @@ -1139,6 +1152,20 @@ impl KafkaSourceReader { // Given the explicit consumer to partition assignment, we should never receive a message // for a partition for which we have no metadata + let partition_known = self + .last_offsets + .get(output_index) + .map(|m| m.contains_key(&partition)) + .unwrap_or(false); + assert_always!( + partition_known, + "kafka: partition missing from last_offsets", + &json!({ + "source_id": self.id.to_string(), + "partition": partition, + "output_index": output_index, + }) + ); assert!( self.last_offsets .get(output_index) @@ -1190,6 +1217,13 @@ fn construct_source_message( ) { let pid = msg.partition(); let Ok(offset) = u64::try_from(msg.offset()) else { + assert_unreachable!( + "kafka: negative offset from non-error message", + &json!({ + "partition": msg.partition(), + "raw_offset": msg.offset(), + }) + ); panic!( "got negative offset ({}) from otherwise non-error'd kafka message", msg.offset() diff --git a/src/storage/src/source/mysql/replication/partitions.rs b/src/storage/src/source/mysql/replication/partitions.rs index c4a6a9ba743bc..7aef48cb3c2c8 100644 --- a/src/storage/src/source/mysql/replication/partitions.rs +++ b/src/storage/src/source/mysql/replication/partitions.rs @@ -11,6 +11,8 @@ use std::collections::BTreeMap; +use antithesis_sdk::assert_unreachable; +use serde_json::json; use timely::progress::Antichain; use uuid::Uuid; @@ -92,6 +94,14 @@ impl GtidReplicationPartitions { // should only see GTID transaction-ids // in a monotonic order for each source, starting at that upper. if active_part.timestamp() > new_part.timestamp() { + assert_unreachable!( + "mysql: BinlogGtidMonotonicityViolation — received out-of-order GTID from multithreaded replica", + &json!({ + "source_uuid": source_id.to_string(), + "active_timestamp": format!("{:?}", active_part.timestamp()), + "new_timestamp": format!("{:?}", new_part.timestamp()), + }) + ); let err = DefiniteError::BinlogGtidMonotonicityViolation( source_id.to_string(), new_part.timestamp().clone(), diff --git a/src/storage/src/source/reclock.rs b/src/storage/src/source/reclock.rs index d4ab5ac4b312b..745115e5dbf72 100644 --- a/src/storage/src/source/reclock.rs +++ b/src/storage/src/source/reclock.rs @@ -10,11 +10,13 @@ /// The `ReclockOperator` observes the progress of a stream that is /// timestamped with some source time `FromTime` and generates bindings that describe how the /// collection should evolve in target time `IntoTime`. +use antithesis_sdk::assert_reachable; use differential_dataflow::consolidation; use differential_dataflow::lattice::Lattice; use mz_persist_client::error::UpperMismatch; use mz_repr::Diff; use mz_storage_client::util::remap_handle::RemapHandle; +use serde_json::json; use timely::order::PartialOrder; use timely::progress::Timestamp; use timely::progress::frontier::{Antichain, AntichainRef, MutableAntichain}; @@ -128,6 +130,12 @@ where upper: self.upper.clone(), }; + // Tracks whether append_batch hit an UpperMismatch during this mint + // invocation. If true and we still exit the while loop normally, + // we've exercised the retry path covered by the catalog property + // `reclock-mint-eventually-succeeds`. + let mut cas_retry_count: u64 = 0; + while *self.upper == [IntoTime::minimum()] || (PartialOrder::less_equal(&self.source_upper.frontier(), &new_from_upper) && PartialOrder::less_than(&self.upper, &new_into_upper) @@ -159,12 +167,28 @@ where let new_batch = match self.append_batch(updates, &new_into_upper).await { Ok(trace_batch) => trace_batch, - Err(UpperMismatch { current, .. }) => self.sync(current.borrow()).await, + Err(UpperMismatch { current, .. }) => { + cas_retry_count = cas_retry_count.saturating_add(1); + self.sync(current.borrow()).await + } }; batch.updates.extend(new_batch.updates); batch.upper = new_batch.upper; } + // Reachability anchor for `reclock-mint-eventually-succeeds`: this + // line fires only when a CaS UpperMismatch was observed and the + // mint loop nonetheless terminated. That's the path the catalog + // wants Antithesis to observe at least once per run; reaching it + // is the signal, so the marker is unconditional `assert_reachable!` + // rather than `assert_sometimes!(true, …)`. + if cas_retry_count > 0 { + assert_reachable!( + "reclock: mint completed after at least one compare_and_append UpperMismatch", + &json!({"cas_retry_count": cas_retry_count}) + ); + } + batch } diff --git a/src/storage/src/source/reclock/compat.rs b/src/storage/src/source/reclock/compat.rs index a260e2dfcf060..607bbc4c5e680 100644 --- a/src/storage/src/source/reclock/compat.rs +++ b/src/storage/src/source/reclock/compat.rs @@ -15,6 +15,7 @@ use std::rc::Rc; use std::sync::Arc; use std::time::Duration; +use antithesis_sdk::assert_unreachable; use anyhow::Context; use differential_dataflow::lattice::Lattice; use fail::fail_point; @@ -33,6 +34,7 @@ use mz_storage_client::util::remap_handle::{RemapHandle, RemapHandleReader}; use mz_storage_types::StorageDiff; use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::sources::{SourceData, SourceTimestamp}; +use serde_json::json; use timely::order::{PartialOrder, TotalOrder}; use timely::progress::Timestamp; use timely::progress::frontier::Antichain; @@ -303,7 +305,13 @@ where *self.shared_write_frontier.borrow_mut() = new_upper; return result; } - Err(invalid_use) => panic!("compare_and_append failed: {invalid_use}"), + Err(invalid_use) => { + assert_unreachable!( + "reclock: compare_and_append InvalidUsage", + &json!({"error": invalid_use.to_string()}) + ); + panic!("compare_and_append failed: {invalid_use}") + } } } diff --git a/src/storage/src/upsert/types.rs b/src/storage/src/upsert/types.rs index 2bf8270aa2c95..57a4b85033563 100644 --- a/src/storage/src/upsert/types.rs +++ b/src/storage/src/upsert/types.rs @@ -88,11 +88,13 @@ use std::num::Wrapping; use std::sync::Arc; use std::time::Instant; +use antithesis_sdk::{assert_always, assert_unreachable}; use bincode::Options; use itertools::Itertools; use mz_ore::error::ErrorExt; use mz_repr::{Diff, GlobalId}; use serde::{Serialize, de::DeserializeOwned}; +use serde_json::json; use crate::metrics::upsert::{UpsertMetrics, UpsertSharedMetrics}; use crate::statistics::SourceStatistics; @@ -294,6 +296,10 @@ impl StateValue { match self { Self::Value(value) => value, Self::Consolidating(_) => { + assert_unreachable!( + "upsert: into_decoded on Consolidating StateValue", + &json!({"accessor": "into_decoded"}) + ); panic!("called `into_decoded without calling `ensure_decoded`") } } @@ -366,6 +372,10 @@ impl StateValue { }), }), StateValue::Consolidating(_) => { + assert_unreachable!( + "upsert: into_provisional_value on Consolidating StateValue", + &json!({"accessor": "into_provisional_value"}) + ); panic!("called `into_provisional_value` without calling `ensure_decoded`") } } @@ -400,6 +410,10 @@ impl StateValue { }), }), StateValue::Consolidating(_) => { + assert_unreachable!( + "upsert: into_provisional_tombstone on Consolidating StateValue", + &json!({"accessor": "into_provisional_tombstone"}) + ); panic!("called `into_provisional_tombstone` without calling `ensure_decoded`") } } @@ -413,6 +427,10 @@ impl StateValue { _ => None, }, Self::Consolidating(_) => { + assert_unreachable!( + "upsert: provisional_order on Consolidating StateValue", + &json!({"accessor": "provisional_order"}) + ); panic!("called `provisional_order` without calling `ensure_decoded`") } } @@ -427,6 +445,10 @@ impl StateValue { _ => value.finalized.as_ref(), }, Self::Consolidating(_) => { + assert_unreachable!( + "upsert: provisional_value_ref on Consolidating StateValue", + &json!({"accessor": "provisional_value_ref"}) + ); panic!("called `provisional_value_ref` without calling `ensure_decoded`") } } @@ -437,6 +459,10 @@ impl StateValue { match self { Self::Value(v) => v.finalized, Self::Consolidating(_) => { + assert_unreachable!( + "upsert: into_finalized_value on Consolidating StateValue", + &json!({"accessor": "into_finalized_value"}) + ); panic!("called `into_finalized_value` without calling `ensure_decoded`") } } @@ -577,7 +603,13 @@ impl StateValue { *acc ^= val; } } - _ => panic!("`merge_update_state` called with non-consolidating state"), + _ => { + assert_unreachable!( + "upsert: merge_update_state on non-Consolidating state", + &json!({"site": "merge_update_state"}) + ); + panic!("`merge_update_state` called with non-consolidating state") + } } } @@ -618,29 +650,61 @@ impl StateValue { }) .expect("invalid upsert state"); // Truncation is fine (using `as`) as this is just a checksum + let want_checksum = seahash::hash(value) as i64; + assert_always!( + consolidating.checksum_sum.0 == want_checksum, + "upsert: consolidating checksum_sum mismatch (diff_sum=1)", + &json!({ + "source_id": source_id.to_string(), + "checksum_sum": consolidating.checksum_sum.0, + "expected_seahash": want_checksum, + }) + ); assert_eq!( - consolidating.checksum_sum.0, - // Hash the value, not the full buffer, which may have extra 0's - seahash::hash(value) as i64, + consolidating.checksum_sum.0, want_checksum, "invalid upsert state: checksum_sum does not match, state: {}, {}", - consolidating, - source_id, + consolidating, source_id, ); *self = Self::finalized_value(bincode_opts.deserialize(value).unwrap()); } 0 => { + assert_always!( + consolidating.len_sum.0 == 0, + "upsert: consolidating len_sum nonzero (diff_sum=0)", + &json!({ + "source_id": source_id.to_string(), + "len_sum": consolidating.len_sum.0, + }) + ); assert_eq!( consolidating.len_sum.0, 0, "invalid upsert state: len_sum is non-0, state: {}, {}", consolidating, source_id, ); + assert_always!( + consolidating.checksum_sum.0 == 0, + "upsert: consolidating checksum_sum nonzero (diff_sum=0)", + &json!({ + "source_id": source_id.to_string(), + "checksum_sum": consolidating.checksum_sum.0, + }) + ); assert_eq!( consolidating.checksum_sum.0, 0, "invalid upsert state: checksum_sum is non-0, state: {}, {}", consolidating, source_id, ); + let all_zero = consolidating.value_xor.iter().all(|&x| x == 0); + assert_always!( + all_zero, + "upsert: consolidating value_xor nonzero (diff_sum=0)", + &json!({ + "source_id": source_id.to_string(), + "value_xor_len": consolidating.value_xor.len(), + }) + ); assert!( - consolidating.value_xor.iter().all(|&x| x == 0), + all_zero, "invalid upsert state: value_xor not all 0s with 0 diff. \ Non-zero positions: {:?}, state: {}, {}", consolidating @@ -669,6 +733,15 @@ impl StateValue { ), Err(_) => "Err(UpsertValueError)".to_string(), }); + assert_unreachable!( + "upsert: consolidating diff_sum not in {0,1}", + &json!({ + "source_id": source_id.to_string(), + "diff_sum": other, + "value_byte_len": value_byte_len, + "decodable": decode_ok, + }) + ); panic!( "invalid upsert state: non 0/1 diff_sum: {}, state: {}, {}, \ key: {:?}, value_byte_len: {:?}, decodable: {:?}", @@ -1059,6 +1132,10 @@ where }); if completed && self.snapshot_completed { + assert_unreachable!( + "upsert: snapshot completion called twice", + &json!({"site": "consolidate_chunk"}) + ); panic!("attempted completion of already completed upsert snapshot") } diff --git a/src/storage/src/upsert_continual_feedback.rs b/src/storage/src/upsert_continual_feedback.rs index a4669d3a80099..5fb562a7aa08a 100644 --- a/src/storage/src/upsert_continual_feedback.rs +++ b/src/storage/src/upsert_continual_feedback.rs @@ -14,6 +14,7 @@ use std::cmp::Reverse; use std::fmt::Debug; use std::sync::Arc; +use antithesis_sdk::{assert_always, assert_unreachable}; use differential_dataflow::hashable::Hashable; use differential_dataflow::{AsCollection, VecCollection}; use indexmap::map::Entry; @@ -23,6 +24,7 @@ use mz_storage_types::errors::{DataflowError, EnvelopeError}; use mz_timely_util::builder_async::{ Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, }; +use serde_json::json; use std::convert::Infallible; use timely::container::CapacityContainerBuilder; use timely::dataflow::StreamVec; @@ -623,6 +625,11 @@ fn stage_input( } stash.extend(data.drain(..).map(|((key, value, order), time, diff)| { + assert_always!( + diff.is_positive(), + "upsert: input diff positive (cf v1)", + &json!({"diff": diff.into_inner()}) + ); assert!(diff.is_positive(), "invalid upsert input"); (time, key, Reverse(order), value) })); @@ -797,6 +804,10 @@ where let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) { command_state } else { + assert_unreachable!( + "upsert: key missing from commands_state (cf v1)", + &json!({"source_id": source_config.id.to_string()}) + ); panic!("key missing from commands_state"); }; diff --git a/src/storage/src/upsert_continual_feedback_v2.rs b/src/storage/src/upsert_continual_feedback_v2.rs index 32de9e3770086..8560ffd614603 100644 --- a/src/storage/src/upsert_continual_feedback_v2.rs +++ b/src/storage/src/upsert_continual_feedback_v2.rs @@ -65,6 +65,7 @@ use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; +use antithesis_sdk::{assert_always, assert_unreachable}; use differential_dataflow::difference::{IsZero, Semigroup}; use differential_dataflow::hashable::Hashable; use differential_dataflow::lattice::Lattice; @@ -81,6 +82,7 @@ use mz_storage_types::errors::{DataflowError, EnvelopeError}; use mz_timely_util::builder_async::{ Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, }; +use serde_json::json; use std::convert::Infallible; use timely::container::CapacityContainerBuilder; use timely::dataflow::StreamVec; @@ -312,6 +314,11 @@ where AsyncEvent::Data(cap, data) => { let mut pushed_any = false; for ((key, value, from_time), ts, diff) in data { + assert_always!( + diff.is_positive(), + "upsert: input diff positive (cf v2)", + &json!({"diff": diff.into_inner()}) + ); assert!(diff.is_positive(), "invalid upsert input"); if PartialOrder::less_equal(&input_upper, &resume_upper) && !resume_upper.less_equal(&ts) @@ -480,7 +487,13 @@ where (Some(a), Some(b)) => std::cmp::min(a, b).clone(), (Some(a), None) => a.clone(), (None, Some(b)) => b.clone(), - (None, None) => unreachable!(), + (None, None) => { + assert_unreachable!( + "upsert: cf v2 join produced (None, None)", + &json!({"site": "min_ts join"}) + ); + unreachable!() + } }; cap.downgrade(&min_ts); } else { diff --git a/test/antithesis/AGENTS.md b/test/antithesis/AGENTS.md new file mode 100644 index 0000000000000..b93956df1ea94 --- /dev/null +++ b/test/antithesis/AGENTS.md @@ -0,0 +1,21 @@ +Files relevant to running Materialize under Antithesis. + +Use the `antithesis-setup` skill to scaffold and manage this directory. Use the `antithesis-research` skill to analyze the system and build a property catalog. Use the `antithesis-workload` skill to implement assertions and test commands. + +**mzcompose.py** +Source of truth for the Antithesis topology. Standard mzcompose composition: services (`postgres-metadata`, `minio`, `redpanda`, `materialized`, `workload`), dependencies, env, ports. The generated `config/docker-compose.yaml` is derived from this. + +**export-compose.py** +Renders `mzcompose.py` into a flat docker-compose YAML that Antithesis can consume. Images are emitted as `ghcr.io/materializeinc/materialize/:mzbuild-` refs that Antithesis pulls directly from public GHCR. + +**workload/** +Mzbuild image (`antithesis-workload`) for the Python test driver. Dockerfile, entrypoint, and test-template scripts (`test/*.sh`) live here. Test command files must be prefixed with one of `parallel_driver_`, `singleton_driver_`, `serial_driver_`, `first_`, `eventually_`, `finally_`, `anytime_`; files prefixed with `helper_` are ignored by Test Composer. + +**config/** +Mzbuild image (`antithesis-config`) — a `FROM scratch` container holding the generated `docker-compose.yaml`. This is the image Antithesis points at to bring up the environment. + +**scratchbook/** +Antithesis scratchbook: system analysis, property catalog, topology plans, per-property evidence files (in `scratchbook/properties/`), property relationship maps, persistent integration notes. Keep up to date as Antithesis-related decisions change. + +**setup-complete.sh** (in `workload/`) +Inject this script into a Dockerfile to notify Antithesis that setup is complete. Should only run once the system under test is ready for testing — Antithesis will not run test commands until it receives this event. diff --git a/test/antithesis/Makefile b/test/antithesis/Makefile new file mode 100644 index 0000000000000..db16f58c565a3 --- /dev/null +++ b/test/antithesis/Makefile @@ -0,0 +1,163 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Local-dev helper for the Materialize Antithesis harness. +# +# Antithesis images ship via the standard mzbuild → GHCR flow; CI publishes +# the same images CI publishes for everything else, fingerprint-tagged with +# `mzbuild-`. Locally, we just acquire the mzbuild images, regenerate +# the compose YAML, and let `docker compose` find them by their canonical +# spec. +# +# Targets: +# make build # regenerate compose YAML, acquire local mzbuild images +# make up # build + bring up the stack +# make down # tear down (preserves volumes) +# make smoke # build + up + smoke test +# make test # smoke test against a running stack +# make clean # tear down + remove volumes +# +# make build-local # build for local dev (no --antithesis flavor) +# make up-local # build-local + bring up the stack +# +# The `-local` targets are for validating the workload + drivers without +# the Antithesis platform. They build the plain (non-antithesis-flavored) +# images, which (a) don't need libvoidstar.so locally and (b) cover all +# images including new transitive deps (e.g. testdrive) that CI doesn't +# yet publish under the antithesis flavor. + +SHELL := /usr/bin/env bash +.SHELLFLAGS := -eu -o pipefail -c + +PROJECT := materialize-antithesis +REPO_ROOT := $(realpath $(dir $(lastword $(MAKEFILE_LIST)))/../..) + +# Pick podman if available, else docker. +ifndef RUNTIME + RUNTIME := $(shell command -v podman >/dev/null 2>&1 && echo podman || (command -v docker >/dev/null 2>&1 && echo docker || echo none)) +endif +ifeq ($(RUNTIME),none) + $(error neither podman nor docker found in PATH; set RUNTIME=docker or install podman) +endif +ifeq ($(RUNTIME),podman) + export MZ_DEV_CI_BUILDER_RUNTIME := podman +endif + +COMPOSE_FILE := $(REPO_ROOT)/test/antithesis/config/docker-compose.yaml +ENV_FILE := $(REPO_ROOT)/test/antithesis/config/.env +COMPOSE := $(RUNTIME) compose -p $(PROJECT) --env-file $(ENV_FILE) -f $(COMPOSE_FILE) +PSQL := $(COMPOSE) exec materialized psql -h localhost -p 6875 -U materialize + +# mzbuild images we need built locally. Third-party images (postgres, minio, +# kafka, …) are pulled by `docker compose` from their upstream registries. +MZBUILD_IMAGES := materialized antithesis-workload + +# --------------------------------------------------------------------------- +# Build +# --------------------------------------------------------------------------- +.PHONY: build export-compose export-env acquire-images + +build: export-compose export-env acquire-images + +export-compose: + cd $(REPO_ROOT) && bin/pyactivate test/antithesis/export-compose.py > $(COMPOSE_FILE) + @echo "Wrote $(COMPOSE_FILE)" + +export-env: + cd $(REPO_ROOT) && bin/pyactivate test/antithesis/export-env.py > $(ENV_FILE) + @echo "Wrote $(ENV_FILE)" + +acquire-images: + @# Force `--arch x86_64` to match what `export-env.py` writes into the + @# `.env` file. The Antithesis platform itself runs amd64-only — both + @# `export-env.py` and `export-compose.py` pin `arch=Arch.X86_64` — so + @# the fingerprints baked into the compose YAML are always for x86_64. + @# Without this flag, `bin/mzimage acquire` defaults to the host arch + @# (aarch64 on Apple Silicon), producing a different fingerprint than + @# the one the compose YAML references; the resulting image doesn't + @# match the compose's `image:` tag and the local stack fails to pull. + @# Also: aarch64 cross-compile of `--antithesis` builds needs an aarch64 + @# `libvoidstar.so` which isn't published — x86_64 is the only flavor + @# Antithesis ships. + @for image in $(MZBUILD_IMAGES); do \ + echo "--- Acquiring $$image (--antithesis --arch x86_64)"; \ + cd $(REPO_ROOT) && bin/mzimage acquire "$$image" --antithesis --arch x86_64; \ + done + +# --------------------------------------------------------------------------- +# Local (non-antithesis) targets +# --------------------------------------------------------------------------- +# +# Build and run the same compose topology without the Antithesis flavor. +# Used for validating the workload + drivers locally before pushing to CI. +# Plain (non-antithesis) mzbuild images: +# * don't need libvoidstar.so installed in the cross-sysroot +# * cover all transitive deps (e.g. testdrive), unlike the antithesis +# flavor which CI only publishes for materialized + antithesis-workload +# + antithesis-config. +# The fault-orchestrator service is a no-op outside Antithesis (its +# pause_faults.sh exits cleanly when ANTITHESIS_STOP_FAULTS is unset), so +# the topology behaves like a regular docker-compose stack. + +.PHONY: build-local export-compose-local export-env-local acquire-images-local up-local + +build-local: export-compose-local export-env-local acquire-images-local + +export-compose-local: + cd $(REPO_ROOT) && bin/pyactivate test/antithesis/export-compose.py \ + --no-antithesis > $(COMPOSE_FILE) + @echo "Wrote $(COMPOSE_FILE) (host arch)" + +export-env-local: + cd $(REPO_ROOT) && bin/pyactivate test/antithesis/export-env.py \ + --no-antithesis > $(ENV_FILE) + @echo "Wrote $(ENV_FILE) (non-antithesis)" + +acquire-images-local: + @# Use the host arch (no `--arch` flag) so the resulting workload image + @# runs natively. On Apple Silicon, running the x86_64 testdrive binary + @# under Docker's rosetta/qemu emulation segfaults inside the + @# foundationdb client init — native aarch64 sidesteps that entirely. + @# `export-env.py --no-antithesis` mirrors the same logic and emits + @# host-arch fingerprints to the .env file. + @for image in $(MZBUILD_IMAGES); do \ + echo "--- Acquiring $$image (plain, host arch)"; \ + cd $(REPO_ROOT) && bin/mzimage acquire "$$image"; \ + done + +up-local: build-local + $(COMPOSE) up -d + +# --------------------------------------------------------------------------- +# Up / Down +# --------------------------------------------------------------------------- +.PHONY: up down clean + +up: build + $(COMPOSE) up -d + +down: + $(COMPOSE) down + +clean: down + $(COMPOSE) down -v --remove-orphans 2>/dev/null || true + +# --------------------------------------------------------------------------- +# Test +# --------------------------------------------------------------------------- +.PHONY: test smoke + +test: + $(PSQL) -c "CREATE TABLE IF NOT EXISTS smoke_test (k INT, v TEXT)" + $(PSQL) -c "INSERT INTO smoke_test VALUES (1, 'hello'), (2, 'world')" + $(PSQL) -c "SELECT * FROM smoke_test ORDER BY k" + $(PSQL) -c "DROP TABLE smoke_test" + +smoke: up test + @echo "[smoke] passed" diff --git a/test/antithesis/config/.env b/test/antithesis/config/.env new file mode 100644 index 0000000000000..341a92886750c --- /dev/null +++ b/test/antithesis/config/.env @@ -0,0 +1,5 @@ +# GENERATED FILE — do not edit. Regenerate via: +# bin/pyactivate test/antithesis/export-env.py > test/antithesis/config/.env +# Consumed by test/antithesis/config/docker-compose.yaml at compose-parse time. +MATERIALIZED_IMAGE=ghcr.io/materializeinc/materialize/materialized:mzbuild-KCOH6PR3STRLZAH6O3VVGMJYMH3H3ZTJ +ANTITHESIS_WORKLOAD_IMAGE=ghcr.io/materializeinc/materialize/antithesis-workload:mzbuild-SSVB2N7XZF62H4MIKAY3G3JCPTPDZ3AX diff --git a/test/antithesis/config/Dockerfile b/test/antithesis/config/Dockerfile new file mode 100644 index 0000000000000..32fcb07e30460 --- /dev/null +++ b/test/antithesis/config/Dockerfile @@ -0,0 +1,18 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Antithesis config image: a FROM-scratch tarball holding the +# docker-compose.yaml that Antithesis uses to bring up the system under +# test, plus a `.env` mapping `${MATERIALIZED_IMAGE}` / +# `${ANTITHESIS_WORKLOAD_IMAGE}` to current mzbuild fingerprints. Compose +# loads `.env` automatically at parse time. See mzbuild.yml for +# regeneration instructions. + +FROM scratch +COPY docker-compose.yaml .env / diff --git a/test/antithesis/config/docker-compose.yaml b/test/antithesis/config/docker-compose.yaml new file mode 100644 index 0000000000000..d070d283b67ec --- /dev/null +++ b/test/antithesis/config/docker-compose.yaml @@ -0,0 +1,907 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# GENERATED FILE — do not edit. Regenerate via: +# bin/pyactivate test/antithesis/export-compose.py > test/antithesis/config/docker-compose.yaml +# Source of truth: test/antithesis/mzcompose.py. + +services: + postgres-metadata: + command: + - postgres + - -c + - wal_level=logical + - -c + - max_wal_senders=100 + - -c + - max_replication_slots=100 + - -c + - max_connections=5000 + ports: + - '26257' + environment: + - POSTGRESDB=postgres + - POSTGRES_PASSWORD=postgres + - PGPORT=26257 + - POSTGRES_HOST_AUTH_METHOD=trust + healthcheck: + test: + - CMD + - pg_isready + - -U + - postgres + interval: 1s + start_period: 30s + restart: 'no' + platform: linux/amd64 + image: postgres:17.7 + entrypoint: + - sh + - -c + - 'cat <<''SQL'' > /docker-entrypoint-initdb.d/z_setup_materialize.sql + + CREATE ROLE root WITH LOGIN PASSWORD ''root''; + + CREATE DATABASE root; + + GRANT ALL PRIVILEGES ON DATABASE root TO root; + + \c root + + CREATE SCHEMA IF NOT EXISTS consensus AUTHORIZATION root; + + CREATE SCHEMA IF NOT EXISTS adapter AUTHORIZATION root; + + CREATE SCHEMA IF NOT EXISTS storage AUTHORIZATION root; + + CREATE SCHEMA IF NOT EXISTS tsoracle AUTHORIZATION root; + + GRANT ALL PRIVILEGES ON SCHEMA public TO root; + + SQL + + exec docker-entrypoint.sh "$$@"' + - -- + container_name: postgres-metadata + hostname: postgres-metadata + networks: + - antithesis-net + minio: + entrypoint: + - sh + - -c + command: + - mkdir -p /data/persist && minio server /data --console-address :9001 + ports: + - 9000 + - 9001 + environment: + - MINIO_STORAGE_CLASS_STANDARD=EC:0 + - MINIO_HEAL_DISABLE=on + - MINIO_DISK_WATERMARK_LOW=1 + - MINIO_DISK_WATERMARK_HIGH=1 + healthcheck: + test: + - CMD + - curl + - --fail + - http://localhost:9000/minio/health/live + timeout: 5s + interval: 1s + start_period: 30s + platform: linux/amd64 + image: minio/minio:latest + container_name: minio + hostname: minio + networks: + - antithesis-net + zookeeper: + image: confluentinc/cp-zookeeper:7.9.4 + ports: + - 2181 + environment: + - ZOOKEEPER_CLIENT_PORT=2181 + healthcheck: + test: + - CMD + - nc + - -z + - localhost + - '2181' + interval: 1s + start_period: 120s + platform: linux/amd64 + container_name: zookeeper + hostname: zookeeper + networks: + - antithesis-net + kafka: + image: confluentinc/cp-kafka:7.9.4 + ports: + - '9092' + environment: + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false + - KAFKA_MIN_INSYNC_REPLICAS=1 + - KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1 + - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 + - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 + - KAFKA_MESSAGE_MAX_BYTES=15728640 + - KAFKA_REPLICA_FETCH_MAX_BYTES=15728640 + - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=100 + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 + - KAFKA_BROKER_ID=1 + - KAFKA_AUTO_CREATE_TOPICS_ENABLE=True + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 + depends_on: + zookeeper: + condition: service_healthy + healthcheck: + test: + - CMD + - nc + - -z + - localhost + - '9092' + interval: 1s + start_period: 120s + platform: linux/amd64 + container_name: kafka + hostname: kafka + networks: + - antithesis-net + schema-registry: + image: confluentinc/cp-schema-registry:7.9.4 + ports: + - 8081 + networks: + - antithesis-net + environment: + - SCHEMA_REGISTRY_KAFKASTORE_TIMEOUT_MS=10000 + - SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR=1 + - SCHEMA_REGISTRY_HOST_NAME=schema-registry + - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://kafka:9092 + command: + - /bin/bash + - -c + - . /etc/confluent/docker/bash-config && . /etc/confluent/docker/mesos-setup.sh + && . /etc/confluent/docker/apply-mesos-overrides && /etc/confluent/docker/configure + && exec /etc/confluent/docker/launch + depends_on: + kafka: + condition: service_healthy + healthcheck: + test: + - CMD + - curl + - -fu + - materialize:sekurity + - localhost:8081 + interval: 1s + start_period: 120s + platform: linux/amd64 + container_name: schema-registry + hostname: schema-registry + mysql: + init: true + ports: + - 3306 + environment: + - MYSQL_ROOT_PASSWORD=p@ssw0rd + command: + - --secure-file-priv=/var/lib/mysql-files + - --log-bin=mysql-bin + - --gtid_mode=ON + - --enforce_gtid_consistency=ON + - --binlog-format=row + - --binlog-row-image=full + - --binlog-row-metadata=full + - --server-id=1 + - --max-connections=500 + healthcheck: + test: + - CMD + - mysqladmin + - ping + - --password=p@ssw0rd + - --protocol=TCP + interval: 1s + start_period: 180s + volumes: + - mysqldata_primary:/var/lib/mysql + - mydata:/var/lib/mysql-files + image: mysql:9.5.0 + platform: linux/amd64 + container_name: mysql + hostname: mysql + networks: + - antithesis-net + mysql-replica: + init: true + ports: + - 3306 + environment: + - MYSQL_ROOT_PASSWORD=p@ssw0rd + command: + - --secure-file-priv=/var/lib/mysql-files + - --log-bin=mysql-bin + - --gtid_mode=ON + - --enforce_gtid_consistency=ON + - --binlog-format=row + - --binlog-row-image=full + - --binlog-row-metadata=full + - --server-id=2 + - --max-connections=500 + - --log-slave-updates + - --skip-replica-start + - --replica_parallel_workers=4 + - --replica_preserve_commit_order=ON + healthcheck: + test: + - CMD + - mysqladmin + - ping + - --password=p@ssw0rd + - --protocol=TCP + interval: 1s + start_period: 180s + volumes: + - mysqldata_replica:/var/lib/mysql + - mydata:/var/lib/mysql-files + image: mysql:9.5.0 + platform: linux/amd64 + container_name: mysql-replica + hostname: mysql-replica + networks: + - antithesis-net + postgres-source: + command: + - postgres + - -c + - wal_level=logical + - -c + - max_wal_senders=100 + - -c + - max_replication_slots=100 + - -c + - max_connections=5000 + - -c + - max_slot_wal_keep_size=64MB + ports: + - '5432' + environment: + - POSTGRESDB=postgres + - POSTGRES_PASSWORD=postgres + - POSTGRES_HOST_AUTH_METHOD=trust + healthcheck: + test: + - CMD + - pg_isready + - -U + - postgres + interval: 1s + start_period: 30s + restart: 'no' + platform: linux/amd64 + image: postgres:17.7 + container_name: postgres-source + hostname: postgres-source + networks: + antithesis-net: + aliases: + - postgres + clusterd1: + entrypoint: + - tini + - -- + command: + - clusterd + ports: + - 2100 + - 2101 + - 6878 + environment: + - CLUSTERD_GRPC_HOST=clusterd1 + - CLUSTERD_USE_CTP=true + - MZ_SOFT_ASSERTIONS=1 + - CLUSTERD_STORAGE_CONTROLLER_LISTEN_ADDR=0.0.0.0:2100 + - CLUSTERD_COMPUTE_CONTROLLER_LISTEN_ADDR=0.0.0.0:2101 + - CLUSTERD_INTERNAL_HTTP_LISTEN_ADDR=0.0.0.0:6878 + - CLUSTERD_SECRETS_READER=local-file + - CLUSTERD_SECRETS_READER_LOCAL_FILE_DIR=/mzdata/secrets + - LD_PRELOAD=libeatmydata.so + - CLUSTERD_PERSIST_PUBSUB_URL=http://materialized:6879 + - CLUSTERD_ENVIRONMENT_ID=mzcompose-us-east-1-00000000-0000-0000-0000-000000000000-0 + - CLUSTERD_PROCESS=0 + - 'CLUSTERD_COMPUTE_TIMELY_CONFIG={"workers": 4, "process": 0, "addresses": ["clusterd1:2102"], + "arrangement_exert_proportionality": 16, "enable_zero_copy": false, "enable_zero_copy_lgalloc": + false, "zero_copy_limit": null}' + - 'CLUSTERD_STORAGE_TIMELY_CONFIG={"workers": 4, "process": 0, "addresses": ["clusterd1:2103"], + "arrangement_exert_proportionality": 1337, "enable_zero_copy": false, "enable_zero_copy_lgalloc": + false, "zero_copy_limit": null}' + volumes: + - mzdata:/mzdata + - mydata:/var/lib/mysql-files + - tmp:/share/tmp + - scratch:/scratch + restart: 'no' + stop_grace_period: 120s + platform: linux/amd64 + image: ${MATERIALIZED_IMAGE} + pull_policy: never + container_name: clusterd1 + hostname: clusterd1 + networks: + - antithesis-net + clusterd2: + entrypoint: + - tini + - -- + command: + - clusterd + ports: + - 2100 + - 2101 + - 6878 + environment: + - CLUSTERD_GRPC_HOST=clusterd2 + - CLUSTERD_USE_CTP=true + - MZ_SOFT_ASSERTIONS=1 + - CLUSTERD_STORAGE_CONTROLLER_LISTEN_ADDR=0.0.0.0:2100 + - CLUSTERD_COMPUTE_CONTROLLER_LISTEN_ADDR=0.0.0.0:2101 + - CLUSTERD_INTERNAL_HTTP_LISTEN_ADDR=0.0.0.0:6878 + - CLUSTERD_SECRETS_READER=local-file + - CLUSTERD_SECRETS_READER_LOCAL_FILE_DIR=/mzdata/secrets + - LD_PRELOAD=libeatmydata.so + - CLUSTERD_PERSIST_PUBSUB_URL=http://materialized:6879 + - CLUSTERD_ENVIRONMENT_ID=mzcompose-us-east-1-00000000-0000-0000-0000-000000000000-0 + - CLUSTERD_PROCESS=0 + - 'CLUSTERD_COMPUTE_TIMELY_CONFIG={"workers": 4, "process": 0, "addresses": ["clusterd2:2102"], + "arrangement_exert_proportionality": 16, "enable_zero_copy": false, "enable_zero_copy_lgalloc": + false, "zero_copy_limit": null}' + - 'CLUSTERD_STORAGE_TIMELY_CONFIG={"workers": 4, "process": 0, "addresses": ["clusterd2:2103"], + "arrangement_exert_proportionality": 1337, "enable_zero_copy": false, "enable_zero_copy_lgalloc": + false, "zero_copy_limit": null}' + volumes: + - mzdata:/mzdata + - mydata:/var/lib/mysql-files + - tmp:/share/tmp + - scratch:/scratch + restart: 'no' + stop_grace_period: 120s + platform: linux/amd64 + image: ${MATERIALIZED_IMAGE} + pull_policy: never + container_name: clusterd2 + hostname: clusterd2 + networks: + - antithesis-net + clusterd-pool-0: + entrypoint: + - tini + - -- + command: + - clusterd + ports: + - 2100 + - 2101 + - 6878 + environment: + - CLUSTERD_GRPC_HOST=clusterd-pool-0 + - CLUSTERD_USE_CTP=true + - MZ_SOFT_ASSERTIONS=1 + - CLUSTERD_STORAGE_CONTROLLER_LISTEN_ADDR=0.0.0.0:2100 + - CLUSTERD_COMPUTE_CONTROLLER_LISTEN_ADDR=0.0.0.0:2101 + - CLUSTERD_INTERNAL_HTTP_LISTEN_ADDR=0.0.0.0:6878 + - CLUSTERD_SECRETS_READER=local-file + - CLUSTERD_SECRETS_READER_LOCAL_FILE_DIR=/mzdata/secrets + - LD_PRELOAD=libeatmydata.so + - CLUSTERD_PERSIST_PUBSUB_URL=http://materialized:6879 + - CLUSTERD_ENVIRONMENT_ID=mzcompose-us-east-1-00000000-0000-0000-0000-000000000000-0 + - CLUSTERD_PROCESS=0 + - 'CLUSTERD_COMPUTE_TIMELY_CONFIG={"workers": 4, "process": 0, "addresses": ["clusterd-pool-0:2102"], + "arrangement_exert_proportionality": 16, "enable_zero_copy": false, "enable_zero_copy_lgalloc": + false, "zero_copy_limit": null}' + - 'CLUSTERD_STORAGE_TIMELY_CONFIG={"workers": 4, "process": 0, "addresses": ["clusterd-pool-0:2103"], + "arrangement_exert_proportionality": 1337, "enable_zero_copy": false, "enable_zero_copy_lgalloc": + false, "zero_copy_limit": null}' + volumes: + - mzdata:/mzdata + - mydata:/var/lib/mysql-files + - tmp:/share/tmp + - scratch:/scratch + restart: 'no' + stop_grace_period: 120s + platform: linux/amd64 + image: ${MATERIALIZED_IMAGE} + pull_policy: never + container_name: clusterd-pool-0 + hostname: clusterd-pool-0 + networks: + - antithesis-net + clusterd-pool-1: + entrypoint: + - tini + - -- + command: + - clusterd + ports: + - 2100 + - 2101 + - 6878 + environment: + - CLUSTERD_GRPC_HOST=clusterd-pool-1 + - CLUSTERD_USE_CTP=true + - MZ_SOFT_ASSERTIONS=1 + - CLUSTERD_STORAGE_CONTROLLER_LISTEN_ADDR=0.0.0.0:2100 + - CLUSTERD_COMPUTE_CONTROLLER_LISTEN_ADDR=0.0.0.0:2101 + - CLUSTERD_INTERNAL_HTTP_LISTEN_ADDR=0.0.0.0:6878 + - CLUSTERD_SECRETS_READER=local-file + - CLUSTERD_SECRETS_READER_LOCAL_FILE_DIR=/mzdata/secrets + - LD_PRELOAD=libeatmydata.so + - CLUSTERD_PERSIST_PUBSUB_URL=http://materialized:6879 + - CLUSTERD_ENVIRONMENT_ID=mzcompose-us-east-1-00000000-0000-0000-0000-000000000000-0 + - CLUSTERD_PROCESS=0 + - 'CLUSTERD_COMPUTE_TIMELY_CONFIG={"workers": 4, "process": 0, "addresses": ["clusterd-pool-1:2102"], + "arrangement_exert_proportionality": 16, "enable_zero_copy": false, "enable_zero_copy_lgalloc": + false, "zero_copy_limit": null}' + - 'CLUSTERD_STORAGE_TIMELY_CONFIG={"workers": 4, "process": 0, "addresses": ["clusterd-pool-1:2103"], + "arrangement_exert_proportionality": 1337, "enable_zero_copy": false, "enable_zero_copy_lgalloc": + false, "zero_copy_limit": null}' + volumes: + - mzdata:/mzdata + - mydata:/var/lib/mysql-files + - tmp:/share/tmp + - scratch:/scratch + restart: 'no' + stop_grace_period: 120s + platform: linux/amd64 + image: ${MATERIALIZED_IMAGE} + pull_policy: never + container_name: clusterd-pool-1 + hostname: clusterd-pool-1 + networks: + - antithesis-net + materialized: + hostname: materialized + depends_on: + minio: + condition: service_healthy + postgres-metadata: + condition: service_healthy + command: + - --unsafe-mode + - --environment-id=mzcompose-us-east-1-00000000-0000-0000-0000-000000000000-0 + - --persist-blob-url=s3://minioadmin:minioadmin@persist/persist?endpoint=http://minio:9000/®ion=minio + - --orchestrator-process-propagate-crashes + - --persist-consensus-url=postgres://root@postgres-metadata:26257?options=--search_path=consensus + - --orchestrator-process-tcp-proxy-listen-addr=0.0.0.0 + - --orchestrator-process-prometheus-service-discovery-directory=/mzdata/prometheus + ports: + - 6875 + - 6876 + - 6877 + - 6878 + - 6880 + - 6881 + - 26257 + environment: + - MZ_NO_TELEMETRY=1 + - MZ_NO_BUILTIN_CONSOLE=1 + - MZ_TEST_ONLY_DUMMY_SEGMENT_CLIENT=true + - MZ_SOFT_ASSERTIONS=1 + - MZ_ORCHESTRATOR_PROCESS_TCP_PROXY_LISTEN_ADDR=0.0.0.0 + - MZ_ORCHESTRATOR_PROCESS_PROMETHEUS_SERVICE_DISCOVERY_DIRECTORY=/mzdata/prometheus + - MZ_BOOTSTRAP_ROLE=materialize + - MZ_INTERNAL_PERSIST_PUBSUB_LISTEN_ADDR=0.0.0.0:6879 + - MZ_PERSIST_PUBSUB_URL=http://127.0.0.1:6879 + - MZ_AWS_CONNECTION_ROLE_ARN=arn:aws:iam::123456789000:role/MaterializeConnection + - MZ_AWS_EXTERNAL_ID_PREFIX=eb5cb59b-e2fe-41f3-87ca-d2176a495345 + - MZ_CATALOG_STORE=persist + - 'MZ_CLUSTER_REPLICA_SIZES={"bootstrap": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "1", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 1, "workers": 1}, "scale=2,workers=4": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "8", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 2, "workers": 4}, "scale=1,workers=1,legacy": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "1", "disabled": false, "disk_limit": null, "is_cc": + false, "memory_limit": "4 GiB", "scale": 1, "workers": 1}, "scale=1,workers=2,legacy": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "2", "disabled": + false, "disk_limit": null, "is_cc": false, "memory_limit": "4 GiB", "scale": + 1, "workers": 2}, "free": {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": + "1", "disabled": true, "disk_limit": null, "is_cc": true, "memory_limit": "4 + GiB", "scale": 1, "workers": 1}, "scale=1,workers=1": {"cpu_exclusive": false, + "cpu_limit": null, "credits_per_hour": "1", "disabled": false, "disk_limit": + null, "is_cc": true, "memory_limit": "4 GiB", "scale": 1, "workers": 1}, "scale=1,workers=1,mem=4GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "1", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 1, "workers": 1}, "scale=1,workers=1,mem=8GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "1", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "8 GiB", "scale": 1, "workers": 1}, "scale=1,workers=1,mem=16GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "1", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "16 GiB", "scale": + 1, "workers": 1}, "scale=1,workers=1,mem=32GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "1", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "32 GiB", "scale": 1, "workers": 1}, "scale=1,workers=1,mem=1GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "1", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "1 GiB", "scale": + 1, "workers": 1}, "scale=1,workers=2": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "2", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 1, "workers": 2}, "scale=1,workers=2,mem=4GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "2", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 1, "workers": 2}, "scale=1,workers=2,mem=8GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "2", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "8 GiB", "scale": 1, "workers": 2}, "scale=1,workers=2,mem=16GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "2", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "16 GiB", "scale": + 1, "workers": 2}, "scale=1,workers=2,mem=32GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "2", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "32 GiB", "scale": 1, "workers": 2}, "scale=2,workers=1": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "2", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 2, "workers": 1}, "scale=2,workers=2": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "4", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 2, "workers": 2}, "scale=1,workers=2,mem=2GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "2", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "2 GiB", "scale": + 1, "workers": 2}, "scale=1,workers=4": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "4", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 1, "workers": 4}, "scale=1,workers=4,mem=4GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "4", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 1, "workers": 4}, "scale=1,workers=4,mem=8GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "4", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "8 GiB", "scale": 1, "workers": 4}, "scale=1,workers=4,mem=16GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "4", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "16 GiB", "scale": + 1, "workers": 4}, "scale=1,workers=4,mem=32GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "4", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "32 GiB", "scale": 1, "workers": 4}, "scale=4,workers=1": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "4", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 4, "workers": 1}, "scale=4,workers=4": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "16", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 4, "workers": 4}, "scale=1,workers=8": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "8", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 1, "workers": 8}, "scale=1,workers=8,mem=4GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "8", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 1, "workers": 8}, "scale=1,workers=8,mem=8GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "8", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "8 GiB", "scale": + 1, "workers": 8}, "scale=1,workers=8,mem=16GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "8", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "16 GiB", "scale": 1, "workers": 8}, "scale=1,workers=8,mem=32GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "8", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "32 GiB", "scale": + 1, "workers": 8}, "scale=8,workers=1": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "8", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 8, "workers": 1}, "scale=8,workers=8": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "64", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 8, "workers": 8}, "scale=1,workers=16": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "16", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 1, "workers": 16}, "scale=1,workers=16,mem=4GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "16", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 1, "workers": 16}, "scale=1,workers=16,mem=8GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "16", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "8 GiB", "scale": 1, "workers": 16}, "scale=1,workers=16,mem=16GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "16", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "16 GiB", "scale": + 1, "workers": 16}, "scale=1,workers=16,mem=32GiB": {"cpu_exclusive": false, + "cpu_limit": null, "credits_per_hour": "16", "disabled": false, "disk_limit": + null, "is_cc": true, "memory_limit": "32 GiB", "scale": 1, "workers": 16}, "scale=16,workers=1": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "16", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 16, "workers": 1}, "scale=16,workers=16": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "256", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 16, "workers": 16}, "scale=1,workers=32": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "32", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 1, "workers": 32}, "scale=1,workers=32,mem=4GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "32", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 1, "workers": 32}, "scale=1,workers=32,mem=8GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "32", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "8 GiB", "scale": + 1, "workers": 32}, "scale=1,workers=32,mem=16GiB": {"cpu_exclusive": false, + "cpu_limit": null, "credits_per_hour": "32", "disabled": false, "disk_limit": + null, "is_cc": true, "memory_limit": "16 GiB", "scale": 1, "workers": 32}, "scale=1,workers=32,mem=32GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "32", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "32 GiB", "scale": + 1, "workers": 32}, "scale=32,workers=1": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "32", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 32, "workers": 1}, "scale=32,workers=32": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "1024", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 32, "workers": 32}}' + - MZ_BOOTSTRAP_DEFAULT_CLUSTER_REPLICA_SIZE=bootstrap + - MZ_BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICA_SIZE=bootstrap + - MZ_BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICA_SIZE=bootstrap + - MZ_BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICA_SIZE=bootstrap + - MZ_BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICA_SIZE=bootstrap + - MZ_BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICA_SIZE=bootstrap + - MZ_BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICATION_FACTOR=1 + - MZ_BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICATION_FACTOR=1 + - MZ_BOOTSTRAP_DEFAULT_CLUSTER_REPLICATION_FACTOR=1 + - COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s + - COCKROACH_LOG_MAX_SYNC_DURATION=120s + - MZ_SYSTEM_PARAMETER_DEFAULT=unsafe_enable_unsafe_functions=true;allow_real_time_recency=true;constraint_based_timestamp_selection=verify;enable_compute_peek_response_stash=true;enable_0dt_deployment_panic_after_timeout=true;enable_0dt_deployment_sources=true;enable_alter_swap=true;enable_case_literal_transform=false;enable_cast_elimination=true;enable_coalesce_case_transform=true;enable_columnar_lgalloc=false;enable_columnation_lgalloc=false;enable_compute_correction_v2=true;enable_compute_logical_backpressure=true;enable_connection_validation_syntax=true;enable_copy_to_expr=true;enable_copy_from_remote=true;enable_create_table_from_source=true;enable_eager_delta_joins=true;enable_envelope_debezium_in_subscribe=true;enable_expressions_in_limit_syntax=true;enable_iceberg_sink=true;enable_introspection_subscribes=true;enable_kafka_sink_partition_by=true;enable_lgalloc=false;enable_load_generator_counter=true;enable_logical_compaction_window=true;enable_multi_worker_storage_persist_sink=true;enable_multi_replica_sources=true;enable_rbac_checks=true;enable_reduce_mfp_fusion=true;enable_refresh_every_mvs=true;enable_replacement_materialized_views=true;enable_cluster_schedule_refresh=true;enable_sql_server_source=true;enable_s3_tables_region_check=false;enable_statement_lifecycle_logging=true;enable_storage_introspection_logs=true;enable_compute_temporal_bucketing=true;enable_variadic_left_join_lowering=true;enable_worker_core_affinity=true;grpc_client_http2_keep_alive_timeout=5s;ore_overflowing_behavior=panic;unsafe_enable_table_keys=true;with_0dt_deployment_max_wait=1800s;persist_next_listen_batch_retryer_clamp=16s;persist_next_listen_batch_retryer_initial_backoff=100ms;persist_next_listen_batch_retryer_fixed_sleep=1200ms;persist_enable_arrow_lgalloc_noncc_sizes=true;persist_enable_s3_lgalloc_noncc_sizes=true;compute_correction_v2_chain_proportionality=3;compute_correction_v2_chunk_size=8192;compute_dataflow_max_inflight_bytes=134217728;compute_hydration_concurrency=2;compute_replica_expiration_offset=3d;compute_apply_column_demands=true;compute_peek_response_stash_threshold_bytes=1048576;compute_subscribe_snapshot_optimization=true;enable_compute_sync_mv_sink=true;enable_password_auth=true;enable_frontend_peek_sequencing=true;enable_frontend_subscribes=true;enable_upsert_v2=false;default_timestamp_interval=1s;force_source_table_syntax=false;persist_batch_columnar_format=structured;persist_batch_delete_enabled=true;persist_batch_structured_order=true;persist_batch_builder_structured=true;persist_batch_structured_key_lower_len=256;persist_batch_max_run_len=4;persist_catalog_force_compaction_fuel=1024;persist_catalog_force_compaction_wait=1s;persist_stats_audit_percent=100;persist_stats_audit_panic=true;persist_encoding_enable_dictionary=true;persist_fast_path_limit=1000;persist_fast_path_order=true;persist_gc_use_active_gc=true;persist_gc_min_versions=16;persist_gc_max_versions=128000;persist_inline_writes_single_max_bytes=4096;persist_inline_writes_total_max_bytes=1048576;persist_pubsub_client_enabled=true;persist_pubsub_push_diff_enabled=true;persist_record_compactions=true;persist_record_schema_id=true;persist_rollup_use_active_rollup=true;persist_blob_target_size=16777216;persist_compaction_memory_bound_bytes=83886080;persist_enable_incremental_compaction=true;persist_use_critical_since_catalog=true;persist_use_critical_since_snapshot=false;persist_use_critical_since_source=false;persist_part_decode_format=arrow;persist_blob_cache_scale_with_threads=true;persist_state_update_lease_timeout=1s;arrangement_size_history_collection_interval=1h;arrangement_size_history_retention_period=7d;persist_validate_part_bounds_on_read=false;persist_validate_part_bounds_on_write=false;statement_logging_default_sample_rate=1.0;statement_logging_max_data_credit=;statement_logging_max_sample_rate=1.0;statement_logging_target_data_rate=;storage_reclock_to_latest=true;storage_source_decode_fuel=100000;storage_statistics_collection_interval=1000;storage_statistics_interval=2000;storage_use_continual_feedback_upsert=true;default_cluster_replication_factor=1;unsafe_enable_unorchestrated_cluster_replicas=true + - MZ_TIMESTAMP_ORACLE_URL=postgres://root@postgres-metadata:26257?options=--search_path=tsoracle + - MZ_NO_BUILTIN_POSTGRES=1 + - MZ_NO_BUILTIN_COCKROACH=1 + - MZ_ADAPTER_STASH_URL=postgres://root@postgres-metadata:26257?options=--search_path=adapter + volumes: + - mzdata:/mzdata + - mydata:/var/lib/mysql-files + - tmp:/share/tmp + - scratch:/scratch + tmpfs: + - /tmp + healthcheck: + test: + - CMD + - curl + - -f + - localhost:6878/api/readyz + interval: 1s + start_period: 600s + stop_grace_period: 120s + platform: linux/amd64 + image: ${MATERIALIZED_IMAGE} + pull_policy: never + container_name: materialized + networks: + - antithesis-net + fault-orchestrator: + image: bash:5 + entrypoint: + - bash + - -c + command: + - "#!/usr/bin/env bash\n\n# Copyright Materialize, Inc. and contributors. All\ + \ rights reserved.\n#\n# Use of this software is governed by the Business Source\ + \ License\n# included in the LICENSE file at the root of this repository.\n\ + #\n# As of the Change Date specified in that file, in accordance with\n# the\ + \ Business Source License, use of this software will be governed\n# by the Apache\ + \ License, Version 2.0.\n\n# Drive Antithesis fault windows globally.\n#\n#\ + \ Antithesis injects faults into the system continuously by default.\n# Calling\ + \ `ANTITHESIS_STOP_FAULTS ` requests a quiet window \u2014\n# Antithesis\ + \ pauses fault injection for that many seconds. The Antithesis\n# engagement\ + \ team's recommendation: drive these quiet windows from a\n# single dedicated\ + \ container, not per-driver, otherwise overlapping\n# per-driver requests keep\ + \ the system in a quiet state most of the time\n# and we never actually fault.\n\ + #\n# This script alternates faults-OFF (quiet) and faults-ON (active)\n# windows\ + \ at randomized intervals so each timeline sees a different\n# cadence. Adapted\ + \ from the Antithesis hands-on tutorial:\n# https://github.com/antithesishq/hands-on-tutorial-1/blob/main/python/antithesis/pause_faults.sh\n\ + #\n# Outside Antithesis (snouty local validate) `ANTITHESIS_STOP_FAULTS` is\n\ + # unset; the script exits immediately so the rest of the compose works.\n\n\ + set -euo pipefail\n\nif [[ -z \"$${ANTITHESIS_STOP_FAULTS:-}\" ]]; then\n \ + \ echo \"ANTITHESIS_STOP_FAULTS not set; fault-orchestrator exiting (no-op)\"\ + \n exit 0\nfi\n\n# Tunable via the service `environment:` block. Defaults\ + \ sized so that:\n# * MAX_ON is comfortably shorter than any driver's CATCHUP_TIMEOUT_S\n\ + # (smallest is 60s in parallel_driver_upsert_latest_value) \u2014 a\n# \ + \ driver's catchup window can always span at least one full quiet\n# \ + \ period.\n# * MIN_OFF is long enough for materialized to commit a few timestamps\n\ + # and for sources to advance offset_committed past the most recent\n# \ + \ batch of produced offsets.\n# * START_DELAY gives setup-complete + bootstrap\ + \ a window of un-faulted\n# time before the alternation begins.\nSTART_DELAY=\"\ + $${START_DELAY:-30}\"\nMIN_ON=\"$${MIN_ON:-20}\"\nMAX_ON=\"$${MAX_ON:-40}\"\n\ + MIN_OFF=\"$${MIN_OFF:-20}\"\nMAX_OFF=\"$${MAX_OFF:-40}\"\n\necho \"fault-orchestrator:\ + \ ON $${MIN_ON}-$${MAX_ON}s / OFF $${MIN_OFF}-$${MAX_OFF}s, initial pause $${START_DELAY}s\"\ + \n\n# Initial quiet window so the rest of the stack reaches steady state\n#\ + \ before Antithesis starts faulting. Antithesis may or may not honour\n# this\ + \ depending on when fault injection begins relative to setup-\n# complete; either\ + \ way the local sleep gives drivers a clean start.\n\"$${ANTITHESIS_STOP_FAULTS}\"\ + \ \"$${START_DELAY}\"\nsleep \"$${START_DELAY}\"\n\nwhile true; do\n # Re-seed\ + \ $$RANDOM from /dev/urandom so successive iterations don't\n # repeat the\ + \ same on/off period (the shell's RANDOM is a 16-bit LCG;\n # without reseeding\ + \ it can produce predictable sequences).\n RANDOM=$$(od -An -N2 -tu2 /dev/urandom\ + \ | tr -d ' ')\n ON_PERIOD=$$((MIN_ON + (RANDOM % (MAX_ON - MIN_ON + 1))))\n\ + \ OFF_PERIOD=$$((MIN_OFF + (RANDOM % (MAX_OFF - MIN_OFF + 1))))\n\n echo\ + \ \"fault-orchestrator: faults OFF for $${OFF_PERIOD}s\"\n \"$${ANTITHESIS_STOP_FAULTS}\"\ + \ \"$${OFF_PERIOD}\"\n sleep \"$${OFF_PERIOD}\"\n\n echo \"fault-orchestrator:\ + \ faults ON for $${ON_PERIOD}s\"\n sleep \"$${ON_PERIOD}\"\ndone\n" + environment: + - START_DELAY=30 + - MIN_ON=20 + - MAX_ON=40 + - MIN_OFF=20 + - MAX_OFF=40 + depends_on: + materialized: + condition: service_healthy + restart: 'no' + platform: linux/amd64 + container_name: fault-orchestrator + hostname: fault-orchestrator + networks: + - antithesis-net + workload: + depends_on: + materialized: + condition: service_healthy + clusterd1: + condition: service_started + clusterd2: + condition: service_started + kafka: + condition: service_healthy + schema-registry: + condition: service_healthy + mysql: + condition: service_healthy + mysql-replica: + condition: service_healthy + postgres-source: + condition: service_healthy + environment: + - PGHOST=materialized + - PGPORT=6875 + - PGUSER=materialize + - PGPORT_INTERNAL=6877 + - PGUSER_INTERNAL=mz_system + - KAFKA_BROKER=kafka:9092 + - SCHEMA_REGISTRY_URL=http://schema-registry:8081 + - MZ_ANTITHESIS_CLUSTER=antithesis_cluster + - ANTITHESIS_CLUSTERD_POOL_SIZE=2 + - CLUSTERD_POOL_SIZE=2 + - CLUSTERD_WORKERS=4 + - MYSQL_HOST=mysql + - MYSQL_REPLICA_HOST=mysql-replica + - MYSQL_PASSWORD=p@ssw0rd + - PG_SOURCE_HOST=postgres-source + - PG_SOURCE_PORT=5432 + - PG_SOURCE_USER=postgres + - PG_SOURCE_PASSWORD=postgres + - PG_SOURCE_DATABASE=postgres + - 'CLUSTER_REPLICA_SIZES={"bootstrap": {"cpu_exclusive": false, "cpu_limit": null, + "credits_per_hour": "1", "disabled": false, "disk_limit": null, "is_cc": true, + "memory_limit": "4 GiB", "scale": 1, "workers": 1}, "scale=2,workers=4": {"cpu_exclusive": + false, "cpu_limit": null, "credits_per_hour": "8", "disabled": false, "disk_limit": + null, "is_cc": true, "memory_limit": "4 GiB", "scale": 2, "workers": 4}, "scale=1,workers=1,legacy": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "1", "disabled": + false, "disk_limit": null, "is_cc": false, "memory_limit": "4 GiB", "scale": + 1, "workers": 1}, "scale=1,workers=2,legacy": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "2", "disabled": false, "disk_limit": null, "is_cc": + false, "memory_limit": "4 GiB", "scale": 1, "workers": 2}, "free": {"cpu_exclusive": + false, "cpu_limit": null, "credits_per_hour": "1", "disabled": true, "disk_limit": + null, "is_cc": true, "memory_limit": "4 GiB", "scale": 1, "workers": 1}, "scale=1,workers=1": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "1", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 1, "workers": 1}, "scale=1,workers=1,mem=4GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "1", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 1, "workers": 1}, "scale=1,workers=1,mem=8GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "1", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "8 GiB", "scale": + 1, "workers": 1}, "scale=1,workers=1,mem=16GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "1", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "16 GiB", "scale": 1, "workers": 1}, "scale=1,workers=1,mem=32GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "1", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "32 GiB", "scale": + 1, "workers": 1}, "scale=1,workers=1,mem=1GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "1", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "1 GiB", "scale": 1, "workers": 1}, "scale=1,workers=2": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "2", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 1, "workers": 2}, "scale=1,workers=2,mem=4GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "2", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 1, "workers": 2}, "scale=1,workers=2,mem=8GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "2", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "8 GiB", "scale": + 1, "workers": 2}, "scale=1,workers=2,mem=16GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "2", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "16 GiB", "scale": 1, "workers": 2}, "scale=1,workers=2,mem=32GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "2", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "32 GiB", "scale": + 1, "workers": 2}, "scale=2,workers=1": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "2", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 2, "workers": 1}, "scale=2,workers=2": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "4", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 2, "workers": 2}, "scale=1,workers=2,mem=2GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "2", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "2 GiB", "scale": 1, "workers": 2}, "scale=1,workers=4": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "4", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 1, "workers": 4}, "scale=1,workers=4,mem=4GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "4", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 1, "workers": 4}, "scale=1,workers=4,mem=8GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "4", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "8 GiB", "scale": + 1, "workers": 4}, "scale=1,workers=4,mem=16GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "4", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "16 GiB", "scale": 1, "workers": 4}, "scale=1,workers=4,mem=32GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "4", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "32 GiB", "scale": + 1, "workers": 4}, "scale=4,workers=1": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "4", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 4, "workers": 1}, "scale=4,workers=4": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "16", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 4, "workers": 4}, "scale=1,workers=8": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "8", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 1, "workers": 8}, "scale=1,workers=8,mem=4GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "8", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 1, "workers": 8}, "scale=1,workers=8,mem=8GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "8", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "8 GiB", "scale": 1, "workers": 8}, "scale=1,workers=8,mem=16GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "8", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "16 GiB", "scale": + 1, "workers": 8}, "scale=1,workers=8,mem=32GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "8", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "32 GiB", "scale": 1, "workers": 8}, "scale=8,workers=1": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "8", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 8, "workers": 1}, "scale=8,workers=8": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "64", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 8, "workers": 8}, "scale=1,workers=16": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "16", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 1, "workers": 16}, "scale=1,workers=16,mem=4GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "16", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 1, "workers": 16}, "scale=1,workers=16,mem=8GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "16", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "8 GiB", "scale": + 1, "workers": 16}, "scale=1,workers=16,mem=16GiB": {"cpu_exclusive": false, + "cpu_limit": null, "credits_per_hour": "16", "disabled": false, "disk_limit": + null, "is_cc": true, "memory_limit": "16 GiB", "scale": 1, "workers": 16}, "scale=1,workers=16,mem=32GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "16", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "32 GiB", "scale": + 1, "workers": 16}, "scale=16,workers=1": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "16", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 16, "workers": 1}, "scale=16,workers=16": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "256", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 16, "workers": 16}, "scale=1,workers=32": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "32", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 1, "workers": 32}, "scale=1,workers=32,mem=4GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "32", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 1, "workers": 32}, "scale=1,workers=32,mem=8GiB": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "32", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "8 GiB", "scale": 1, "workers": 32}, "scale=1,workers=32,mem=16GiB": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "32", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "16 GiB", "scale": + 1, "workers": 32}, "scale=1,workers=32,mem=32GiB": {"cpu_exclusive": false, + "cpu_limit": null, "credits_per_hour": "32", "disabled": false, "disk_limit": + null, "is_cc": true, "memory_limit": "32 GiB", "scale": 1, "workers": 32}, "scale=32,workers=1": + {"cpu_exclusive": false, "cpu_limit": null, "credits_per_hour": "32", "disabled": + false, "disk_limit": null, "is_cc": true, "memory_limit": "4 GiB", "scale": + 32, "workers": 1}, "scale=32,workers=32": {"cpu_exclusive": false, "cpu_limit": + null, "credits_per_hour": "1024", "disabled": false, "disk_limit": null, "is_cc": + true, "memory_limit": "4 GiB", "scale": 32, "workers": 32}}' + platform: linux/amd64 + image: ${ANTITHESIS_WORKLOAD_IMAGE} + pull_policy: never + container_name: workload + hostname: workload + networks: + - antithesis-net +networks: + antithesis-net: + driver: bridge +volumes: + mzdata: null + pgdata: null + mysqldata: null + mssqldata: null + sourcedata_512Mb: + driver_opts: + device: tmpfs + type: tmpfs + o: size=512m + mydata: null + tmp: null + secrets: null + scratch: null + mysqldata_primary: null + mysqldata_replica: null diff --git a/test/antithesis/config/mzbuild.yml b/test/antithesis/config/mzbuild.yml new file mode 100644 index 0000000000000..f3491f546dbb5 --- /dev/null +++ b/test/antithesis/config/mzbuild.yml @@ -0,0 +1,33 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# FROM-scratch image holding the docker-compose.yaml + .env for the +# Antithesis environment. Antithesis pulls this image and reads +# `/docker-compose.yaml` to bring up the system under test; `.env` supplies +# `${MATERIALIZED_IMAGE}` / `${ANTITHESIS_WORKLOAD_IMAGE}` at compose-parse +# time. +# +# The compose YAML (committed, topology-only) is generated from +# `test/antithesis/mzcompose.py` via `bin/pyactivate +# test/antithesis/export-compose.py`. Regenerate when topology changes; CI +# verifies the committed copy is up to date. +# +# `.env` (generated, gitignored) is written by +# `bin/pyactivate test/antithesis/export-env.py` at build time. Its content +# changes every materialized fingerprint shift, which is what propagates +# fresh fingerprints into this image without touching the committed YAML. +# +# `publish: false` keeps the standard `ci.test.build` flow from trying to +# build this image — it would fail on `COPY docker-compose.yaml .env /` +# because `.env` is gitignored and only `build-antithesis.sh` writes it. +# The antithesis nightly step builds and pushes the image directly via +# push-antithesis.py. + +name: antithesis-config +publish: false diff --git a/test/antithesis/export-compose.py b/test/antithesis/export-compose.py new file mode 100644 index 0000000000000..b2e68321b16d3 --- /dev/null +++ b/test/antithesis/export-compose.py @@ -0,0 +1,402 @@ +#!/usr/bin/env python3 + +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +"""Export the resolved docker-compose YAML for the Antithesis composition. + +Loads `test/antithesis/mzcompose.py` and dumps a docker-compose YAML to +stdout where Materialize-built images are emitted as compose env-var +placeholders (`${MATERIALIZED_IMAGE}`, `${ANTITHESIS_WORKLOAD_IMAGE}`). +The actual fingerprint values are supplied separately in a `.env` file +generated by `export-env.py`. This separation lets the committed YAML stay +stable across materialized source changes — only `.env` shifts per +fingerprint. + +Image-reference policy: + + * Materialize-built images (`materialized`, `antithesis-workload`) + become `${MATERIALIZED_IMAGE}` / `${ANTITHESIS_WORKLOAD_IMAGE}`. + Compose interpolates them from `.env` at parse time. The actual specs + are `ghcr.io/materializeinc/materialize/:mzbuild-` with + `antithesis=True` participating in the fingerprint. + + * Third-party `mzbuild` images (`postgres`, `minio`) are replaced with + the public upstream image. Our mzbuild variants bake in test-friendly + patches (eatmydata, no_fsync) that defeat Antithesis's fault injection; + Antithesis runs against vanilla. + +The script also strips mzcompose-only keys, host bind-mounts, and host-path +env vars that don't resolve inside the Antithesis sandbox, and inlines the +postgres bootstrap SQL into the entrypoint (the bind-mount path won't +exist). + +Usage: + bin/pyactivate test/antithesis/export-compose.py \\ + > test/antithesis/config/docker-compose.yaml +""" + +import argparse +import sys +from pathlib import Path +from typing import Any + +import yaml + +from materialize import MZ_ROOT +from materialize.mzbuild import Repository +from materialize.mzcompose.composition import Composition +from materialize.xcompile import Arch + +# mzbuild image names that we publish under our fingerprint. Each maps to +# the compose env-var placeholder; `.env` (export-env.py) supplies the +# concrete ref at compose-parse time. Keep in sync with `export-env.py`. +MATERIALIZE_IMAGES = { + "materialized": "${MATERIALIZED_IMAGE}", + "antithesis-workload": "${ANTITHESIS_WORKLOAD_IMAGE}", +} + +# Public-image fallbacks for mzbuild images whose Materialize-specific +# customizations subvert Antithesis (eatmydata, fsync no-ops, etc.). +# Antithesis can reach public registries — we just need to make sure the +# compose points at the upstream image, not our patched mzbuild build. +PUBLIC_FALLBACKS = { + "postgres": "postgres:17.7", + "minio": "minio/minio:latest", +} + +# Header prepended to the generated YAML so check-copyright passes and +# readers know the file isn't hand-edited. +HEADER = """\ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# GENERATED FILE — do not edit. Regenerate via: +# bin/pyactivate test/antithesis/export-compose.py > test/antithesis/config/docker-compose.yaml +# Source of truth: test/antithesis/mzcompose.py. + +""" + + +def resolve_mzbuild(svc: dict[str, Any]) -> None: + """Replace `mzbuild:` with a concrete or templated `image:` ref. + + For Materialize-built images we also set `pull_policy: never` so the + `make up-local` flow doesn't attempt a registry probe at compose + startup. The fingerprint tags only exist locally on the dev machine + that ran `make build-local` — they're never pushed to GHCR by that + flow, so the standard "check remote for newer digest" probe fails + with `unauthorized` and aborts the bring-up. Third-party images + (PUBLIC_FALLBACKS) genuinely come from upstream registries; for + those we leave the default pull policy alone. + + The Antithesis platform itself uses a separate registry (Antithesis's + GCP Artifact Registry) that it does have credentials for, so the + pull_policy never field doesn't affect a real Antithesis run. + """ + name = svc.pop("mzbuild") + if name in MATERIALIZE_IMAGES: + svc["image"] = MATERIALIZE_IMAGES[name] + svc["pull_policy"] = "never" + elif name in PUBLIC_FALLBACKS: + svc["image"] = PUBLIC_FALLBACKS[name] + else: + raise ValueError( + f"mzbuild image {name!r} has no Antithesis policy — add it to " + f"MATERIALIZE_IMAGES (use a `.env` placeholder) or " + f"PUBLIC_FALLBACKS (swap to a public image) in export-compose.py." + ) + + +def inline_postgres_setup(svc: dict[str, Any]) -> None: + """Replace the bind-mounted setup SQL with an inline entrypoint write. + + Antithesis has no host filesystem, so we can't mount the SQL file. + Read it from misc/postgres/setup_materialize.sql (one source of truth) + and bake it into the service entrypoint. + + The inline-setup transform only fires when the service originally + requested it (Postgres-ctor `setup_materialize=True`, which appears + here as a bind-mounted setup_materialize.sql). Plain postgres-image + services — e.g. a vanilla PG used as a CDC upstream — get the + common env-fixup (drop LD_PRELOAD, add HOST_AUTH_METHOD=trust) and + nothing else. + """ + if not svc.get("image", "").startswith("postgres:"): + return + + vols = svc.get("volumes", []) or [] + has_setup = any(isinstance(v, str) and "setup_materialize.sql" in v for v in vols) + + env = svc.setdefault("environment", []) + # eatmydata isn't installed in the public postgres image. + env[:] = [e for e in env if not e.startswith("LD_PRELOAD=")] + # Trust auth — Antithesis-internal traffic only. + env.append("POSTGRES_HOST_AUTH_METHOD=trust") + + if not has_setup: + return + + # Drop the bind-mounted setup SQL; we'll inline it. + vols[:] = [v for v in vols if "setup_materialize.sql" not in v] + if not vols: + svc.pop("volumes", None) + + setup_sql = (MZ_ROOT / "misc" / "postgres" / "setup_materialize.sql").read_text() + # Strip comment lines + collapse to one statement per output line so we + # can safely double-quote it inside the sh -c here. + setup_sql = "\n".join( + line for line in setup_sql.splitlines() if line and not line.startswith("--") + ) + svc["entrypoint"] = [ + "sh", + "-c", + # `$$@` survives compose's $-interpolation and arrives as `$@` at the + # shell, forwarding any args (e.g., the `postgres` CMD) verbatim. + f"cat <<'SQL' > /docker-entrypoint-initdb.d/z_setup_materialize.sql\n" + f"{setup_sql}\n" + f"SQL\n" + f'exec docker-entrypoint.sh "$$@"', + "--", + ] + + +def strip_host_bindmounts(svc: dict[str, Any]) -> None: + """Drop volume entries that bind-mount a host path.""" + if "volumes" not in svc: + return + svc["volumes"] = [ + v + for v in svc["volumes"] + if not isinstance(v, str) + or ":" not in v + or not v.split(":", 1)[0].startswith("/") + ] + if not svc["volumes"]: + del svc["volumes"] + + +def strip_incompatible_env(svc: dict[str, Any]) -> None: + """Drop env vars that are unsafe or unresolvable under Antithesis. + + - `MZ_EAT_MY_DATA` enables `libeatmydata.so` (fsync no-op) — fatal for + crash-recovery testing under fault injection. + - `MZ_LISTENERS_CONFIG_PATH` and `MZ_EXTERNAL_LOGIN_PASSWORD_*` reference + host paths or host secrets that don't exist in the sandbox. + - Bare env vars (no `=`) inherit from the host environment, which is + empty under Antithesis; drop them so materialized's built-in defaults + apply. + """ + if "environment" not in svc: + return + drop_prefixes = ( + "MZ_EAT_MY_DATA=", + "MZ_LISTENERS_CONFIG_PATH=", + "MZ_EXTERNAL_LOGIN_PASSWORD_", + ) + svc["environment"] = [ + e for e in svc["environment"] if "=" in e and not e.startswith(drop_prefixes) + ] + + +def strip_mzcompose_keys(svc: dict[str, Any]) -> None: + """Drop keys understood by mzcompose but not by docker/podman compose.""" + for key in ("propagate_uid_gid", "allow_host_ports", "publish"): + svc.pop(key, None) + + +# Single user-defined bridge network every service joins. Defining the +# network explicitly (rather than relying on docker-compose's auto- +# generated `default`) gives us deterministic container-DNS regardless +# of how the Antithesis platform's surrounding orchestration parses the +# compose file. Antithesis support flagged the auto-network as a likely +# cause of a kafka -> zookeeper UnknownHostException during setup; the +# fix is to make the network explicit. +# +# Must NOT set `internal: true` per Antithesis docker best practices — +# that would cut us off from the Antithesis-side network used for +# instrumentation. Plain bridge is the recommended shape. +ANTITHESIS_NETWORK = "antithesis-net" + + +# Extra docker-network aliases per service. The repository's +# `test/pg-cdc/*.td` files hard-code `@postgres` as the upstream hostname; +# the testdrive-runner drivers run those files unmodified by aliasing +# `postgres` to our `postgres-source` container at the network-DNS layer. +EXTRA_NETWORK_ALIASES: dict[str, list[str]] = { + "postgres-source": ["postgres"], +} + + +def assign_network(name: str, svc: dict[str, Any]) -> None: + """Place the service on the single named bridge network so docker-DNS + is deterministic. Overwrites any pre-existing `networks` entry — some + upstream Service classes set a vestigial `default: aliases: []` block + that we don't want carried through. + + Services that need additional names on the same network (see + `EXTRA_NETWORK_ALIASES`) use the long-form mapping syntax so we can + declare `aliases:` for them. Plain services keep the short list form. + """ + aliases = EXTRA_NETWORK_ALIASES.get(name) + if aliases: + svc["networks"] = {ANTITHESIS_NETWORK: {"aliases": aliases}} + else: + svc["networks"] = [ANTITHESIS_NETWORK] + + +def declare_top_level_network(compose: dict[str, Any]) -> None: + """Declare the bridge network at the compose top level. Overwrites any + pre-existing top-level `networks:` entry (mzcompose currently emits + an empty dict). + """ + compose["networks"] = { + ANTITHESIS_NETWORK: {"driver": "bridge"}, + } + + +def set_explicit_names(name: str, svc: dict[str, Any]) -> None: + """Set `container_name` and `hostname` to the service key. + + Per Antithesis docker best practices (https://antithesis.com/docs/ + best_practices/docker_best_practices/), every service should declare + its container_name and hostname explicitly and use the same value + for both. Triage reports attribute log lines and assertions by + `hostname`; if it isn't set, Antithesis falls back to an inferred + value (possibly the container id) that's harder to recognize. + + Set here at export time rather than per-service in mzcompose.py so + that local mzcompose runs aren't constrained to one global + container_name namespace. + + Asserts the service key is DNS-safe (no underscores, RFC-1123). + Docker Compose itself rejects underscored service keys, so this is + a sanity check, not a transform. + """ + if "_" in name: + raise ValueError( + f"service {name!r}: underscores in hostnames break DNS resolution " + f"under Antithesis (RFC-1123). Rename the service to use hyphens." + ) + svc["container_name"] = name + svc["hostname"] = name + + +def upgrade_started_to_healthy(compose: dict[str, Any]) -> None: + """For every `depends_on` entry that uses `condition: service_started` + against a dependency that declares a `healthcheck`, upgrade the + condition to `service_healthy`. + + Under the Antithesis platform, `service_started` proved unreliable as + a readiness gate during initial container startup: docker fires it as + soon as the dependency's container *process* starts, before the + dependency's DNS entry is reliably resolvable. The first run on the + fault-isolated topology saw kafka hit `UnknownHostException: zookeeper` + 148+ times in a row before its retry loop landed on a successful + lookup, with the same cascade downstream (schema-registry ↔ kafka). + Gating on the healthcheck (which probes the actual listen port) + eliminates that race. + + Dependencies without a healthcheck (e.g. clusterd, which has no + readiness signal we currently expose) are left as `service_started` + — there's nothing to wait on. + """ + services = compose.get("services", {}) + has_healthcheck = {name for name, svc in services.items() if "healthcheck" in svc} + for svc in services.values(): + deps = svc.get("depends_on") + if not isinstance(deps, dict): + continue + for dep_name, dep_spec in deps.items(): + if ( + isinstance(dep_spec, dict) + and dep_spec.get("condition") == "service_started" + and dep_name in has_healthcheck + ): + dep_spec["condition"] = "service_healthy" + + +def register_referenced_named_volumes(compose: dict[str, Any]) -> None: + """Declare any named volume referenced by a service that isn't already + declared at the top level. Docker Compose rejects the file otherwise. + + mzcompose's `Composition` only auto-declares the fixed `DEFAULT_MZ_VOLUMES` + set; per-service custom named volumes (e.g. `clusterd1_scratch`) reference + names that have no top-level entry and fail `docker compose config`. + """ + top_level: dict[str, Any] = compose.setdefault("volumes", {}) or {} + compose["volumes"] = top_level + + for svc in compose.get("services", {}).values(): + for entry in svc.get("volumes", []) or []: + if not isinstance(entry, str): + continue + # Bind mounts (`/host:/container`) start with `/`; named volumes + # are bare identifiers. We only auto-declare the latter. + if entry.startswith("/"): + continue + name = entry.split(":", 1)[0] + if not name or name in top_level: + continue + top_level[name] = None + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__.splitlines()[0]) + parser.add_argument( + "--no-antithesis", + action="store_true", + help=( + "Emit a compose YAML for local-dev (host arch) rather than the " + "Antithesis x86_64 platform. Mirrors `export-env.py --no-antithesis` " + "— together they let `make build-local` + `make up-local` run " + "the stack natively on Apple Silicon (the Antithesis-flavored " + "x86_64 testdrive binary segfaults inside Docker's rosetta/qemu " + "emulation)." + ), + ) + args = parser.parse_args() + arch = Arch.host() if args.no_antithesis else Arch.X86_64 + platform = "linux/amd64" if arch == Arch.X86_64 else "linux/arm64" + + # munge_services=False keeps ports bare (e.g., `6875` instead of + # `127.0.0.1::6875`) — Antithesis is container-to-container, no host + # binding. We do our own mzbuild→image substitution below and don't + # need fingerprint resolution since Materialize-built images become + # `${...}` placeholders. + repo = Repository(Path("."), arch=arch, antithesis=not args.no_antithesis) + c = Composition(repo, "antithesis", munge_services=False) + + for name, svc in c.compose["services"].items(): + svc["platform"] = platform + if "mzbuild" in svc: + resolve_mzbuild(svc) + inline_postgres_setup(svc) + strip_host_bindmounts(svc) + strip_incompatible_env(svc) + strip_mzcompose_keys(svc) + set_explicit_names(name, svc) + assign_network(name, svc) + + declare_top_level_network(c.compose) + upgrade_started_to_healthy(c.compose) + register_referenced_named_volumes(c.compose) + + sys.stdout.write(HEADER) + yaml.dump(c.compose, sys.stdout, default_flow_style=False, sort_keys=False) + + +if __name__ == "__main__": + main() diff --git a/test/antithesis/export-env.py b/test/antithesis/export-env.py new file mode 100644 index 0000000000000..c7611d100436f --- /dev/null +++ b/test/antithesis/export-env.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 + +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +"""Emit the `.env` file consumed by Antithesis's docker-compose.yaml. + +The compose YAML (export-compose.py) is committed with `${MATERIALIZED_IMAGE}` +/ `${ANTITHESIS_WORKLOAD_IMAGE}` placeholders so it stays stable across +materialized source changes. This script writes the corresponding `.env` +with the current mzbuild fingerprints so compose can interpolate them. + +Run at CI build time (build-antithesis.sh) and at local-dev `make build`. +The `antithesis-config` mzbuild image copies in the .env produced by this +script, so the image's own fingerprint tracks the materialized fingerprint +transitively — same materialized → same .env → same antithesis-config. + +With `--registry`, the emitted refs use that registry prefix instead of +the default (whatever `spec()` returns based on `MZ_GHCR`). CI passes the +Antithesis GCP Artifact Registry so the compose Antithesis pulls +references images at the registry Antithesis can actually reach. + +Usage: + bin/pyactivate test/antithesis/export-env.py \\ + > test/antithesis/config/.env + bin/pyactivate test/antithesis/export-env.py \\ + --registry us-central1-docker.pkg.dev/molten-verve-216720/materialize-repository \\ + > test/antithesis/config/.env +""" + +import argparse +import sys +from pathlib import Path + +from materialize.mzbuild import Repository +from materialize.xcompile import Arch + +# Mapping of `.env` variable name → mzbuild image name. Keep in sync with +# MATERIALIZE_IMAGES in export-compose.py. +ENV_VARS = { + "MATERIALIZED_IMAGE": "materialized", + "ANTITHESIS_WORKLOAD_IMAGE": "antithesis-workload", +} + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__.splitlines()[0]) + parser.add_argument( + "--registry", + default=None, + help=( + "Registry prefix to use for emitted refs. If unset, uses the " + "default `spec()` (GHCR when MZ_GHCR=1, else Docker Hub)." + ), + ) + parser.add_argument( + "--no-antithesis", + action="store_true", + help=( + "Emit non-antithesis-flavored image fingerprints. Used by the " + "`make build-local` workflow that brings the compose up without " + "the Antithesis platform — the antithesis flavor needs a " + "libvoidstar.so we don't have locally, and the antithesis-only " + "deps (testdrive in particular) aren't published with the " + "antithesis flavor yet. CI sets neither (antithesis stays on)." + ), + ) + args = parser.parse_args() + + # Antithesis itself runs amd64-only, so the Antithesis-targeted build + # (CI default) is always x86_64. For local-dev `--no-antithesis` we use + # the host arch instead so the compose stack runs natively without + # rosetta/qemu emulation (which segfaults inside testdrive on Apple + # Silicon). + arch = Arch.host() if args.no_antithesis else Arch.X86_64 + repo = Repository(Path("."), arch=arch, antithesis=not args.no_antithesis) + images = [repo.images[name] for name in ENV_VARS.values()] + deps = repo.resolve_dependencies(images) + + sys.stdout.write( + "# GENERATED FILE — do not edit. Regenerate via:\n" + "# bin/pyactivate test/antithesis/export-env.py > test/antithesis/config/.env\n" + "# Consumed by test/antithesis/config/docker-compose.yaml at compose-parse time.\n" + ) + for var, image_name in ENV_VARS.items(): + if args.registry: + ref = ( + f"{args.registry}/{image_name}:mzbuild-{deps[image_name].fingerprint()}" + ) + else: + ref = deps[image_name].spec() + sys.stdout.write(f"{var}={ref}\n") + + +if __name__ == "__main__": + main() diff --git a/test/antithesis/fault-orchestrator/pause_faults.sh b/test/antithesis/fault-orchestrator/pause_faults.sh new file mode 100755 index 0000000000000..00cb4e910bc47 --- /dev/null +++ b/test/antithesis/fault-orchestrator/pause_faults.sh @@ -0,0 +1,76 @@ +#!/usr/bin/env bash + +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Drive Antithesis fault windows globally. +# +# Antithesis injects faults into the system continuously by default. +# Calling `ANTITHESIS_STOP_FAULTS ` requests a quiet window — +# Antithesis pauses fault injection for that many seconds. The Antithesis +# engagement team's recommendation: drive these quiet windows from a +# single dedicated container, not per-driver, otherwise overlapping +# per-driver requests keep the system in a quiet state most of the time +# and we never actually fault. +# +# This script alternates faults-OFF (quiet) and faults-ON (active) +# windows at randomized intervals so each timeline sees a different +# cadence. Adapted from the Antithesis hands-on tutorial: +# https://github.com/antithesishq/hands-on-tutorial-1/blob/main/python/antithesis/pause_faults.sh +# +# Outside Antithesis (snouty local validate) `ANTITHESIS_STOP_FAULTS` is +# unset; the script exits immediately so the rest of the compose works. + +set -euo pipefail + +if [[ -z "${ANTITHESIS_STOP_FAULTS:-}" ]]; then + echo "ANTITHESIS_STOP_FAULTS not set; fault-orchestrator exiting (no-op)" + exit 0 +fi + +# Tunable via the service `environment:` block. Defaults sized so that: +# * MAX_ON is comfortably shorter than any driver's CATCHUP_TIMEOUT_S +# (smallest is 60s in parallel_driver_upsert_latest_value) — a +# driver's catchup window can always span at least one full quiet +# period. +# * MIN_OFF is long enough for materialized to commit a few timestamps +# and for sources to advance offset_committed past the most recent +# batch of produced offsets. +# * START_DELAY gives setup-complete + bootstrap a window of un-faulted +# time before the alternation begins. +START_DELAY="${START_DELAY:-30}" +MIN_ON="${MIN_ON:-20}" +MAX_ON="${MAX_ON:-40}" +MIN_OFF="${MIN_OFF:-20}" +MAX_OFF="${MAX_OFF:-40}" + +echo "fault-orchestrator: ON ${MIN_ON}-${MAX_ON}s / OFF ${MIN_OFF}-${MAX_OFF}s, initial pause ${START_DELAY}s" + +# Initial quiet window so the rest of the stack reaches steady state +# before Antithesis starts faulting. Antithesis may or may not honour +# this depending on when fault injection begins relative to setup- +# complete; either way the local sleep gives drivers a clean start. +"${ANTITHESIS_STOP_FAULTS}" "${START_DELAY}" +sleep "${START_DELAY}" + +while true; do + # Re-seed $RANDOM from /dev/urandom so successive iterations don't + # repeat the same on/off period (the shell's RANDOM is a 16-bit LCG; + # without reseeding it can produce predictable sequences). + RANDOM=$(od -An -N2 -tu2 /dev/urandom | tr -d ' ') + ON_PERIOD=$((MIN_ON + (RANDOM % (MAX_ON - MIN_ON + 1)))) + OFF_PERIOD=$((MIN_OFF + (RANDOM % (MAX_OFF - MIN_OFF + 1)))) + + echo "fault-orchestrator: faults OFF for ${OFF_PERIOD}s" + "${ANTITHESIS_STOP_FAULTS}" "${OFF_PERIOD}" + sleep "${OFF_PERIOD}" + + echo "fault-orchestrator: faults ON for ${ON_PERIOD}s" + sleep "${ON_PERIOD}" +done diff --git a/test/antithesis/mzcompose.py b/test/antithesis/mzcompose.py new file mode 100644 index 0000000000000..8167aef466a89 --- /dev/null +++ b/test/antithesis/mzcompose.py @@ -0,0 +1,384 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +""" +Antithesis test composition for Materialize. + +Topology exercised under Antithesis: + - postgres-metadata : consensus/catalog/timestamp-oracle store + - minio : S3-compatible blob storage for persist + - zookeeper + kafka : Kafka broker for source ingestion + - schema-registry : Avro/Protobuf schemas for kafka sources + - clusterd1, clusterd2 : two external compute+storage processes — each + backs one replica of `antithesis_cluster`, so + Antithesis killing either container exercises the + compute/storage-replica recovery and rebalancing + paths without taking the cluster offline. + - clusterd-pool-{0..N-1} : a configurable pool of external clusterd + containers that the parallel-workload driver + claims one-per-cluster to give each + parallel-workload cluster its own container. + Without this pool, parallel-workload clusters + would all share materialized's process orchestrator + and Antithesis could only fault the entire + container as a unit. Pool size is controlled by + the `ANTITHESIS_CLUSTERD_POOL_SIZE` env var (read + from the harness; defaults to 8). + - materialized : the SUT (environmentd; clusterd is external) + - workload : Python test driver wired to the Antithesis SDK + - fault-orchestrator : single bash container alternating quiet and + faulting windows globally via + `ANTITHESIS_STOP_FAULTS`. Centralising the + cadence avoids the failure mode where every + driver requests its own quiet window and the + union of overlapping requests keeps the system + in a quiet state most of the time. + +Usage: + bin/mzcompose --find antithesis run default # bring up the cluster + bin/pyactivate test/antithesis/export-compose.py > config/... # dump compose YAML +""" + +import json +import os +from pathlib import Path + +from materialize.mzcompose import cluster_replica_size_map +from materialize.mzcompose.composition import Composition +from materialize.mzcompose.service import Service, ServiceConfig +from materialize.mzcompose.services.clusterd import Clusterd +from materialize.mzcompose.services.kafka import Kafka +from materialize.mzcompose.services.materialized import Materialized +from materialize.mzcompose.services.minio import Minio +from materialize.mzcompose.services.mysql import MySql, create_mysql_server_args +from materialize.mzcompose.services.postgres import Postgres, PostgresMetadata +from materialize.mzcompose.services.schema_registry import SchemaRegistry +from materialize.mzcompose.services.zookeeper import Zookeeper + +# Number of pool clusterd containers reserved for parallel-workload clusters +# (one container per cluster, giving each its own container-level fault +# domain). Read from the env so CI/local runs can tune it without editing +# this file. Default 2 — the no-lock allocator (rng-picked slot per +# invocation) tolerates oversubscription, and a smaller pool keeps the +# topology closer to production replica counts. +CLUSTERD_POOL_SIZE = int(os.environ.get("ANTITHESIS_CLUSTERD_POOL_SIZE", "2")) + +# Timely worker threads per clusterd process. Reverted from 16 back to 4 +# on suspicion that Antithesis's deterministic hypervisor runs the whole +# fleet on a single core — 16 work-stealing Timely workers per process +# on one core would burn most of their wakeups on context-switch +# overhead and starve dependent steps, which would manifest as +# workloads never finishing. +# +# This value must stay in lockstep with the `WORKERS N` clause in every +# CREATE CLUSTER REPLICAS statement that targets these containers +# (workload-entrypoint.sh reads it from the CLUSTERD_WORKERS env var +# the Workload service passes through; the parallel-workload Python +# driver consumes the same env via the framework's pool-cluster +# wrapper). +CLUSTERD_WORKERS = 4 + + +class FaultOrchestrator(Service): + """Single bash container that drives Antithesis fault windows globally. + + Invokes `${ANTITHESIS_STOP_FAULTS} ` to open quiet windows, + then sleeps through faults-ON windows, on a randomised cadence + (MIN_ON..MAX_ON / MIN_OFF..MAX_OFF). The script is bundled in + `test/antithesis/fault-orchestrator/pause_faults.sh` and inlined into + the compose `command:` here so we don't need a new mzbuild image + just to ship 30 lines of bash. + + The Antithesis engagement team flagged per-driver quiet-period + requests as an anti-pattern: with many concurrent drivers each + asking for a quiet window, the union of overlapping windows leaves + the SUT mostly un-faulted. Centralising the cadence here means + faults arrive in one coordinated rhythm; drivers stay robust to + quiet/faulting transitions by relying on `wait_for_catchup` with + generous timeouts. + + Outside Antithesis `ANTITHESIS_STOP_FAULTS` is unset and the script + exits immediately, so this service is a no-op for local validate. + """ + + def __init__(self) -> None: + script_path = Path(__file__).parent / "fault-orchestrator" / "pause_faults.sh" + # Compose interpolates `${VAR}` in every string value at parse + # time, which would eat the script's shell variable references + # (`${RANDOM}`, `${MIN_ON}`, `${ANTITHESIS_STOP_FAULTS}`, etc.) + # before bash ever sees them. Double the `$` to pass through a + # literal `$` and let bash do its own expansion at runtime. The + # underlying .sh file stays normal so shellcheck and direct + # execution work. + script = script_path.read_text().replace("$", "$$") + config: ServiceConfig = { + # bash:5 is alpine-based and ships `bash`, `od`, `tr`, and + # `sleep` via busybox — everything the script uses. Public + # image, so it sails through export-compose.py untouched. + "image": "bash:5", + # `bash -c