Перейти к содержанию

Apache Kafka в Spring Framework

Apache Kafka – это распределенная и отказоустойчивая система обработки потоков.

В этом руководстве рассмотрим поддержку Spring для Kafka и уровень абстракции, который он обеспечивает по сравнению с собственными клиентскими Kafka API для Java.

Spring Kafka предлагает простую и типичную модель программирования шаблонов Spring с KafkaTemplate и управляемыми сообщениями POJO через аннотацию @KafkaListener.

Установка и настройка

Чтобы загрузить и установить Kafka, обратитесь к официальному руководству. Также нужно добавить зависимость spring-kafka в pom.xml:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.2</version>
</dependency>

Последнюю версию этого артефакта можно найти здесь.

Разберем пример приложения на базе Spring Boot.

В этой статье предполагается, что сервер запущен с конфигурацией по умолчанию и порты сервера не изменены.

Настройка топиков

Раньше для создания топиков в Kafka запускали инструменты командной строки :

$ bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic mytopic

Но с введением AdminClient в Kafka теперь можно создавать топики программно.

Для этого необходимо добавить bean-компонент Spring KafkaAdmin, который будет автоматически добавлять топики для всех bean-компонентов типа NewTopic:

@Configuration
public class KafkaTopicConfig {
    
    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }
    
    @Bean
    public NewTopic topic1() {
         return new NewTopic("javamaster", 1, (short) 1);
    }
}

Создание сообщений

Чтобы создавать сообщения, сначала нужно настроить ProducerFactory. Это задает стратегию создания экземпляров Kafka Producer.

Затем нужен KafkaTemplate, который обертывает экземпляр Producer и предоставляет удобные методы для отправки сообщений в топики Kafka.

Экземпляры Producer являются потокобезопасными. Таким образом, использование одного экземпляра в контексте приложения даст более высокую производительность. Следовательно, экземпляры KakfaTemplate также являются потокобезопасными, и рекомендуется использовать один экземпляр.

Конфигурация производителя

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        configProps.put(
          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        configProps.put(
          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Публикация сообщений

Можно отправлять сообщения с помощью класса KafkaTemplate:

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String msg) {
    kafkaTemplate.send(topicName, msg);
}

Send API возвращает объект ListenableFuture. Если хотим заблокировать отправляющий поток и получить результат об отправленном сообщении, можно вызвать get API объекта ListenableFuture. Поток будет ждать результата, но это замедлит производителя.

Kafka – это платформа для быстрой обработки потоков. Поэтому лучше обрабатывать результаты асинхронно, чтобы последующие сообщения не ждали результата предыдущего сообщения. Можно сделать это с помощью обратного вызова:

public void sendMessage(String message) {
            
    ListenableFuture<SendResult<String, String>> future = 
      kafkaTemplate.send(topicName, message);
	
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

        @Override
        public void onSuccess(SendResult<String, String> result) {
            System.out.println("Sent message=[" + message + 
              "] with offset=[" + result.getRecordMetadata().offset() + "]");
        }
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("Unable to send message=[" 
              + message + "] due to : " + ex.getMessage());
        }
    });
}

Получение сообщений

Конфигурация потребителя

Для получения сообщений нужно настроить ConsumerFactory и KafkaListenerContainerFactory. Как только эти бины станут доступны в фабрике бинов Spring, потребители на основе POJO можно настроить с помощью аннотации @KafkaListener.

