Я использую приложение Flink для хранения состояния ключа на ПВХ, установленном в модуле Kubernetes. Установленный ПВХ-диск — UltraSSD_LRS в Azure. Конфигурация задания flink приведена ниже:
state.backend: rocksdb
state.backend.rocksdb.localdir: "/data/flink/state/local
state.checkpoints.dir: abfs://XXXXXXX/flink-checkpointing/
внутри кода я также устанавливаю путь к dbStorage, как показано:
EmbeddedRocksDBStateBackend embeddedRocksDb = new EmbeddedRocksDBStateBackend(true);
embeddedRocksDb.setDbStoragePath("file:///data/flink/state/job/");
env.setStateBackend(embeddedRocksDb);
env.enableCheckpointing(5000);
Диск монтируется правильно, и данные также записываются, но через некоторое время приложение выдает ошибку Rockdb Exception.
Caused by: org.apache.flink.util.SerializedThrowable: org.rocksdb.RocksDBException: while link file to /data/flink/state/job/job_cb48da41cb620a68170593dc09789f2b_op_KeyedCoProcessOperator_1c449adacf198fac8b664046293f1fdf__2_4__uuid_4c6051b6-b588-4bcb-a740-fe4ddb01beae/chk-8.tmp/000014.sst: /data/flink/state/job/job_cb48da41cb620a68170593dc09789f2b_op_KeyedCoProcessOperator_1c449adacf198fac8b664046293f1fdf__2_4__uuid_4c6051b6-b588-4bcb-a740-fe4ddb01beae/db/000014.sst: Operation not supported
at org.rocksdb.Checkpoint.createCheckpoint(Native Method) ~[flink-dist-1.17.1.jar:1.17.1]
at org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:51) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.takeDBNativeCheckpoint(RocksDBSnapshotStrategyBase.java:170) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.syncPrepareResources(RocksDBSnapshotStrategyBase.java:156) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase.syncPrepareResources(RocksDBSnapshotStrategyBase.java:76) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:593) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:246) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:336) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:715) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:350) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1299) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1287) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1244) ~[flink-dist-1.17.1.jar:1.17.1]
... 23 more
Я не уверен, в чем проблема, но я бы начал с удаления всей конфигурации из кода. Это не обязательно, но было бы чище.
В частности, эти два элемента настраивают одно и то же, но не устанавливают одно и то же значение, поэтому это сбивает с толку:
state.backend.rocksdb.localdir: "/data/flink/state/local
embeddedRocksDb.setDbStoragePath("file:///data/flink/state/job/");
Другая часть конфигурации — включение контрольных точек, что можно сделать через
execution.checkpointing.interval: 5000
Еще один фактор, который следует учитывать, заключается в том, что EmbeddedRocksDBStateBackend
не нужен постоянный том — он может использовать эфемерное хранилище, поскольку Flink для восстановления полагается на контрольные точки, а не на локальный диск.
Если размер состояния превысит размер хранилища, произойдет сбой. Вы можете избежать этого сбоя или восстановиться после него, выполнив масштабирование до большего количества узлов или перенастроив локальный каталог в этой точке.
Привет @david, мы использовали ПВХ, потому что диск эластичен и будет расширяться при увеличении размера состояния. хотел знать, каково будет поведение, если размер состояния станет больше, чем эфемерное хранилище?