Исключение при записи в Azure UltraSSD LRS для сохранения состояния флик-ключа с помощью Rocksdb

Я использую приложение Flink для хранения состояния ключа на ПВХ, установленном в модуле Kubernetes. Установленный ПВХ-диск — UltraSSD_LRS в Azure. Конфигурация задания flink приведена ниже:

  state.backend: rocksdb
  state.backend.rocksdb.localdir: "/data/flink/state/local
  state.checkpoints.dir: abfs://XXXXXXX/flink-checkpointing/

внутри кода я также устанавливаю путь к dbStorage, как показано:

EmbeddedRocksDBStateBackend embeddedRocksDb = new EmbeddedRocksDBStateBackend(true);
embeddedRocksDb.setDbStoragePath("file:///data/flink/state/job/");
env.setStateBackend(embeddedRocksDb);
env.enableCheckpointing(5000);

Диск монтируется правильно, и данные также записываются, но через некоторое время приложение выдает ошибку Rockdb Exception.

Caused by: org.apache.flink.util.SerializedThrowable: org.rocksdb.RocksDBException: while link file to /data/flink/state/job/job_cb48da41cb620a68170593dc09789f2b_op_KeyedCoProcessOperator_1c449adacf198fac8b664046293f1fdf__2_4__uuid_4c6051b6-b588-4bcb-a740-fe4ddb01beae/chk-8.tmp/000014.sst: /data/flink/state/job/job_cb48da41cb620a68170593dc09789f2b_op_KeyedCoProcessOperator_1c449adacf198fac8b664046293f1fdf__2_4__uuid_4c6051b6-b588-4bcb-a740-fe4ddb01beae/db/000014.sst: Operation not supported
at org.rocksdb.Checkpoint.createCheckpoint(Native Method) ~[flink-dist-1.17.1.jar:1.17.1]
at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.takeDBNativeCheckpoint(RocksDBSnapshotStrategyBase.java:170) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.syncPrepareResources(RocksDBSnapshotStrategyBase.java:156) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.syncPrepareResources(RocksDBSnapshotStrategyBase.java:76) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:593) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:246) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:336) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:715) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:350) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1299) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1287) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1244) ~[flink-dist-1.17.1.jar:1.17.1]
... 23 more
Как установить LAMP Stack - Security 5/5 на виртуальную машину Azure Linux VM
Как установить LAMP Stack - Security 5/5 на виртуальную машину Azure Linux VM
В предыдущей статье мы завершили установку базы данных, для тех, кто не знает.
Как установить LAMP Stack 1/2 на Azure Linux VM
Как установить LAMP Stack 1/2 на Azure Linux VM
В дополнение к нашему предыдущему сообщению о намерении Azure прекратить поддержку Azure Database для MySQL в качестве единого сервера после 16...
0
0
81
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я не уверен, в чем проблема, но я бы начал с удаления всей конфигурации из кода. Это не обязательно, но было бы чище.

В частности, эти два элемента настраивают одно и то же, но не устанавливают одно и то же значение, поэтому это сбивает с толку:

state.backend.rocksdb.localdir: "/data/flink/state/local
embeddedRocksDb.setDbStoragePath("file:///data/flink/state/job/");

Другая часть конфигурации — включение контрольных точек, что можно сделать через

execution.checkpointing.interval: 5000

Еще один фактор, который следует учитывать, заключается в том, что EmbeddedRocksDBStateBackend не нужен постоянный том — он может использовать эфемерное хранилище, поскольку Flink для восстановления полагается на контрольные точки, а не на локальный диск.

Привет @david, мы использовали ПВХ, потому что диск эластичен и будет расширяться при увеличении размера состояния. хотел знать, каково будет поведение, если размер состояния станет больше, чем эфемерное хранилище?

Aman Vaishya 16.04.2024 11:18

Если размер состояния превысит размер хранилища, произойдет сбой. Вы можете избежать этого сбоя или восстановиться после него, выполнив масштабирование до большего количества узлов или перенастроив локальный каталог в этой точке.

David Anderson 16.04.2024 20:08

Другие вопросы по теме