# clickhouse_sinker
(opens new window) (opens new window)
clickhouse_sinker is a sinker program that transfer kafka message into ClickHouse (opens new window).
Refers to design for how it works.
# Features
- Uses native ClickHouse client-server TCP protocol, with higher performance than HTTP.
- Easy to use and deploy, you don't need write any hard code, just care about the configuration file
- Support multiple parsers: fastjson(recommended), gjson, csv.
- Support multiple Kafka security mechanisms: SSL, SASL/PLAIN, SASL/SCRAM, SASL/GSSAPI and combinations of them.
- Bulk insert (by config
bufferSize
andflushInterval
). - Powered by Franz-go, which is the fastest and most cpu and memory efficient Kafka client in Go.
- Parse messages concurrently.
- Write batches concurrently.
- Every batch is routed to a determined clickhouse shard. Exit if loop write fail.
- Custom sharding policy (by config
shardingKey
andshardingPolicy
). - Tolerate replica single-point-failure.
- At-least-once delivery guarantee.
- Config management with local file or Nacos.
- One clickhouse_sinker instance assign tasks to all instances in balance of message lag (by config
nacos-service-name
).
# Supported data types
- [x] UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64
- [x] Float32, Float64
- [x] Decimal, Decimal32, Decimal64, Decimal128, Decimal256
- [x] String, FixedString, LowCardinality(String)
- [x] Date, DateTime, DateTime64. Assuming that all values of a field of kafka message has the same layout, and layouts of each field are unrelated. Automatically detect the layout from these date layouts (opens new window) till the first successful detection and reuse that layout forever.
- [x] UUID
- [x] Enum
- [x] Array(T), where T is one of above basic types
- [x] Nullable(T), where T is one of above basic types
- [x] Map
Note:
- A message is ignored if it's invalid json, or CSV value doesn't match with the format. This is counted by
ParseMsgsErrorTotal
. - If a message field type is imcompatible with the type
T
declared in ClickHouse, or field value is invalid to parse, the default value ofT
(see the following table) is filled. - If a message field type is compatible with the type
T
declared in ClickHouse, but field value is overflow, the nearer border ofT
is filled.
ClickHouse data type | default value | compatible Json data type | valid range |
---|---|---|---|
Bool | false | Bool | false, true |
Int8, Int16, ... | 0 | Bool, Number | Int8 [-128,127], ... |
Float32, Float64 | 0.0 | Number | Float32 [-MaxFloat32,MaxFloat32], ... |
Decimal, ... | 0.0 | Number | decimal-value-ranges (opens new window) |
String, ... | "" | Bool, Number, String, Object, Array | N/A |
Date, DateTime, ... | EPOCH | Number, String | [EPOCH,MaxUint32_seconds_since_epoch) |
UUID | "00000000-0000-0000-0000-000000000000" | String | N/A |
Enum | N/A | String | N/A |
Nullable(T) | NULL | (The same as T) | (The same as T) |
Array(T) | [] | (The same as T) | (The same as T) |
# Benchmark
# clickhouse_sinker
- ClickHouse cluster: 3 shards, 2 physical hosts in each shard. Each host contains 48 cpu, 256 GB RAM, 12TB HDD RAID5.
- ZooKeeper cluster: on three hosts of ClickHouse cluster.
- Kafka cluster: 2 nodes on three hosts of ClickHouse cluster. Share the same zookeeper cluster wich ClickHouse.
- Kafka topic apache_access_log1: partition 1, replicator factor: 1
- Kafka topic apache_access_log2: partition 2, replicator factor: 1
- Kafka topic apache_access_log4: partition 4, replicator factor: 1
- Generate json messages via kafka_gen_log(https://github.com/housepower/clickhouse_sinker/blob/master/cmd/kafka_gen_log). Messages avg lenght is 754 bytes.
config | thoughput(rows/s) | writer total cost | clickhouse cost per node |
---|---|---|---|
1 kafka partition, 1 sinker | 142 K | 11.0 cpu, 8 GB | 0.3 cpu |
2 kafka partition, 1 sinker | 159 K | 14.0 cpu, 14 GB | 0.7 cpu |
4 kafka partition, 1 sinker | 25~127 K | 2~22 cpu, 16 GB | 1 cpu |
2 kafka partition, 2 sinker | 275 K | 22 cpu, 8 GB | 1.3 cpu |
4 kafka partition, 2 sinker | 301 K | 25 cpu, 18 GB | 1.5 cpu |
# Flink pipeline
Here's the Flink pipeline which moves date from kafka to ClickHouse. The cpu hotspot of the Flink pipeline is JSON decode, and Row.setField.
Kafka Source -> JSON decode -> DateTime formart conversion -> Interger type conversion -> JDBCSinkJob
config | thoughput(rows/s) | writer total cost | clickhouse cost per node |
---|---|---|---|
1 kafka partition, pipeline Parallelism: 20 | 44.7 K | 13.8 cpu, 20 GB | 1.1 cpu |
# Conclusion
- clickhouse_sinker is 3x fast as the Flink pipeline, and cost much less connection and cpu overhead on clickhouse-server.
- clickhouse_sinker retry other replicas on writing failures.
- clickhouse_sinker get table schema from ClickHouse. The pipeline need manual config of all fields.
- clickhouse_sinker detect DateTime format. The pipeline need dedicated steps to do format and type conversion.
# Configuration
Refers to how integration test (opens new window) use the example config. Also refers to code (opens new window) for all config items.
# Kafka Encryption
clickhouse_sinker supports following encryption mechanisms:
- No encryption
An example kafka config:
"kafka": {
"brokers": "192.168.31.64:9092",
"@version": "Required if you use sarama. It's the the Kafka server version.",
"version": "2.5.0"
}
- Encryption using SSL
An example kafka config:
"kafka": {
"brokers": "192.168.31.64:9093",
"version": "2.5.0",
"tls": {
"enable": true,
"@trustStoreLocation": "ssl.truststore.location which kafka-console-consumer.sh uses",
"trustStoreLocation": "/etc/security/kafka.client.truststore.jks",
"@trustStorePassword": "ssl.truststore.password which kafka-console-consumer.sh uses",
"trustStorePassword": "123456",
"@endpIdentAlgo": "ssl.endpoint.identification.algorithm which kafka-console-consumer.sh uses",
"endpIdentAlgo": ""
}
}
Or if you have extracted certificates from JKS, use the following config:
"kafka": {
"brokers": "192.168.31.64:9093",
"version": "2.5.0",
"tls": {
"enable": true,
"@caCertFiles": "Required. It's the CA certificate with which Kafka brokers certs be signed. This cert is added to kafka.client.truststore.jks which kafka-console-consumer.sh uses",
"caCertFiles": "/etc/security/ca-cert",
"@endpIdentAlgo": "ssl.endpoint.identification.algorithm which kafka-console-consumer.sh uses",
"endpIdentAlgo": ""
}
}
FYI. kafka-console-consumer.sh
works as the following setup:
$ cat config/client_SSL_NOAUTH.properties
security.protocol=SSL
ssl.truststore.location=/etc/security/kafka.client.truststore.jks
ssl.truststore.password=123456
ssl.endpoint.identification.algorithm=
$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.64:9094 --topic sunshine --group test-consumer-group --from-beginning --consumer.config config/client_SSL_NOAUTH.properties
Please follow Kafka SSL setup
(opens new window). Use -keyalg RSA
when you create the broker keystore, otherwise there will be no cipher suites in common between the keystore and those Golang supports. See this (opens new window) for reference.
# Kafka Authentication
clickhouse_sinker support the following authentication mechanisms:
- No authentication
An example kafka config:
"kafka": {
"brokers": "192.168.31.64:9092",
"@version": "Required if you use sarama. It's the the Kafka server version.",
"version": "2.5.0"
}
- SASL/PLAIN
An example kafka config:
"kafka": {
"brokers": "192.168.31.64:9094",
"version": "2.5.0",
"sasl": {
"enable": true,
"mechanism": "PLAIN",
"username": "alice",
"password": "alice-secret"
}
}
FYI. Java clients work with the following setup:
$ cat config/client_PLAINTEXT_PLAIN.properties
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret";
$ bin/kafka-console-producer.sh --broker-list 192.168.31.64:9094 --topic sunshine --producer.config config/client_PLAINTEXT_PLAIN.properties
$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.64:9094 --topic sunshine --group test-consumer-group --from-beginning --consumer.config config/client_PLAINTEXT_PLAIN.properties
- SASL/SCRAM
An example kafka config:
"kafka": {
"brokers": "192.168.31.64:9094",
"version": "2.5.0",
"sasl": {
"enable": true,
"@mechanism": "SCRAM-SHA-256 or SCRAM-SHA-512",
"mechanism": "SCRAM-SHA-256",
"username": "alice",
"password": "alice-secret"
}
}
FYI. Java clients work with the following setup:
$ cat config/client_PLAINTEXT_SCRAM.properties
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="alice" password="alice-secret";
$ bin/kafka-console-producer.sh --broker-list 192.168.31.64:9094 --topic sunshine --producer.config config/client_PLAINTEXT_SCRAM.properties
$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.64:9094 --topic sunshine --group test-consumer-group --from-beginning --consumer.config config/client_PLAINTEXT_SCRAM.properties
- SASL/GSSAPI(Kerberos)
An example kafka config:
"kafka": {
"brokers": "192.168.31.64:9094",
"version": "2.5.0",
"sasl": {
"enable": true,
"mechanism": "GSSAPI",
"gssapi": {
"@authtype": "1 - Username and password, 2 - Keytab",
"authtype": 2,
"keytabpath": "/etc/security/mmmtest.keytab",
"kerberosconfigpath": "/etc/krb5.conf",
"servicename": "kafka",
"@username": "`principal` consists of `username` `@` `realm`",
"username": "mmm",
"realm": "ALANWANG.COM"
}
}
}
FYI. Java clients work with the following setup:
$ cat config/client_PLAINTEXT_GSSAPI.properties
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
sasl.mechanism=GSSAPI
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true debug=true keyTab="/etc/security/mmmtest.keytab" principal="mmm@ALANWANG.COM";
$ bin/kafka-console-producer.sh --broker-list 192.168.31.64:9094 --topic sunshine --producer.config config/client_PLAINTEXT_GSSAPI.properties
$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.64:9094 --topic sunshine --group test-consumer-group --from-beginning --consumer.config config/client_PLAINTEXT_GSSAPI.properties
Kerberos setup is complex. Please ensure kafka-console-consumer.sh
(opens new window) Kerberos keytab authentication work STRICTLY FOLLOW this article (opens new window), then test clickhouse_sinker
Kerberos authentication on the SAME machine which kafka-console-consumer.sh
runs. I tested sarama Kerberos authentication against Kafka 2.2.1 (opens new window). Not sure other Kafka versions work.
# Sharding Policy
Every message is routed to a determined ClickHouse shard.
By default, the shard number is caculated by (kafka_offset/roundup(buffer_size))%clickhouse_shards
, where roundup()
round upward an unsigned integer to the the nearest 2^n.
This above expression can be customized with shardingKey
and shardingPolicy
. shardingKey
value is a column name. shardingPolicy
value could be:
stripe,<size>
. This requiresshardingKey
be a numeric-like (bool, int, float, date etc.) column. The expression is(uint64(shardingKey)/stripe_size)%clickhouse_shards
.hash
. This requiresshardingKey
be a string-like column. The hash function used internally is xxHash64 (opens new window). The expression isxxhash64(string(shardingKey))%clickhouse_shards
.
# Configuration Management
The precedence of config items:
- CLI parameters > env variables
- Nacos > Local Config File
# Nacos
Sinker is able to register with Nacos, get and apply config changes dynamically without restart the whole process. Controled by:
- CLI parameters:
nacos-addr, nacos-username, nacos-password, nacos-namespace-id, nacos-group, nacos-dataid
- env variables:
NACOS_ADDR, NACOS_USERNAME, NACOS_PASSWORD, NACOS_NAMESPACE_ID, NACOS_GROUP, NACOS_DATAID
# Local Config File
Currently sinker is able to parse local config file at startup, but unable to detect file changes. Controled by:
- CLI parameters:
local-cfg-file
- env variables:
LOCAL_CFG_FILE
# Prometheus Metrics
All metrics are defined in statistics.go
. You can create Grafana dashboard for clickhouse_sinker by importing the template clickhouse_sinker-dashboard.json
.
- Pull with prometheus
Metrics are exposed at http://ip:port/metrics
. IP is the outbound IP of this machine. Port is from CLI --http-port
or env HTTP_PORT
.
Sinker registers with Nacos if CLI --consul-cfg-enable
or env CONSUL_REGISTER_ENABLE
is present. However Prometheus is unable (opens new window) to obtain dynamic service list from nacos server.
- Push to prometheus
If CLI --metric-push-gateway-addrs
or env METRIC_PUSH_GATEWAY_ADDRS
(a list of comma-separated urls) is present, metrics are pushed to one of given URLs regualarly.
# Extending
There are several abstract interfaces which you can implement to support more message format, message queue and config management mechanism.
type Parser interface {
Parse(bs []byte) model.Metric
}
// RemoteConfManager can be implemented by many backends: Nacos, Consul, etcd, ZooKeeper...
type RemoteConfManager interface {
Init(properties map[string]interface{}) error
GetConfig() (conf *Config, err error)
// PublishConfig publishs the config. The manager shall not reference the passed Config object after call.
PublishConfig(conf *Config) (err error)
}
# Why not Kafka Engine
(opens new window) built in ClickHouse?
- My experience indicates
Kafka Engine
is complicated, buggy and hard to debug. Kafka Engine
runs inside the db process, lowers the database stability. On the other side, Vertica (opens new window)'s official kafka importer is separated with the database server.Kafka Engine
doesn't support custom sharding policy.- Neither
Kafka Engine
nor clickhouse_sinker support exactly-once.
# Kafka Compatibility
Kafka release history is at here (opens new window). Kafka broker exposes versions of various APIs it supports since 0.10.0.0 (opens new window).
# Franz-go Kafka client
- Franz negotiates it's protocol version.
- Franz supports Kerberos authentication.
- Franz supports generation cleanup callback.
- Franz wins Sarama and Kafka-go at benchmark competition.
- Franz project is young but very active.
Design →