Сучасні інформаційні системи щоденно генерують величезну кількість даних. Це можуть бути ваші банківські транзакції, замовлення в онлайн-магазині або інформація з датчиків вашого автомобіля. Для обробки цих безперервних потоків даних у режимі реального часу, а також для надійного обміну записами між різними корпоративними платформами, вам необхідний Apache Kafka.
Apache Kafka – це програмне забезпечення з відкритим кодом для роботи з потоковими даними, здатне обробляти понад мільйон записів щосекунди. Окрім високої продуктивності, Kafka вирізняється масштабованістю, відмовостійкістю, мінімальною затримкою та надійним зберіганням інформації.
Такі гіганти, як LinkedIn, Uber та Netflix, використовують Apache Kafka для обробки даних у реальному часі та потокової передачі. Найпростіший спосіб почати роботу з Kafka – це запустити її на вашому персональному комп’ютері. Це дає змогу не тільки побачити, як працює сервер Apache Kafka, а й створювати та отримувати повідомлення.
Отримавши практичний досвід запуску сервера, створення тем та написання Java-коду з використанням клієнта Kafka, ви будете готові використовувати Apache Kafka для будь-яких потреб у обробці даних.
Як завантажити Apache Kafka на персональний комп’ютер
Останню версію Apache Kafka можна завантажити з офіційного вебсайту. Завантажений архів буде у форматі .tgz. Після завантаження його необхідно розпакувати.
Якщо ви користуєтеся Linux, відкрийте термінал та перейдіть до папки, в яку ви завантажили архів Apache Kafka. Далі виконайте наступну команду:
tar -xzvf kafka_2.13-3.5.0.tgz
Після виконання цієї команди у вас з’явиться нова папка з назвою kafka_2.13-3.5.0. Перейдіть до неї за допомогою команди:
cd kafka_2.13-3.5.0
Тепер ви можете переглянути вміст цієї папки, використовуючи команду ls.
Користувачі Windows можуть виконати аналогічні дії. Якщо у вас немає команди tar, ви можете скористатися стороннім архіватором, наприклад, WinZip, щоб відкрити архів.
Як запустити Apache Kafka на персональному комп’ютері
Після завантаження та розпакування Apache Kafka, настав час запустити її. Вона не потребує встановлення. Ви можете використовувати її одразу через командний рядок або термінал.
Перед початком роботи з Apache Kafka, переконайтеся, що у вашій системі встановлена Java 8 або новіша версія. Apache Kafka потребує запущеної Java.
#1. Запуск сервера Apache Zookeeper
Першим кроком є запуск Apache Zookeeper. Він поставляється разом з архівом. Це сервіс, який відповідає за підтримку конфігурацій та синхронізацію для інших служб.
Знаходячись у каталозі, де ви розпакували архів, виконайте наступну команду:
Для користувачів Linux:
bin/zookeeper-server-start.sh config/zookeeper.properties
Для користувачів Windows:
bin/windows/zookeeper-server-start.bat config/zookeeper.properties
Файл zookeeper.properties містить налаштування для запуску сервера Apache Zookeeper. Ви можете налаштувати такі параметри, як локальна папка для збереження даних та порт, на якому працюватиме сервер.
#2. Запуск сервера Apache Kafka
Після запуску сервера Apache Zookeeper настав час запустити сервер Apache Kafka.
Відкрийте новий термінал або командний рядок та перейдіть до каталогу, де знаходяться файли. Після цього ви можете запустити сервер Apache Kafka за допомогою наступної команди:
Для користувачів Linux:
bin/kafka-server-start.sh config/server.properties
Для користувачів Windows:
bin/windows/kafka-server-start.bat config/server.properties
Тепер сервер Apache Kafka працює. Якщо ви бажаєте змінити стандартні налаштування, це можна зробити, редагуючи файл server.properties. Різні параметри описані в офіційній документації.
Як використовувати Apache Kafka на персональному комп’ютері
Тепер ви готові використовувати Apache Kafka на своєму комп’ютері для створення та отримання повідомлень. Оскільки сервери Apache Zookeeper та Apache Kafka запущені, давайте подивимося, як можна створити свою першу тему, написати та отримати повідомлення.
Які кроки потрібно виконати для створення теми в Apache Kafka?
Перш ніж створювати тему, давайте з’ясуємо, що вона собою представляє. В Apache Kafka тема – це логічне сховище даних, яке допомагає передавати дані в потоковому режимі. Уявіть її як канал, через який дані переміщуються від однієї частини системи до іншої.
Тема підтримує кілька джерел та кілька споживачів – більше ніж одна система може відправляти та отримувати дані з теми. На відміну від інших систем обміну повідомленнями, будь-яке повідомлення в темі може бути використане кілька разів. Крім того, ви можете вказати термін зберігання повідомлень.
Розглянемо приклад системи (виробника), яка генерує дані про банківські транзакції. Інша система (споживач) обробляє ці дані та відправляє сповіщення користувачам. Для полегшення цього процесу потрібна тема.
Відкрийте новий термінал або командний рядок та перейдіть до каталогу, де ви розпакували архів. Наступна команда створить тему з назвою transactions:
Для користувачів Linux:
bin/kafka-topics.sh --create --topic transactions --bootstrap-server localhost:9092
Для користувачів Windows:
bin/windows/kafka-topics.bat --create --topic transactions --bootstrap-server localhost:9092
Тепер ви створили свою першу тему та готові відправляти та отримувати повідомлення.
Як відправити повідомлення до Apache Kafka?
Маючи готову тему, ви можете відправити своє перше повідомлення. Відкрийте новий термінал або командний рядок, або використайте той, що використовували для створення теми. Переконайтеся, що ви знаходитесь у правильному каталозі, де розпакували архів. Використовуйте командний рядок, щоб створити повідомлення, використовуючи наступну команду:
Для користувачів Linux:
bin/kafka-console-producer.sh --topic transactions --bootstrap-server localhost:9092
Для користувачів Windows:
bin/windows/kafka-console-producer.bat --topic transactions --bootstrap-server localhost:9092
Після виконання команди, термінал або командний рядок буде очікувати введення. Напишіть своє перше повідомлення та натисніть Enter.
> This is a transactional record for $100
Ви відправили своє перше повідомлення до Apache Kafka на вашому комп’ютері. Тепер ви готові його отримати.
Як отримати повідомлення з Apache Kafka?
Маючи створену тему та відправлене до неї повідомлення, тепер ви можете його отримати.
Apache Kafka дозволяє підключити кілька отримувачів до однієї теми. Кожен отримувач може бути частиною групи отримувачів – логічного ідентифікатора. Наприклад, якщо у вас є дві служби, яким потрібні одні й ті самі дані, вони можуть мати різні групи отримувачів.
Однак, якщо у вас є два екземпляри однієї служби, вам слід уникати повторного отримання одного й того самого повідомлення. У цьому випадку, вони матимуть спільну групу отримувачів.
У вікні термінала або командного рядка, переконайтеся, що ви знаходитесь у потрібному каталозі. Використайте наступну команду для запуску отримувача:
Для користувачів Linux:
bin/kafka-console-consumer.sh --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer
Для користувачів Windows:
bin/windows/kafka-console-consumer.bat --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer
Ви побачите, що повідомлення, яке ви відправили раніше, з’явиться у вашому терміналі. Ви успішно отримали своє перше повідомлення за допомогою Apache Kafka.
Команда kafka-console-consumer приймає багато аргументів. Давайте подивимося, що означає кожен з них:
- –topic вказує тему, з якої ви будете отримувати повідомлення
- –from-beginning повідомляє консольному отримувачу, що потрібно починати читати повідомлення з початку
- Ваш сервер Apache Kafka вказується за допомогою параметра –bootstrap-server
- Крім того, ви можете вказати групу отримувачів за допомогою параметра –group
- Якщо параметр групи отримувачів не вказано, вона буде згенерована автоматично
Коли консольний отримувач запущено, ви можете відправляти нові повідомлення. Ви побачите, що всі вони отримані та з’являються у вашому терміналі.
Тепер, коли ви створили свою тему, успішно відправили та отримали повідомлення, давайте інтегруємо це з Java-програмою.
Як створити відправника та отримувача Apache Kafka за допомогою Java
Перш ніж почати, переконайтеся, що на вашому комп’ютері встановлена Java 8 або новіша версія. Apache Kafka надає власну клієнтську бібліотеку, яка дозволяє легко підключатися до неї. Якщо ви використовуєте Maven для управління залежностями, додайте наступну залежність до вашого pom.xml:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.5.0</version> </dependency>
Ви також можете завантажити бібліотеку з Maven Repository та додати її до шляху класів Java.
Після підключення бібліотеки, відкрийте будь-який редактор коду. Давайте подивимося, як запустити відправника та отримувача за допомогою Java.
Створення відправника Apache Kafka Java
Завдяки бібліотеці kafka-clients, ви готові почати створення свого відправника Kafka.
Створімо клас з назвою SimpleProducer.java. Він буде відповідати за відправку повідомлень до теми, яку ви створили раніше. У цьому класі ви створите екземпляр org.apache.kafka.clients.producer.KafkaProducer. Після цього ви будете використовувати цього відправника для надсилання повідомлень.
Для створення відправника Kafka вам потрібна адреса та порт вашого сервера Apache Kafka. Оскільки ви запускаєте його на локальному комп’ютері, адресою буде localhost. Враховуючи, що ви не змінили параметри за замовчуванням, порт буде 9092. Розгляньте наведений нижче код, який допоможе вам створити відправника:
package org.example.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class SimpleProducer { private final KafkaProducer<String, String> producer; public SimpleProducer(String host, String port) { String server = host + ":" + port; Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); this.producer = new KafkaProducer<>(properties); } }
Ви помітите, що встановлюються три параметри. Давайте швидко розглянемо кожен з них:
- BOOTSTRAP_SERVERS_CONFIG дозволяє визначити, де запущено сервер Apache Kafka
- KEY_SERIALIZER_CLASS_CONFIG повідомляє відправнику, який формат використовувати для відправки ключів повідомлень
- Формат для відправки самого повідомлення визначається за допомогою параметра VALUE_SERIALIZER_CLASS_CONFIG
Оскільки ви будете відправляти текстові повідомлення, обидва параметри налаштовано на використання StringSerializer.class.
Щоб фактично відправити повідомлення до вашої теми, вам потрібно використовувати метод producer.send(), який приймає ProducerRecord. Наступний код показує метод, який відправляє повідомлення до теми та виводить відповідь разом зі зсувом повідомлення.
public void produce(String topic, String message) throws ExecutionException, InterruptedException { ProducerRecord<String, String> record = new ProducerRecord<>(topic, message); final Future<RecordMetadata> send = this.producer.send(record); final RecordMetadata recordMetadata = send.get(); System.out.println(recordMetadata); }
Маючи весь код, ви можете відправляти повідомлення до вашої теми. Ви можете використовувати основний метод для перевірки, як показано в коді нижче:
package org.example.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class SimpleProducer { private final KafkaProducer<String, String> producer; public SimpleProducer(String host, String port) { String server = host + ":" + port; Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); this.producer = new KafkaProducer<>(properties); } public void produce(String topic, String message) throws ExecutionException, InterruptedException { ProducerRecord<String, String> record = new ProducerRecord<>(topic, message); final Future<RecordMetadata> send = this.producer.send(record); final RecordMetadata recordMetadata = send.get(); System.out.println(recordMetadata); } public static void main(String[] args) throws Exception{ SimpleProducer producer = new SimpleProducer("localhost", "9092"); producer.produce("transactions", "This is a transactional record of $200"); } }
У цьому коді ви створюєте SimpleProducer, який підключається до вашого сервера Apache Kafka на вашому комп’ютері. Він використовує KafkaProducer для відправки текстових повідомлень до вашої теми.
Створення отримувача Apache Kafka Java
Настав час створити отримувача Apache Kafka за допомогою Java. Створіть клас з назвою SimpleConsumer.java. Далі ви створите конструктор для цього класу, який ініціалізує org.apache.kafka.clients.consumer.KafkaConsumer. Для створення отримувача, вам потрібна адреса та порт сервера Apache Kafka. Крім того, вам потрібна група отримувачів та тема, з якої ви хочете отримувати повідомлення. Використовуйте наступний фрагмент коду:
package org.example.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; public class SimpleConsumer { private static final String OFFSET_RESET = "earliest"; private final KafkaConsumer<String, String> consumer; private boolean keepConsuming = true; public SimpleConsumer(String host, String port, String consumerGroupId, String topic) { String server = host + ":" + port; Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<>(properties); this.consumer.subscribe(List.of(topic)); } }
Аналогічно до Kafka Producer, Kafka Consumer також приймає об’єкт Properties. Розглянемо всі різні властивості:
- BOOTSTRAP_SERVERS_CONFIG повідомляє отримувачу, де запущено сервер Apache Kafka
- Група отримувачів вказується за допомогою GROUP_ID_CONFIG
- Коли отримувач починає отримувати повідомлення, AUTO_OFFSET_RESET_CONFIG дозволяє вказати, з якого моменту потрібно починати читання повідомлень
- KEY_DESERIALIZER_CLASS_CONFIG повідомляє отримувачу тип ключа повідомлення
- VALUE_DESERIALIZER_CLASS_CONFIG повідомляє тип фактичного повідомлення
Оскільки у вашому випадку ви будете отримувати текстові повідомлення, властивості десеріалізатора встановлені на StringDeserializer.class.
Тепер ви будете отримувати повідомлення з вашої теми. Щоб все було просто, отримане повідомлення буде виведено на консоль. Розглянемо, як це можна зробити за допомогою наступного коду:
private boolean keepConsuming = true; public void consume() { while (keepConsuming) { final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L)); if (consumerRecords != null && !consumerRecords.isEmpty()) { consumerRecords.iterator().forEachRemaining(consumerRecord -> { System.out.println(consumerRecord.value()); }); } } }
Цей код буде постійно опитувати тему. Коли ви отримаєте будь-який запис отримувача, повідомлення буде виведено. Перевірте свого отримувача за допомогою основного методу. Ви запустите Java-програму, яка буде постійно отримувати повідомлення з теми та виводити їх. Зупиніть Java-програму, щоб припинити роботу отримувача.
package org.example.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; public class SimpleConsumer { private static final String OFFSET_RESET = "earliest"; private final KafkaConsumer<String, String> consumer; private boolean keepConsuming = true; public SimpleConsumer(String host, String port, String consumerGroupId, String topic) { String server = host + ":" + port; Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<>(properties); this.consumer.subscribe(List.of(topic)); } public void consume() { while (keepConsuming) { final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L)); if (consumerRecords != null && !consumerRecords.isEmpty()) { consumerRecords.iterator().forEachRemaining(consumerRecord -> { System.out.println(consumerRecord.value()); }); } } } public static void main(String[] args) { SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", "9092", "transactions-consumer", "transactions"); simpleConsumer.consume(); } }
Коли ви запустите код, ви помітите, що він отримує не тільки повідомлення, відправлені вашим Java-відправником, а й ті, які ви відправляли за допомогою консольного відправника. Це пояснюється тим, що параметр AUTO_OFFSET_RESET_CONFIG встановлено на значення “найраніші”.
Коли SimpleConsumer працює, ви можете використовувати консольний відправник або Java-додаток SimpleProducer для відправлення нових повідомлень до теми. Ви побачите, як вони отримуються та виводяться на консоль.
Задовольніть усі ваші потреби в обробці даних за допомогою Apache Kafka
Apache Kafka дозволяє з легкістю впоратися з усіма вимогами до обробки даних. Встановивши Apache Kafka на вашому комп’ютері, ви можете досліджувати всі різноманітні можливості, які вона надає. Крім того, офіційний клієнт Java дозволяє вам ефективно писати, підключатися та взаємодіяти з сервером Apache Kafka.
Будучи універсальною, масштабованою та високопродуктивною системою потокової передачі даних, Apache Kafka може змінити правила гри. Ви можете використовувати її для локальної розробки або інтегрувати у свої виробничі системи. Так само, як її легко налаштувати локально, налаштування Apache Kafka для великих застосувань не становить великої проблеми.
Якщо ви шукаєте платформи потокової передачі даних, можете переглянути найкращі платформи для аналізу та обробки даних в реальному часі.