У меня есть конвейер Beam, который начинается с чтения нескольких текстовых файлов, где каждая строка в файле представляет собой строку, которая позже вставляется в Bigtable. Сценарий требует подтверждения, что количество строк, извлеченных из каждого файла, и количество строк, позже вставленных в Bigtable, совпадают. Для этого я планирую разработать собственную стратегию работы с окнами, чтобы строки из одного файла назначались одному окну на основе имени файла в качестве ключа, который будет передан в функцию работы с окнами.
Есть ли какой-нибудь образец кода для создания пользовательских оконных функций?
@Pablo Спасибо за ответ. Однако, основываясь на моих ограниченных знаниях и просмотре документации, GroupByKey
действует только как SQL GROUP BY
и фактически не назначает Window. В моем сценарии строки уже сгруппированы вместе на основе имени файла в качестве контейнера, то есть ключа. Проблема здесь в том, чтобы иметь возможность вставлять строки (строки), принадлежащие одному и тому же файлу, как часть одного и того же окна, что, как я считаю, возможно, если строки будут сделаны частью одного и того же окна непосредственно перед вызовом CloudBigtableIO.writeToTable()
.
Я не понимаю что ты имеешь в виду. Если вы уже сгруппировали строки по имени файла, зачем вам вставлять другие строки? Откуда берутся эти другие строки?
Извините за путаницу. Когда я говорю «строки уже сгруппированы вместе на основе имени файла», в основном я пытаюсь сказать, что я уже знаю, к какому файлу принадлежит конкретная строка. Не в этом проблема. Проблема заключается в вызове CloudBigtableIO.writeToTable()
, который должен происходить для каждого окна (1 имя файла = 1 окно). К сожалению, GroupByKey
не создает окна для каждой клавиши. Надеюсь его прояснит.
Хммм в лучах окна используются для представления времени, а не других измерений. Если вы хотите, чтобы ваши элементы были окнами, вам нужно будет добавить временную метку и применить стратегию работы с окнами. Вы можете добавить собственную стратегию управления временными окнами, но, похоже, это не то, что вы ищете?
Хотя я изменил свою стратегию подтверждения вставленного количества строк, для всех, кто интересуется оконными элементами, считываемыми из пакетного источника, например. FileIO
в пакетном задании, вот код для создания пользовательской стратегии работы с окнами:
public class FileWindows extends PartitioningWindowFn<Object, IntervalWindow>{
private static final long serialVersionUID = -476922142925927415L;
private static final Logger LOG = LoggerFactory.getLogger(FileWindows.class);
@Override
public IntervalWindow assignWindow(Instant timestamp) {
Instant end = new Instant(timestamp.getMillis() + 1);
IntervalWindow interval = new IntervalWindow(timestamp, end);
LOG.info("FileWindows >> assignWindow(): Window assigned with Start: {}, End: {}", timestamp, end);
return interval;
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return this.equals(other);
}
@Override
public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
if (!this.isCompatible(other)) {
throw new IncompatibleWindowException(other, String.format("Only %s objects are compatible.", FileWindows.class.getSimpleName()));
}
}
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
}
а затем его можно использовать в конвейере, как показано ниже:
p
.apply("Assign_Timestamp_to_Each_Message", ParDo.of(new AssignTimestampFn()))
.apply("Assign_Window_to_Each_Message", Window.<KV<String,String>>into(new FileWindows())
.withAllowedLateness(Duration.standardMinutes(1))
.discardingFiredPanes());
Имейте в виду, что вам нужно будет записать AssignTimestampFn()
, чтобы каждое сообщение имело отметку времени.
Это потоковый конвейер? Если нет, вы можете сделать это с помощью GroupByKey.