Перейти к основному содержимому
Перейти к основному содержимому

Spark Connector

Этот коннектор использует оптимизации, специфичные для ClickHouse, такие как продвинутое шардирование и сокращение предикатов, для улучшения производительности запросов и обработки данных. Коннектор основан на официальном JDBC коннекторе ClickHouse и управляет своим собственным каталогом.

До версии Spark 3.0 в Spark не существовало концепции встроенного каталога, поэтому пользователи обычно полагались на внешние каталоги, такие как Hive Metastore или AWS Glue. С этими внешними решениями пользователи должны были регистрировать свои таблицы источников данных вручную перед обращением к ним в Spark. Однако с тех пор, как Spark 3.0 представил концепцию каталога, Spark теперь может автоматически обнаруживать таблицы, регистрируя плагины каталога.

Дефолтный каталог Spark — это spark_catalog, а таблицы идентифицируются по формату {catalog name}.{database}.{table}. С новой функцией каталога теперь возможно добавлять и работать с несколькими каталогами в одном приложении Spark.

Requirements

  • Java 8 или 17
  • Scala 2.12 или 2.13
  • Apache Spark 3.3 или 3.4 или 3.5

Compatibility Matrix

ВерсияСовместимые версии SparkВерсия ClickHouse JDBC
mainSpark 3.3, 3.4, 3.50.6.3
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.8.0Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3Не требуется
0.3.0Spark 3.2, 3.3Не требуется
0.2.1Spark 3.2Не требуется
0.1.2Spark 3.2Не требуется

Installation & Setup

Для интеграции ClickHouse с Spark существует несколько вариантов установки, подходящих для различных конфигураций проектов. Вы можете добавить коннектор ClickHouse Spark в качестве зависимости непосредственно в файл сборки вашего проекта (например, в pom.xml для Maven или build.sbt для SBT). Кроме того, вы можете поместить необходимые JAR-файлы в папку $SPARK_HOME/jars/, или передать их непосредственно как опцию Spark с помощью флага --jars в команде spark-submit. Оба подхода гарантируют, что коннектор ClickHouse будет доступен в вашей среде Spark.

Import as a Dependency

Добавьте следующий репозиторий, если вы хотите использовать версию SNAPSHOT.

Download The Library

Шаблон имени двоичного JAR:

Вы можете найти все доступные выпущенные JAR-файлы в Maven Central Repository и все ежедневные сборки SNAPSHOT JAR-файлов в Sonatype OSS Snapshots Repository.

к сведению

Важно включить clickhouse-jdbc JAR с классом "all", поскольку коннектор зависит от clickhouse-http и clickhouse-client, которые оба входят в clickhouse-jdbc:all. В качестве альтернативы вы можете добавить clickhouse-client JAR и clickhouse-http отдельно, если вы предпочитаете не использовать полный пакет JDBC.

В любом случае убедитесь, что версии пакетов совместимы в соответствии с таблицей совместимости.

Register The Catalog (required)

Чтобы получить доступ к вашим таблицам ClickHouse, вы должны настроить новый каталог Spark с помощью следующих конфигураций:

СвойствоЗначениеЗначение по умолчаниюОбязательно
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/AДа
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhostНет
spark.sql.catalog.<catalog_name>.protocolhttphttpНет
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123Нет
spark.sql.catalog.<catalog_name>.user<clickhouse_username>defaultНет
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(пустая строка)Нет
spark.sql.catalog.<catalog_name>.database<database>defaultНет
spark.<catalog_name>.write.formatjsonarrowНет

Эти настройки могут быть установлены одним из следующих способов:

  • Изменить/создать spark-defaults.conf.
  • Передать конфигурацию в вашу команду spark-submit (или в ваши команды spark-shell/spark-sql CLI).
  • Добавить конфигурацию при инициализации вашего контекста.
к сведению

При работе с кластером ClickHouse вам необходимо задать уникальное имя каталога для каждого экземпляра. Например:

