Перейти к содержанию

Введение в коннекторы Kafka

Apache Kafka® – это распределенная платформа потоковой передачи. Ранее мы обсуждали, как реализовать потребителей и производителей Kafka с помощью Spring.

В этом руководстве узнаем, как использовать коннекторы Kafka. Мы рассмотрим:

  • различные типы коннекторов Kafka;
  • функции и режимы Kafka Connect;
  • конфигурацию коннекторов с использованием файлов свойств, а также REST API.

Основы Kafka Connect и коннекторы Kafka

Kafka Connect – это платформа для подключения Kafka к внешним системам, таким как базы данных, хранилища ключей и значений, поисковые индексы и файловые системы, с помощью так называемых коннекторов.

Коннекторы Kafka – это готовые к использованию компоненты, которые помогут импортировать данные из внешних систем в топики Kafka и экспортировать данные из топиков Kafka во внешние системы. Можно использовать существующие реализации коннекторов для общих источников данных и потребителей или реализовать собственные коннекторы.

Исходный коннектор собирает данные из системы. Исходные системы могут быть целыми базами данных, таблицами потоков или брокерами сообщений. Исходный коннектор также может собирать метрики с серверов приложений в топики Kafka, делая данные доступными для потоковой обработки с малой задержкой.

Коннектор потребителя доставляет данные из разделов Kafka в другие системы, которые могут быть индексами, такими как Elasticsearch, пакетными системами, такими как Hadoop, или любой другой базой данных.

Некоторые коннекторы поддерживаются сообществом, а другие поддерживаются Confluent или его партнерами. Действительно, можно найти коннекторы для большинства популярных систем, таких как S3, JDBC и Cassandra, и это лишь некоторые из них.

Функции

Возможности Kafka Connect включают в себя:

  • фреймворк для подключения внешних систем к Kafka – упрощает разработку, развертывание и управление коннекторами;
  • распределенный и автономный режимы – помогает развертывать большие кластеры, используя распределенный характер Kafka, а также настройки для разработки, тестирования и небольших производственных развертываний;
  • интерфейс REST – можно управлять коннекторами с помощью REST API;
  • автоматическое управление смещением – Kafka Connect помогает обрабатывать процесс фиксации смещения, что избавляет от необходимости вручную реализовывать эту подверженную ошибкам часть разработки коннектора;
  • распределенная и масштабируемая по умолчанию – Kafka Connect использует существующий протокол управления группами; можно добавить больше обработчиков для масштабирования кластера Kafka Connect;
  • потоковая и пакетная интеграция: Kafka Connect – идеальное решение для объединения систем потоковой и пакетной передачи данных в сочетании с существующими возможностями Kafka;
  • преобразования – они позволяют вносить простые и легкие изменения в отдельные сообщения.

Настройка

Вместо использования простого дистрибутива Kafka загрузим Confluent Platform, дистрибутив Kafka, предоставленный Confluent, Inc., компанией, стоящей за Kafka. Confluent Platform поставляется с дополнительными инструментами и клиентами по сравнению с простой Kafka, а также дополнительными готовыми коннекторами.

В нашем случае достаточно версии с открытым исходным кодом, которую можно найти на сайте Confluent.

Быстрый старт Kafka Connect

Для начала обсудим принцип Kafka Connect, используя его самые основные коннекторы, которые являются коннектором источника файла и коннектором потребителя файла.

Удобно, что Confluent Platform поставляется с обоими этими коннекторами, а также с эталонными конфигурациями.

Конфигурация исходного коннектора

Для исходного коннектора эталонная конфигурация доступна по адресу $CONFLUENT_HOME/etc/kafka/connect-file-source.properties:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
topic=connect-test
file=test.txt

Эта конфигурация имеет некоторые свойства, общие для всех исходных коннекторов:

  • name – указанное пользователем имя экземпляра коннектора;
  • connector.class указывает класс реализации, в основном вид коннектора;
  • tasks.max указывает, сколько экземпляров исходного коннектора должно работать параллельно;
  • topic определяет топик, в который коннектор должен отправлять выходные данные.

В этом случае есть атрибут, специфичный для коннектора:

  • file определяет файл, из которого коннектор должен считывать ввод.

