Покроковий посібник із налаштування та запуску

Сучасні комп’ютерні системи щодня створюють мільйони записів даних. До них належать ваші фінансові операції, розміщення замовлення або дані з датчика автомобіля. Щоб обробляти ці потокові події даних у режимі реального часу та надійно переміщувати записи подій між різними корпоративними системами, вам потрібно Апач Кафка.

Apache Kafka — це рішення для потокової передачі даних із відкритим кодом, яке обробляє понад 1 мільйон записів за секунду. Окрім цієї високої пропускної здатності, Apache Kafka забезпечує високу масштабованість і доступність, низьку затримку та постійне зберігання.

Такі компанії, як LinkedIn, Uber і Netflix, покладаються на Apache Kafka для обробки в реальному часі та потокового передавання даних. Найпростіший спосіб розпочати роботу з Apache Kafka — запустити його на вашій локальній машині. Це дозволяє не тільки бачити сервер Apache Kafka у дії, але й дозволяє створювати та споживати повідомлення.

Маючи практичний досвід запуску сервера, створення тем і написання коду Java за допомогою клієнта Kafka, ви будете готові використовувати Apache Kafka для задоволення всіх своїх потреб у конвеєрі даних.

Як завантажити Apache Kafka на локальну машину

Ви можете завантажити останню версію Apache Kafka з офіційне посилання. Завантажений вміст буде стиснутий у форматі .tgz. Після завантаження вам доведеться розпакувати його.

Якщо у вас Linux, відкрийте свій термінал. Далі перейдіть до місця, куди ви завантажили стиснуту версію Apache Kafka. Виконайте таку команду:

tar -xzvf kafka_2.13-3.5.0.tgz

Після виконання команди ви побачите новий каталог під назвою kafka_2.13-3.5.0. Переміщайтеся в папці за допомогою:

cd kafka_2.13-3.5.0

Тепер ви можете переглянути вміст цього каталогу за допомогою команди ls.

Для користувачів Windows ви можете виконати ті самі дії. Якщо ви не можете знайти команду tar, ви можете скористатися інструментом третьої сторони, наприклад WinZip, щоб відкрити архів.

Як запустити Apache Kafka на вашій локальній машині

Після того, як ви завантажили та розпакували Apache Kafka, настав час розпочати його запуск. Він не має інсталяторів. Ви можете почати використовувати його безпосередньо через командний рядок або вікно терміналу.

Перш ніж почати роботу з Apache Kafka, переконайтеся, що у вашій системі встановлено Java 8+. Apache Kafka вимагає запущеної інсталяції Java.

#1. Запустіть сервер Apache Zookeeper

Першим кроком є ​​запуск Apache Zookeeper. Ви отримуєте його попередньо завантаженим як частину архіву. Це служба, яка відповідає за підтримку конфігурацій і забезпечення синхронізації для інших служб.

Коли ви перебуваєте в каталозі, куди ви видобули вміст архіву, виконайте таку команду:

Для користувачів Linux:

bin/zookeeper-server-start.sh config/zookeeper.properties

Для користувачів Windows:

bin/windows/zookeeper-server-start.bat config/zookeeper.properties

Файл zookeeper.properties містить конфігурації для запуску сервера Apache Zookeeper. Ви можете налаштувати такі властивості, як локальний каталог, де зберігатимуться дані, і порт, на якому працюватиме сервер.

#2. Запустіть сервер Apache Kafka

Тепер, коли сервер Apache Zookeeper запущено, настав час запустити сервер Apache Kafka.

Відкрийте новий термінал або вікно командного рядка та перейдіть до каталогу, де знаходяться витягнуті файли. Тоді ви можете запустити сервер Apache Kafka за допомогою команди нижче:

Для користувачів Linux:

bin/kafka-server-start.sh config/server.properties

Для користувачів Windows:

bin/windows/kafka-server-start.bat config/server.properties

У вас працює сервер Apache Kafka. Якщо ви бажаєте змінити стандартну конфігурацію, це можна зробити, змінивши файл server.properties. Різні значення присутні в офіційна документація.

Як використовувати Apache Kafka на локальній машині

Тепер ви готові почати використовувати Apache Kafka на локальній машині для створення та споживання повідомлень. Оскільки сервери Apache Zookeeper і Apache Kafka запущені та працюють, давайте подивимося, як ви можете створити свою першу тему, створити своє перше повідомлення та використати те саме.

  12 найкращих програм для керування проектами для малого та середнього бізнесу

