Play with Spark Shell
Launch Spark Shell
$SPARK_HOME/bin/spark-shell \
--conf spark.sql.catalog.clickhouse=xenon.clickhouse.ClickHouseCatalog \
--conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
--conf spark.sql.catalog.clickhouse.protocol=http \
--conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \
--conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \
--conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
--conf spark.sql.catalog.clickhouse.database=default \
--jars /path/clickhouse-spark-runtime-3.4_2.12:0.7.3.jar,/path/clickhouse-jdbc-0.4.6-all.jar
The following argument
--jars /path/clickhouse-spark-runtime-3.4_2.12:0.7.3.jar,/path/clickhouse-jdbc-0.4.6-all.jar
can be replaced by
--repositories https://{maven-cental-mirror or private-nexus-repo} \
--packages com.github.housepower:clickhouse-spark-runtime-3.4_2.12:0.7.3,com.clickhouse:clickhouse-jdbc:0.4.6:all
to avoid copying jar to your Spark client node.
Operations
Basic operations, e.g. create database, create table, write table, read table, etc.
scala> spark.sql("use clickhouse")
res0: org.apache.spark.sql.DataFrame = []
scala> spark.sql("create database test_db")
res1: org.apache.spark.sql.DataFrame = []
scala> spark.sql("show databases").show
+---------+
|namespace|
+---------+
| default|
| system|
| test_db|
+---------+
scala> spark.sql("""
| CREATE TABLE test_db.tbl (
| create_time TIMESTAMP NOT NULL,
| m INT NOT NULL COMMENT 'part key',
| id BIGINT NOT NULL COMMENT 'sort key',
| value STRING
| ) USING ClickHouse
| PARTITIONED BY (m)
| TBLPROPERTIES (
| engine = 'MergeTree()',
| order_by = 'id',
| settings.index_granularity = 8192
| )
| """)
res2: org.apache.spark.sql.DataFrame = []
scala> :paste
// Entering paste mode (ctrl-D to finish)
spark.createDataFrame(Seq(
("2021-01-01 10:10:10", 1L, "1"),
("2022-02-02 10:10:10", 2L, "2")
)).toDF("create_time", "id", "value")
.withColumn("create_time", to_timestamp($"create_time"))
.withColumn("m", month($"create_time"))
.select($"create_time", $"m", $"id", $"value")
.writeTo("test_db.tbl")
.append
// Exiting paste mode, now interpreting.
scala> spark.table("test_db.tbl").show
+-------------------+---+---+-----+
| create_time| m| id|value|
+-------------------+---+---+-----+
|2021-01-01 10:10:10| 1| 1| 1|
|2022-02-02 10:10:10| 2| 2| 2|
+-------------------+---+---+-----+
scala> spark.sql("DELETE FROM test_db.tbl WHERE id=1")
res3: org.apache.spark.sql.DataFrame = []
scala> spark.table("test_db.tbl").show
+-------------------+---+---+-----+
| create_time| m| id|value|
+-------------------+---+---+-----+
|2022-02-02 10:10:10| 2| 2| 2|
+-------------------+---+---+-----+
Execute ClickHouse native SQL.
scala> val options = Map(
| "host" -> "clickhouse",
| "protocol" -> "http",
| "http_port" -> "8123",
| "user" -> "default",
| "password" -> ""
| )
scala> val sql = """
| |CREATE TABLE test_db.person (
| | id Int64,
| | name String,
| | age Nullable(Int32)
| |)
| |ENGINE = MergeTree()
| |ORDER BY id
| """.stripMargin
scala> spark.executeCommand("xenon.clickhouse.ClickHouseCommandRunner", sql, options)
scala> spark.sql("show tables in clickhouse_s1r1.test_db").show
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
| test_db| person| false|
+---------+---------+-----------+
scala> spark.table("clickhouse_s1r1.test_db.person").printSchema
root
|-- id: long (nullable = false)
|-- name: string (nullable = false)
|-- age: integer (nullable = true)