Чтобы это работало, создадим базовый файл с некоторым содержимым:

echo -e "foo\nbar\n" > $CONFLUENT_HOME/test.txt

Обратите внимание, что рабочий каталог – $CONFLUENT_HOME.

Конфигурация коннектора потребителя

Для коннектора потребителя будем использовать эталонную конфигурацию в $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties:

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

Логически он содержит точно такие же параметры, хотя на этот раз connector.class определяет реализацию коннектора потребителя, а file – это место, куда коннектор должен записывать содержимое.

Конфигурация обработчика

Наконец, нужно настроить обработчик Connect, который объединит два коннектора и выполнит работу по чтению из коннектора-источника и записи в коннектор-потребитель.

Для этого можно использовать $CONFLUENT_HOME/etc/kafka/connect-standalone.properties:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/share/java

Обратите внимание, что plugin.path может содержать список путей, по которым доступны реализации коннектора.

Поскольку будем использовать коннекторы в комплекте с Kafka, можно установить для plugin.path значение $CONFLUENT_HOME/share/java. При работе с Windows может потребоваться указать здесь абсолютный путь.

Для остальных параметров можно оставить значения по умолчанию:

  • bootstrap.servers содержит адреса брокеров Kafka;
  • key.converter и value.converter определяют классы конвертеров, которые сериализуют и десериализуют данные по мере их поступления из источника в Kafka, а затем из Kafka в потребитель;
  • key.converter.schemas.enable и value.converter.schemas.enable – это настройки, специфичные для конвертера;
  • offset.storage.file.filename – самый важный параметр при работе Connect в автономном режиме: он определяет, где Connect должен хранить данные смещения;
  • offset.flush.interval.ms определяет интервал, с которым обработчик пытается зафиксировать смещения для задач.

Список параметров довольно большой, поэтому ознакомьтесь с официальной документацией для получения полной информации.

Kafka Connect в автономном режиме

Теперь можно начать первую настройку коннектора:

$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-source.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties

Во-первых, можно проверить содержимое топика с помощью командной строки:

$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning

Как видим, исходный коннектор взял данные из файла test.txt, преобразовал их в JSON и отправил в Kafka:

