Skip to content

Configurations

Catalog Configurations

Single Instance

Suppose you have one ClickHouse instance which installed on 10.0.0.1 and exposes HTTP endpoint on 8123.

Edit $SPARK_HOME/conf/spark-defaults.conf.

####################################################################################
## register a catalog named "clickhouse"
####################################################################################
spark.sql.catalog.clickhouse                      xenon.clickhouse.ClickHouseCatalog

####################################################################################
## basic configurations for "clickhouse" catalog
####################################################################################
spark.sql.catalog.clickhouse.host                 10.0.0.1
spark.sql.catalog.clickhouse.protocol             http
spark.sql.catalog.clickhouse.http_port            8123
spark.sql.catalog.clickhouse.user                 default
spark.sql.catalog.clickhouse.password
spark.sql.catalog.clickhouse.database             default

##############################################################################################
## custom options of clickhouse-client for "clickhouse" catalog
##############################################################################################
spark.sql.catalog.clickhouse.option.ssl                 false
spark.sql.catalog.clickhouse.option.async               false
spark.sql.catalog.clickhouse.option.client_name         spark
spark.sql.catalog.clickhouse.option.custom_http_params  async_insert=1,wait_for_async_insert=1

Then you can access ClickHouse table <ck_db>.<ck_table> from Spark SQL by using clickhouse.<ck_db>.<ck_table>.

Cluster

For ClickHouse cluster, give an unique catalog name for each instances.

Suppose you have two ClickHouse instances, one installed on 10.0.0.1 and exposes HTTPS endpoint on port 8443 named clickhouse1, and another installed on 10.0.0.2 and exposes HTTPS endpoint on port 8443 named clickhouse2.

Edit $SPARK_HOME/conf/spark-defaults.conf.

spark.sql.catalog.clickhouse1                xenon.clickhouse.ClickHouseCatalog
spark.sql.catalog.clickhouse1.host           10.0.0.1
spark.sql.catalog.clickhouse1.protocol       https
spark.sql.catalog.clickhouse1.http_port      8443
spark.sql.catalog.clickhouse1.user           default
spark.sql.catalog.clickhouse1.password
spark.sql.catalog.clickhouse1.database       default
spark.sql.catalog.clickhouse1.option.ssl     true

spark.sql.catalog.clickhouse2                xenon.clickhouse.ClickHouseCatalog
spark.sql.catalog.clickhouse2.host           10.0.0.2
spark.sql.catalog.clickhouse2.protocol       https
spark.sql.catalog.clickhouse2.http_port      8443
spark.sql.catalog.clickhouse2.user           default
spark.sql.catalog.clickhouse2.password
spark.sql.catalog.clickhouse2.database       default
spark.sql.catalog.clickhouse2.option.ssl     true

Then you can access clickhouse1 table <ck_db>.<ck_table> from Spark SQL by clickhouse1.<ck_db>.<ck_table>, and access clickhouse2 table <ck_db>.<ck_table> by clickhouse2.<ck_db>.<ck_table>.

SQL Configurations

SQL Configurations could be overwritten by SET <key>=<value> in runtime.

Key Default Description Since
spark.clickhouse.ignoreUnsupportedTransform false ClickHouse supports using complex expressions as sharding keys or partition values, e.g. cityHash64(col_1, col_2), and those can not be supported by Spark now. If true, ignore the unsupported expressions, otherwise fail fast w/ an exception. Note, when spark.clickhouse.write.distributed.convertLocal is enabled, ignore unsupported sharding keys may corrupt the data. 0.4.0
spark.clickhouse.read.compression.codec lz4 The codec used to decompress data for reading. Supported codecs: none, lz4. 0.5.0
spark.clickhouse.read.distributed.convertLocal true When reading Distributed table, read local table instead of itself. If true, ignore spark.clickhouse.read.distributed.useClusterNodes. 0.1.0
spark.clickhouse.read.format json Serialize format for reading. Supported formats: json, binary 0.6.0
spark.clickhouse.read.runtimeFilter.enabled false Enable runtime filter for reading. 0.8.0
spark.clickhouse.read.splitByPartitionId true If true, construct input partition filter by virtual column _partition_id, instead of partition value. There are known bugs to assemble SQL predication by partition value. This feature requires ClickHouse Server v21.6+ 0.4.0
spark.clickhouse.useNullableQuerySchema false If true, mark all the fields of the query schema as nullable when executing CREATE/REPLACE TABLE ... AS SELECT ... on creating the table. Note, this configuration requires SPARK-43390(available in Spark 3.5), w/o this patch, it always acts as true. 0.8.0
spark.clickhouse.write.batchSize 10000 The number of records per batch on writing to ClickHouse. 0.1.0
spark.clickhouse.write.compression.codec lz4 The codec used to compress data for writing. Supported codecs: none, lz4. 0.3.0
spark.clickhouse.write.distributed.convertLocal false When writing Distributed table, write local table instead of itself. If true, ignore spark.clickhouse.write.distributed.useClusterNodes. 0.1.0
spark.clickhouse.write.distributed.useClusterNodes true Write to all nodes of cluster when writing Distributed table. 0.1.0
spark.clickhouse.write.format arrow Serialize format for writing. Supported formats: json, arrow 0.4.0
spark.clickhouse.write.localSortByKey true If true, do local sort by sort keys before writing. 0.3.0
spark.clickhouse.write.localSortByPartition If true, do local sort by partition before writing. If not set, it equals to spark.clickhouse.write.repartitionByPartition. 0.3.0
spark.clickhouse.write.maxRetry 3 The maximum number of write we will retry for a single batch write failed with retryable codes. 0.1.0
spark.clickhouse.write.repartitionByPartition true Whether to repartition data by ClickHouse partition keys to meet the distributions of ClickHouse table before writing. 0.3.0
spark.clickhouse.write.repartitionNum 0 Repartition data to meet the distributions of ClickHouse table is required before writing, use this conf to specific the repartition number, value less than 1 mean no requirement. 0.1.0
spark.clickhouse.write.repartitionStrictly false If true, Spark will strictly distribute incoming records across partitions to satisfy the required distribution before passing the records to the data source table on write. Otherwise, Spark may apply certain optimizations to speed up the query but break the distribution requirement. Note, this configuration requires SPARK-37523(available in Spark 3.4), w/o this patch, it always acts as true. 0.3.0
spark.clickhouse.write.retryInterval 10s The interval in seconds between write retry. 0.1.0
spark.clickhouse.write.retryableErrorCodes 241 The retryable error codes returned by ClickHouse server when write failing. 0.1.0