Извлечь текст между двумя строками, если между этими двумя строками также присутствует третья строка — Pyspark

У меня есть искровой фрейм данных с двумя столбцами (time_stamp и message), как показано ниже:

Пример искрового фрейма данных

      message                                             time_stamp
irrelevant_text Startstring [ID: 1AB]                   2015-01-23 08:23:16
some irrelevant text                                    2015-01-23 08:24:20
irrelevant_text mandatorystring ID [1AB]                2015-01-23 08:25:32
some irrelevant text                                    2015-01-23 08:27:18
contributor XYZ_ABCD                                    2015-01-23 08:27:54
some irrelevant text                                    2015-01-23 08:28:36
irrelevant_text endstring [ID: 1AB]                     2015-01-23 08:30:47
some irrelevant text                                    2015-01-23 08:24:20
irrelevant_text Startstring [ID: 2BC]                   2015-01-23 10:05:16
some irrelevant text                                    2015-01-23 10:24:20
contributor LMN_EFG_X                                   2015-01-23 10:27:21
some irrelevant text                                    2015-01-23 10:28:34
irrelevant_text endstring [ID: 2BC]                     2015-01-23 10:30:47
some irrelevant text                                    2015-01-23 10:50:20
irrelevant_text Startstring [ID: 3DE]                   2015-01-23 12:21:16
some irrelevant text                                    2015-01-23 12:24:20
irrelevant_text mandatorystring ID [3DE]                2015-01-23 12:37:32
some irrelevant text                                    2015-01-23 12:45:18
contributor PQRS_STU_wtx                                2015-01-23 12:47:05
some irrelevant text                                    2015-01-23 12:48:33
irrelevant_text endstring [ID: 3DE]                     2015-01-23 12:59:47

Я пытаюсь извлечь участника, появившегося между Startstring и endstring, если между Startstring и endstring существует обязательная строка, и отбросить участников, если между Startstring и endstring нет обязательной строки. В одной дате может быть несколько таких экземпляров.

Ожидаемый результат:

time_stamp                 contributor
2015-01-23 08:27:54         XYZ_ABCD                                    
2015-01-23 12:47:05       PQRS_STU_wtx                                

Для чтения текстового файла я использовал следующую команду.

df = spark.read.format("com.databricks.spark.csv").option("inferSchema", "false").schema(schema).option("delimiter", "\t").load('{}'.format(fileName))

Являются ли эти строки отдельными строками фрейма данных?

OneCricketeer 18.12.2020 06:27

да, это отдельные строки кадра данных искры с заголовками столбцов как «сообщение» и «метка времени»

Dataholic 18.12.2020 06:28

Есть ли другой способ чтения/группировки данных заранее? Как вы контролируете, чтобы начальная и конечная строки оказывались в одном и том же исполнителе?

OneCricketeer 18.12.2020 06:29

Эта программа будет написана в Azure Databricks, и я думаю, что Azure Databricks справится с этим в конце своей работы. Так что нам не нужно об этом думать. Этот фрейм данных создается из текстового файла с разделителями табуляции. Для чтения текстового файла я использовал следующую команду. "df = spark.read.format("com.databricks.spark.csv").option("inferS‌​chema", "false").schema(schema).option("разделитель", "\t"). загрузить('{}'.format(fileName))"

Dataholic 18.12.2020 06:35

Нет, блоки данных не справятся с этим. Например, все временные метки до полудня могут появиться на исполнителе 1, где у вас есть начальная строка в 11. Затем у исполнителя 2 есть соответствующая конечная строка в 1:00... Где-то вам нужно сгруппировать строки вместе, чтобы гарантировать, что данные не раскололся

OneCricketeer 18.12.2020 06:39

для целей этого вопроса давайте предположим, что весь фрейм данных выполняется одним и тем же исполнителем.

Dataholic 18.12.2020 06:41

Конечно. Есть ли у вас какие-либо попытки до сих пор? Или ждете, что кто-то напишет для вас?

OneCricketeer 18.12.2020 06:43

