Перейти к основному содержимому
Перейти к основному содержимому

ClickHouse Kafka Connect Sink

примечание

Если вам нужна помощь, пожалуйста, сообщите об ошибке в репозитории или задайте вопрос в публичном Slack ClickHouse.

ClickHouse Kafka Connect Sink — это коннектор Kafka, который передает данные из темы Kafka в таблицу ClickHouse.

Лицензия

Коннектор Kafka Sink распространяется под Лицензией Apache 2.0

Требования к окружению

В окружении должна быть установлена платформа Kafka Connect версии v2.7 или более поздней.

Матрица совместимости версий

Version ClickHouse Kafka ConnectVersion ClickHouseKafka ConnectПлатформа Confluent
1.0.0> 23.3> 2.7> 6.1

Основные функции

  • Поставляется с заранее определенными семантиками exactly-once. Используется новая функция ядра ClickHouse под названием KeeperMap (используется как хранилище состояний конектора) и позволяет создать минималистичную архитектуру.
  • Поддержка сторонних хранилищ состояний: в данный момент по умолчанию используется память, но также может использоваться KeeperMap (Redis будет добавлен скоро).
  • Интеграция на уровне ядра: разработано, поддерживается и обслуживается ClickHouse.
  • Непрерывное тестирование с использованием ClickHouse Cloud.
  • Вставка данных с объявленной схемой и без схемы.
  • Поддержка всех типов данных ClickHouse.

Инструкции по установке

Соберите ваши данные подключения

Чтобы подключиться к ClickHouse с помощью HTTP(S), вам нужна следующая информация:

  • ХОСТ и ПОРТ: как правило, порт составляет 8443 при использовании TLS или 8123 при отсутствии TLS.

  • НАЗВАНИЕ БАЗЫ ДАННЫХ: по умолчанию существует база данных с именем default, используйте имя базы данных, к которой вы хотите подключиться.

  • ИМЯ ПОЛЬЗОВАТЕЛЯ и ПАРОЛЬ: по умолчанию имя пользователя равно default. Используйте имя пользователя, соответствующее вашему случаю.

Сведения о вашем ClickHouse Cloud-сервисе доступны в консоли ClickHouse Cloud. Выберите сервис, к которому вы будете подключаться, и нажмите Подключиться:

Выберите HTTPS, и детали доступны в примере команды curl.

Если вы используете самоуправляемый ClickHouse, детали подключения устанавливаются вашим администратором ClickHouse.

Общие инструкции по установке

Коннектор распространяется как единый JAR-файл, содержащий все классы, необходимые для работы плагина.

Чтобы установить плагин, выполните следующие шаги:

  • Скачайте zip-архив, содержащий файл Connector JAR, со страницы Releases репозитория ClickHouse Kafka Connect Sink.
  • Извлеките содержимое ZIP-файла и скопируйте его в нужное место.
  • Добавьте путь с директорией плагина в конфигурацию plugin.path в вашем свойствах Connect, чтобы позволить платформе Confluent обнаружить плагин.
  • Укажите имя темы, имя хоста экземпляра ClickHouse и пароль в конфигурации.
  • Перезапустите платформу Confluent.
  • Если вы используете платформу Confluent, войдите в интерфейс управления Confluent Control Center, чтобы убедиться, что ClickHouse Sink доступен в списке доступных коннекторов.

Параметры конфигурации

Чтобы подключить ClickHouse Sink к серверу ClickHouse, вам нужно предоставить:

  • данные подключения: hostname (обязательно) и port (необязательно)
  • учетные данные пользователя: password (обязательно) и username (необязательно)
  • класс коннектора: com.clickhouse.kafka.connect.ClickHouseSinkConnector (обязательно)
  • topics или topics.regex: Kafka-темы для опроса - имена тем должны соответствовать именам таблиц (обязательно)
  • преобразователи ключей и значений: устанавливаются в зависимости от типа данных в вашей теме. Обязательны, если не определены в конфигурации рабочего процесса.

