Я хочу преобразовать clientIPInt (который в формате Int, да!) в отдельный столбец после применения к нему определенных формул.
Sample Input: df_A
+----+------------------------+
|num |clientIPInt |
+----+------------------------+
|1275|200272593 |
|145 |200172593 |
|2678|200274543 |
|6578|200272593 |
|1001|200272593 |
+----+------------------------+
Output:
+----+------------------------++---------------+
|num |clientIPInt |ip64bigint |
+----+------------------------+----------------+
|1275|200272593 |3521834763 |
|145 |0 |0 |
|2678|200272593 |3521834763 |
|6578|200272593 |3521834763 |
|1001|200272593 |3521834763 |
+----+------------------------+----------------+
Я создал udf для преобразования. Ниже то, что я пробовал.
val b = df_A.withColumn("ip64bigint", ipToLong(df_A.col("clientIpInt")))
val ipToLong = udf{(ipInt: Int) =>
val i = {
if (ipInt <= 0) ipInt.toLong + 4294967296L
else ipInt.toLong
}
val b = ((i & 255L) * 16777216L) + ((i & 65280L) * 256L) + ((i & 16711680L) / 256L) + ((i / 16777216L) & 255L)
b
}
Однако этот udf не так эффективен.
Затем я попытался использовать функцию столбца, но приведенный ниже код не работает.
val d = df_A.withColumn("ip64bigint", newCol($"clientIpInt"))
def newCol(col: Column): Column = {
when(col <= 0, ((((col.toLong + + 4294967296L) & 255L) * 16777216L) + (((col.toLong + + 4294967296L) & 65280L) * 256L) + (((col.toLong + + 4294967296L) & 16711680L) / 256L) + (((col.toLong + + 4294967296L) / 16777216L) & 255L))).
otherwise(((col & 255L) * 16777216L) + ((col & 65280L) * 256L) + ((col & 16711680L) / 256L) + ((col / 16777216L) & 255L))
}
Я действительно не хочу преобразовывать фрейм данных df_A в набор данных [случайный класс столбцов], так как у меня более 140 столбцов в фрейме данных.
Любые идеи, что я делаю неправильно с функцией столбца или любым другим способом преобразования данных
Ниже приведено одно решение, которое работает:
пример кадра данных =>
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
val data =
Seq(
Row(1275, 200272593),
Row(145, 0),
Row(2678, 200274543),
Row(6578, 200272593),
Row(1001, 200272593))
val dF = spark.createDataFrame(spark.sparkContext.parallelize(data),
StructType(List(StructField("num", IntegerType, nullable = true),
StructField("clientIPInt", IntegerType, nullable = true))))
+----+-----------+
| num|clientIPInt|
+----+-----------+
|1275| 200272593|
| 145| 0|
|2678| 200274543|
|6578| 200272593|
|1001| 200272593|
+----+-----------+
используя функции, предоставленные искрой =>
import spark.implicits._
import org.apache.spark.sql.functions._
dF.withColumn("i", when('clientIPInt <= 0, ('clientIPInt cast "long") + 4294967296L).otherwise('clientIPInt cast "long"))
.withColumn("ip64bigint", (('i.bitwiseAND(255L) * 16777216L) + ('i.bitwiseAND(65280L) * 256L) + ('i.bitwiseAND(16711680L) / 256L) + ('i / 16777216L).cast("long").bitwiseAND(255L)) cast "long")
.drop("i").show(false)
Выход =>
+----+-----------+----------+
|num |clientIPInt|ip64bigint|
+----+-----------+----------+
|1275|200272593 |3521834763|
|145 |0 |0 |
|2678|200274543 |1878191883|
|6578|200272593 |3521834763|
|1001|200272593 |3521834763|
+----+-----------+----------+
Есть идеи @Shaido?