Таким образом, вы сможете получить доступ к таблице clickhouse1 <ck_db>.<ck_table> из Spark SQL по clickhouse1.<ck_db>.<ck_table>, а к таблице clickhouse2 <ck_db>.<ck_table> по clickhouse2.<ck_db>.<ck_table>.

Read Data

Write Data

DDL Operations

Вы можете выполнять DDL операции на вашем экземпляре ClickHouse, используя Spark SQL, все изменения немедленно сохраняются в ClickHouse. Spark SQL позволяет вам писать запросы точно так же, как вы бы делали это в ClickHouse, поэтому вы можете напрямую выполнять команды, такие как CREATE TABLE, TRUNCATE и другие — без изменений, например:

Приведенные выше примеры демонстрируют запросы Spark SQL, которые вы можете запускать в своем приложении с использованием любого API—Java, Scala, PySpark или оболочки.

Конфигурации

Следующие конфигурации доступны для настройки в коннекторе:


КлючПо умолчаниюОписаниеС версии
spark.clickhouse.ignoreUnsupportedTransformfalseClickHouse поддерживает использование сложных выражений в качестве ключей шардирования или значений разделов, например, cityHash64(col_1, col_2), которые в настоящее время не поддерживаются Spark. Если true, игнорировать неподдерживаемые выражения, иначе быстро завершать с исключением. Обратите внимание, что если spark.clickhouse.write.distributed.convertLocal включен, игнорирование неподдерживаемых ключей шардирования может привести к повреждению данных.0.4.0
spark.clickhouse.read.compression.codeclz4Кодек, используемый для декомпрессии данных при чтении. Поддерживаемые кодеки: none, lz4.0.5.0
spark.clickhouse.read.distributed.convertLocaltrueПри чтении распределенной таблицы читать локальную таблицу вместо самой себя. Если true, игнорировать spark.clickhouse.read.distributed.useClusterNodes.0.1.0
spark.clickhouse.read.fixedStringAsbinaryЧтение типа FixedString ClickHouse как указанного типа данных Spark. Поддерживаемые типы: binary, string0.8.0
spark.clickhouse.read.formatjsonФормат сериализации для чтения. Поддерживаемые форматы: json, binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalseВключение фильтра времени выполнения для чтения.0.8.0
spark.clickhouse.read.splitByPartitionIdtrueЕсли true, конструкция фильтра ввода раздела осуществляется по виртуальному столбцу _partition_id, вместо значения раздела. Известны проблемы с составлением SQL предикатов по значению раздела. Эта функция требует ClickHouse Server v21.6+0.4.0
spark.clickhouse.useNullableQuerySchemafalseЕсли true, пометить все поля схемы запроса как допускающие значение NULL при выполнении CREATE/REPLACE TABLE ... AS SELECT ... при создании таблицы. Обратите внимание, эта конфигурация требует SPARK-43390 (доступно в Spark 3.5), без этого патча она всегда работает как true.0.8.0
spark.clickhouse.write.batchSize10000Количество записей на партию при записи в ClickHouse.0.1.0
spark.clickhouse.write.compression.codeclz4Кодек, используемый для сжатия данных при записи. Поддерживаемые кодеки: none, lz4.0.3.0
spark.clickhouse.write.distributed.convertLocalfalseПри записи в распределенную таблицу запись локальной таблицы вместо самой себя. Если true, игнорировать spark.clickhouse.write.distributed.useClusterNodes.0.1.0
spark.clickhouse.write.distributed.useClusterNodestrueЗапись на все узлы кластера при записи в распределенную таблицу.0.1.0
spark.clickhouse.write.formatarrowФормат сериализации для записи. Поддерживаемые форматы: json, arrow0.4.0
spark.clickhouse.write.localSortByKeytrueЕсли true, проводить локальную сортировку по ключам сортировки перед записью.0.3.0
spark.clickhouse.write.localSortByPartitionзначение spark.clickhouse.write.repartitionByPartitionЕсли true, проводить локальную сортировку по разделу перед записью. Если не установлено, это равно spark.clickhouse.write.repartitionByPartition.0.3.0
spark.clickhouse.write.maxRetry3Максимальное количество повторных попыток записи для одной неудачной записи партии с кодами, позволяющими повторный запрос.0.1.0
spark.clickhouse.write.repartitionByPartitiontrueСледует ли перераспределять данные по ключам разделов ClickHouse для достижения распределения таблицы ClickHouse перед записью.0.3.0
spark.clickhouse.write.repartitionNum0Перераспределение данных для достижения распределения таблицы ClickHouse требуется перед записью, используйте эту конфигурацию для указания количества перераспределений, значение меньше 1 означает отсутствие требований.0.1.0
spark.clickhouse.write.repartitionStrictlyfalseЕсли true, Spark будет строго распределять входящие записи по разделам, чтобы соответствовать необходимому распределению перед передачей записей в таблицу источника данных на запись. В противном случае Spark может применить определенные оптимизации для ускорения запроса, но нарушить требование распределения. Обратите внимание, эта конфигурация требует SPARK-37523 (доступно в Spark 3.4), без этого патча она всегда работает как true.0.3.0
spark.clickhouse.write.retryInterval10sИнтервал в секундах между попытками записи.0.1.0
spark.clickhouse.write.retryableErrorCodes241Коды ошибок, позволяющие повторный запрос, возвращаемые сервером ClickHouse при неудачной записи.0.1.0

