Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном времени и потоковых приложений. В этой статье мы рассмотрим шаги по установке и настройке Kafka на локальной машине, а также создадим пример производителя и потребителя с помощью Python.
Чтобы установить Apache Kafka, выполните следующие шаги:
Теперь у вас в системе установлен apache kafka. Теперь мы можем приступить к настройке окружения. Для этого,
Теперь откройте любую IDE по вашему выбору и откройте два блокнота для тестирования производителя и потребителя:
Вот пример производителя:
From kafka import KafkaProducer
Импортировать json
Producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: json.dumps(x).encode('utf-8'))
Data = pd.read_csv('/dbfs/FileStore/tables/detail_table.csv')
For i in range(data.shape[0]):
Dict1 = data[['batch_id','batch_name','src_system_id','src_system','src_country','src_db_name','schema_name','table_name']].iloc[[i]].to_dict(orient='records')
Producer.send('test', value = dict1)
Sleep(5)
Теперь потребитель, который будет читать эти данные, будет выглядеть примерно так:
From kafka import KafkaConsumer
Импортировать json
Consumer = KafkaConsumer('bunny', bootstrap_servers=["localhost:9092"], value_deserializer=lambda x: json.loads(x.decode('utf-8')))
Для c в consumer:
Data = pd.json_normalize(c.value)
Spark_df = spark.createDataFrame(data)
Spark_df.write.mode('append').format('parquet').saveAsTable('kafka_sample.detail_table')
Print("row added")
Потребитель по умолчанию будет выбирать все значения, отправленные в данную тему с самого начала.
В этом коде мы создаем объект Kafka consumer с параметром bootstrap_servers, установленным на localhost:9092, который является адресом и портом по умолчанию брокера Kafka, запущенного на нашей локальной машине. Мы также задаем функцию value_deserializer, которая десериализует значение нашего сообщения из JSON.
Затем мы используем цикл for для чтения сообщений из темы bunny и выводим значение каждого сообщения на консоль.
Apache Kafka - это универсальная потоковая платформа, которая может использоваться в широком спектре приложений. Вот некоторые возможные варианты использования Apache Kafka:
В целом, Kafka - это мощный инструмент для построения конвейеров данных в реальном времени и приложений потоковой обработки данных, который можно использовать в самых разных отраслях и случаях.
В этой статье мы рассмотрели шаги по установке и настройке Apache Kafka на вашей локальной машине, а также создали пример производителя и потребителя с помощью Python. С помощью Kafka вы можете создавать конвейеры данных в реальном времени и потоковые приложения, а этот пример кода может послужить отправной точкой для ваших собственных проектов Kafka.
20.08.2023 18:21
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в 2023-2024 годах? Или это полная лажа?".
20.08.2023 17:46
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
19.08.2023 18:39
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в частности, магию поплавков и гибкость flexbox.
19.08.2023 17:22
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для чтения благодаря своей простоте. Кроме того, мы всегда хотим проверить самые последние возможности в наших проектах!
18.08.2023 20:33
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий их языку и культуре.
14.08.2023 14:49
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип предназначен для представления неделимого значения.