У меня есть искровой фрейм данных с двумя столбцами (time_stamp и message), как показано ниже:
Пример искрового фрейма данных
message time_stamp
irrelevant_text Startstring [ID: 1AB] 2015-01-23 08:23:16
some irrelevant text 2015-01-23 08:24:20
irrelevant_text mandatorystring ID [1AB] 2015-01-23 08:25:32
some irrelevant text 2015-01-23 08:27:18
contributor XYZ_ABCD 2015-01-23 08:27:54
some irrelevant text 2015-01-23 08:28:36
irrelevant_text endstring [ID: 1AB] 2015-01-23 08:30:47
some irrelevant text 2015-01-23 08:24:20
irrelevant_text Startstring [ID: 2BC] 2015-01-23 10:05:16
some irrelevant text 2015-01-23 10:24:20
contributor LMN_EFG_X 2015-01-23 10:27:21
some irrelevant text 2015-01-23 10:28:34
irrelevant_text endstring [ID: 2BC] 2015-01-23 10:30:47
some irrelevant text 2015-01-23 10:50:20
irrelevant_text Startstring [ID: 3DE] 2015-01-23 12:21:16
some irrelevant text 2015-01-23 12:24:20
irrelevant_text mandatorystring ID [3DE] 2015-01-23 12:37:32
some irrelevant text 2015-01-23 12:45:18
contributor PQRS_STU_wtx 2015-01-23 12:47:05
some irrelevant text 2015-01-23 12:48:33
irrelevant_text endstring [ID: 3DE] 2015-01-23 12:59:47
Я пытаюсь извлечь участника, появившегося между Startstring и endstring, если между Startstring и endstring существует обязательная строка, и отбросить участников, если между Startstring и endstring нет обязательной строки. В одной дате может быть несколько таких экземпляров.
Ожидаемый результат:
time_stamp contributor
2015-01-23 08:27:54 XYZ_ABCD
2015-01-23 12:47:05 PQRS_STU_wtx
Для чтения текстового файла я использовал следующую команду.
df = spark.read.format("com.databricks.spark.csv").option("inferSchema", "false").schema(schema).option("delimiter", "\t").load('{}'.format(fileName))
да, это отдельные строки кадра данных искры с заголовками столбцов как «сообщение» и «метка времени»
Есть ли другой способ чтения/группировки данных заранее? Как вы контролируете, чтобы начальная и конечная строки оказывались в одном и том же исполнителе?
Эта программа будет написана в Azure Databricks, и я думаю, что Azure Databricks справится с этим в конце своей работы. Так что нам не нужно об этом думать. Этот фрейм данных создается из текстового файла с разделителями табуляции. Для чтения текстового файла я использовал следующую команду. "df = spark.read.format("com.databricks.spark.csv").option("inferSchema", "false").schema(schema).option("разделитель", "\t"). загрузить('{}'.format(fileName))"
Нет, блоки данных не справятся с этим. Например, все временные метки до полудня могут появиться на исполнителе 1, где у вас есть начальная строка в 11. Затем у исполнителя 2 есть соответствующая конечная строка в 1:00... Где-то вам нужно сгруппировать строки вместе, чтобы гарантировать, что данные не раскололся
для целей этого вопроса давайте предположим, что весь фрейм данных выполняется одним и тем же исполнителем.
Конечно. Есть ли у вас какие-либо попытки до сих пор? Или ждете, что кто-то напишет для вас?
Я фактически преобразовал весь фрейм данных в одну строку и применил регулярное выражение для извлечения. Он работает нормально, но иногда блок данных занимает много времени, если размер набора данных очень велик. У меня есть набор данных объемом около 1 ТБ. Я использовал следующий шаблон: «Startstring(?:(?!Startstring).)*?mandatorystring(?:(?!Startstring).)*?,['\s]*ID\s*:\s *([^',]*).*?endstring"
Верно... Если вы собираетесь это сделать, то вам вообще не следует использовать искру. С другой стороны, как я уже сказал, вам нужно сгруппировать линии, чтобы найти соответствующие концы стартов, что является одним из способов сделать это, но тогда у вас нет временных меток.
Давайте продолжим обсуждение в чате.
Мое немедленное предложение состояло бы в том, чтобы не использовать читатель CSV и вместо этого использовать читатель HadoopRDD, но вы должны написать код Java для этого. Тем не менее, это, вероятно, наиболее подходящий и быстрый метод для решения проблемы, поскольку он правильно разбивает данные на нужные вам фрагменты.
Отфильтруйте допустимые группы сообщений (содержащие «обязательные») и получите сообщения, содержащие «участник», из допустимых групп сообщений.
from pyspark.sql import functions as F, Window
df2 = df.withColumn(
'begin',
F.last(
F.when(F.col('message').rlike('Startstring'), F.col('time_stamp')), True
).over(Window.orderBy('time_stamp'))
).withColumn(
'end',
F.first(
F.when(F.col('message').rlike('Endstring'), F.col('time_stamp')), True
).over(Window.orderBy('time_stamp').rowsBetween(0, Window.unboundedFollowing))
).withColumn(
'mandatory',
F.sum(
F.col('message').rlike('mandatory').cast('int')
).over(Window.partitionBy('begin', 'end'))
).filter(
"mandatory >= 1 and message rlike 'contributor'"
).select(
'time_stamp',
F.regexp_extract('message', 'contributor (\S+)', 1).alias('contributor')
)
df2.show()
+-------------------+------------+
| time_stamp| contributor|
+-------------------+------------+
|2015-01-23 08:27:54| XYZ_ABCD|
|2015-01-23 12:47:05|PQRS_STU_wtx|
+-------------------+------------+
ПРИМЕЧАНИЕ. Это хорошее решение для небольшого объема данных, но его нельзя запускать на блоках данных, если вы используете его для больших данных. Он использует функцию Windows, которая ограничивает использование рабочего узла до 1, даже если у вас много рабочих узлов. Мне пришлось разработать собственное решение с использованием SQL, чтобы использовать параллелизм в блоках данных.
Используйте функции window
.
Попробуйте код ниже.
Импортировать необходимые библиотеки.
from pyspark.sql import functions as F
from pyspark.sql.window import Window
Загрузка данных во фрейм данных.
df = spark.read.format("csv").option("header","true").load("/tmp/data/sample.csv")
df \
.withColumn("subMessage", \
F.when(F.col("message").contains("Startstring"),F.lit("start"))\
.when(F.col("message").contains("mandatorystring"),F.lit("mandatory")) \
.when(F.col("message").contains("contributor"),F.regexp_replace(F.col("message"),"contributor ","")) \
.when(F.col("message").contains("endstring"),F.lit("end"))\
) \
.filter(F.col("subMessage").isNotNull()) \
.withColumn("iscontributor",((F.lead(F.col("subMessage"),1).over(Window.orderBy(F.lit(1))) == "end") & (F.lag(F.col("subMessage"),1).over(Window.orderBy(F.lit(1))) == "mandatory"))) \
.filter(F.col("iscontributor") == True) \
.show()
Окончательный вывод.
+--------------------+-------------------+------------+-------------+
| message| time_stamp| subMessage|iscontributor|
+--------------------+-------------------+------------+-------------+
|contributor XYZ_ABCD|2015-01-23 08:27:54| XYZ_ABCD| true|
|contributor PQRS_...|2015-01-23 12:47:05|PQRS_STU_wtx| true|
+--------------------+-------------------+------------+-------------+
Являются ли эти строки отдельными строками фрейма данных?