{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

Если посмотрим на папку $CONFLUENT_HOME, то увидим, что здесь был создан файл test.sink.txt:

cat $CONFLUENT_HOME/test.sink.txt
foo
bar

Поскольку коннектор потребителя извлекает значение из атрибута полезной нагрузки и записывает его в файл назначения, данные в test.sink.txt содержат содержимое исходного файла test.txt.

Теперь добавим больше строк в test.txt.

Когда это делаем, видим, что коннектор источника автоматически обнаруживает эти изменения.

Нужно только убедиться, что в конце вставлена новая строка, иначе исходный коннектор не будет учитывать последнюю строку.

На этом этапе остановим процесс Connect, так как запустим Connect в распределенном режиме несколькими строками.

REST API Connect

До сих пор выполняли все настройки, передавая файлы свойств через командную строку. Однако, поскольку Connect предназначен для работы в качестве сервиса, также доступен REST API.

По умолчанию он доступен по адресу http://localhost:8083. Несколько конечных точек:

  • GET/connectors – возвращает список всех используемых коннекторов;
  • GET/connectors/{name} – возвращает сведения о конкретном коннекторе;
  • POST/connectors – создает новый коннектор; тело запроса должно быть объектом JSON, содержащим поле строкового имени и поле конфигурации объекта с параметрами конфигурации коннектора;
  • GET/connectors/{name}/status – возвращает текущий статус коннектора, в том числе если он запущен, при сбое или приостановке, к какому рабочему процессу он назначен, информацию об ошибке, если сбой, и состояние всех его задач;
  • DELETE/connectors/{name} – удаляет коннектор, изящно останавливая все задачи и удаляя его конфигурацию;
  • GET/connector-plugins – возвращает список плагинов коннекторов, установленных в кластере Kafka Connect.

Официальная документация предоставляет список со всеми конечными точками.

В следующем разделе будем использовать REST API для создания новых коннекторов.

Kafka Connect в распределенном режиме

Автономный режим идеально подходит для разработки и тестирования, а также для небольших установок. Однако, если хотим в полной мере использовать распределенную природу Kafka, необходимо запустить Connect в распределенном режиме.

При этом настройки коннектора и метаданные сохраняются в топиках Kafka, а не в файловой системе. В результате рабочие узлы действительно не имеют состояния.

Запуск коннектора

Эталонную конфигурацию для распределенного режима можно найти по адресу $CONFLUENT_HOME/etc/kafka/connect-distributed.properties.

Параметры в основном такие же, как и для автономного режима. Отличий всего несколько:

  • group.id определяет имя кластерной группы Connect (значение должно отличаться от любого идентификатора группы потребителей);
  • offset.storage.topic, config.storage.topic и status.storage.topic определяют топики для этих настроек (для каждого топика можно определить коэффициент репликации).

Официальная документация предоставляет список со всеми параметрами.

Можно запустить Connect в распределенном режиме следующим образом:

$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties

Добавление коннекторов с помощью REST API

Теперь, по сравнению с автономной командой запуска, мы не передавали никаких конфигураций коннектора в качестве аргументов. Вместо этого необходимо создать коннекторы с помощью REST API.

Чтобы настроить предыдущий пример, необходимо отправить два запроса POST на адрес http://localhost:8083/connectors, содержащие следующие структуры JSON.

Во-первых, нужно создать тело для исходного коннектора POST в виде файла JSON. Назовем его connect-file-source.json:

{
    "name": "local-file-source",
    "config": {
        "connector.class": "FileStreamSource",
        "tasks.max": 1,
        "file": "test-distributed.txt",
        "topic": "connect-distributed"
    }
}

Обратите внимание, как это выглядит очень похоже на эталонный файл конфигурации, который использовали в первый раз.

Затем используем POST:

curl -d @"$CONFLUENT_HOME/connect-file-source.json" \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8083/connectors

Затем сделаем то же самое для коннектора потребителя, вызвав файл connect-file-sink.json:

{
    "name": "local-file-sink",
    "config": {
        "connector.class": "FileStreamSink",
        "tasks.max": 1,
        "file": "test-distributed.sink.txt",
        "topics": "connect-distributed"
    }
}

И выполним POST, как раньше:

curl -d @$CONFLUENT_HOME/connect-file-sink.json \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8083/connectors

При необходимости можно убедиться, что эта настройка работает правильно:

$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-distributed --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

Если посмотрим на папку $CONFLUENT_HOME, то увидим, что здесь был создан файл test-distributed.sink.txt:

cat $CONFLUENT_HOME/test-distributed.sink.txt
foo
bar

После того, как протестировали распределенную установку, очистим ее, удалив два коннектора:

curl -X DELETE http://localhost:8083/connectors/local-file-source
curl -X DELETE http://localhost:8083/connectors/local-file-sink

Преобразование данных

Поддерживаемые преобразования

Преобразования позволяют вносить простые и легкие изменения в отдельные сообщения.

Kafka Connect поддерживает следующие встроенные преобразования:

  • InsertField – добавить поле, используя либо статические данные, либо метаданные записи; ReplaceField – фильтровать или переименовывать поля;
  • MaskField – заменить поле допустимым значением null для типа (например, нулем или пустой строкой);
  • HoistField – обернуть все событие как одно поле внутри структуры или мапы;
  • ExtractField – извлечь определенное поле из структуры и мапы и включить в результаты только этого поля;
  • SetSchemaMetadata – изменить имя или версию схемы;
  • TimestampRouter – изменить топик записи на основе исходного топика и временной метки;
  • RegexRouter – изменить топик записи на основе исходного топика, строки замены и регулярного выражения.

Преобразование настраивается с использованием следующих параметров:

  • transforms – разделенный запятыми список псевдонимов для преобразований;
  • transforms.$alias.type – имя класса для преобразования;
  • transforms.$alias.$transformationSpecificConfig – конфигурация для соответствующего преобразования.

Применение преобразователя

Чтобы протестировать некоторые функции преобразования, настроим следующие два преобразования:

  • обернем все сообщение как структуру JSON;
  • добавим поле в эту структуру.

Прежде чем применять преобразования, необходимо настроить Connect для использования JSON без схемы, изменив connect-distributed.properties:

key.converter.schemas.enable=false
value.converter.schemas.enable=false

После этого нужно перезапустить Connect в распределенном режиме:

$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties

После этого нужно создать тело для исходного коннектора POST в виде файла JSON. Здесь назовем его connect-file-source-transform.json.

Помимо уже известных параметров, добавим несколько строк для двух необходимых преобразований:

{
    "name": "local-file-source",
    "config": {
        "connector.class": "FileStreamSource",
        "tasks.max": 1,
        "file": "test-transformation.txt",
        "topic": "connect-transformation",
        "transforms": "MakeMap,InsertSource",
        "transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value",
        "transforms.MakeMap.field": "line",
        "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertSource.static.field": "data_source",
        "transforms.InsertSource.static.value": "test-file-source"
    }
}

После этого выполним POST:

curl -d @$CONFLUENT_HOME/connect-file-source-transform.json \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8083/connectors

Напишем несколько строк в test-transformation.txt:

Foo
Bar

Если теперь проверим топик connect-transformation, то должны получить следующие строки:

{"line":"Foo","data_source":"test-file-source"}
{"line":"Bar","data_source":"test-file-source"}

Использование готовых коннекторов

После использования этих простых коннекторов посмотрим на более продвинутые, готовые к использованию коннекторы и способы их установки.

Где найти коннекторы

Готовые коннекторы доступны из разных источников:

  • несколько коннекторов связаны с простым Apache Kafka (источник и потребитель для файлов и консоли);
  • еще несколько коннекторов входят в состав Confluent Platform (ElasticSearch, HDFS, JDBC и AWS S3).
  • также проверьте Confluent Hub, который является своего рода магазином приложений для коннекторов Kafka. Количество предлагаемых коннекторов постоянно растет:
    • коннекторы Confluent (разработаны, протестированы, задокументированы и полностью поддерживаются Confluent);
    • сертифицированные коннекторы (реализованы сторонней организацией и сертифицированы Confluent);
    • коннекторы, разработанные и поддерживаемые сообществом.
  • кроме того, Confluent также предоставляет страницу коннекторов;
  • и, наконец, есть производители, которые предоставляют коннекторы как часть своего продукта; например, Landoop предоставляет потоковую библиотеку под названием Lenses, которая также содержит набор из примерно 25 коннекторов с открытым исходным кодом (многие из них также перечислены в других местах).

Установка коннекторов из Confluent Hub

Корпоративная версия Confluent предоставляет сценарий для установки коннекторов и других компонентов из Confluent Hub (сценарий не включен в версию с открытым исходным кодом). Если используем корпоративную версию, то можно установить коннектор с помощью следующей команды:

$CONFLUENT_HOME/bin/confluent-hub install confluentinc/kafka-connect-mqtt:1.0.0-preview

Установка коннекторов вручную

Если нужен коннектор, которого нет в Confluent Hub, или если есть версия Confluent с открытым исходным кодом, можно установить необходимые коннекторы вручную. Для этого нужно скачать и разархивировать коннектор, а также переместить включенные библиотеки в папку, указанную как plugin.path.

Для каждого коннектора в архиве должны быть две интересующие нас папки:

  • в папке lib находится jar коннектора, например, kafka-connect-mqtt-1.0.0-preview.jar, а также еще несколько *.jar, необходимых коннектору;
  • папка etc содержит один или несколько эталонных файлов конфигурации.

Необходимо переместить папку lib в $CONFLUENT_HOME/share/java или в любое другое место, которое указали как plugin.path в connect-standalone.properties и connect-distributed.properties. При этом также может иметь смысл переименовать папку во что-то осмысленное.

Можно использовать файлы конфигурации из etc, либо ссылаясь на них при запуске в автономном режиме, либо можно взять свойства и создать из них файл JSON.

Заключение

В этом руководстве рассмотрели, как установить и использовать Kafka Connect. Рассмотрели типы коннекторов, как источники, так и потребители. Рассмотрели некоторые функции и режимы, в которых может работать Connect. Затем рассмотрели преобразователи. И, наконец, узнали, где взять и как установить пользовательские коннекторы.

Файлы конфигурации можно найти на GitHub.

Оригинал