Flink SQL: исходная таблица слишком велика, чтобы поместиться в память

Я относительно новичок в Flink, и сегодня я столкнулся с проблемой при использовании Flink SQL в сеансовом кластере Flink 1.11.3.

Проблема

Я зарегистрировал исходную таблицу, в которой используется драйвер jdbc postgres. Я пытаюсь переместить некоторые данные из этой онлайн-базы данных в AWS S3 в формате паркета. Эта таблица имеет огромный размер (~ 43 ГБ). Задание не удалось примерно через 1 минуту, и диспетчер задач аварийно завершил работу без предупреждения. Но я думаю, что диспетчеру задач не хватило памяти.

Мое наблюдение

Я обнаружил, что когда я это делаю tableEnv.executeSql("select ... from huge_table limit 1000"), flink пытался просканировать всю исходную таблицу в память и только после этого планировал сделать лимит.

Вопрос

Поскольку меня интересуют данные только за последние несколько дней, есть ли способ ограничить количество строк, которые задание будет сканировать по отметке времени?

Приложение

Вот минимальная настройка, которая может воспроизвести проблему (удалено много шума)

Код настройки окружения

var blinkSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
var tableEnv = TableEnvironment.create(blinkSettings);

Исходная таблица DDL в Flink SQL

CREATE TABLE source_transactions (
    txid STRING,
    username STRING,
    amount BIGINT,
    ts TIMESTAMP,
    PRIMARY KEY (txid) NOT ENFORCED
) WITH (
    'connector'='jdbc',
    'url'='jdbc:postgresql://my.bank',
    'table-name'='transactions',
    'driver'='org.postgresql.Driver',
    'username'='username',
    'password'='password',
    'scan.fetch-size'='2000'
)

DDL таблицы приемника в Flink SQL

CREATE TABLE sink_transactions (
    create_time TIMESTAMP,
    username STRING,
    delta_amount DOUBLE,
    dt STRING
) PARTITIONED BY (dt) WITH (
    'connector'='filesystem',
    'path'='s3a://s3/path/to/transactions',
    'format'='parquet'
)

Вставить запрос в Flink SQL

INSERT INTO sink_transactions
SELECT ts, username, CAST(t.amount AS DOUBLE) / 100, DATE_FORMAT(ts, 'yyyy-MM-dd')
FROM source_transactions
ReactJs | Supabase | Добавление данных в базу данных
ReactJs | Supabase | Добавление данных в базу данных
Это и есть ваш редактор таблиц в supabase.👇
Понимание Python и переход к SQL
Понимание Python и переход к SQL
Перед нами лабораторная работа по BloodOath:
3
0
696
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Ваше замечание верно. Flink не поддерживает оптимизацию ограничения выталкивания вниз для соединителя JDBC, и для поддержки этой функции существует почти объединенный PR, который будет использоваться во Flink 1.13, и вы можете выбрать этот патч для своего кода, если вы срочно к этой функции.

1.JIRA: FLINK-19650 Поддержите уменьшение лимита для Jdbc

2.PR: https://github.com/apache/flink/pull/13800

Спасибо, что указали мне правильное направление, сэр! тем не менее, этот PR добавил поддержку лимита, но на самом деле то, что я ищу, - это выбор диапазона по отметке времени. Вы случайно не знаете, как его можно сбросить?

Patrick 23.12.2020 05:29

Существует PR для поддержки пользовательского запроса при определении таблицы JDBC, но сообщество обеспокоено тем, что таблица, определенная таким образом, является представлением или таблицей (например, запрос: выберите a, b из A JOIN B на A.id = B.id ), попробую протолкнуть этот пиар. github.com/apache/flink/pull/13245

leonard 23.12.2020 11:01

Другие вопросы по теме