Я запускаю Flink 1.15.2 и пытаюсь определить фабрику пользовательских параметров в RocksDB, чтобы отключить кеш блоков.
Следуя примеру из этого сообщения в блоге: https://shopify.engineering/optimizing-apache-flink-applications-tips
Однако мое приложение Flink отказывается запускаться после добавления OptionsFactory
в мой env. Ошибка, кажется, исходит из этой строки
https://github.com/facebook/rocksdb/blob/main/table/block_based/block_based_table_factory.cc#L599
Enable cache_index_and_filter_blocks, , but block cache is disabled
Я понятия не имею, что здесь пошло не так, что-то должно переопределять/игнорировать параметры моего столбца, но я не знаю что.
Буду признателен за любую оказанную помощь.
Конфиг тут:
class NoBlockCacheRocksDbOptionsFactory extends ConfigurableRocksDBOptionsFactory {
override def createDBOptions(currentOptions: DBOptions, handlesToClose: util.Collection[AutoCloseable]): DBOptions = {
currentOptions.setMaxBackgroundJobs(20) // state.backend.rocksdb.thread.num
currentOptions
}
override def createColumnOptions(
currentOptions: ColumnFamilyOptions,
handlesToClose: util.Collection[AutoCloseable]): ColumnFamilyOptions = {
val blockBasedTableConfig = new BlockBasedTableConfig()
.setNoBlockCache(true)
.setBlockCache(null)
.setCacheIndexAndFilterBlocks(false)
.setCacheIndexAndFilterBlocksWithHighPriority(false)
.setPinL0FilterAndIndexBlocksInCache(false)
currentOptions.setTableFormatConfig(blockBasedTableConfig)
}
override def configure(configuration: ReadableConfig): RocksDBOptionsFactory = {
this
}
}
Ошибка и трассировка стека:
Caused by: org.apache.flink.util.SerializedThrowable: Enable cache_index_and_filter_blocks, , but block cache is disabled
at org.rocksdb.RocksDB.open(Native Method) ~[flink-dist-1.15.2.jar:1.15.2]
at org.rocksdb.RocksDB.open(RocksDB.java:306) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:80) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:134) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:113) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation.restore(RocksDBNoneRestoreOperation.java:62) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:315) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) ~[flink-dist-1.15.2.jar:1.15.2]
... 11 more
попробуйте установить для конфигурации flink state.backend.rocksdb.memory.managed значение false вместо значения true по умолчанию. Этот параметр переопределяет конфигурации, установленные как часть фабрики параметров rockdb. Мне удалось запустить сообщение о вакансии, отключив изменение этого значения.
Спасибо! Это именно то, чего мне не хватало.