Исключение абстрактного метода при подключении к таблице API Cassandra Azure Cosmos DB с помощью задания Azure Databricks

Я пытаюсь записать данные в таблицу Cassandra (cosmos DB) через задание Azure DBR (искровая потоковая передача). Получение ниже исключения:

Query [id = , runId = ] terminated with exception: Failed to open native connection to Cassandra at {<name>.cassandra.cosmosdb.azure.com:10350} :: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract`

`Caused by: IOException: Failed to open native connection to Cassandra at {<name>.cassandra.cosmosdb.azure.com:10350} :: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract
Caused by: AbstractMethodError: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract`

Что я сделал, чтобы попасть сюда:

  • создал учетную запись Cosmos DB
  • созданное пространство ключей cassandra
  • создал таблицу кассандры
  • создал задание DBR
  • добавлен com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 в кластер заданий
  • добавлен com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 в кластер заданий

Что я пробовал:

разные версии соединителей или вспомогательных библиотек azure Cosmos db, но некоторые или другие ошибки ClassNotFoundExceptions или MethodNotFound

Фрагмент кода:

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.log4j.Logger
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.cassandra._
import java.time.LocalDateTime

def writeDelta(spark:SparkSession,dataFrame: DataFrame,sourceName: String,checkpointLocation: String,dataPath: String,loadType: String,log: Logger): Boolean = {
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.remoteConnectionsPerExecutor", "10")
    spark.conf.set("spark.cassandra.connection.localConnectionsPerExecutor", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keepAliveMS", "60000000") //Increase this number as needed
    spark.conf.set("spark.cassandra.output.ignoreNulls","true")
    spark.conf.set("spark.cassandra.connection.host", "*******.cassandra.cosmosdb.azure.com")
    spark.conf.set("spark.cassandra.connection.port", "10350")
    spark.conf.set("spark.cassandra.connection.ssl.enabled", "true")
    // spark.cassandra.auth.username and password are set in cluster conf
    
    val write=dataFrame.writeStream.
              format("org.apache.spark.sql.cassandra").
              options(Map( "table" -> "****", "keyspace" -> "****")).
              foreachBatch(upsertToDelta _).
              outputMode("update").
              option("mergeSchema", "true").
              option("mode","PERMISSIVE").
              option("checkpointLocation", checkpointLocation).
              start()
            write.awaitTermination()
}

  def upsertToDelta(newBatch: DataFrame, batchId: Long) {

    try {
      val spark = SparkSession.active
      println(LocalDateTime.now())
      println("BATCH ID = "+batchId+" REC COUNT = "+newBatch.count())
      newBatch.persist()
      val userWindow = Window.partitionBy(keyColumn).orderBy(col(timestampCol).desc)
      val deDup = newBatch.withColumn("rank", row_number().over(userWindow)).where(col("rank") === 1).drop("rank")
    
      deDup.write
        .format("org.apache.spark.sql.cassandra")
        .options(Map( "table" -> "****", "keyspace" -> "****"))
        .mode("append")
        .save()

      newBatch.unpersist()
    } catch {
      case e: Exception =>
        throw e
    }
  }

#############################

После реализации решения, предложенного @theo-van-kraay, появляется ошибка в журналах исполнителя (задание продолжает работать даже после этой ошибки)

23/02/13 07:28:55 INFO CassandraConnector: Connected to Cassandra cluster.
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 9 (task 26, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Committed partition 9 (task 26, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO Executor: Finished task 9.0 in stage 6.0 (TID 26). 1511 bytes result sent to driver
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 7 (task 24, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 1 (task 18, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 3 (task 20, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 5 (task 22, attempt 0, stage 6.0)
23/02/13 07:28:56 ERROR Utils: Aborting task
java.lang.IllegalArgumentException: Unable to get Token Metadata
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$tokenMap$1(LocalNodeFirstLoadBalancingPolicy.scala:86)
    at scala.Option.orElse(Option.scala:447)
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.tokenMap(LocalNodeFirstLoadBalancingPolicy.scala:86)
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.replicasForRoutingKey$1(LocalNodeFirstLoadBalancingPolicy.scala:103)
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$8(LocalNodeFirstLoadBalancingPolicy.scala:107)
    at scala.Option.flatMap(Option.scala:271)
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$7(LocalNodeFirstLoadBalancingPolicy.scala:107)
    at scala.Option.orElse(Option.scala:447)
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$3(LocalNodeFirstLoadBalancingPolicy.scala:107)
    at scala.Option.flatMap(Option.scala:271)
    ...
    ...

23/02/13 07:28:56 ERROR Utils: Aborting task

Внесите изменения, чтобы отобразить все необходимые сведения, включая код, в блокноте. Прямо сейчас вы не предоставили ничего, кроме вывода ошибок.

David Makogon 10.02.2023 15:49

@DavidMakogon добавил фрагмент кода, используемый для записи кадра данных в базу данных Cosmos.

SSakya 13.02.2023 10:18
Как установить LAMP Stack - Security 5/5 на виртуальную машину Azure Linux VM
Как установить LAMP Stack - Security 5/5 на виртуальную машину Azure Linux VM
В предыдущей статье мы завершили установку базы данных, для тех, кто не знает.
Как установить LAMP Stack 1/2 на Azure Linux VM
Как установить LAMP Stack 1/2 на Azure Linux VM
В дополнение к нашему предыдущему сообщению о намерении Azure прекратить поддержку Azure Database для MySQL в качестве единого сервера после 16...
0
2
91
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Вы можете удалить:

com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 

Он не требуется для Spark 3 Cassandra Connector и был создан только для Spark 2. Также удалите ссылки на него в коде.

Получение новой ошибки после реализации вашего предложения. Подробности смотрите в обновленном вопросе.

SSakya 13.02.2023 10:00

Ошибка «Невозможно получить метаданные маркера» — это известная проблема, которая в определенных сценариях влияет на Spark 3 (драйвер Java 4) и API Cosmos DB для Apache Cassandra. Это было недавно исправлено, но все еще находится в процессе развертывания в сервисе. Если решение является срочным, вы можете подать запрос в службу поддержки в Azure, и мы можем ускорить его, включив исправление явно в вашей учетной записи, пока оно не будет полностью развернуто. Не стесняйтесь упоминать этот вопрос о переполнении стека при обращении в службу поддержки, чтобы инженер, который его обрабатывает, имел контекст.

Другие вопросы по теме

Похожие вопросы