Я работаю в Databricks, пытаясь передать данные из блокнота R в Apache Spark, где один столбец представляет собой вложенные данные. Вот рабочий пример без вложенности:
library(SparkR)
sparkR.session()
d1 = data.frame(id = 1:3, name = c('x', 'y', 'z'))
# temp view
SparkR::dropTempView('temp1') # drop if it already exists
SparkR::createOrReplaceTempView(SparkR::as.DataFrame(d1), 'temp1')
my_schema1 = structType(structField("id", "double"), structField("name", "string"))
SparkR::createTable('hive_metastore.my_project.test1', schema = my_schema1)
# append data to spark table
SparkR::sql('INSERT INTO hive_metastore.my_project.test1 TABLE temp1;')
# test
SparkR::sql('SELECT * FROM hive_metastore.my_project.test1') |> SparkR::showDF()
+---+----+
| id|name|
+---+----+
|1.0| x|
|2.0| y|
|3.0| z|
+---+----+
Теперь пример вложенных данных:
# 2 functions to generate equivalent nested data structures
sub_fn1 = function(x) data.frame(key = base::sample(LETTERS, x), val = rnorm(x))
sub_fn2 = function(x) purrr::map2(base::sample(LETTERS, x), rnorm(x), ~ list(key = .x, val = .y))
d2 = dplyr::tibble(
id = 1:3, name = c('x', 'y', 'z'),
data1 = purrr::map(c(3, 5, 4), sub_fn1),
data2 = purrr::map(c(3, 5, 4), sub_fn2)
) |> as.data.frame()
dplyr::glimpse(d2)
Rows: 3
Columns: 4
$ id <int> 1, 2, 3
$ name <chr> "x", "y", "z"
$ data1 <list> [<data.frame[3 x 2]>], [<data.frame[5 x 2]>], [<data.frame[4 x 2…
$ data2 <list> [["I", 0.6562561], ["N", -0.5147073], ["M", -0.4036189]], [["M",…
Я не могу создать действительную схему для отражения любого из этих полей данных, поэтому не могу указать таблицу Spark для добавления. Например:
my_schema2 = structType(
structField("id", "double"),
structField("name", "string"),
structField("data2", "array")
)
Error in checkType(type) : Unsupported type for SparkDataframe: array
Поддерживаются ли эти примеры вложения? Очень благодарен за помощь в выяснении того, как пройти шаг «INSERT INTO» с примером вложенного набора данных.
Да, я пытался это сделать, но пока безуспешно, но, возможно, вы правы.
Более простой способ — сохранить данные R невложенными, пусть ваш идентификатор и имя повторяются. Создайте Spark df из невложенного R df. Затем используйте SparkR::collect_list для создания массива при группировке по идентификатору и имени. Вы можете проверить функции map_ и array_ в документации Spark Sql, чтобы узнать, помогают ли они.
Спасибо, я протестирую этот подход, но оставлю вопрос открытым на случай, если решение возможно.
Я согласен с Вивеком Аталом. Вместо вложения попробуйте хранить данные более обычным способом и выполнять необходимые вычисления с помощью group_by
, mutate
, summarize
и т. д. во фрейме данных. Это более типичный рабочий процесс для Spark и таблиц базы данных в целом.
Я тоже хотел бы знать, поддерживается ли это, хотя мое тестирование показывает, что это не поддерживается напрямую/легко.
Альтернативным подходом может быть преобразование вложенных данных в какой-либо другой формат, например необработанный или JSON, перед сохранением, а затем обратный перевод при извлечении.
Вот пример использования JSON:
sub_fn1 = function(x) data.frame(key = base::sample(LETTERS, x), val = rnorm(x))
sub_fn2 = function(x) purrr::map2(base::sample(LETTERS, x), rnorm(x), ~ list(key = .x, val = .y))
# data1, data2 as data.frame
d2 = dplyr::tibble(
id = 1:3,
name = c('x', 'y', 'z'),
data1 = purrr::map(c(3, 5, 4), sub_fn1),
data2 = purrr::map(c(3, 5, 4), sub_fn2)
)
# translate data1, data2 to json for storage
d2 <- d2 |>
dplyr::mutate(
data1 = purrr::map_chr(data1, jsonlite::toJSON),
data2 = purrr::map_chr(data2, jsonlite::toJSON)
)
SparkR::dropTempView('tmp_v_1')
SparkR::createOrReplaceTempView(SparkR::as.DataFrame(d2), 'tmp_v_1')
my_schema2 = SparkR::structType(
SparkR::structField("id", "double"),
SparkR::structField("name", "string"),
SparkR::structField("data1", "string"),
SparkR::structField("data2", "string")
)
SparkR::sql("DROP TABLE IF EXISTS x.y.z;")
SparkR::createTable('x.y.z', schema = my_schema2)
SparkR::sql('INSERT INTO x.y.z TABLE tmp_v_1;')
# back translate
SparkR::sql('SELECT * FROM x.y.z') |>
SparkR::collect() |>
dplyr::mutate(
data1 = purrr::map(data1, \(x) x |> jsonlite::fromJSON() |> as.data.frame()),
data2 = purrr::map(data2, \(x) x |> jsonlite::fromJSON() |> as.data.frame())
) |>
dplyr::as_tibble()
#> # A tibble: 3 × 4
#> id name data1 data2
#> <dbl> <chr> <list> <list>
#> 1 1 x <df [3 × 2]> <df [3 × 2]>
#> 2 2 y <df [5 × 2]> <df [5 × 2]>
#> 3 3 z <df [4 × 2]> <df [4 × 2]>
Спасибо за это, я посмотрю, соответствует ли это моим требованиям. Даже если и нет, кажется, что это единственный подход, который поддерживает Spark.
Сам не пробовал, но withField , как описано здесь может быть одним из способов вложения вещей.