Provides ClickHouseOperator
, ClickHouseHook
and ClickHouseSqlSensor
for
Apache Airflow based on mymarilyn/clickhouse-driver.
Top-1% downloads on PyPI.
- SQL queries are templated.
- Can run multiple SQL queries per single
ClickHouseOperator
. - Result of the last query of
ClickHouseOperator
instance is pushed to XCom. - Executed queries are logged in a pretty form.
- Uses efficient native ClickHouse TCP protocol thanks to clickhouse-driver. Does not support HTTP protocol.
- Supports extra ClickHouse connection parameters such
as various timeouts,
compression
,secure
, etc through Airflow Connection.extra property.
pip install -U airflow-clickhouse-plugin
Dependencies: apache-airflow
with apache-airflow-providers-common-sql
(usually pre-packed with Airflow) and clickhouse-driver
.
Different versions of the plugin support different combinations of Python and Airflow versions. We primarily support Airflow 2.0+ and Python 3.7+. If you need to use the plugin with older Python-Airflow combinations, pick a suitable plugin version:
airflow-clickhouse-plugin version | Airflow version | Python version |
---|---|---|
0.9.0 | ~=2.0.0,>=2.2.0,<2.5.0 | ~=3.7 |
0.8.2 | >=2.0.0,<2.4.0 | ~=3.7 |
0.8.0,0.8.1 | >=2.0.0,<2.3.0 | ~=3.6 |
0.7.0 | >=2.0.0,<2.2.0 | ~=3.6 |
0.6.0 | ~=2.0.1 | ~=3.6 |
>=0.5.4,<0.6.0 | ~=1.10.6 | >=2.7 or >=3.5.* |
>=0.5.0,<0.5.4 | ==1.10.6 | >=2.7 or >=3.5.* |
~=
means compatible release, see PEP 440 for an
explanation.
Starting from Airflow 2.2 pandas
is now an extra requirement.
To install airflow-clickhouse-plugin
with pandas
support, use
pip install airflow-clickhouse-plugin[pandas]
.
Important: this works only with pip
21+. So to handle pandas
dependency
properly you may need to first upgrade pip
using pip install -U pip
.
If you are not able to upgrade pip
to 21+, install dependency directly using
pip install apache-airflow[pandas]==
(specifying current Airflow version).
Simple one-liner:
pip install "apache-airflow[pandas]==$(pip freeze | grep apache-airflow== | cut -d'=' -f3)"
.
To see examples scroll down. To run them, create an Airflow connection to ClickHouse.
To import ClickHouseOperator
use:
from airflow_clickhouse_plugin.operators.clickhouse_operator import ClickHouseOperator
Supported kwargs:
sql
: templated query (if argument is a singlestr
) or queries (if iterable ofstr
's).clickhouse_conn_id
: connection id. Connection schema is described below.parameters
: passed to clickhouse-driver execute method.- If multiple queries are provided via
sql
then the parameters are passed to all of them. - Parameters are not templated.
- If multiple queries are provided via
database
: if present, overrides database defined by connection.- Other kwargs (including the required
task_id
) are inherited from Airflow BaseOperator.
The result of the last query is pushed to XCom.
See example below.
To import ClickHouseHook
use:
from airflow_clickhouse_plugin.hooks.clickhouse_hook import ClickHouseHook
Supported kwargs of constructor (__init__
method):
clickhouse_conn_id
: connection id. Connection schema is described below.database
: if present, overrides database defined by connection.
Supports all the methods of the Airflow BaseHook including:
get_records(sql: str, parameters: dict=None)
: returns result of the query as a list of tuples. Materializes all the records in memory.get_first(sql: str, parameters: dict=None)
: returns the first row of the result. Does not load the whole dataset into memory because of using execute_iter. If the dataset is empty then returnsNone
following fetchone semantics.run(sql, parameters)
: runs a single query (specified argument of typestr
) or multiple queries (if iterable ofstr
).parameters
can have any form supported by execute method of clickhouse-driver.- If single query is run then returns its result. If multiple queries are run then returns the result of the last of them.
- If multiple queries are given then
parameters
are passed to all of them. - Materializes all the records in memory (uses simple
execute
but notexecute_iter
).- To achieve results streaming by
execute_iter
use it directly viahook.get_conn().execute_iter(…)
(see execute_iter reference).
- To achieve results streaming by
- Every
run
call uses a new connection which is closed when finished.
get_conn()
: returns the underlying clickhouse_driver.Client instance.
See example below.
Sensor fully inherits from Airflow SQLSensor and therefore
fully implements its interface using ClickHouseHook
to fetch the SQL
execution result and supports templating of sql
argument.
See example below.
As a type
of a new connection, choose SQLite. host
should be set to
ClickHouse host's IP or domain name.
There is no special ClickHouse connection type yet, so we use SQLite as the closest one.
The rest of the connection details may be skipped as they
have defaults defined by clickhouse-driver
. If
you use non-default values, set them according to the
connection schema.
If you use a secure connection to ClickHouse (this requires additional
configurations on ClickHouse side), set extra
to {"secure":true}
.
clickhouse_driver.Client is initialized with attributes stored in Airflow Connection attributes. The mapping of the attributes is listed below:
Airflow Connection attribute | Client.__init__ argument |
---|---|
host |
host |
port |
port |
schema |
database |
login |
user |
password |
password |
extra |
**kwargs |
database
argument of ClickHouseOperator
or ClickHouseHook
overrides
schema
attribute of the Airflow connection.
You may also pass additional arguments, such as
timeouts, compression
, secure
, etc through
Connection.extra attribute. The attribute should
contain a JSON object which will be deserialized and
all of its properties will be passed as-is to the Client
.
For example, if Airflow connection contains extra={"secure":true}
then
the Client.__init__
will receive secure=True
keyword argument in
addition to other non-empty connection attributes.
You should install several packages to support compression. For example, for lz4:
pip3 install clickhouse-cityhash lz4
Then you should include compression
parameter in airflow connection uri: extra={"compression":"lz4"}
. You can get
additional information about extra options from official documentation of clickhouse-driver
Connection URI should look like in the example below:
clickhouse://login:password@host:port/?compression=lz4
See official documentation to get more info about connections in Airflow.
If the Airflow connection attribute is not set then it is not passed to the
Client
at all. In that case the default value of the corresponding
clickhouse_driver.Connection argument is used (e.g.
user
defaults to 'default'
).
This means that Airflow ClickHouse Plugin does not itself define any default
values for the ClickHouse connection. You may fully rely on default values
of the clickhouse-driver version you use. The only exception is
host
: if the attribute of Airflow connection is not set then 'localhost'
is used.
By default, the plugin uses connection_id='clickhouse_default'
.
from airflow import DAG
from airflow_clickhouse_plugin.operators.clickhouse_operator import ClickHouseOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id='update_income_aggregate',
start_date=days_ago(2),
) as dag:
ClickHouseOperator(
task_id='update_income_aggregate',
database='default',
sql=(
'''
INSERT INTO aggregate
SELECT eventDt, sum(price * qty) AS income FROM sales
WHERE eventDt = '{{ ds }}' GROUP BY eventDt
''', '''
OPTIMIZE TABLE aggregate ON CLUSTER {{ var.value.cluster_name }}
PARTITION toDate('{{ execution_date.format('%Y-%m-01') }}')
''', '''
SELECT sum(income) FROM aggregate
WHERE eventDt BETWEEN
'{{ execution_date.start_of('month').to_date_string() }}'
AND '{{ execution_date.end_of('month').to_date_string() }}'
''',
# result of the last query is pushed to XCom
),
clickhouse_conn_id='clickhouse_test',
) >> PythonOperator(
task_id='print_month_income',
provide_context=True,
python_callable=lambda task_instance, **_:
# pulling XCom value and printing it
print(task_instance.xcom_pull(task_ids='update_income_aggregate')),
)
from airflow import DAG
from airflow_clickhouse_plugin.hooks.clickhouse_hook import ClickHouseHook
from airflow.hooks.mysql_hook import MySqlHook
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
def mysql_to_clickhouse():
mysql_hook = MySqlHook()
ch_hook = ClickHouseHook()
records = mysql_hook.get_records('SELECT * FROM some_mysql_table')
ch_hook.run('INSERT INTO some_ch_table VALUES', records)
with DAG(
dag_id='mysql_to_clickhouse',
start_date=days_ago(2),
) as dag:
dag >> PythonOperator(
task_id='mysql_to_clickhouse',
python_callable=mysql_to_clickhouse,
)
Important note: don't try to insert values using
ch_hook.run('INSERT INTO some_ch_table VALUES (1)')
literal form.
clickhouse-driver requires values for INSERT
query to
be provided via parameters
due to specifics of the native ClickHouse
protocol.
from airflow import DAG
from airflow_clickhouse_plugin.sensors.clickhouse_sql_sensor import ClickHouseSqlSensor
from airflow_clickhouse_plugin.operators.clickhouse_operator import ClickHouseOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id='listen_warnings',
start_date=days_ago(2),
) as dag:
dag >> ClickHouseSqlSensor(
task_id='poke_events_count',
database='monitor',
sql="SELECT count() FROM warnings WHERE eventDate = '{{ ds }}'",
success=lambda cnt: cnt > 10000,
) >> ClickHouseOperator(
task_id='create_alert',
database='alerts',
sql='''
INSERT INTO events SELECT eventDate, count()
FROM monitor.warnings WHERE eventDate = '{{ ds }}'
''',
)
From the root project directory
Using make
:
make unit
Using python
:
python -m unittest discover -s tests
Integration tests require access to ClickHouse server. Tests use connection
URI defined via environment variable
AIRFLOW_CONN_CLICKHOUSE_DEFAULT
with clickhouse://localhost
as default.
You can run ClickHouse server in a local Docker container using the following command:
Using make
:
make run-clickhouse
Using shell
:
docker run -p 9000:9000 --ulimit nofile=262144:262144 -it clickhouse/clickhouse-server
And then run from the project root:
Using make
:
make integration
Using python
:
python3 -m unittest discover -s tests/integration
From the root project directory:
Using make
:
make tests
Using python
:
python3 -m unittest discover -s tests
GitHub Action is set up for this project.
Run ClickHouse server inside Docker:
Using shell
:
docker exec -it $(docker run --rm -d clickhouse/clickhouse-server) bash
Using make
:
make run-clickhouse-dind
The above command will open bash
inside the container.
Install dependencies into container and run tests (execute inside container):
Using python
:
apt-get update
apt-get install -y python3.10 python3-pip git make
git clone https://github.com/whisklabs/airflow-clickhouse-plugin.git
cd airflow-clickhouse-plugin
python3.10 -m pip install -r requirements.txt
python3.10 -m unittest discover -s tests
Using make
:
apt-get update
apt-get install -y python3.10 python3-pip git make
git clone https://github.com/whisklabs/airflow-clickhouse-plugin.git
cd airflow-clickhouse-plugin
make tests
- Created by Anton Bryzgalov, @bryzgaloff, originally at Whisk, Samsung
- Inspired by Viktor Taranenko, @viktortnk (Whisk, Samsung)
Community contributors:
- Danila Ganchar, @d-ganchar
- Mikhail, @glader
- Alexander Chashnikov, @ne1r0n
- Simone Brundu, @saimon46
- @gkarg
- Stanislav Morozov, @r3b-fish
- Sergey Bychkov, @SergeyBychkov
- @was-av