Я фактически преобразовал весь фрейм данных в одну строку и применил регулярное выражение для извлечения. Он работает нормально, но иногда блок данных занимает много времени, если размер набора данных очень велик. У меня есть набор данных объемом около 1 ТБ. Я использовал следующий шаблон: «Startstring(?:(?!Startstring).)*?mandatorystring(?:(?!Start‌​string).)*?,['\s]*ID‌​\s*:\s *([^',]*).*?en‌​dstring"

Dataholic 18.12.2020 06:46

Верно... Если вы собираетесь это сделать, то вам вообще не следует использовать искру. С другой стороны, как я уже сказал, вам нужно сгруппировать линии, чтобы найти соответствующие концы стартов, что является одним из способов сделать это, но тогда у вас нет временных меток.

OneCricketeer 18.12.2020 06:50

Давайте продолжим обсуждение в чате.

Dataholic 18.12.2020 06:52

Мое немедленное предложение состояло бы в том, чтобы не использовать читатель CSV и вместо этого использовать читатель HadoopRDD, но вы должны написать код Java для этого. Тем не менее, это, вероятно, наиболее подходящий и быстрый метод для решения проблемы, поскольку он правильно разбивает данные на нужные вам фрагменты.

OneCricketeer 18.12.2020 06:53
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
11
784
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Отфильтруйте допустимые группы сообщений (содержащие «обязательные») и получите сообщения, содержащие «участник», из допустимых групп сообщений.

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'begin',
    F.last(
        F.when(F.col('message').rlike('Startstring'), F.col('time_stamp')), True
    ).over(Window.orderBy('time_stamp'))
).withColumn(
    'end',
    F.first(
        F.when(F.col('message').rlike('Endstring'), F.col('time_stamp')), True
    ).over(Window.orderBy('time_stamp').rowsBetween(0, Window.unboundedFollowing))
).withColumn(
    'mandatory',
    F.sum(
        F.col('message').rlike('mandatory').cast('int')
    ).over(Window.partitionBy('begin', 'end'))
).filter(
    "mandatory >= 1 and message rlike 'contributor'"
).select(
    'time_stamp',
    F.regexp_extract('message', 'contributor (\S+)', 1).alias('contributor')
)

df2.show()
+-------------------+------------+
|         time_stamp| contributor|
+-------------------+------------+
|2015-01-23 08:27:54|    XYZ_ABCD|
|2015-01-23 12:47:05|PQRS_STU_wtx|
+-------------------+------------+

ПРИМЕЧАНИЕ. Это хорошее решение для небольшого объема данных, но его нельзя запускать на блоках данных, если вы используете его для больших данных. Он использует функцию Windows, которая ограничивает использование рабочего узла до 1, даже если у вас много рабочих узлов. Мне пришлось разработать собственное решение с использованием SQL, чтобы использовать параллелизм в блоках данных.

Dataholic 25.12.2020 06:26

Используйте функции window.

Попробуйте код ниже.

Импортировать необходимые библиотеки.

from pyspark.sql import functions as F
from pyspark.sql.window import Window

Загрузка данных во фрейм данных.

df = spark.read.format("csv").option("header","true").load("/tmp/data/sample.csv")
df \
.withColumn("subMessage", \
    F.when(F.col("message").contains("Startstring"),F.lit("start"))\ 
    .when(F.col("message").contains("mandatorystring"),F.lit("mandatory")) \
    .when(F.col("message").contains("contributor"),F.regexp_replace(F.col("message"),"contributor ","")) \
    .when(F.col("message").contains("endstring"),F.lit("end"))\
) \
.filter(F.col("subMessage").isNotNull()) \
.withColumn("iscontributor",((F.lead(F.col("subMessage"),1).over(Window.orderBy(F.lit(1))) == "end") & (F.lag(F.col("subMessage"),1).over(Window.orderBy(F.lit(1))) == "mandatory"))) \
.filter(F.col("iscontributor") == True) \
.show()

Окончательный вывод.

+--------------------+-------------------+------------+-------------+
|             message|         time_stamp|  subMessage|iscontributor|
+--------------------+-------------------+------------+-------------+
|contributor XYZ_ABCD|2015-01-23 08:27:54|    XYZ_ABCD|         true|
|contributor PQRS_...|2015-01-23 12:47:05|PQRS_STU_wtx|         true|
+--------------------+-------------------+------------+-------------+

Другие вопросы по теме