Декартово произведение обнаружено искрой, несмотря на то, что условие соединения нетривиально

Я использую Spark 2.3.0, и у меня есть два фрейма данных.

Первый, df1, имеет схему:

root
 |-- time: long (nullable = true)
 |-- channel: string (nullable = false)

Второй, df2, имеет схему:

root
 |-- pprChannel: string (nullable = true)
 |-- ppr: integer (nullable = false)

Я сейчас пытаюсь сделать:

spark.sql("select a.channel as channel, a.time as time, b.ppr as ppr from df1 a inner join df2 b on a.channel = b.pprChannel")

Но у меня Detected cartesian product for INNER join between logical plans.

Когда я пытаюсь воссоздать как на Spark-Shell с sc.parallelize, так и на простых Seqs, это работает.

Что здесь может быть не так?

Следовать за

Вот что я получаю, когда использую df1.join(df2, 'channel === 'pprChannel, "inner").explain(true):

== Parsed Logical Plan ==
Join Inner, (channel#124 = pprChannel#136)
:- Project [time#113L AS time#127L, channel#124]
:  +- Project [time#113L, unnamed AS channel#124]
:     +- Project [time#113L]
:        +- Project [channel#23, time#113L]
:           +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98, clipDT#105L, if ((isnull(t0#93L) || isnull(t1#29L))) null else UDF(t0#93L, t1#29L) AS time#113L]
:              +- Filter (clipDT#105L >= cast(50000000 as bigint))
:                 +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98, (t1#29L - t0#93L) AS clipDT#105L]
:                    +- Filter (((t0#93L >= cast(0 as bigint)) && (pt0#98 = 1)) && (pt1#82 = 2))
:                       +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98]
:                          +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98, pt0#98]
:                             +- Window [lag(pt1#82, 1, 0) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS pt0#98], [channel#23], [t1#29L ASC NULLS FIRST]
:                                +- Project [channel#23, t1#29L, pt1#82, t0#93L]
:                                   +- Project [channel#23, t1#29L, pt1#82, t0#93L]
:                                      +- Project [channel#23, t1#29L, pt1#82, t0#93L, t0#93L]
:                                         +- Window [lag(t1#29L, 1, -1) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#93L], [channel#23], [t1#29L ASC NULLS FIRST]
:                                            +- Project [channel#23, t1#29L, pt1#82]
:                                               +- Project [channel#23, t1#29L, pt1#82]
:                                                  +- Filter pt1#82 IN (1,2)
:                                                     +- Project [channel#23, t1#29L, dv1#58, t0#70L, dv0#75, if ((isnull(dv0#75) || isnull(dv1#58))) null else UDF(dv0#75, dv1#58) AS pt1#82]
:                                                        +- Filter ((t0#70L >= cast(0 as bigint)) && NOT isnan(dv0#75))
:                                                           +- Project [channel#23, t1#29L, dv1#58, t0#70L, dv0#75]
:                                                              +- Project [channel#23, t1#29L, dv1#58, t0#70L, dv0#75, dv0#75]
:                                                                 +- Window [lag(dv1#58, 1, NaN) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS dv0#75], [channel#23], [t1#29L ASC NULLS FIRST]
:                                                                    +- Project [channel#23, t1#29L, dv1#58, t0#70L]
:                                                                       +- Project [channel#23, t1#29L, dv1#58, t0#70L]
:                                                                          +- Project [channel#23, t1#29L, dv1#58, t0#70L, t0#70L]
:                                                                             +- Window [lag(t1#29L, 1, -1) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#70L], [channel#23], [t1#29L ASC NULLS FIRST]
:                                                                                +- Project [channel#23, t1#29L, dv1#58]
:                                                                                   +- Project [channel#23, t1#29L, dv1#58]
:                                                                                      +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, v0#49, abs(if ((isnull(v0#49) || isnull(v1#35))) null else UDF(v0#49, v1#35)) AS dv1#58]
:                                                                                         +- Filter ((t0#42L >= cast(0 as bigint)) && NOT isnan(v0#49))
:                                                                                            +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, v0#49]
:                                                                                               +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, v0#49, v0#49]
:                                                                                                  +- Window [lag(v1#35, 1, NaN) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS v0#49], [channel#23], [t1#29L ASC NULLS FIRST]
:                                                                                                     +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L]
:                                                                                                        +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L]
:                                                                                                           +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, t0#42L]
:                                                                                                              +- Window [lag(t1#29L, 1, -1) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#42L], [channel#23], [t1#29L ASC NULLS FIRST]
:                                                                                                                 +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23]
:                                                                                                                    +- Filter ((NOT isnull(t1#29L) && NOT isnull(v1#35)) && ((t1#29L >= cast(0 as bigint)) && NOT isnan(v1#35)))
:                                                                                                                       +- Project [_c0#10, _c1#11, t1#29L, value#18 AS v1#35, channel#23]
:                                                                                                                          +- Project [_c0#10, _c1#11, time#14L AS t1#29L, value#18, channel#23]
:                                                                                                                             +- Project [_c0#10, _c1#11, time#14L, value#18, unnamed AS channel#23]
:                                                                                                                                +- Project [_c0#10, _c1#11, time#14L, UDF(_c1#11) AS value#18]
:                                                                                                                                   +- Project [_c0#10, _c1#11, UDF(_c0#10) AS time#14L]
:                                                                                                                                      +- Relation[_c0#10,_c1#11] csv
+- Project [_1#133 AS pprChannel#136, _2#134 AS ppr#137]
   +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#133, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#134]
      +- ExternalRDD [obj#132]

== Analyzed Logical Plan ==
time: bigint, channel: string, pprChannel: string, ppr: int
Join Inner, (channel#124 = pprChannel#136)
:- Project [time#113L AS time#127L, channel#124]
:  +- Project [time#113L, unnamed AS channel#124]
:     +- Project [time#113L]
:        +- Project [channel#23, time#113L]
:           +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98, clipDT#105L, if ((isnull(t0#93L) || isnull(t1#29L))) null else if ((isnull(t0#93L) || isnull(t1#29L))) null else UDF(t0#93L, t1#29L) AS time#113L]
:              +- Filter (clipDT#105L >= cast(50000000 as bigint))
:                 +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98, (t1#29L - t0#93L) AS clipDT#105L]
:                    +- Filter (((t0#93L >= cast(0 as bigint)) && (pt0#98 = 1)) && (pt1#82 = 2))
:                       +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98]
:                          +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98, pt0#98]
:                             +- Window [lag(pt1#82, 1, 0) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS pt0#98], [channel#23], [t1#29L ASC NULLS FIRST]
:                                +- Project [channel#23, t1#29L, pt1#82, t0#93L]
:                                   +- Project [channel#23, t1#29L, pt1#82, t0#93L]
:                                      +- Project [channel#23, t1#29L, pt1#82, t0#93L, t0#93L]
:                                         +- Window [lag(t1#29L, 1, -1) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#93L], [channel#23], [t1#29L ASC NULLS FIRST]
:                                            +- Project [channel#23, t1#29L, pt1#82]
:                                               +- Project [channel#23, t1#29L, pt1#82]
:                                                  +- Filter pt1#82 IN (1,2)
:                                                     +- Project [channel#23, t1#29L, dv1#58, t0#70L, dv0#75, if ((isnull(dv0#75) || isnull(dv1#58))) null else if ((isnull(dv0#75) || isnull(dv1#58))) null else UDF(dv0#75, dv1#58) AS pt1#82]
:                                                        +- Filter ((t0#70L >= cast(0 as bigint)) && NOT isnan(dv0#75))
:                                                           +- Project [channel#23, t1#29L, dv1#58, t0#70L, dv0#75]
:                                                              +- Project [channel#23, t1#29L, dv1#58, t0#70L, dv0#75, dv0#75]
:                                                                 +- Window [lag(dv1#58, 1, NaN) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS dv0#75], [channel#23], [t1#29L ASC NULLS FIRST]
:                                                                    +- Project [channel#23, t1#29L, dv1#58, t0#70L]
:                                                                       +- Project [channel#23, t1#29L, dv1#58, t0#70L]
:                                                                          +- Project [channel#23, t1#29L, dv1#58, t0#70L, t0#70L]
:                                                                             +- Window [lag(t1#29L, 1, -1) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#70L], [channel#23], [t1#29L ASC NULLS FIRST]
:                                                                                +- Project [channel#23, t1#29L, dv1#58]
:                                                                                   +- Project [channel#23, t1#29L, dv1#58]
:                                                                                      +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, v0#49, abs(if ((isnull(v0#49) || isnull(v1#35))) null else if ((isnull(v0#49) || isnull(v1#35))) null else UDF(v0#49, v1#35)) AS dv1#58]
:                                                                                         +- Filter ((t0#42L >= cast(0 as bigint)) && NOT isnan(v0#49))
:                                                                                            +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, v0#49]
:                                                                                               +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, v0#49, v0#49]
:                                                                                                  +- Window [lag(v1#35, 1, NaN) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS v0#49], [channel#23], [t1#29L ASC NULLS FIRST]
:                                                                                                     +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L]
:                                                                                                        +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L]
:                                                                                                           +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, t0#42L]
:                                                                                                              +- Window [lag(t1#29L, 1, -1) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#42L], [channel#23], [t1#29L ASC NULLS FIRST]
:                                                                                                                 +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23]
:                                                                                                                    +- Filter ((NOT isnull(t1#29L) && NOT isnull(v1#35)) && ((t1#29L >= cast(0 as bigint)) && NOT isnan(v1#35)))
:                                                                                                                       +- Project [_c0#10, _c1#11, t1#29L, value#18 AS v1#35, channel#23]
:                                                                                                                          +- Project [_c0#10, _c1#11, time#14L AS t1#29L, value#18, channel#23]
:                                                                                                                             +- Project [_c0#10, _c1#11, time#14L, value#18, unnamed AS channel#23]
:                                                                                                                                +- Project [_c0#10, _c1#11, time#14L, UDF(_c1#11) AS value#18]
:                                                                                                                                   +- Project [_c0#10, _c1#11, UDF(_c0#10) AS time#14L]
:                                                                                                                                      +- Relation[_c0#10,_c1#11] csv
+- Project [_1#133 AS pprChannel#136, _2#134 AS ppr#137]
   +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#133, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#134]
      +- ExternalRDD [obj#132]

== Optimized Logical Plan ==
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Project [UDF(t0#93L, t1#29L) AS time#127L, unnamed AS channel#124]
+- Filter ((isnotnull(pt0#98) && isnotnull(pt1#82)) && ((((t0#93L >= 0) && (pt0#98 = 1)) && (pt1#82 = 2)) && ((t1#29L - t0#93L) >= 50000000)))
   +- Window [lag(t1#29L, 1, -1) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#93L, lag(pt1#82, 1, 0) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS pt0#98], [unnamed], [t1#29L ASC NULLS FIRST]
      +- Project [t1#29L, if ((isnull(dv0#75) || isnull(dv1#58))) null else if ((isnull(dv0#75) || isnull(dv1#58))) null else UDF(dv0#75, dv1#58) AS pt1#82]
         +- Filter (((t0#70L >= 0) && NOT isnan(dv0#75)) && if ((isnull(dv0#75) || isnull(dv1#58))) null else if ((isnull(dv0#75) || isnull(dv1#58))) null else UDF(dv0#75, dv1#58) IN (1,2))
            +- Window [lag(t1#29L, 1, -1) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#70L, lag(dv1#58, 1, NaN) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS dv0#75], [unnamed], [t1#29L ASC NULLS FIRST]
               +- Project [t1#29L, abs(UDF(v0#49, v1#35)) AS dv1#58]
                  +- Filter ((t0#42L >= 0) && NOT isnan(v0#49))
                     +- Window [lag(t1#29L, 1, -1) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#42L, lag(v1#35, 1, NaN) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS v0#49], [unnamed], [t1#29L ASC NULLS FIRST]
                        +- Project [UDF(_c0#10) AS t1#29L, UDF(_c1#11) AS v1#35]
                           +- Filter ((UDF(_c0#10) >= 0) && NOT isnan(UDF(_c1#11)))
                              +- Relation[_c0#10,_c1#11] csv
and
Project [_1#133 AS pprChannel#136, _2#134 AS ppr#137]
+- Filter (isnotnull(_1#133) && (unnamed = _1#133))
   +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#133, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#134]
      +- ExternalRDD [obj#132]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
== Physical Plan ==
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Project [UDF(t0#93L, t1#29L) AS time#127L, unnamed AS channel#124]
+- Filter ((isnotnull(pt0#98) && isnotnull(pt1#82)) && ((((t0#93L >= 0) && (pt0#98 = 1)) && (pt1#82 = 2)) && ((t1#29L - t0#93L) >= 50000000)))
   +- Window [lag(t1#29L, 1, -1) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#93L, lag(pt1#82, 1, 0) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS pt0#98], [unnamed], [t1#29L ASC NULLS FIRST]
      +- Project [t1#29L, if ((isnull(dv0#75) || isnull(dv1#58))) null else if ((isnull(dv0#75) || isnull(dv1#58))) null else UDF(dv0#75, dv1#58) AS pt1#82]
         +- Filter (((t0#70L >= 0) && NOT isnan(dv0#75)) && if ((isnull(dv0#75) || isnull(dv1#58))) null else if ((isnull(dv0#75) || isnull(dv1#58))) null else UDF(dv0#75, dv1#58) IN (1,2))
            +- Window [lag(t1#29L, 1, -1) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#70L, lag(dv1#58, 1, NaN) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS dv0#75], [unnamed], [t1#29L ASC NULLS FIRST]
               +- Project [t1#29L, abs(UDF(v0#49, v1#35)) AS dv1#58]
                  +- Filter ((t0#42L >= 0) && NOT isnan(v0#49))
                     +- Window [lag(t1#29L, 1, -1) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#42L, lag(v1#35, 1, NaN) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS v0#49], [unnamed], [t1#29L ASC NULLS FIRST]
                        +- Project [UDF(_c0#10) AS t1#29L, UDF(_c1#11) AS v1#35]
                           +- Filter ((UDF(_c0#10) >= 0) && NOT isnan(UDF(_c1#11)))
                              +- Relation[_c0#10,_c1#11] csv
and
Project [_1#133 AS pprChannel#136, _2#134 AS ppr#137]
+- Filter (isnotnull(_1#133) && (unnamed = _1#133))
   +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#133, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#134]
      +- ExternalRDD [obj#132]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;

Да, df1 - результат довольно сложных вычислений, поэтому он такой большой. df2 - это очень маленький DF, который всегда поступает от Map, причем в Spark с sc.parallelize передается не более 50–100 записей. Так что я мог использовать crossJoin и where как обходной путь. Но я хочу понять, почему Spark считает, что это декартово произведение.

Продолжение 2

Сейчас я использую другой подход. Поскольку первый DF - это огромный DF, который является результатом сложных вычислений, а второй всегда исходит из небольшой карты, я изменил свой алгоритм, чтобы использовать обычные операции map для его достижения:

val bDF2Data = sc.broadcast(df2Data)
val res =
  df1.
    as[(Long, String)].
    mapPartitions { iter =>
      val df2Data = bDF2Data.value
      iter.
        flatMap {
          case (time, channel) =>
            df2Data.get(channel).map(ppr => (time, channel, ppr))
        }
    }.
    toDF("time", "channel", "ppr").
    // More operations ...

Может это поможет: stackoverflow.com/questions/42477068/…

Shaido 10.09.2018 11:15

К сожалению, я не могу понять, что не так с explain. Я добавил вывод explain в качестве редактирования.

rabejens 10.09.2018 11:21
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
2
3 380
0

Другие вопросы по теме