Используя PySpark, реализована потоковая передача искр + интеграция Kafka. При каждом запуске он дает смещение от 0.
Нужно решить 2 вопроса:
Помогите в решении этого
# create spark session
spark = SparkSession.builder \
.appName(appName) \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") \
.getOrCreate()
# Define schema for data in value field
schema = StructType([
StructField("col1", StringType()),
StructField("col2", StringType()),
StructField("col3" , TimestampType()),
StructField("col4" , DoubleType())
])
# Spark streaming
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", broker) \
.option("subscribe", topic) \
.option("kafka.group.id", appName) \
.option("enable.auto.commit", True) \
.load()
value_df = df.select(col("topic"), col("partition"), col("offset"), from_json(col("value").cast("STRING"), schema).alias("values"))
это было полезно, помогло решить проблему. Спасибо





spark.read() — пакетная обработка. spark.readStream() — структурированная потоковая передача.
- Чтение стрима за последние 15 минут
- Чтение последнего зафиксированного смещения для каждого раздела.
Одна основная вещь, которую нужно понять, это то, что (как и в Kafka) обе эти вещи (контрольная точка и частота) указаны для каждого потребителя. Итак, вы указываете это, когда writeStream, а не когда readStream.
Думайте о «потоковой передаче Dataframe, возвращаемой spark.readStream()» как о теме Кафки. Который, в свою очередь, может иметь несколько потребителей, и каждый потребитель может определять свою собственную контрольную точку, частоту и т. д.
Аналогичным образом вы можете иметь несколько потребителей для одного потокового Dataframe и указать для каждого разные контрольные точки и частоту. Например.
# Create streaming dataframe
sdf = spark.readStream.format('kafka').option(...)
# start() DataStreamWriter to get create streaming queries
sq1 = sdf.writeStream.parquet(...) \
.trigger(processingTime='1 seconds').option('checkpointLocation', 'loc1').start()
sq2 = sdf.writeStream.csv(...) \
.trigger(processingTime='5 seconds').option('checkpointLocation', 'loc2').start()
spark.streams.awaitTermination()
Чтение стрима за последние 15 минут
Это контролируется с помощью DataStreamWriter.trigger()
Этого можно добиться двумя способами:
.trigger(availableNow=True). Таким образом, ваш компьютер может запускаться каждые 15 минут, запускать задание и затем выключаться. Кластер блоков данных, например. запуск занимает 4-6 минут..trigger(processingTime='15 minutes'). Если вам нужны более низкие задержки и/или постоянно работающий общий кластер, вы можете сделать это, в противном случае нет смысла делать это с частотой 15 минут.Чтение последнего зафиксированного смещения для каждого раздела.
Это контролируется через DataStreamWriter.option('checkpointLocation', '/path/to/checkpoint/folder/'). Например. на S3, локальный диск, ADLS,...
Проверьте этот ответ