Які кроки потрібно виконати, щоб створити тему в Apache Kafka?

Перш ніж створити свою першу тему, давайте розберемося, що таке тема. В Apache Kafka тема — це логічне сховище даних, яке допомагає потоково передавати дані. Уявіть це як канал, через який дані транспортуються від одного компонента до іншого.

Тема підтримує кілька виробників і кількох споживачів – більше ніж одна система може писати та читати з теми. На відміну від інших систем обміну повідомленнями, будь-яке повідомлення з теми можна використовувати кілька разів. Крім того, ви також можете вказати термін зберігання ваших повідомлень.

Розглянемо приклад системи (продюсера), яка виробляє дані для банківських операцій. А інша система (споживач) споживає ці дані та надсилає сповіщення про програму користувачеві. Щоб полегшити це, необхідна тема.

Відкрийте новий термінал або вікно командного рядка та перейдіть до каталогу, куди ви розпакували архів. Наступна команда створить тему під назвою транзакції:

Для користувачів Linux:

bin/kafka-topics.sh --create --topic transactions --bootstrap-server localhost:9092

Для користувачів Windows:

bin/windows/kafka-topics.bat --create --topic transactions --bootstrap-server localhost:9092

Тепер ви створили свою першу тему, і ви готові почати створювати та споживати повідомлення.

Як створити повідомлення для Apache Kafka?

З готовою темою Apache Kafka ви можете створювати своє перше повідомлення. Відкрийте новий термінал або вікно командного рядка або скористайтеся тим самим, що використовували для створення теми. Далі переконайтеся, що ви знаходитесь у правильному каталозі, куди ви видобули вміст архіву. Ви можете використовувати командний рядок, щоб створити своє повідомлення на тему за допомогою такої команди:

Для користувачів Linux:

bin/kafka-console-producer.sh --topic transactions --bootstrap-server localhost:9092

Для користувачів Windows:

bin/windows/kafka-console-producer.bat --topic transactions --bootstrap-server localhost:9092

Після виконання команди ви побачите, що ваш термінал або вікно командного рядка очікує введення. Напишіть своє перше повідомлення та натисніть Enter.

> This is a transactional record for $100

Ви створили своє перше повідомлення для Apache Kafka на локальній машині. Згодом ви готові використовувати це повідомлення.

Як споживати повідомлення від Apache Kafka?

За умови, що вашу тему було створено, і ви створили повідомлення до вашої теми Kafka, тепер ви можете використовувати це повідомлення.

Apache Kafka дозволяє приєднати кілька споживачів до однієї теми. Кожен споживач може бути частиною групи споживачів – логічного ідентифікатора. Наприклад, якщо у вас є дві служби, яким потрібно споживати однакові дані, вони можуть мати різні групи споживачів.

Однак, якщо у вас є два екземпляри однієї служби, вам слід уникати використання та обробки одного й того самого повідомлення двічі. У цьому випадку в обох буде одна група споживачів.

У вікні терміналу або командного рядка переконайтеся, що ви перебуваєте у потрібному каталозі. Використовуйте таку команду, щоб запустити споживача:

Для користувачів Linux:

bin/kafka-console-consumer.sh --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer

Для користувачів Windows:

bin/windows/kafka-console-consumer.bat --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer

Ви побачите, що повідомлення, яке ви створили раніше, з’явиться на вашому терміналі. Тепер ви використали Apache Kafka, щоб отримати своє перше повідомлення.

Команда kafka-console-consumer приймає багато аргументів. Давайте подивимося, що означає кожен з них:

  • –topic згадує тему, з якої ви будете споживати
  • –from-beginning повідомляє споживачеві консолі починати читати повідомлення відразу з першого повідомлення
  • Ваш сервер Apache Kafka згадується за допомогою параметра –bootstrap-server
  • Крім того, ви можете згадати групу споживачів, передавши параметр –group
  • Якщо параметр групи споживачів відсутній, він генерується автоматично
  8 найпоширеніших шахрайств з криптовалютою та як їх уникнути

Коли консольний споживач працює, ви можете спробувати створити нові повідомлення. Ви побачите, що всі вони використані та з’являться у вашому терміналі.

