Configurations
Catalog Configurations
Single Instance
Suppose you have one ClickHouse instance which installed on 10.0.0.1
and exposes HTTP endpoint on 8123
.
Edit $SPARK_HOME/conf/spark-defaults.conf
.
####################################################################################
## register a catalog named "clickhouse"
####################################################################################
spark.sql.catalog.clickhouse xenon.clickhouse.ClickHouseCatalog
####################################################################################
## basic configurations for "clickhouse" catalog
####################################################################################
spark.sql.catalog.clickhouse.host 10.0.0.1
spark.sql.catalog.clickhouse.protocol http
spark.sql.catalog.clickhouse.http_port 8123
spark.sql.catalog.clickhouse.user default
spark.sql.catalog.clickhouse.password
spark.sql.catalog.clickhouse.database default
##############################################################################################
## custom options of clickhouse-client for "clickhouse" catalog
##############################################################################################
spark.sql.catalog.clickhouse.option.ssl false
spark.sql.catalog.clickhouse.option.async false
spark.sql.catalog.clickhouse.option.client_name spark
spark.sql.catalog.clickhouse.option.custom_http_params async_insert=1,wait_for_async_insert=1
Then you can access ClickHouse table <ck_db>.<ck_table>
from Spark SQL by using clickhouse.<ck_db>.<ck_table>
.
Cluster
For ClickHouse cluster, give an unique catalog name for each instances.
Suppose you have two ClickHouse instances, one installed on 10.0.0.1
and exposes HTTPS endpoint on port 8443
named clickhouse1, and another installed on 10.0.0.2
and exposes HTTPS endpoint on port 8443
named clickhouse2.
Edit $SPARK_HOME/conf/spark-defaults.conf
.
spark.sql.catalog.clickhouse1 xenon.clickhouse.ClickHouseCatalog
spark.sql.catalog.clickhouse1.host 10.0.0.1
spark.sql.catalog.clickhouse1.protocol https
spark.sql.catalog.clickhouse1.http_port 8443
spark.sql.catalog.clickhouse1.user default
spark.sql.catalog.clickhouse1.password
spark.sql.catalog.clickhouse1.database default
spark.sql.catalog.clickhouse1.option.ssl true
spark.sql.catalog.clickhouse2 xenon.clickhouse.ClickHouseCatalog
spark.sql.catalog.clickhouse2.host 10.0.0.2
spark.sql.catalog.clickhouse2.protocol https
spark.sql.catalog.clickhouse2.http_port 8443
spark.sql.catalog.clickhouse2.user default
spark.sql.catalog.clickhouse2.password
spark.sql.catalog.clickhouse2.database default
spark.sql.catalog.clickhouse2.option.ssl true
Then you can access clickhouse1 table <ck_db>.<ck_table>
from Spark SQL by clickhouse1.<ck_db>.<ck_table>
,
and access clickhouse2 table <ck_db>.<ck_table>
by clickhouse2.<ck_db>.<ck_table>
.
SQL Configurations
SQL Configurations could be overwritten by SET <key>=<value>
in runtime.
Key | Default | Description | Since |
---|---|---|---|
spark.clickhouse.ignoreUnsupportedTransform | false | ClickHouse supports using complex expressions as sharding keys or partition values, e.g. cityHash64(col_1, col_2) , and those can not be supported by Spark now. If true , ignore the unsupported expressions, otherwise fail fast w/ an exception. Note, when spark.clickhouse.write.distributed.convertLocal is enabled, ignore unsupported sharding keys may corrupt the data. |
0.4.0 |
spark.clickhouse.read.compression.codec | lz4 | The codec used to decompress data for reading. Supported codecs: none, lz4. | 0.5.0 |
spark.clickhouse.read.distributed.convertLocal | true | When reading Distributed table, read local table instead of itself. If true , ignore spark.clickhouse.read.distributed.useClusterNodes . |
0.1.0 |
spark.clickhouse.read.format | json | Serialize format for reading. Supported formats: json, binary | 0.6.0 |
spark.clickhouse.read.runtimeFilter.enabled | false | Enable runtime filter for reading. | 0.8.0 |
spark.clickhouse.read.splitByPartitionId | true | If true , construct input partition filter by virtual column _partition_id , instead of partition value. There are known bugs to assemble SQL predication by partition value. This feature requires ClickHouse Server v21.6+ |
0.4.0 |
spark.clickhouse.useNullableQuerySchema | false | If true , mark all the fields of the query schema as nullable when executing CREATE/REPLACE TABLE ... AS SELECT ... on creating the table. Note, this configuration requires SPARK-43390(available in Spark 3.5), w/o this patch, it always acts as true . |
0.8.0 |
spark.clickhouse.write.batchSize | 10000 | The number of records per batch on writing to ClickHouse. | 0.1.0 |
spark.clickhouse.write.compression.codec | lz4 | The codec used to compress data for writing. Supported codecs: none, lz4. | 0.3.0 |
spark.clickhouse.write.distributed.convertLocal | false | When writing Distributed table, write local table instead of itself. If true , ignore spark.clickhouse.write.distributed.useClusterNodes . |
0.1.0 |
spark.clickhouse.write.distributed.useClusterNodes | true | Write to all nodes of cluster when writing Distributed table. | 0.1.0 |
spark.clickhouse.write.format | arrow | Serialize format for writing. Supported formats: json, arrow | 0.4.0 |
spark.clickhouse.write.localSortByKey | true | If true , do local sort by sort keys before writing. |
0.3.0 |
spark.clickhouse.write.localSortByPartition | If true , do local sort by partition before writing. If not set, it equals to spark.clickhouse.write.repartitionByPartition . |
0.3.0 | |
spark.clickhouse.write.maxRetry | 3 | The maximum number of write we will retry for a single batch write failed with retryable codes. | 0.1.0 |
spark.clickhouse.write.repartitionByPartition | true | Whether to repartition data by ClickHouse partition keys to meet the distributions of ClickHouse table before writing. | 0.3.0 |
spark.clickhouse.write.repartitionNum | 0 | Repartition data to meet the distributions of ClickHouse table is required before writing, use this conf to specific the repartition number, value less than 1 mean no requirement. | 0.1.0 |
spark.clickhouse.write.repartitionStrictly | false | If true , Spark will strictly distribute incoming records across partitions to satisfy the required distribution before passing the records to the data source table on write. Otherwise, Spark may apply certain optimizations to speed up the query but break the distribution requirement. Note, this configuration requires SPARK-37523(available in Spark 3.4), w/o this patch, it always acts as true . |
0.3.0 |
spark.clickhouse.write.retryInterval | 10s | The interval in seconds between write retry. | 0.1.0 |
spark.clickhouse.write.retryableErrorCodes | 241 | The retryable error codes returned by ClickHouse server when write failing. | 0.1.0 |