Приложение Spark + Kafka, получающее «CassandraCatalogException: попытка записи в таблицу C *, но отсутствуют столбцы первичного ключа: [col1, col2, col3]»

Запустить окружение

kafka ----ReadStream----> local ----WriteStream----> cassandra \

исходник место на локалке и кафке, локал, writeStream это разные IP\

Столбцы таблицы:

col1 | col2 | col3 | col4 | col5 | col6 | col7

df.printSchema

root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)

Извините, я пытаюсь решить в одиночку, но не могу найти решение.

Выполнить код

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,
com.datastax.spark:spark-cassandra-connector_2.12:3.2.0,
com.github.jnr:jnr-posix:3.1.15
--conf com.datastax.spark:spark.cassandra.connectiohost{cassandraIP},
spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions test.py

Исходный код:

from pyspark.sql import SparkSession

# Spark Bridge local to spark_master == Connect master
spark = SparkSession.builder \
    .master("spark://{SparkMasterIP}:7077") \
    .appName("Spark_Streaming+kafka+cassandra") \
    .config('spark.cassandra.connection.host', '{cassandraIP}') \
    .config('spark.cassandra.connection.port', '9042') \
    .getOrCreate()

# Read Stream From {Topic} at BootStrap
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "{KafkaIP}:9092") \
    .option('startingOffsets','earliest') \
    .option('failOnDataLoss','False') \
    .option("subscribe", "{Topic}") \
    .load() \

df.printSchema()

# write Stream at cassandra
ds = df.writeStream \
    .trigger(processingTime='15 seconds') \
    .format("org.apache.spark.sql.cassandra") \
    .option("checkpointLocation","{checkpoint}") \
    .options(table='{table}',keyspace = "{key}") \
    .outputMode('update') \
    .start()

ds.awaitTermination()

Я получаю эту ошибку:

com.datastax.spark.connector.datasource.CassandraCatalogException: попытка записи в таблицу C* отсутствует столбцы первичного ключа: [col1,col2,col3]

в com.datastax.spark.connector.datasource.CassandraWriteBuilder.(CassandraWriteBuilder.scala:44)
на com.datastax.spark.connector.datasource.CassandraTable.newWriteBuilder(CassandraTable.scala:69)
в org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:590)
в org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:140)
в org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:59)
в org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:295)
в scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
в org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
на org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStr
в org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)

Traceback (последний последний вызов):

Файл "/home/test.py", строка 33, в
ds.awaitTermination()

Файл "/venv/lib64/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/streaming.py", строка 101, в awaitTe

Файл "/venv/lib64/python3.6/site-packages/pyspark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", строка 1322, in
Файл "/home/jeju/venv/lib64/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", строка 117, в деко pyspark.sql.utils.StreamingQueryException: попытка записи в таблицу C* отсутствует
столбцы первичного ключа: [col1,col2,col3]
=== Потоковый запрос ===
Идентификатор: [id = d7da05f9-29a2-4597-a2c9-86a4ebfa65f2, runId = eea59c10-30fa-4939-8a30-03bd7c96b3f2]
Текущие зафиксированные смещения: {}
Текущие доступные смещения: {}

Что выводит df.printSchema()? Данные, полученные от kafka, могут нуждаться в разборе.

ozlemg 01.12.2022 09:44

@ozlemg так..... kafka ---readStream---> parse ----> parseData --writeStream-->cassandra права??

hi-inbeom 01.12.2022 09:51

Да, ошибка возникает из-за отсутствия декодирования данных в Kafka. Когда вы читаете из Kafka, данные поступают в виде двоичных двоичных объектов, которые необходимо анализировать в соответствии с форматом, в котором вы кодируете данные.

Alex Ott 01.12.2022 10:19

@AlexOtt спасибо, когда я нашел искровой поток вашего комментария, и ответ был полезен, большое спасибо

hi-inbeom 01.12.2022 11:12
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
4
63
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Ошибка говорит о том, что столбцы первичного ключа: [col1,col2,col3] отсутствуют. Так что df не имеет этих столбцов. У вас уже есть df.printSchema(). Вы сами видите, что это так. df, прочитанный из Kafka, имеет фиксированную схему, и вы можете извлекать свои данные, анализируя столбцы ключа и значения. В моем случае отправленные данные были в столбце значений (если вам нужно, вы также можете добавить ключевой столбец) и отформатированы json. Поэтому я мог прочитать его по следующему коду:

dfPerson = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "x.x.x.x") \
.option("subscribe", TOPIC) \
.option("startingOffsets", "latest") \
.load()\
.select(from_json(col("value").cast("string"), schema).alias("data"))\
.select("data.*")

Надеюсь, это поможет.

Другие вопросы по теме