Тепер, коли ви створили свою тему та успішно створили та спожили повідомлення, давайте інтегруємо це з програмою Java.

Як створити виробника та споживача Apache Kafka за допомогою Java

Перш ніж почати, переконайтеся, що на вашій локальній машині встановлено Java 8+. Apache Kafka надає власну клієнтську бібліотеку, яка дозволяє безперешкодно підключатися. Якщо ви використовуєте Maven для керування своїми залежностями, додайте наступну залежність до свого pom.xml

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.0</version>
</dependency>

Ви також можете завантажити бібліотеку з Репозиторій Maven і додайте його до свого шляху до класів Java.

Коли бібліотека створена, відкрийте будь-який редактор коду. Давайте подивимося, як ви можете запустити свого виробника та споживача за допомогою Java.

Створити Apache Kafka Java producer

Завдяки бібліотеці kafka-clients ви готові розпочати створення свого продюсера Kafka.

Давайте створимо клас під назвою SimpleProducer.java. Це відповідатиме за створення повідомлень на тему, яку ви створили раніше. У цьому класі ви створите екземпляр org.apache.kafka.clients.producer.KafkaProducer. Згодом ви будете використовувати цього виробника для надсилання повідомлень.

Для створення виробника Kafka вам потрібен хост і порт вашого сервера Apache Kafka. Оскільки ви запускаєте його на локальній машині, хостом буде localhost. Враховуючи, що ви не змінили властивості за замовчуванням під час запуску сервера, порт буде 9092. Розгляньте наведений нижче код, який допоможе вам створити виробника:

package org.example.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SimpleProducer {

    private final KafkaProducer<String, String> producer;

    public SimpleProducer(String host, String port) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        this.producer = new KafkaProducer<>(properties);
    }
}

Ви помітите, що встановлюються три властивості. Давайте швидко пройдемося по кожному з них:

  • BOOTSTRAP_SERVERS_CONFIG дозволяє визначити, де запущено сервер Apache Kafka
  • KEY_SERIALIZER_CLASS_CONFIG повідомляє виробнику, який формат використовувати для надсилання ключів повідомлень.
  • Формат для надсилання фактичного повідомлення визначається за допомогою властивості VALUE_SERIALIZER_CLASS_CONFIG.

Оскільки ви надсилатимете текстові повідомлення, обидві властивості налаштовано на використання StringSerializer.class.

Щоб фактично надіслати повідомлення до вашої теми, вам потрібно використати метод producer.send(), який приймає ProducerRecord. Наступний код дає вам метод, який надсилає повідомлення до теми та друкує відповідь разом із зміщенням повідомлення.

public void produce(String topic, String message) throws ExecutionException, InterruptedException {
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
    final Future<RecordMetadata> send = this.producer.send(record);
    final RecordMetadata recordMetadata = send.get();
    System.out.println(recordMetadata);
}

З усім кодом на місці ви можете надсилати повідомлення у свою тему. Ви можете використовувати основний метод, щоб перевірити це, як представлено в коді нижче:

package org.example.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SimpleProducer {

    private final KafkaProducer<String, String> producer;

    public SimpleProducer(String host, String port) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        this.producer = new KafkaProducer<>(properties);
    }

    public void produce(String topic, String message) throws ExecutionException, InterruptedException {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
        final Future<RecordMetadata> send = this.producer.send(record);
        final RecordMetadata recordMetadata = send.get();
        System.out.println(recordMetadata);
    }

    public static void main(String[] args) throws Exception{
       SimpleProducer producer = new SimpleProducer("localhost", "9092");
       producer.produce("transactions", "This is a transactional record of $200");
    }
}

У цьому коді ви створюєте SimpleProducer, який підключається до вашого сервера Apache Kafka на вашій локальній машині. Він внутрішньо використовує KafkaProducer для створення текстових повідомлень на вашу тему.

Створення споживача Java Apache Kafka

Настав час зробити споживачем Apache Kafka за допомогою клієнта Java. Створіть клас під назвою SimpleConsumer.java. Далі ви створите конструктор для цього класу, який ініціалізує org.apache.kafka.clients.consumer.KafkaConsumer. Для створення споживача вам потрібен хост і порт, на якому працює сервер Apache Kafka. Крім того, вам потрібна група споживачів, а також тема, з якої ви хочете споживати. Використовуйте наведений нижче фрагмент коду:

