Создание настраиваемой оконной функции в Apache Beam

У меня есть конвейер Beam, который начинается с чтения нескольких текстовых файлов, где каждая строка в файле представляет собой строку, которая позже вставляется в Bigtable. Сценарий требует подтверждения, что количество строк, извлеченных из каждого файла, и количество строк, позже вставленных в Bigtable, совпадают. Для этого я планирую разработать собственную стратегию работы с окнами, чтобы строки из одного файла назначались одному окну на основе имени файла в качестве ключа, который будет передан в функцию работы с окнами.

Есть ли какой-нибудь образец кода для создания пользовательских оконных функций?

Это потоковый конвейер? Если нет, вы можете сделать это с помощью GroupByKey.

Pablo 13.09.2018 00:40

@Pablo Спасибо за ответ. Однако, основываясь на моих ограниченных знаниях и просмотре документации, GroupByKey действует только как SQL GROUP BY и фактически не назначает Window. В моем сценарии строки уже сгруппированы вместе на основе имени файла в качестве контейнера, то есть ключа. Проблема здесь в том, чтобы иметь возможность вставлять строки (строки), принадлежащие одному и тому же файлу, как часть одного и того же окна, что, как я считаю, возможно, если строки будут сделаны частью одного и того же окна непосредственно перед вызовом CloudBigtableIO.writeToTable().

W Khattak 13.09.2018 08:27

Я не понимаю что ты имеешь в виду. Если вы уже сгруппировали строки по имени файла, зачем вам вставлять другие строки? Откуда берутся эти другие строки?

Pablo 13.09.2018 08:42

Извините за путаницу. Когда я говорю «строки уже сгруппированы вместе на основе имени файла», в основном я пытаюсь сказать, что я уже знаю, к какому файлу принадлежит конкретная строка. Не в этом проблема. Проблема заключается в вызове CloudBigtableIO.writeToTable(), который должен происходить для каждого окна (1 имя файла = 1 окно). К сожалению, GroupByKey не создает окна для каждой клавиши. Надеюсь его прояснит.

W Khattak 13.09.2018 11:35

Хммм в лучах окна используются для представления времени, а не других измерений. Если вы хотите, чтобы ваши элементы были окнами, вам нужно будет добавить временную метку и применить стратегию работы с окнами. Вы можете добавить собственную стратегию управления временными окнами, но, похоже, это не то, что вы ищете?

Pablo 13.09.2018 21:53
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
5
748
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Хотя я изменил свою стратегию подтверждения вставленного количества строк, для всех, кто интересуется оконными элементами, считываемыми из пакетного источника, например. FileIO в пакетном задании, вот код для создания пользовательской стратегии работы с окнами:

public class FileWindows extends PartitioningWindowFn<Object, IntervalWindow>{

private static final long serialVersionUID = -476922142925927415L;
private static final Logger LOG = LoggerFactory.getLogger(FileWindows.class);

@Override
public IntervalWindow assignWindow(Instant timestamp) {
    Instant end = new Instant(timestamp.getMillis() + 1);
    IntervalWindow interval = new IntervalWindow(timestamp, end);
    LOG.info("FileWindows >> assignWindow(): Window assigned with Start: {}, End: {}", timestamp, end);
    return interval;
}

@Override
public boolean isCompatible(WindowFn<?, ?> other) {
    return this.equals(other);
}

@Override
public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
    if (!this.isCompatible(other)) {
        throw new IncompatibleWindowException(other, String.format("Only %s objects are compatible.", FileWindows.class.getSimpleName()));
    }
  }

@Override
public Coder<IntervalWindow> windowCoder() {
    return IntervalWindow.getCoder();
}   

}

а затем его можно использовать в конвейере, как показано ниже:

p
 .apply("Assign_Timestamp_to_Each_Message", ParDo.of(new AssignTimestampFn()))
 .apply("Assign_Window_to_Each_Message", Window.<KV<String,String>>into(new FileWindows())
  .withAllowedLateness(Duration.standardMinutes(1))
  .discardingFiredPanes());

Имейте в виду, что вам нужно будет записать AssignTimestampFn(), чтобы каждое сообщение имело отметку времени.

Другие вопросы по теме