Apache Kafka® – это распределенная платформа потоковой передачи. Ранее мы обсуждали, как реализовать потребителей и производителей Kafka с помощью Spring.
В этом руководстве узнаем, как использовать коннекторы Kafka. Мы рассмотрим:
- различные типы коннекторов Kafka;
- функции и режимы Kafka Connect;
- конфигурацию коннекторов с использованием файлов свойств, а также REST API.
Основы Kafka Connect и коннекторы Kafka
Kafka Connect – это платформа для подключения Kafka к внешним системам, таким как базы данных, хранилища ключей и значений, поисковые индексы и файловые системы, с помощью так называемых коннекторов.
Коннекторы Kafka – это готовые к использованию компоненты, которые помогут импортировать данные из внешних систем в топики Kafka и экспортировать данные из топиков Kafka во внешние системы. Можно использовать существующие реализации коннекторов для общих источников данных и потребителей или реализовать собственные коннекторы.
Исходный коннектор собирает данные из системы. Исходные системы могут быть целыми базами данных, таблицами потоков или брокерами сообщений. Исходный коннектор также может собирать метрики с серверов приложений в топики Kafka, делая данные доступными для потоковой обработки с малой задержкой.
Коннектор потребителя доставляет данные из разделов Kafka в другие системы, которые могут быть индексами, такими как Elasticsearch, пакетными системами, такими как Hadoop, или любой другой базой данных.
Некоторые коннекторы поддерживаются сообществом, а другие поддерживаются Confluent или его партнерами. Действительно, можно найти коннекторы для большинства популярных систем, таких как S3, JDBC и Cassandra, и это лишь некоторые из них.
Функции
Возможности Kafka Connect включают в себя:
- фреймворк для подключения внешних систем к Kafka – упрощает разработку, развертывание и управление коннекторами;
- распределенный и автономный режимы – помогает развертывать большие кластеры, используя распределенный характер Kafka, а также настройки для разработки, тестирования и небольших производственных развертываний;
- интерфейс REST – можно управлять коннекторами с помощью REST API;
- автоматическое управление смещением – Kafka Connect помогает обрабатывать процесс фиксации смещения, что избавляет от необходимости вручную реализовывать эту подверженную ошибкам часть разработки коннектора;
- распределенная и масштабируемая по умолчанию – Kafka Connect использует существующий протокол управления группами; можно добавить больше обработчиков для масштабирования кластера Kafka Connect;
- потоковая и пакетная интеграция: Kafka Connect – идеальное решение для объединения систем потоковой и пакетной передачи данных в сочетании с существующими возможностями Kafka;
- преобразования – они позволяют вносить простые и легкие изменения в отдельные сообщения.
Настройка
Вместо использования простого дистрибутива Kafka загрузим Confluent Platform, дистрибутив Kafka, предоставленный Confluent, Inc., компанией, стоящей за Kafka. Confluent Platform поставляется с дополнительными инструментами и клиентами по сравнению с простой Kafka, а также дополнительными готовыми коннекторами.
В нашем случае достаточно версии с открытым исходным кодом, которую можно найти на сайте Confluent.
Быстрый старт Kafka Connect
Для начала обсудим принцип Kafka Connect, используя его самые основные коннекторы, которые являются коннектором источника файла и коннектором потребителя файла.
Удобно, что Confluent Platform поставляется с обоими этими коннекторами, а также с эталонными конфигурациями.
Конфигурация исходного коннектора
Для исходного коннектора эталонная конфигурация доступна по адресу $CONFLUENT_HOME/etc/kafka/connect-file-source.properties:
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
topic=connect-test
file=test.txt
Эта конфигурация имеет некоторые свойства, общие для всех исходных коннекторов:
- name – указанное пользователем имя экземпляра коннектора;
- connector.class указывает класс реализации, в основном вид коннектора;
- tasks.max указывает, сколько экземпляров исходного коннектора должно работать параллельно;
- topic определяет топик, в который коннектор должен отправлять выходные данные.
В этом случае есть атрибут, специфичный для коннектора:
- file определяет файл, из которого коннектор должен считывать ввод.
Чтобы это работало, создадим базовый файл с некоторым содержимым:
echo -e "foo\nbar\n" > $CONFLUENT_HOME/test.txt
Обратите внимание, что рабочий каталог – $CONFLUENT_HOME.
Конфигурация коннектора потребителя
Для коннектора потребителя будем использовать эталонную конфигурацию в $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties:
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
Логически он содержит точно такие же параметры, хотя на этот раз connector.class определяет реализацию коннектора потребителя, а file – это место, куда коннектор должен записывать содержимое.
Конфигурация обработчика
Наконец, нужно настроить обработчик Connect, который объединит два коннектора и выполнит работу по чтению из коннектора-источника и записи в коннектор-потребитель.
Для этого можно использовать $CONFLUENT_HOME/etc/kafka/connect-standalone.properties:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/share/java
Обратите внимание, что plugin.path может содержать список путей, по которым доступны реализации коннектора.
Поскольку будем использовать коннекторы в комплекте с Kafka, можно установить для plugin.path значение $CONFLUENT_HOME/share/java. При работе с Windows может потребоваться указать здесь абсолютный путь.
Для остальных параметров можно оставить значения по умолчанию:
- bootstrap.servers содержит адреса брокеров Kafka;
- key.converter и value.converter определяют классы конвертеров, которые сериализуют и десериализуют данные по мере их поступления из источника в Kafka, а затем из Kafka в потребитель;
- key.converter.schemas.enable и value.converter.schemas.enable – это настройки, специфичные для конвертера;
- offset.storage.file.filename – самый важный параметр при работе Connect в автономном режиме: он определяет, где Connect должен хранить данные смещения;
- offset.flush.interval.ms определяет интервал, с которым обработчик пытается зафиксировать смещения для задач.
Список параметров довольно большой, поэтому ознакомьтесь с официальной документацией для получения полной информации.
Kafka Connect в автономном режиме
Теперь можно начать первую настройку коннектора:
$CONFLUENT_HOME/bin/connect-standalone \
$CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
$CONFLUENT_HOME/etc/kafka/connect-file-source.properties \
$CONFLUENT_HOME/etc/kafka/connect-file-sink.properties
Во-первых, можно проверить содержимое топика с помощью командной строки:
$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning
Как видим, исходный коннектор взял данные из файла test.txt, преобразовал их в JSON и отправил в Kafka:
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
Если посмотрим на папку $CONFLUENT_HOME, то увидим, что здесь был создан файл test.sink.txt:
cat $CONFLUENT_HOME/test.sink.txt
foo
bar
Поскольку коннектор потребителя извлекает значение из атрибута полезной нагрузки и записывает его в файл назначения, данные в test.sink.txt содержат содержимое исходного файла test.txt.
Теперь добавим больше строк в test.txt.
Когда это делаем, видим, что коннектор источника автоматически обнаруживает эти изменения.
Нужно только убедиться, что в конце вставлена новая строка, иначе исходный коннектор не будет учитывать последнюю строку.
На этом этапе остановим процесс Connect, так как запустим Connect в распределенном режиме несколькими строками.
REST API Connect
До сих пор выполняли все настройки, передавая файлы свойств через командную строку. Однако, поскольку Connect предназначен для работы в качестве сервиса, также доступен REST API.
По умолчанию он доступен по адресу http://localhost:8083. Несколько конечных точек:
- GET/connectors – возвращает список всех используемых коннекторов;
- GET/connectors/{name} – возвращает сведения о конкретном коннекторе;
- POST/connectors – создает новый коннектор; тело запроса должно быть объектом JSON, содержащим поле строкового имени и поле конфигурации объекта с параметрами конфигурации коннектора;
- GET/connectors/{name}/status – возвращает текущий статус коннектора, в том числе если он запущен, при сбое или приостановке, к какому рабочему процессу он назначен, информацию об ошибке, если сбой, и состояние всех его задач;
- DELETE/connectors/{name} – удаляет коннектор, изящно останавливая все задачи и удаляя его конфигурацию;
- GET/connector-plugins – возвращает список плагинов коннекторов, установленных в кластере Kafka Connect.
Официальная документация предоставляет список со всеми конечными точками.
В следующем разделе будем использовать REST API для создания новых коннекторов.
Kafka Connect в распределенном режиме
Автономный режим идеально подходит для разработки и тестирования, а также для небольших установок. Однако, если хотим в полной мере использовать распределенную природу Kafka, необходимо запустить Connect в распределенном режиме.
При этом настройки коннектора и метаданные сохраняются в топиках Kafka, а не в файловой системе. В результате рабочие узлы действительно не имеют состояния.
Запуск коннектора
Эталонную конфигурацию для распределенного режима можно найти по адресу $CONFLUENT_HOME/etc/kafka/connect-distributed.properties.
Параметры в основном такие же, как и для автономного режима. Отличий всего несколько:
- group.id определяет имя кластерной группы Connect (значение должно отличаться от любого идентификатора группы потребителей);
- offset.storage.topic, config.storage.topic и status.storage.topic определяют топики для этих настроек (для каждого топика можно определить коэффициент репликации).
Официальная документация предоставляет список со всеми параметрами.
Можно запустить Connect в распределенном режиме следующим образом:
$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties
Добавление коннекторов с помощью REST API
Теперь, по сравнению с автономной командой запуска, мы не передавали никаких конфигураций коннектора в качестве аргументов. Вместо этого необходимо создать коннекторы с помощью REST API.
Чтобы настроить предыдущий пример, необходимо отправить два запроса POST на адрес http://localhost:8083/connectors, содержащие следующие структуры JSON.
Во-первых, нужно создать тело для исходного коннектора POST в виде файла JSON. Назовем его connect-file-source.json:
{
"name": "local-file-source",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": 1,
"file": "test-distributed.txt",
"topic": "connect-distributed"
}
}
Обратите внимание, как это выглядит очень похоже на эталонный файл конфигурации, который использовали в первый раз.
Затем используем POST:
curl -d @"$CONFLUENT_HOME/connect-file-source.json" \
-H "Content-Type: application/json" \
-X POST http://localhost:8083/connectors
Затем сделаем то же самое для коннектора потребителя, вызвав файл connect-file-sink.json:
{
"name": "local-file-sink",
"config": {
"connector.class": "FileStreamSink",
"tasks.max": 1,
"file": "test-distributed.sink.txt",
"topics": "connect-distributed"
}
}
И выполним POST, как раньше:
curl -d @$CONFLUENT_HOME/connect-file-sink.json \
-H "Content-Type: application/json" \
-X POST http://localhost:8083/connectors
При необходимости можно убедиться, что эта настройка работает правильно:
$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-distributed --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
Если посмотрим на папку $CONFLUENT_HOME, то увидим, что здесь был создан файл test-distributed.sink.txt:
cat $CONFLUENT_HOME/test-distributed.sink.txt
foo
bar
После того, как протестировали распределенную установку, очистим ее, удалив два коннектора:
curl -X DELETE http://localhost:8083/connectors/local-file-source
curl -X DELETE http://localhost:8083/connectors/local-file-sink
Преобразование данных
Поддерживаемые преобразования
Преобразования позволяют вносить простые и легкие изменения в отдельные сообщения.
Kafka Connect поддерживает следующие встроенные преобразования:
- InsertField – добавить поле, используя либо статические данные, либо метаданные записи; ReplaceField – фильтровать или переименовывать поля;
- MaskField – заменить поле допустимым значением null для типа (например, нулем или пустой строкой);
- HoistField – обернуть все событие как одно поле внутри структуры или мапы;
- ExtractField – извлечь определенное поле из структуры и мапы и включить в результаты только этого поля;
- SetSchemaMetadata – изменить имя или версию схемы;
- TimestampRouter – изменить топик записи на основе исходного топика и временной метки;
- RegexRouter – изменить топик записи на основе исходного топика, строки замены и регулярного выражения.
Преобразование настраивается с использованием следующих параметров:
- transforms – разделенный запятыми список псевдонимов для преобразований;
- transforms.$alias.type – имя класса для преобразования;
- transforms.$alias.$transformationSpecificConfig – конфигурация для соответствующего преобразования.
Применение преобразователя
Чтобы протестировать некоторые функции преобразования, настроим следующие два преобразования:
- обернем все сообщение как структуру JSON;
- добавим поле в эту структуру.
Прежде чем применять преобразования, необходимо настроить Connect для использования JSON без схемы, изменив connect-distributed.properties:
key.converter.schemas.enable=false
value.converter.schemas.enable=false
После этого нужно перезапустить Connect в распределенном режиме:
$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties
После этого нужно создать тело для исходного коннектора POST в виде файла JSON. Здесь назовем его connect-file-source-transform.json.
Помимо уже известных параметров, добавим несколько строк для двух необходимых преобразований:
{
"name": "local-file-source",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": 1,
"file": "test-transformation.txt",
"topic": "connect-transformation",
"transforms": "MakeMap,InsertSource",
"transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.MakeMap.field": "line",
"transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSource.static.field": "data_source",
"transforms.InsertSource.static.value": "test-file-source"
}
}
После этого выполним POST:
curl -d @$CONFLUENT_HOME/connect-file-source-transform.json \
-H "Content-Type: application/json" \
-X POST http://localhost:8083/connectors
Напишем несколько строк в test-transformation.txt:
Foo
Bar
Если теперь проверим топик connect-transformation, то должны получить следующие строки:
{"line":"Foo","data_source":"test-file-source"}
{"line":"Bar","data_source":"test-file-source"}
Использование готовых коннекторов
После использования этих простых коннекторов посмотрим на более продвинутые, готовые к использованию коннекторы и способы их установки.
Где найти коннекторы
Готовые коннекторы доступны из разных источников:
- несколько коннекторов связаны с простым Apache Kafka (источник и потребитель для файлов и консоли);
- еще несколько коннекторов входят в состав Confluent Platform (ElasticSearch, HDFS, JDBC и AWS S3).
- также проверьте Confluent Hub, который является своего рода магазином приложений для коннекторов Kafka. Количество предлагаемых коннекторов постоянно растет:
- коннекторы Confluent (разработаны, протестированы, задокументированы и полностью поддерживаются Confluent);
- сертифицированные коннекторы (реализованы сторонней организацией и сертифицированы Confluent);
- коннекторы, разработанные и поддерживаемые сообществом.
- кроме того, Confluent также предоставляет страницу коннекторов;
- и, наконец, есть производители, которые предоставляют коннекторы как часть своего продукта; например, Landoop предоставляет потоковую библиотеку под названием Lenses, которая также содержит набор из примерно 25 коннекторов с открытым исходным кодом (многие из них также перечислены в других местах).
Установка коннекторов из Confluent Hub
Корпоративная версия Confluent предоставляет сценарий для установки коннекторов и других компонентов из Confluent Hub (сценарий не включен в версию с открытым исходным кодом). Если используем корпоративную версию, то можно установить коннектор с помощью следующей команды:
$CONFLUENT_HOME/bin/confluent-hub install confluentinc/kafka-connect-mqtt:1.0.0-preview
Установка коннекторов вручную
Если нужен коннектор, которого нет в Confluent Hub, или если есть версия Confluent с открытым исходным кодом, можно установить необходимые коннекторы вручную. Для этого нужно скачать и разархивировать коннектор, а также переместить включенные библиотеки в папку, указанную как plugin.path.
Для каждого коннектора в архиве должны быть две интересующие нас папки:
- в папке lib находится jar коннектора, например, kafka-connect-mqtt-1.0.0-preview.jar, а также еще несколько *.jar, необходимых коннектору;
- папка etc содержит один или несколько эталонных файлов конфигурации.
Необходимо переместить папку lib в $CONFLUENT_HOME/share/java или в любое другое место, которое указали как plugin.path в connect-standalone.properties и connect-distributed.properties. При этом также может иметь смысл переименовать папку во что-то осмысленное.
Можно использовать файлы конфигурации из etc, либо ссылаясь на них при запуске в автономном режиме, либо можно взять свойства и создать из них файл JSON.
Заключение
В этом руководстве рассмотрели, как установить и использовать Kafka Connect. Рассмотрели типы коннекторов, как источники, так и потребители. Рассмотрели некоторые функции и режимы, в которых может работать Connect. Затем рассмотрели преобразователи. И, наконец, узнали, где взять и как установить пользовательские коннекторы.
Файлы конфигурации можно найти на GitHub.