Извлечь значение из столбца ArrayType в Scala и преобразовать его в длинный

У меня есть DataFrame, который состоит из столбца ArrayType, и массив может иметь разную длину в каждой строке данных. Ниже я привел пример кода, который может создать некоторые фиктивные данные с аналогичной структурой.

Вы увидите, что для одной транзакции у меня есть идентификатор транзакции, а также некоторые дополнительные данные, каждая из которых хранится в «сегменте». Здесь мы видим один сегмент, в котором хранится информация о клиенте (всегда массив длины два), и у нас есть дополнительный сегмент для каждой покупки товара. Сама информация о купленном товаре представляет собой массив различной длины; первые два элемента массива всегда будут идентификатором и названием купленного товара; дополнительные элементы массива могут существовать для цвета и т. д., но мы можем игнорировать их в этом случае использования.

val dfschema = new StructType()
  .add("transaction",
    new StructType()
      .add(
        "transaction_id",
        StringType
      )
      .add(
        "segments",
        ArrayType(
          new StructType()
            .add("segment_id",StringType)
            .add("segment_fields",ArrayType(
              StringType,
              false
            )
          ), false
        )
      )
    )


val mockdata = Seq(
  Row(
    Row(
      "2e6d57769e49ae8cb0c4105548c4389d",
      List(
        Row(
          "CustomerInformation",
          List(
            "SomeCustomerName",
            "SomeCustomerEmail"
          )
        ),
        Row(
          "ItemPurchased",
          List(
            "SomeItemID",
            "SomeItemName"
          )
        ),
        Row(
          "ItemPurchased",
          List(
            "AnotherItemID",
            "AnotherItemName",
            "ItemColor"
          )
        ),
        Row(
          "ItemPurchased",
          List(
            "YetAnotherItemID",
            "YetAnotherItemName",
            "ItemColor"
          )
        )
      )
    )
  )
)

val df = spark.createDataFrame(
  spark.sparkContext.parallelize(mockdata),
  dfschema)

Чего я хочу добиться, так это преобразовать приведенное выше в другой фрейм данных с двумя столбцами: один для имени клиента и один для имени элемента. Для приведенного выше примера он хотел бы:

Имя Клиентаназвание предмета
SomeCustomerNameSomeItemName
SomeCustomerNameДругое имя элемента
SomeCustomerNameещеназваниеэлемента

Однако я не хочу жестко кодировать поля данных, которые я извлекаю; вместо этого я хочу написать пару функций, которые вы могли бы запускать как часть команды select, например:

df(
  select(
    get_single_subsegment("CustomerInformation", 0),
    get_repeated_subsements("ItemPurchased", 1)
  )
)

Таким образом, если я решу получить адрес электронной почты клиента вместо имени, мне просто нужно изменить изменение с 0 на 1 в приведенном выше примере. И я даже могу передать номер индекса как переменную.

Можно ли это сделать?

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
2
0
47
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Начиная со Spark 3.0, вы можете использовать встроенные функции spark для определения двух ваших функций get_single_subsegment и get_repeated_subsegments.

Для get_single_subsegment вы можете сначала отфильтровать массив сегментов по segment_id с помощью filter, затем получить первый элемент этого отфильтрованного массива с помощью getItem, а затем получить элемент с нужным индексом в этом объекте сегмента с помощью getField и getItem:

import org.apache.spark.sql.functions.{col, filter}
import org.apache.spark.sql.Column

def get_single_subsegment(segmentId: String, index: Int): Column = {
  filter(col("transaction.segments"), c => c.getField("segment_id") === segmentId)
    .getItem(0)
    .getField("segment_fields")
    .getItem(index)
}

Для get_repeated_subsegments вы сначала фильтруете, как в get_single_subsegment, но затем используете transform для извлечения индекса полей правого сегмента для каждого элемента отфильтрованного массива, а затем explode этого массива, чтобы иметь одну строку за элементом отфильтрованного массива:

import org.apache.spark.sql.functions.{col, explode, filter, transform}
import org.apache.spark.sql.Column

def get_repeated_subsegments(segmentId: String, index: Int): Column = {
  explode(
    transform(
      filter(col("transaction.segments"), c => c.getField("segment_id") === segmentId)
        .getField("segment_fields"),
      c => c.getItem(index)
    )
  )
}

Если мы применим две функции, определенные выше, к вашему примеру, мы получим следующий результат:

df.select(
  get_single_subsegment("CustomerInformation", 0).as("customer_name"),
  get_repeated_subsegments("ItemPurchased", 1).as("item_name")
).show(false)

// +----------------+------------------+
// |customer_name   |item_name         |
// +----------------+------------------+
// |SomeCustomerName|SomeItemName      |
// |SomeCustomerName|AnotherItemName   |
// |SomeCustomerName|YetAnotherItemName|
// +----------------+------------------+

Бонус - извлечь несколько столбцов

Если, как вы прокомментировали, вы хотите извлечь несколько столбцов с помощью get_repeated_subsegments, вам нужно изменить get_repeated_subsegments, чтобы не выполнять explode в нем, а когда вы выполняете выбор. Затем вы можете добавить несколько столбцов, используя arrays_zip для массивов, полученных путем применения get_repeated_subsegments, следующим образом:

import org.apache.spark.sql.functions.{arrays_zip, col, explode, filter, transform}
import org.apache.spark.sql.Column

def get_repeated_subsegments(segmentId: String, index: Int): Column = {
  transform(
    filter(col("transaction.segments"), c => c.getField("segment_id") === segmentId)
      .getField("segment_fields"),
    c => c.getItem(index)
  )
}

df.select(
    get_single_subsegment("CustomerInformation", 0).as("customer_name"),
    explode(
      arrays_zip(
        get_repeated_subsegments("ItemPurchased", 0).as("item_id"),
        get_repeated_subsegments("ItemPurchased", 1).as("item_name")
      )
    ).alias("items")
  )
  .select("customer_name", "items.*")
  .show(false)

// +----------------+----------------+------------------+
// |customer_name   |item_id         |item_name         |
// +----------------+----------------+------------------+
// |SomeCustomerName|SomeItemID      |SomeItemName      |
// |SomeCustomerName|AnotherItemID   |AnotherItemName   |
// |SomeCustomerName|YetAnotherItemID|YetAnotherItemName|
// +----------------+----------------+------------------+

Это превосходно, большое спасибо! Один дополнительный вопрос: что, если я хочу извлечь как идентификатор элемента, так и его имя, и получить вывод с тремя столбцами (customer_name, item_id, item_name)? Я дважды пытался вызвать get_repeated_subsegments, но Scala это не понравилось!

Merik 15.05.2022 03:28

Обновлен ответ с решением для извлечения двух столбцов вместо одного с помощью get_repeated_subsegments

Vincent Doba 15.05.2022 23:54

Спасибо еще раз! Можете ли вы указать мне документацию о функции arrays_zip где-нибудь в Интернете? Я не могу найти его на spark.apache.org/docs/latest/api/scala/org/apache/spark/…, и хотя я нахожу некоторые документы для pyspark, я не нахожу ничего конкретно для Scala.

Merik 16.05.2022 01:03

Вот документация для arrays_zip

Vincent Doba 16.05.2022 08:43

Другие вопросы по теме