diff --git a/docs.json b/docs.json index 77070ab78..5fb2e002d 100644 --- a/docs.json +++ b/docs.json @@ -3278,6 +3278,7 @@ "group": "Data ingestion", "pages": [ "integrations/connectors/data-ingestion/etl-tools/airbyte-and-clickhouse", + "integrations/connectors/data-ingestion/etl-tools/airflow-and-clickhouse", "integrations/connectors/data-ingestion/etl-tools/apify-and-clickhouse", { "expanded": false, diff --git a/integrations/connectors/data-ingestion/etl-tools/airflow-and-clickhouse.mdx b/integrations/connectors/data-ingestion/etl-tools/airflow-and-clickhouse.mdx new file mode 100644 index 000000000..edc42eba5 --- /dev/null +++ b/integrations/connectors/data-ingestion/etl-tools/airflow-and-clickhouse.mdx @@ -0,0 +1,212 @@ +--- +sidebarTitle: 'Apache Airflow' +slug: /integrations/airflow +description: 'Orchestrate ClickHouse queries and data loads from Apache Airflow using the ClickHouse provider' +title: 'Connect Apache Airflow to ClickHouse' +doc_type: 'guide' +integration: + - support_level: 'core' + - category: 'data_ingestion' + - website: 'https://airflow.apache.org/' +keywords: ['Apache Airflow', 'orchestration', 'DAG', 'clickhouse-connect', 'SQLExecuteQueryOperator', 'ClickHouseHook', 'workflow', 'scheduler'] +--- + +import ClickHouseSupportedBadge from "/snippets/components/ClickHouseSupported/ClickHouseSupported.jsx"; + + + +[Apache Airflow](https://airflow.apache.org/) is an open-source platform for authoring, scheduling, and monitoring workflows as code. Workflows are defined as directed acyclic graphs (DAGs) of tasks written in Python. + +The `apache-airflow-providers-clickhousedb` provider connects Airflow to ClickHouse, letting you run queries, create tables, and load data as part of a DAG. It connects over the [HTTP interface](/interfaces/http) using the [`clickhouse-connect`](/integrations/python) client, and exposes ClickHouse through Airflow's common SQL framework, so the standard `SQLExecuteQueryOperator` handles DDL, DML, and analytical queries with no ClickHouse-specific operator required. + +## Install the provider {#install-the-provider} + +Install the provider into the environment where your Airflow scheduler and workers run: + +```bash +pip install apache-airflow-providers-clickhousedb +``` + +The provider depends on `apache-airflow-providers-common-sql` and `clickhouse-connect`, which are installed alongside it. To pass query results to pandas or polars DataFrames, install the optional extras: + +```bash +pip install 'apache-airflow-providers-common-sql[pandas,polars]' +``` + +## Create a ClickHouse connection {#create-a-clickhouse-connection} + +The provider registers a connection type of `clickhouse`. Create a connection from the Airflow UI under **Admin > Connections**, or define one through the CLI or an environment variable. + +In the UI, select **ClickHouse** as the connection type and fill in the fields: + +| Field | Description | Default | +|--------------------------|--------------------------------------------------------------------------|-------------------------------| +| **Host** | ClickHouse server hostname, for example `abc123.clickhouse.cloud` | `localhost` | +| **Port** | HTTP(S) port | `8123` (plain), `8443` (TLS) | +| **Login** | ClickHouse username | `default` | +| **Password** | ClickHouse user password | (empty) | +| **Database** | Default database for the connection. The UI labels this **Database**; it is the `schema` field when you define the connection by URI or JSON. | `default` | + +For [ClickHouse Cloud](/cloud/overview) or any self-hosted cluster with TLS enabled, set `secure` to `true` in the **Extra** field and use the TLS port (`8443`). + +### Extra connection options {#extra-connection-options} + +The provider exposes additional options as dedicated fields in the connection form. When you define the connection by URI, JSON, or environment variable instead, supply them as keys in the `extra` JSON object. All are optional: + +| `extra` key | UI field | Default | Description | +|------------------------|------------------------------|---------|----------------------------------------------------------------------------------------------| +| `secure` | Use TLS (HTTPS) | `false` | Enable HTTPS/TLS. | +| `verify` | Verify SSL Certificate | `true` | Verify the server TLS certificate when `secure` is `true`. Set to `false` for self-signed certificates. | +| `connect_timeout` | Connection Timeout (seconds) | `10` | HTTP connection timeout in seconds. | +| `send_receive_timeout` | Query Timeout (seconds) | `300` | Query read/write timeout in seconds. Increase this for long-running analytical queries. | +| `compress` | Enable LZ4 Compression | `true` | Enable LZ4 result compression. | +| `client_name` | Client Name | (empty) | A label appended to the Airflow version identifier in the ClickHouse `User-Agent` and the `client_name` column of [`system.query_log`](/operations/system-tables/query_log). | +| `session_settings` | Session Settings (JSON) | (empty) | [ClickHouse session settings](/operations/settings/settings) applied to every query on the connection, for example `{"max_execution_time": 300, "max_threads": 8}`. | +| `client_kwargs` | Client kwargs (JSON) | (empty) | Additional keyword arguments forwarded to `clickhouse_connect.get_client()`, for example an `http_proxy`. | + +### Define a connection without the UI {#define-a-connection-without-the-ui} + +Set the connection through an environment variable. The URI form covers host, credentials, and database: + +```bash +export AIRFLOW_CONN_CLICKHOUSE_DEFAULT='clickhouse://default:password@localhost:8123/my_database' +``` + +All components of the URI must be URL-encoded. For TLS, timeouts, or session settings, use the JSON form, which exposes the **Extra** fields: + +```bash +export AIRFLOW_CONN_CLICKHOUSE_DEFAULT='{ + "conn_type": "clickhouse", + "host": "abc123.clickhouse.cloud", + "port": 8443, + "login": "default", + "password": "secret", + "schema": "my_database", + "extra": { + "secure": true, + "session_settings": { + "max_execution_time": 300, + "max_memory_usage": 10000000000 + } + } +}' +``` + +All hooks and operators use the connection ID `clickhouse_default` unless you specify another. + +## Run queries with SQLExecuteQueryOperator {#run-queries} + +Set the operator's `conn_id` to your ClickHouse connection. The following DAG creates a table, inserts rows, reads them back, and drops the table: + +```python +from datetime import datetime + +from airflow import DAG +from airflow.providers.common.sql.hooks.sql import fetch_all_handler +from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator + +CLICKHOUSE_CONN_ID = "clickhouse_default" +CLICKHOUSE_TABLE = "airflow_example" + +with DAG( + dag_id="example_clickhouse", + start_date=datetime(2021, 1, 1), + default_args={"conn_id": CLICKHOUSE_CONN_ID}, + schedule="@once", + catchup=False, +) as dag: + create_table = SQLExecuteQueryOperator( + task_id="create_table", + sql=f""" + CREATE TABLE IF NOT EXISTS {CLICKHOUSE_TABLE} ( + id UInt32, + name String, + ts DateTime DEFAULT now() + ) ENGINE = MergeTree() + ORDER BY id + """, + ) + + insert_rows = SQLExecuteQueryOperator( + task_id="insert_rows", + sql=f""" + INSERT INTO {CLICKHOUSE_TABLE} (id, name) VALUES + (1, 'Alice'), + (2, 'Bob'), + (3, 'Charlie') + """, + ) + + read_rows = SQLExecuteQueryOperator( + task_id="read_rows", + sql=f"SELECT id, name FROM {CLICKHOUSE_TABLE} ORDER BY id", + handler=fetch_all_handler, + ) + + drop_table = SQLExecuteQueryOperator( + task_id="drop_table", + sql=f"DROP TABLE IF EXISTS {CLICKHOUSE_TABLE}", + ) + + create_table >> insert_rows >> read_rows >> drop_table +``` + +Query results are fetched with the default `handler` (`fetch_all_handler`). To return something other than the full result set, pass a different handler, such as `fetch_one_handler` for the first row only. + +### Target a different database per task {#target-a-different-database} + +When one connection points at a cluster and individual tasks query different databases, override the database through `hook_params` instead of creating a separate connection: + +```python +read_rows = SQLExecuteQueryOperator( + task_id="read_rows", + conn_id=CLICKHOUSE_CONN_ID, + sql="SELECT count() FROM events", + hook_params={"database": "analytics"}, +) +``` + +## Use the hook directly {#use-the-hook-directly} + +For work that doesn't fit a SQL operator — bulk inserts, streaming, or ClickHouse-specific client calls — use `ClickHouseHook` inside a Python task. + +The hook's `bulk_insert_rows` method uses the native columnar insert path in `clickhouse-connect`, which is much faster than row-by-row inserts for large datasets. Set `batch_size` to bound peak memory on very large inputs: + +```python +from airflow.providers.clickhousedb.hooks.clickhouse import ClickHouseHook + +hook = ClickHouseHook(clickhouse_conn_id="clickhouse_default") + +hook.bulk_insert_rows( + table="events", + rows=[("user1", "click"), ("user2", "view")], + column_names=["user_id", "action"], + batch_size=1000, +) +``` + +Call `get_client()` to reach the underlying `clickhouse-connect` client for anything the hook doesn't expose directly: + +```python +client = hook.get_client() +total = client.query("SELECT count() FROM events").result_rows[0][0] +``` + +### Apply session settings {#apply-session-settings} + +Pass [session settings](/operations/settings/settings) when constructing the hook, either directly or through an operator's `hook_params`. Settings passed to the constructor are merged on top of any `session_settings` defined in the connection's **Extra** field, and the constructor values win on conflicting keys: + +```python +hook = ClickHouseHook( + clickhouse_conn_id="clickhouse_default", + session_settings={"max_execution_time": 60, "max_threads": 4}, +) +``` + +## Related content {#related-content} + +- [`clickhouse-connect` Python client](/integrations/python) +- [ClickHouse HTTP interface](/interfaces/http) +- [ClickHouse session settings reference](/operations/settings/settings) +- [`apache-airflow-providers-clickhousedb` reference docs](https://airflow.apache.org/docs/apache-airflow-providers-clickhousedb/) +- [Provider package on PyPI](https://pypi.org/project/apache-airflow-providers-clickhousedb/) diff --git a/integrations/connectors/data-ingestion/index.mdx b/integrations/connectors/data-ingestion/index.mdx index 9b8a9a565..ef5fda073 100644 --- a/integrations/connectors/data-ingestion/index.mdx +++ b/integrations/connectors/data-ingestion/index.mdx @@ -1,6 +1,6 @@ --- slug: /integrations/data-ingestion-overview -keywords: [ 'Airbyte', 'Apache Spark', 'Spark', 'Azure Synapse', 'Amazon Glue', 'Apache Beam', 'dbt', 'Fivetran', 'NiFi', 'dlt', 'Vector' ] +keywords: [ 'Airbyte', 'Apache Airflow', 'Airflow', 'Apache Spark', 'Spark', 'Azure Synapse', 'Amazon Glue', 'Apache Beam', 'dbt', 'Fivetran', 'NiFi', 'dlt', 'Vector' ] title: 'Data ingestion' description: 'Landing page for the data ingestion section' doc_type: 'landing-page' @@ -12,6 +12,7 @@ For more information check out the pages below: | Data Ingestion Tool | Description | |------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | [Airbyte](/integrations/connectors/data-ingestion/etl-tools/airbyte-and-clickhouse) | An open-source data integration platform. It allows the creation of ELT data pipelines and is shipped with more than 140 out-of-the-box connectors. | +| [Apache Airflow](/integrations/airflow) | An open-source platform for authoring, scheduling, and monitoring workflows as code, with a provider for running ClickHouse queries and data loads from DAGs. | | [Apache Spark](/integrations/connectors/data-ingestion/apache-spark/index) | A multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters | | [Apache Flink](/integrations/connectors/data-ingestion/apache-flink) | Real-time data ingestion and processing into ClickHouse through Flink's DataStream API with support for batch writes | | [Amazon Glue](/integrations/connectors/data-ingestion/AWS/glue) | A fully managed, serverless data integration service provided by Amazon Web Services (AWS) simplifying the process of discovering, preparing, and transforming data for analytics, machine learning, and application development. |