Полная таблица параметров конфигурации:

Название свойстваОписаниеЗначение по умолчанию
hostname (Обязательно)Имя хоста или IP-адрес сервераN/A
portПорт ClickHouse - по умолчанию 8443 (для HTTPS в облаке), но для HTTP (по умолчанию для самоуправляемых) он должен быть 81238443
sslВключить ssl-соединение к ClickHousetrue
jdbcConnectionPropertiesСвойства подключения при подключении к ClickHouse. Должны начинаться с ? и соединяться с помощью & между param=value""
usernameИмя пользователя базы данных ClickHousedefault
password (Обязательно)Пароль базы данных ClickHouseN/A
databaseИмя базы данных ClickHousedefault
connector.class (Обязательно)Класс коннектора (явно задайте и сохраните как значение по умолчанию)"com.clickhouse.kafka.connect.ClickHouseSinkConnector"
tasks.maxКоличество задач коннектора"1"
errors.retry.timeoutТаймаут повторной попытки ClickHouse JDBC"60"
exactlyOnceВключено Exactly Once"false"
topics (Обязательно)Kafka-темы для опроса - имена тем должны соответствовать именам таблиц""
key.converter (Обязательно* - см. Описание)Установите в зависимости от типов ваших ключей. Обязательно здесь, если вы передаете ключи (и не определены в конфигурации рабочего процесса)."org.apache.kafka.connect.storage.StringConverter"
value.converter (Обязательно* - см. Описание)Установите в зависимости от типа данных в вашей теме. Поддерживаются: - JSON, String, Avro или Protobuf форматы. Обязательно здесь, если не определены в конфигурации рабочего процесса."org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enableПоддержка схем преобразователя значения коннектора"false"
errors.toleranceОшибки в соединителе допускаются. Поддерживаемые: none, all"none"
errors.deadletterqueue.topic.nameЕсли установлено (с errors.tolerance=all), будет использоваться DLQ для неудавшихся партий (см. Устранение неполадок)""
errors.deadletterqueue.context.headers.enableДобавляет дополнительные заголовки для DLQ""
clickhouseSettingsСписок настроек ClickHouse через запятую (например, "insert_quorum=2 и т.д.")""
topic2TableMapСписок через запятую, который отображает имена тем на имена таблиц (например, "topic1=table1, topic2=table2 и т.д.")""
tableRefreshIntervalВремя (в секундах) для обновления кэша определения таблицы0
keeperOnClusterПозволяет настроить параметр ON CLUSTER для самоуправляемых экземпляров (например, ON CLUSTER clusterNameInConfigFileDefinition) для таблицы connect_state exactly-once (см. Распределенные DDL Запросы""
bypassRowBinaryПозволяет отключить использование RowBinary и RowBinaryWithDefaults для данных на основе схемы (Avro, Protobuf и т.д.) - должно использоваться только когда данные будут содержать отсутствующие столбцы, и Nullable/Default неприемлемы"false"
dateTimeFormatsФорматы даты и времени для разбора полей схемы DateTime64, разделенные ; (например, someDateField=yyyy-MM-dd HH:mm:ss.SSSSSSSSS;someOtherDateField=yyyy-MM-dd HH:mm:ss).""
tolerateStateMismatchПозволяет коннектору удалять записи "раньше", чем текущий смещение, сохраненное AFTER_PROCESSING (например, если смещение 5 отправлено, а смещение 250 - последняя зарегистрированная смещение)"false"

Целевые таблицы

ClickHouse Connect Sink считывает сообщения из тем Kafka и записывает их в соответствующие таблицы. ClickHouse Connect Sink записывает данные в существующие таблицы. Пожалуйста, убедитесь, что целевая таблица с соответствующей схемой создана в ClickHouse перед тем, как начать вставлять в нее данные.

Каждая тема требует выделенной целевой таблицы в ClickHouse. Имя целевой таблицы должно соответствовать имени исходной темы.

Предварительная обработка

Если вам нужно преобразовать исходящие сообщения перед их отправкой в ClickHouse Kafka Connect Sink, используйте Kafka Connect Transformations.

Поддерживаемые типы данных

С объявленной схемой:

Тип Kafka ConnectТип ClickHouseПоддерживаетсяПримитив
STRINGStringДа
INT8Int8Да
INT16Int16Да
INT32Int32Да
INT64Int64Да
FLOAT32Float32Да
FLOAT64Float64Да
BOOLEANBooleanДа
ARRAYArray(T)Нет
MAPMap(Primitive, T)Нет
STRUCTVariant(T1, T2, ...)Нет
STRUCTTuple(a T1, b T2, ...)Нет
STRUCTNested(a T1, b T2, ...)Нет
BYTESStringНет
org.apache.kafka.connect.data.TimeInt64 / DateTime64Нет
org.apache.kafka.connect.data.TimestampInt32 / Date32Нет
org.apache.kafka.connect.data.DecimalDecimalНет

Без объявленной схемы:

Запись преобразуется в JSON и отправляется в ClickHouse в виде значения в формате JSONEachRow.

Рецепты конфигурации

Это некоторые распространенные рецепты конфигурации, чтобы вы могли начать быстро.

Базовая конфигурация

Самая простая конфигурация, чтобы начать - она предполагает, что вы запускаете Kafka Connect в распределенном режиме и у вас работает сервер ClickHouse на localhost:8443 с включенным SSL, данные представлены в безсхемном JSON.

Базовая конфигурация с несколькими темами

Коннектор может потреблять данные из нескольких тем.

Базовая конфигурация с DLQ

Использование с разными форматами данных

Поддержка схемы Avro
Поддержка схемы Protobuf

Обратите внимание: если вы столкнетесь с проблемами с отсутствующими классами, не в каждой среде есть преобразователь protobuf, и вам может понадобиться альтернативная версия JAR с включенными зависимостями.

Поддержка схемы JSON
Поддержка строк

Коннектор поддерживает преобразователь строк в разных форматах ClickHouse: JSON, CSV и TSV.

Логирование

Логирование обеспечивается автоматически платформой Kafka Connect. Место назначения логирования и формат могут быть настроены через файл конфигурации Kafka Connect.

Если вы используете платформу Confluent, журналы можно просмотреть, выполнив команду CLI:

Для получения дополнительных сведений ознакомьтесь с официальным руководством.

Мониторинг

ClickHouse Kafka Connect сообщает метрики выполнения через Java Management Extensions (JMX). JMX включен в коннектор Kafka по умолчанию.

ClickHouse Connect MBeanName:

ClickHouse Kafka Connect сообщает следующие метрики:

НазваниеТипОписание
receivedRecordslongОбщее количество полученных записей.
recordProcessingTimelongОбщее время в наносекундах, затраченное на группировку и преобразование записей в единую структуру.
taskProcessingTimelongОбщее время в наносекундах, затраченное на обработку и вставку данных в ClickHouse.

Ограничения

  • Удаления не поддерживаются.
  • Размер партии наследуется от свойств потребителя Kafka.
  • При использовании KeeperMap для exactly-once и изменении или возвращении смещения, вам необходимо удалить содержимое из KeeperMap для этой конкретной темы. (См. ниже руководство по устранению неполадок для получения дополнительных сведений)

Настройка производительности

Если вы когда-либо думали: "Я хотел бы настроить размер партии для коннектора sink", то этот раздел для вас.

Fetch Connect против Poll Connector

Kafka Connect (рамка, на которой основан наш коннектор sink) будет извлекать сообщения из тем Kafka в фоновом режиме (независимо от коннектора).

Вы можете контролировать этот процесс, используя fetch.min.bytes и fetch.max.bytes - в то время как fetch.min.bytes определяет минимальное количество, необходимое, прежде чем платформа передаст значения коннектору (в пределах временного ограничения, установленного fetch.max.wait.ms), fetch.max.bytes устанавливает верхний предел размера. Если вы хотите передать более крупные партии к коннектору, одним из вариантов может быть увеличение минимальной выборки или максимального ожидания для формирования более крупных пакетов данных.

Эти извлеченные данные затем обрабатываются клиентом коннектора, опрашивающим сообщения, где количество для каждого опроса контролируется max.poll.records - обратите внимание, что получение независимо от опроса!

При настройке этих параметров пользователи должны стремиться так, чтобы их размер выборки производил несколько партий max.poll.records (и помнить, что настройки fetch.min.bytes и fetch.max.bytes представляют сжатые данные) - таким образом, каждая задача коннектора вставляет как можно более крупную партию.

ClickHouse оптимизирован для больших партий, даже с небольшой задержкой, а не частых, но меньших партий - чем больше партия, тем лучше.

Более подробную информацию можно найти в документации Confluent или в документации Kafka.

Несколько тем с высокой пропускной способностью

Если ваш коннектор настроен на подписку на несколько тем, вы используете topic2TableMap для сопоставления тем с таблицами, и вы сталкиваетесь с узким местом при вставке, что приводит к задержке потребителя, рассмотрите возможность создания одного коннектора на тему. Основная причина, по которой это происходит, заключается в том, что в данный момент партии вставляются в каждую таблицу поочередно.

Создание одного коннектора на тему — это обходной путь, который обеспечивает максимальную скорость вставки.

Устранение неполадок

"Несоответствие состояния для темы [someTopic] раздела [0]"

Это происходит, когда смещение, хранимое в KeeperMap, отличается от смещения, хранимого в Kafka, обычно когда тема была удалена или смещение было вручную отрегулировано. Чтобы исправить это, вам нужно удалить старые значения, хранящиеся для данной темы + раздела.

ПРИМЕЧАНИЕ: Этот коррекционный процесс может иметь последствия для exactly-once.

"Какие ошибки коннектор будет повторять?"

В данный момент основное внимание уделяется выявлению ошибок, которые являются временными и могут быть повторены, включая:

  • ClickHouseException - это общее исключение, которое может быть выброшено ClickHouse. Оно обычно выбрасывается, когда сервер перегружен, и следующие коды ошибок считаются особенно временными:
    • 3 - UNEXPECTED_END_OF_FILE
    • 159 - TIMEOUT_EXCEEDED
    • 164 - READONLY
    • 202 - TOO_MANY_SIMULTANEOUS_QUERIES
    • 203 - NO_FREE_CONNECTION
    • 209 - SOCKET_TIMEOUT
    • 210 - NETWORK_ERROR
    • 242 - TABLE_IS_READ_ONLY
    • 252 - TOO_MANY_PARTS
    • 285 - TOO_FEW_LIVE_REPLICAS
    • 319 - UNKNOWN_STATUS_OF_INSERT
    • 425 - SYSTEM_ERROR
    • 999 - KEEPER_EXCEPTION
    • 1002 - UNKNOWN_EXCEPTION
  • SocketTimeoutException - выбрасывается, когда время ожидания сокета истекает.
  • UnknownHostException - выбрасывается, когда хост не может быть разрешен.
  • IOException - выбрасывается, когда возникает проблема с сетью.

"Все мои данные пустые/нули"

Скорее всего, поля в ваших данных не соответствуют полям в таблице - это особенно часто встречается с CDC (и форматом Debezium). Одно из распространенных решений - добавить преобразование flatten к вашей конфигурации коннектора:

Это преобразует ваши данные из вложенного JSON в плоский JSON (используя _ в качестве разделителя). Поля в таблице будут иметь формат "field1_field2_field3" (т.е. "before_id", "after_id" и т.д.).

"Я хочу использовать свои ключи Kafka в ClickHouse"

Ключи Kafka по умолчанию не сохраняются в поле значения, но вы можете использовать преобразование KeyToValue, чтобы переместить ключ в поле значения (под новым именем поля _key):