Как я могу разобрать поток avro byteString в потоке akka

У меня есть файлы Avro в ведре s3, и я пытаюсь выполнить потоковую передачу и анализ в класс case. У меня есть схема для разбора, но я не знаю, как с ней поступить.

Я использую s3.download для загрузки и потоковой передачи файла из корзины s3, а затем конвертирую его в utf8string.

Пожалуйста, помогите, как я могу проанализировать схему, которую мы имеем, учитывая входной поток, который я получаю.

Не могли бы вы добавить код, объясняющий, в чем именно заключается ваша проблема? Как это связано с аккой?

Tomer Shetah 27.12.2020 08:10
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
1
296
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я отвечу на этот вопрос, основываясь на том, что вы просили использовать схему для (де)сериализации сообщений с помощью Avro.

У меня есть схема для разбора, но я не знаю, как с ней поступить.

и предположим, что вы уже загружаете сообщения из s3.buckets. Затем я воспользуюсь своим примером сохранения сообщений в PostgreSQL просто для рабочего примера. Но вы можете предположить, что ваше соединение s3.bucket.

Я использую библиотеку com.sksamuel.avro4s для создания своего сериализатора Avro. Вот необходимые библиотеки для добавления в build.sbt:

  val akkaVersion = "2.6.10"
  "com.typesafe.akka" %% "akka-actor" % akkaVersion,
  "com.typesafe.akka" %% "akka-persistence" % akkaVersion,
  "com.sksamuel.avro4s" %% "avro4s-core" % "4.0.4",
  "org.xerial.snappy" % "snappy-java" % "1.1.8.2",
  "org.postgresql" % "postgresql" % "42.2.2",
  "com.github.dnvriend" %% "akka-persistence-jdbc" % "3.4.0",

Затем вы создаете свой сериализатор, в моем случае это MyFirstAvroSerializer, расширяющий akka.serialization.Serializer. У него есть схема, которая в моем случае является классом case CompanyRegistry. По сути, вам нужно реализовать методы identifier, которые должны иметь уникальный идентификатор, toBinary и fromBinary для преобразования сообщений и includeManifest, который является ложным, потому что мне не нужен манифест.

import akka.serialization.Serializer
import com.sksamuel.avro4s.{AvroInputStream, AvroOutputStream, AvroSchema}
import com.typesafe.config.ConfigFactory

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

case class BankAccount(iban: String, bankCode: String, amount: Double, currency: String)
case class CompanyRegistry(name: String, accounts: Seq[BankAccount], activityCode: String, marketCap: Double)

class MyFirstAvroSerializer extends Serializer {
  val schema = AvroSchema[CompanyRegistry]
  override def identifier: Int = 454874
  override def toBinary(o: AnyRef): Array[Byte] = o match {
    case c: CompanyRegistry =>
      val baos = new ByteArrayOutputStream()
      val avroOutputStream = AvroOutputStream.binary[CompanyRegistry].to(baos).build() // schema
      avroOutputStream.write(c)
      avroOutputStream.flush()
      avroOutputStream.close()
      baos.toByteArray
    case _ => throw new IllegalArgumentException(s"we only support CompanyRegistry for Avro")
  }
  override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
    val avroInputStream = AvroInputStream.binary[CompanyRegistry].from(new ByteArrayInputStream(bytes)).build(schema)
    val companyRegistryIterator: Iterator[CompanyRegistry] = avroInputStream.iterator
    val companyRegistry = companyRegistryIterator.next()
    avroInputStream.close()
    companyRegistry
  }
  override def includeManifest: Boolean = false
}

Затем вам нужно настроить проект для вызова этого сериализатора при обмене сообщениями akka между участниками. Настройте его на application.conf, добавив конкретную конфигурацию. В моем случае это avroSerializable. Вы устанавливаете MyFirstAvroSerializer в области serializers и классы case в области serialization-bindings. Я также настроил Akka-remote, но вы можете его игнорировать.

avroSerializable {
  akka {
    actor {
      provider = remote
      #allow-java-serialization = off
      serializers {
        java = "akka.serialization.JavaSerializer"
        avro = "org.github.felipegutierrez.explore.akka.classic.remote.serialization.MyFirstAvroSerializer"
      }
      serialization-bindings {
        "org.github.felipegutierrez.explore.akka.classic.remote.serialization.CompanyRegistry" = avro
        "java.io.Serializable" = java
      }
    }
    remote {
      artery {
        enabled = on
        transport = aeron-udp
        canonical.hostname = "localhost"
      }
    }
  }
}

Как я уже сказал в начале, я использую PostgreSQL. Но в вашем случае это будет конфигурация хранилища ведра s3. Я уйду отсюда только для полноты картины и потому, что я вызываю эту конфигурацию при создании системы акторов.

postgresStore {
  akka.persistence.journal.plugin = "jdbc-journal"
  akka.persistence.snapshot-store.plugin = "jdbc-snapshot-store"
  akka.actor.allow-java-serialization = on
  # create JDBC configuration to Akka persistence
  akka-persistence-jdbc {
    shared-databases {
      slick {
        profile = "slick.jdbc.PostgresProfile$"
        db {
          numThreads = 10
          driver = "org.postgresql.Driver"
          url = "jdbc:postgresql://localhost:5432/rtjvm"
          user = "docker"
          password = "docker"
        }
      }
    }
  }
  # dbinding the JDBC plugins with the configureation created above
  jdbc-journal {
    use-shared-db = "slick"
  }
  jdbc-snapshot-store {
    use-shared-db = "slick"
  }
}

Теперь пришло время создать систему акторов и простого актора SimplePersistentActor и отправить сообщение по сети. SimplePersistentActor — это просто очень простой актор, который принимает сообщения, которые я отправляю, ничего особенного.

object AvroSerialization_Persistence {
  def main(args: Array[String]): Unit = {
    val config = ConfigFactory.load().getConfig("postgresStore")
      .withFallback(ConfigFactory.load("avroSerializable"))
    val system = ActorSystem("postgresStoreSystem", config)
    val simplePersistentActor = system.actorOf(SimplePersistentActor.props("avro-actor"), "personAvroActor")
    val companyRegistryMsg = CompanyRegistry(
      "Google",
      Seq(
        BankAccount("US-1234", "google-bank", 4.3, "gazillion dollars"),
        BankAccount("GB-4321", "google-bank", 0.5, "trillion pounds")
      ),
      "ads",
      523895
    )
    simplePersistentActor ! companyRegistryMsg
  }
}

Другие вопросы по теме