В классе конфигурации требуется аннотация @EnableKafka, чтобы включить обнаружение аннотации @KafkaListener в bean-компонентах, управляемых Spring:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        props.put(
          ConsumerConfig.GROUP_ID_CONFIG, 
          groupId);
        props.put(
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        props.put(
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> 
      kafkaListenerContainerFactory() {
   
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Получение сообщений

@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group foo: " + message);
}

Можно реализовать несколько слушателей для топика, каждый с другим идентификатором группы. Кроме того, один потребитель может прослушивать сообщения из разных тем:

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

Spring также поддерживает извлечение одного или нескольких заголовков сообщений с помощью аннотации @Header в слушателе:

@KafkaListener(topics = "topicName")
public void listenWithHeaders(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"
        + "from partition: " + partition);
}

Получение сообщений из определенного раздела

Обратите внимание, что мы создали топик javamaster только с одним разделом. Однако для топика с несколькими разделами @KafkaListener может явно подписаться на конкретный раздел топика с начальным смещением:

@KafkaListener(
  topicPartitions = @TopicPartition(topic = "topicName",
  partitionOffsets = {
    @PartitionOffset(partition = "0", initialOffset = "0"), 
    @PartitionOffset(partition = "3", initialOffset = "0")}),
  containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"
        + "from partition: " + partition);
}

Поскольку в этом прослушивателе для InitialOffset установлено значение 0, все ранее использованные сообщения из разделов 0 и 3 будут повторно использоваться каждый раз при инициализации этого прослушивателя.

Если не нужно устанавливать смещение, можно использовать свойство partitions аннотации @TopicPartition, чтобы установить только разделы без смещения:

@KafkaListener(topicPartitions 
  = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))

Добавление фильтра сообщений для слушателей

Можно настроить прослушиватели для получения определенных типов сообщений, добавив собственный фильтр. Это можно сделать, установив для RecordFilterStrategy значение KafkaListenerContainerFactory:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
  filterKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(
      record -> record.value().contains("World"));
    return factory;
}

Затем можно настроить прослушиватель для использования этой фабрики контейнеров:

@KafkaListener(
  topics = "topicName", 
  containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
    System.out.println("Received Message in filtered listener: " + message);
}

В этом прослушивателе все сообщения, соответствующие фильтру, будут отброшены.

Пользовательские конвертеры сообщений

До сих пор мы рассматривали только отправку и получение строк в виде сообщений. Однако можно отправлять и получать пользовательские объекты Java. Для этого требуется настроить соответствующий сериализатор в ProducerFactory и десериализатор в ConsumerFactory.

Посмотрим на простой класс бина, который будем отправлять в виде сообщений:

public class Greeting {

    private String msg;
    private String name;

    // стандартные геттеры, сеттеры и конструктор
}

Создание пользовательских сообщений

В этом примере будем использовать JsonSerializer.

Посмотрим на код для ProducerFactory и KafkaTemplate:

@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
    // ...
    configProps.put(
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
    return new KafkaTemplate<>(greetingProducerFactory());
}

Можно использовать этот новый KafkaTemplate для отправки приветственного сообщения:

kafkaTemplate.send(topicName, new Greeting("Hello", "World"));

6.2. Получение пользовательских сообщений

Точно так же изменим ConsumerFactory и KafkaListenerContainerFactory, чтобы правильно десериализовать приветственное сообщение:

@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
    // ...
    return new DefaultKafkaConsumerFactory<>(
      props,
      new StringDeserializer(), 
      new JsonDeserializer<>(Greeting.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> 
  greetingKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(greetingConsumerFactory());
    return factory;
}

JSON сериализатор и десериализатор spring-kafka использует библиотеку Jackson, которая также является дополнительной зависимостью Maven для проекта spring-kafka.

Добавим его в pom.xml:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.7</version>
</dependency>

Вместо использования последней версии Jackson рекомендуется использовать версию, добавленную в pom.xml spring-kafka.

Наконец, нужно написать прослушиватель для получения приветственных сообщений:

@KafkaListener(
  topics = "topicName", 
  containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
    // обработка приветственного сообщения
}

Заключение

В этой статье рассмотрели основы поддержки Spring для Apache Kafka. Мы кратко рассмотрели классы, используемые для отправки и получения сообщений.

Полный исходный код этой статьи можно найти на GitHub.

Перед запуском кода убедитесь, что сервер Kafka запущен и топики созданы вручную.

Оригинал