Поддерживаемые типы данных

В этом разделе описывается сопоставление типов данных между Spark и ClickHouse. Таблицы ниже предоставляют быстрые справочные данные для преобразования типов данных при чтении из ClickHouse в Spark и при вставке данных из Spark в ClickHouse.

Чтение данных из ClickHouse в Spark

Тип данных ClickHouseТип данных SparkПоддерживаетсяЯвляется примитивнымПримечания
NothingNullTypeДа
BoolBooleanTypeДа
UInt8, Int16ShortTypeДа
Int8ByteTypeДа
UInt16,Int32IntegerTypeДа
UInt32,Int64, UInt64LongTypeДа
Int128,UInt128, Int256, UInt256DecimalType(38, 0)Да
Float32FloatTypeДа
Float64DoubleTypeДа
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6StringTypeДа
FixedStringBinaryType, StringTypeДаКонтролируется конфигурацией READ_FIXED_STRING_AS
DecimalDecimalTypeДаТочность и масштаб до Decimal128
Decimal32DecimalType(9, scale)Да
Decimal64DecimalType(18, scale)Да
Decimal128DecimalType(38, scale)Да
Date, Date32DateTypeДа
DateTime, DateTime32, DateTime64TimestampTypeДа
ArrayArrayTypeНетТип элемента массива также конвертируется
MapMapTypeНетКлючи ограничены типом StringType
IntervalYearYearMonthIntervalType(Year)Да
IntervalMonthYearMonthIntervalType(Month)Да
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalTypeНетИспользуется конкретный тип интервала
Object
Nested
Tuple
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

Вставка данных из Spark в ClickHouse

Тип данных SparkТип данных ClickHouseПоддерживаетсяЯвляется примитивнымПримечания
BooleanTypeUInt8Да
ByteTypeInt8Да
ShortTypeInt16Да
IntegerTypeInt32Да
LongTypeInt64Да
FloatTypeFloat32Да
DoubleTypeFloat64Да
StringTypeStringДа
VarcharTypeStringДа
CharTypeStringДа
DecimalTypeDecimal(p, s)ДаТочность и масштаб до Decimal128
DateTypeDateДа
TimestampTypeDateTimeДа
ArrayType (list, tuple, or array)ArrayНетТип элемента массива также конвертируется
MapTypeMapНетКлючи ограничены типом StringType
Object
Nested

Участие и поддержка

Если вы хотите внести свой вклад в проект или сообщить о каких-либо проблемах, мы приветствуем вашу помощь! Посетите наш репозиторий GitHub, чтобы открыть проблему, предложить улучшения или отправить запрос на слияние. Ваши вложения приветствуются! Пожалуйста, ознакомьтесь с руководством по участию в репозитории перед началом. Спасибо за помощь в улучшении нашего коннектора ClickHouse Spark!