Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python

RedDeveloper
22.04.2023 12:00
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python

Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном времени и потоковых приложений. В этой статье мы рассмотрим шаги по установке и настройке Kafka на локальной машине, а также создадим пример производителя и потребителя с помощью Python.

Установка Apache Kafka

Чтобы установить Apache Kafka, выполните следующие шаги:

  1. Скачайте последнюю версию бинарного дистрибутива Kafka с официального сайта [https://kafka.apache.org/downloads].
  2. Распакуйте скачанный архив в выбранную вами директорию.
  3. Перейдите в каталог Kafka и откройте папку config. (Вы можете не вносить никаких изменений в файлы config)
  4. Откройте файл zookeeper.properties и измените свойство dataDir на место, где ZooKeeper может хранить свои данные.
  5. Сохраните изменения и закройте файл.
  6. Откройте файл server.properties и измените свойство log.dirs на место, где Kafka может хранить свои файлы журналов.
  7. Сохраните изменения и закройте файл.

Теперь у вас в системе установлен apache kafka. Теперь мы можем приступить к настройке окружения. Для этого,

  1. перейдите в командную строку в папке kafka и запустите сервер zookeeper с помощью этой команды: bin\windows\zookeeper-server-start.bat config\zookeeper.properties
  2. в той же папке откройте другой терминал и запустите сервер kafka следующим образом: bin\windows\kafka-server-start.bat config\server.properties
  3. Теперь для третьего этапа нам нужно создать тему, в которую производитель может отправлять сообщения, а получатель - получать их следующим образом: bin\windows\kafka-topics.bat - create - topic bunny - bootstrap-server localhost:9092 - replication-factor 1 - partitions 1 (в реальной жизни фактор репликации должен быть больше 3, чтобы избежать потери данных)

Теперь откройте любую IDE по вашему выбору и откройте два блокнота для тестирования производителя и потребителя:

Вот пример производителя:

From kafka import KafkaProducer
Импортировать json
Producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: json.dumps(x).encode('utf-8'))
Data = pd.read_csv('/dbfs/FileStore/tables/detail_table.csv')
For i in range(data.shape[0]):
Dict1 = data[['batch_id','batch_name','src_system_id','src_system','src_country','src_db_name','schema_name','table_name']].iloc[[i]].to_dict(orient='records')
Producer.send('test', value = dict1)
Sleep(5)

Теперь потребитель, который будет читать эти данные, будет выглядеть примерно так:

From kafka import KafkaConsumer
Импортировать json
Consumer = KafkaConsumer('bunny', bootstrap_servers=["localhost:9092"], value_deserializer=lambda x: json.loads(x.decode('utf-8')))
Для c в consumer:
Data = pd.json_normalize(c.value)
Spark_df = spark.createDataFrame(data)
Spark_df.write.mode('append').format('parquet').saveAsTable('kafka_sample.detail_table')
Print("row added")

Потребитель по умолчанию будет выбирать все значения, отправленные в данную тему с самого начала.

В этом коде мы создаем объект Kafka consumer с параметром bootstrap_servers, установленным на localhost:9092, который является адресом и портом по умолчанию брокера Kafka, запущенного на нашей локальной машине. Мы также задаем функцию value_deserializer, которая десериализует значение нашего сообщения из JSON.

Затем мы используем цикл for для чтения сообщений из темы bunny и выводим значение каждого сообщения на консоль.

Apache Kafka - это универсальная потоковая платформа, которая может использоваться в широком спектре приложений. Вот некоторые возможные варианты использования Apache Kafka:

  1. Обработка данных в режиме реального времени: Kafka можно использовать для обработки и анализа больших объемов данных в режиме реального времени, что позволяет организациям быстро принимать решения на основе самой актуальной информации.
  2. Архитектуры, управляемые событиями: Kafka может использоваться в качестве центрального узла для архитектуры, управляемой событиями, позволяя различным сервисам и приложениям взаимодействовать друг с другом через общую систему обмена сообщениями.
  3. Обработка данных IoT: Kafka может использоваться для сбора и обработки больших объемов данных с устройств IoT в режиме реального времени, позволяя организациям отслеживать и анализировать данные датчиков из различных источников.
  4. Агрегация и анализ журналов: Kafka можно использовать для сбора и хранения данных журналов из различных источников, что позволяет организациям анализировать и устранять неполадки во всей инфраструктуре.
  5. Отправка сообщений и уведомлений: Kafka можно использовать для отправки сообщений и уведомлений пользователям в режиме реального времени, что позволяет организациям своевременно предоставлять обновления и оповещения своим клиентам и сотрудникам.

В целом, Kafka - это мощный инструмент для построения конвейеров данных в реальном времени и приложений потоковой обработки данных, который можно использовать в самых разных отраслях и случаях.

Заключение

В этой статье мы рассмотрели шаги по установке и настройке Apache Kafka на вашей локальной машине, а также создали пример производителя и потребителя с помощью Python. С помощью Kafka вы можете создавать конвейеры данных в реальном времени и потоковые приложения, а этот пример кода может послужить отправной точкой для ваших собственных проектов Kafka.

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?

20.08.2023 18:21

Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в 2023-2024 годах? Или это полная лажа?".

Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией

20.08.2023 17:46

В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.

Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox

19.08.2023 18:39

Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в частности, магию поплавков и гибкость flexbox.

Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest

19.08.2023 17:22

В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для чтения благодаря своей простоте. Кроме того, мы всегда хотим проверить самые последние возможности в наших проектах!

Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️

18.08.2023 20:33

Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий их языку и культуре.

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL

14.08.2023 14:49

Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип предназначен для представления неделимого значения.