Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs.json
Original file line number Diff line number Diff line change
Expand Up @@ -3278,6 +3278,7 @@
"group": "Data ingestion",
"pages": [
"integrations/connectors/data-ingestion/etl-tools/airbyte-and-clickhouse",
"integrations/connectors/data-ingestion/etl-tools/airflow-and-clickhouse",
"integrations/connectors/data-ingestion/etl-tools/apify-and-clickhouse",
{
"expanded": false,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
---
sidebarTitle: 'Apache Airflow'
slug: /integrations/airflow
description: 'Orchestrate ClickHouse queries and data loads from Apache Airflow using the ClickHouse provider'
title: 'Connect Apache Airflow to ClickHouse'
doc_type: 'guide'
integration:
- support_level: 'core'
- category: 'data_ingestion'
- website: 'https://airflow.apache.org/'
keywords: ['Apache Airflow', 'orchestration', 'DAG', 'clickhouse-connect', 'SQLExecuteQueryOperator', 'ClickHouseHook', 'workflow', 'scheduler']
---

import ClickHouseSupportedBadge from "/snippets/components/ClickHouseSupported/ClickHouseSupported.jsx";

<ClickHouseSupportedBadge/>

[Apache Airflow](https://airflow.apache.org/) is an open-source platform for authoring, scheduling, and monitoring workflows as code. Workflows are defined as directed acyclic graphs (DAGs) of tasks written in Python.

The `apache-airflow-providers-clickhousedb` provider connects Airflow to ClickHouse, letting you run queries, create tables, and load data as part of a DAG. It connects over the [HTTP interface](/interfaces/http) using the [`clickhouse-connect`](/integrations/python) client, and exposes ClickHouse through Airflow's common SQL framework, so the standard `SQLExecuteQueryOperator` handles DDL, DML, and analytical queries with no ClickHouse-specific operator required.

## Install the provider {#install-the-provider}

Install the provider into the environment where your Airflow scheduler and workers run:

```bash
pip install apache-airflow-providers-clickhousedb
```

The provider depends on `apache-airflow-providers-common-sql` and `clickhouse-connect`, which are installed alongside it. To pass query results to pandas or polars DataFrames, install the optional extras:

```bash
pip install 'apache-airflow-providers-common-sql[pandas,polars]'
```

## Create a ClickHouse connection {#create-a-clickhouse-connection}

The provider registers a connection type of `clickhouse`. Create a connection from the Airflow UI under **Admin > Connections**, or define one through the CLI or an environment variable.

In the UI, select **ClickHouse** as the connection type and fill in the fields:

| Field | Description | Default |
|--------------------------|--------------------------------------------------------------------------|-------------------------------|
| **Host** | ClickHouse server hostname, for example `abc123.clickhouse.cloud` | `localhost` |
| **Port** | HTTP(S) port | `8123` (plain), `8443` (TLS) |
| **Login** | ClickHouse username | `default` |
| **Password** | ClickHouse user password | (empty) |
| **Database** | Default database for the connection. The UI labels this **Database**; it is the `schema` field when you define the connection by URI or JSON. | `default` |

For [ClickHouse Cloud](/cloud/overview) or any self-hosted cluster with TLS enabled, set `secure` to `true` in the **Extra** field and use the TLS port (`8443`).

### Extra connection options {#extra-connection-options}

The provider exposes additional options as dedicated fields in the connection form. When you define the connection by URI, JSON, or environment variable instead, supply them as keys in the `extra` JSON object. All are optional:

| `extra` key | UI field | Default | Description |
|------------------------|------------------------------|---------|----------------------------------------------------------------------------------------------|
| `secure` | Use TLS (HTTPS) | `false` | Enable HTTPS/TLS. |
| `verify` | Verify SSL Certificate | `true` | Verify the server TLS certificate when `secure` is `true`. Set to `false` for self-signed certificates. |
| `connect_timeout` | Connection Timeout (seconds) | `10` | HTTP connection timeout in seconds. |
| `send_receive_timeout` | Query Timeout (seconds) | `300` | Query read/write timeout in seconds. Increase this for long-running analytical queries. |
| `compress` | Enable LZ4 Compression | `true` | Enable LZ4 result compression. |
| `client_name` | Client Name | (empty) | A label appended to the Airflow version identifier in the ClickHouse `User-Agent` and the `client_name` column of [`system.query_log`](/operations/system-tables/query_log). |
| `session_settings` | Session Settings (JSON) | (empty) | [ClickHouse session settings](/operations/settings/settings) applied to every query on the connection, for example `{"max_execution_time": 300, "max_threads": 8}`. |
| `client_kwargs` | Client kwargs (JSON) | (empty) | Additional keyword arguments forwarded to `clickhouse_connect.get_client()`, for example an `http_proxy`. |

### Define a connection without the UI {#define-a-connection-without-the-ui}

Set the connection through an environment variable. The URI form covers host, credentials, and database:

```bash
export AIRFLOW_CONN_CLICKHOUSE_DEFAULT='clickhouse://default:password@localhost:8123/my_database'
```

All components of the URI must be URL-encoded. For TLS, timeouts, or session settings, use the JSON form, which exposes the **Extra** fields:

```bash
export AIRFLOW_CONN_CLICKHOUSE_DEFAULT='{
"conn_type": "clickhouse",
"host": "abc123.clickhouse.cloud",
"port": 8443,
"login": "default",
"password": "secret",
"schema": "my_database",
"extra": {
"secure": true,
"session_settings": {
"max_execution_time": 300,
"max_memory_usage": 10000000000
}
}
}'
```

All hooks and operators use the connection ID `clickhouse_default` unless you specify another.

## Run queries with SQLExecuteQueryOperator {#run-queries}

Set the operator's `conn_id` to your ClickHouse connection. The following DAG creates a table, inserts rows, reads them back, and drops the table:

```python
from datetime import datetime

from airflow import DAG
from airflow.providers.common.sql.hooks.sql import fetch_all_handler
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

CLICKHOUSE_CONN_ID = "clickhouse_default"
CLICKHOUSE_TABLE = "airflow_example"

with DAG(
dag_id="example_clickhouse",
start_date=datetime(2021, 1, 1),
default_args={"conn_id": CLICKHOUSE_CONN_ID},
schedule="@once",
catchup=False,
) as dag:
create_table = SQLExecuteQueryOperator(
task_id="create_table",
sql=f"""
CREATE TABLE IF NOT EXISTS {CLICKHOUSE_TABLE} (
id UInt32,
name String,
ts DateTime DEFAULT now()
) ENGINE = MergeTree()
ORDER BY id
""",
)

insert_rows = SQLExecuteQueryOperator(
task_id="insert_rows",
sql=f"""
INSERT INTO {CLICKHOUSE_TABLE} (id, name) VALUES
(1, 'Alice'),
(2, 'Bob'),
(3, 'Charlie')
""",
)

read_rows = SQLExecuteQueryOperator(
task_id="read_rows",
sql=f"SELECT id, name FROM {CLICKHOUSE_TABLE} ORDER BY id",
handler=fetch_all_handler,
)

drop_table = SQLExecuteQueryOperator(
task_id="drop_table",
sql=f"DROP TABLE IF EXISTS {CLICKHOUSE_TABLE}",
)

create_table >> insert_rows >> read_rows >> drop_table
```

Query results are fetched with the default `handler` (`fetch_all_handler`). To return something other than the full result set, pass a different handler, such as `fetch_one_handler` for the first row only.

### Target a different database per task {#target-a-different-database}

When one connection points at a cluster and individual tasks query different databases, override the database through `hook_params` instead of creating a separate connection:

```python
read_rows = SQLExecuteQueryOperator(
task_id="read_rows",
conn_id=CLICKHOUSE_CONN_ID,
sql="SELECT count() FROM events",
hook_params={"database": "analytics"},
)
```

## Use the hook directly {#use-the-hook-directly}

For work that doesn't fit a SQL operator — bulk inserts, streaming, or ClickHouse-specific client calls — use `ClickHouseHook` inside a Python task.

The hook's `bulk_insert_rows` method uses the native columnar insert path in `clickhouse-connect`, which is much faster than row-by-row inserts for large datasets. Set `batch_size` to bound peak memory on very large inputs:

```python
from airflow.providers.clickhousedb.hooks.clickhouse import ClickHouseHook

hook = ClickHouseHook(clickhouse_conn_id="clickhouse_default")

hook.bulk_insert_rows(
table="events",
rows=[("user1", "click"), ("user2", "view")],
column_names=["user_id", "action"],
batch_size=1000,
)
```

Call `get_client()` to reach the underlying `clickhouse-connect` client for anything the hook doesn't expose directly:

```python
client = hook.get_client()
total = client.query("SELECT count() FROM events").result_rows[0][0]
```

### Apply session settings {#apply-session-settings}

Pass [session settings](/operations/settings/settings) when constructing the hook, either directly or through an operator's `hook_params`. Settings passed to the constructor are merged on top of any `session_settings` defined in the connection's **Extra** field, and the constructor values win on conflicting keys:

```python
hook = ClickHouseHook(
clickhouse_conn_id="clickhouse_default",
session_settings={"max_execution_time": 60, "max_threads": 4},
)
```

## Related content {#related-content}

- [`clickhouse-connect` Python client](/integrations/python)
- [ClickHouse HTTP interface](/interfaces/http)
- [ClickHouse session settings reference](/operations/settings/settings)
- [`apache-airflow-providers-clickhousedb` reference docs](https://airflow.apache.org/docs/apache-airflow-providers-clickhousedb/)
- [Provider package on PyPI](https://pypi.org/project/apache-airflow-providers-clickhousedb/)
3 changes: 2 additions & 1 deletion integrations/connectors/data-ingestion/index.mdx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
slug: /integrations/data-ingestion-overview
keywords: [ 'Airbyte', 'Apache Spark', 'Spark', 'Azure Synapse', 'Amazon Glue', 'Apache Beam', 'dbt', 'Fivetran', 'NiFi', 'dlt', 'Vector' ]
keywords: [ 'Airbyte', 'Apache Airflow', 'Airflow', 'Apache Spark', 'Spark', 'Azure Synapse', 'Amazon Glue', 'Apache Beam', 'dbt', 'Fivetran', 'NiFi', 'dlt', 'Vector' ]
title: 'Data ingestion'
description: 'Landing page for the data ingestion section'
doc_type: 'landing-page'
Expand All @@ -12,6 +12,7 @@ For more information check out the pages below:
| Data Ingestion Tool | Description |
|------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| [Airbyte](/integrations/connectors/data-ingestion/etl-tools/airbyte-and-clickhouse) | An open-source data integration platform. It allows the creation of ELT data pipelines and is shipped with more than 140 out-of-the-box connectors. |
| [Apache Airflow](/integrations/airflow) | An open-source platform for authoring, scheduling, and monitoring workflows as code, with a provider for running ClickHouse queries and data loads from DAGs. |
| [Apache Spark](/integrations/connectors/data-ingestion/apache-spark/index) | A multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters |
| [Apache Flink](/integrations/connectors/data-ingestion/apache-flink) | Real-time data ingestion and processing into ClickHouse through Flink's DataStream API with support for batch writes |
| [Amazon Glue](/integrations/connectors/data-ingestion/AWS/glue) | A fully managed, serverless data integration service provided by Amazon Web Services (AWS) simplifying the process of discovering, preparing, and transforming data for analytics, machine learning, and application development. |
Expand Down