package org.example.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class SimpleConsumer {

    private static final String OFFSET_RESET = "earliest";

    private final KafkaConsumer<String, String> consumer;
    private boolean keepConsuming = true;

    public SimpleConsumer(String host, String port, String consumerGroupId, String topic) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<>(properties);
        this.consumer.subscribe(List.of(topic));
    }
}

Подібно до Kafka Producer, Kafka Consumer також приймає об’єкт Properties. Давайте розглянемо всі різні властивості набору:

  • BOOTSTRAP_SERVERS_CONFIG повідомляє споживачеві, де запущено сервер Apache Kafka
  • Група споживачів згадується за допомогою GROUP_ID_CONFIG
  • Коли споживач починає споживати, AUTO_OFFSET_RESET_CONFIG дозволяє згадати, як далеко назад ви хочете почати споживати повідомлення від
  • KEY_DESERIALIZER_CLASS_CONFIG повідомляє споживачеві тип ключа повідомлення
  • VALUE_DESERIALIZER_CLASS_CONFIG повідомляє тип споживача фактичного повідомлення
  Як змінити логотип плану в Microsoft Planner

Оскільки у вашому випадку ви будете отримувати текстові повідомлення, властивості десеріалізатора встановлені на StringDeserializer.class.

Тепер ви будете споживати повідомлення з вашої теми. Для того, щоб усе було просто, коли повідомлення буде спожито, ви роздрукуєте його на консолі. Давайте подивимося, як ви можете досягти цього за допомогою коду нижче:

private boolean keepConsuming = true;

public void consume() {
    while (keepConsuming) {
        final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L));
        if (consumerRecords != null && !consumerRecords.isEmpty()) {
            consumerRecords.iterator().forEachRemaining(consumerRecord -> {
                System.out.println(consumerRecord.value());
            });
        }
    }
}

Цей код продовжить опитувати тему. Коли ви отримаєте будь-який запис споживача, повідомлення буде надруковано. Перевірте свого споживача в дії за допомогою основного методу. Ви запустите програму Java, яка буде продовжувати використовувати тему та друкувати повідомлення. Зупиніть програму Java, щоб припинити роботу споживача.

package org.example.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class SimpleConsumer {

    private static final String OFFSET_RESET = "earliest";

    private final KafkaConsumer<String, String> consumer;
    private boolean keepConsuming = true;

    public SimpleConsumer(String host, String port, String consumerGroupId, String topic) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<>(properties);
        this.consumer.subscribe(List.of(topic));
    }

    public void consume() {
        while (keepConsuming) {
            final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L));
            if (consumerRecords != null && !consumerRecords.isEmpty()) {
                consumerRecords.iterator().forEachRemaining(consumerRecord -> {
                    System.out.println(consumerRecord.value());
                });
            }
        }
    }

    public static void main(String[] args) {
        SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", "9092", "transactions-consumer", "transactions");
        simpleConsumer.consume();
    }
}

Коли ви запускаєте код, ви помітите, що він не тільки споживає повідомлення, створені вашим продюсером Java, але також ті, які ви створили через Console Producer. Це пояснюється тим, що властивість AUTO_OFFSET_RESET_CONFIG встановлено на найранішу.

Коли SimpleConsumer працює, ви можете використовувати консольний продюсер або Java-додаток SimpleProducer для створення подальших повідомлень до теми. Ви побачите, як вони споживаються та друкуються на консолі.

Задовольніть усі ваші потреби в конвеєрі даних за допомогою Apache Kafka

Apache Kafka дозволяє з легкістю впоратися з усіма вимогами до каналу даних. Встановивши Apache Kafka на вашій локальній машині, ви можете досліджувати всі різноманітні функції, які надає Kafka. Крім того, офіційний клієнт Java дозволяє вам ефективно писати, підключатися та спілкуватися з сервером Apache Kafka.

Будучи універсальною, масштабованою та високопродуктивною системою потокового передавання даних, Apache Kafka може справді змінити правила гри. Ви можете використовувати його для локального розвитку або навіть інтегрувати у свої виробничі системи. Так само, як його легко налаштувати локально, налаштування Apache Kafka для великих програм не становить великого завдання.

Якщо ви шукаєте платформи потокової передачі даних, ви можете переглянути найкращі платформи потокової передачі даних для аналізу та обробки в реальному часі.