Я относительно новичок в Flink, и сегодня я столкнулся с проблемой при использовании Flink SQL в сеансовом кластере Flink 1.11.3.
Я зарегистрировал исходную таблицу, в которой используется драйвер jdbc postgres. Я пытаюсь переместить некоторые данные из этой онлайн-базы данных в AWS S3 в формате паркета. Эта таблица имеет огромный размер (~ 43 ГБ). Задание не удалось примерно через 1 минуту, и диспетчер задач аварийно завершил работу без предупреждения. Но я думаю, что диспетчеру задач не хватило памяти.
Я обнаружил, что когда я это делаю tableEnv.executeSql("select ... from huge_table limit 1000")
, flink пытался просканировать всю исходную таблицу в память и только после этого планировал сделать лимит.
Поскольку меня интересуют данные только за последние несколько дней, есть ли способ ограничить количество строк, которые задание будет сканировать по отметке времени?
Вот минимальная настройка, которая может воспроизвести проблему (удалено много шума)
Код настройки окружения
var blinkSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
var tableEnv = TableEnvironment.create(blinkSettings);
Исходная таблица DDL в Flink SQL
CREATE TABLE source_transactions (
txid STRING,
username STRING,
amount BIGINT,
ts TIMESTAMP,
PRIMARY KEY (txid) NOT ENFORCED
) WITH (
'connector'='jdbc',
'url'='jdbc:postgresql://my.bank',
'table-name'='transactions',
'driver'='org.postgresql.Driver',
'username'='username',
'password'='password',
'scan.fetch-size'='2000'
)
DDL таблицы приемника в Flink SQL
CREATE TABLE sink_transactions (
create_time TIMESTAMP,
username STRING,
delta_amount DOUBLE,
dt STRING
) PARTITIONED BY (dt) WITH (
'connector'='filesystem',
'path'='s3a://s3/path/to/transactions',
'format'='parquet'
)
Вставить запрос в Flink SQL
INSERT INTO sink_transactions
SELECT ts, username, CAST(t.amount AS DOUBLE) / 100, DATE_FORMAT(ts, 'yyyy-MM-dd')
FROM source_transactions
Ваше замечание верно. Flink не поддерживает оптимизацию ограничения выталкивания вниз для соединителя JDBC, и для поддержки этой функции существует почти объединенный PR, который будет использоваться во Flink 1.13, и вы можете выбрать этот патч для своего кода, если вы срочно к этой функции.
1.JIRA: FLINK-19650 Поддержите уменьшение лимита для Jdbc
2.PR: https://github.com/apache/flink/pull/13800
Существует PR для поддержки пользовательского запроса при определении таблицы JDBC, но сообщество обеспокоено тем, что таблица, определенная таким образом, является представлением или таблицей (например, запрос: выберите a, b из A JOIN B на A.id = B.id ), попробую протолкнуть этот пиар. github.com/apache/flink/pull/13245
Спасибо, что указали мне правильное направление, сэр! тем не менее, этот PR добавил поддержку лимита, но на самом деле то, что я ищу, - это выбор диапазона по отметке времени. Вы случайно не знаете, как его можно сбросить?