У меня есть следующее потоковое приложение, которое читает сообщения Protobuf из темы Kafka и записывает их в приемник паркета FileSystem:
class ProtoDeserializer extends DeserializationSchema[User] {
override def getProducedType: TypeInformation[User] = TypeInformation.of(classOf[User])
override def deserialize(message: Array[Byte]): User =
User.parseFrom(message.slice(6, message.length))
override def isEndOfStream(nextElement: User): Boolean = false
}
object StreamingKafkaProtoToParquetLocalFs {
private val brokers = "localhost:9092"
private val topic = "test-topic-proto"
private val consumerGroupId = "test-consumer-proto"
private val targetPath = "file:///my/path"
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1))
env.getCheckpointConfig.setCheckpointStorage(s"$targetPath/checkpoints")
env.getConfig.registerTypeWithKryoSerializer(classOf[User], classOf[ProtobufSerializer])
val source = KafkaSource.builder[User]
.setBootstrapServers(brokers)
.setTopics(topic)
.setGroupId(consumerGroupId)
.setValueOnlyDeserializer(new ProtoDeserializer)
.setStartingOffsets(OffsetsInitializer.earliest())
.build
val input: DataStream[User] = env.fromSource(source, WatermarkStrategy.noWatermarks[User], "KafKaTable")
val sink: StreamingFileSink[User] = StreamingFileSink
.forBulkFormat(new Path(s"$targetPath/data"), ParquetProtoWriters.forType(classOf[User]))
.build()
input.addSink(sink)
env.execute()
}
}
Когда я запускаю программу, я вижу, что все выходные файлы, записанные в целевой путь, пусты (размер 0) и inprogress
, хотя я включил контрольную точку.
Важно отметить, что тема не пустая и когда я меняю сток на print()
сообщения печатаются корректно.
Что мне не хватает? почему print
и паркетная мойка ведут себя по-разному?
Похоже, вы используете последнюю версию Flink, поэтому попробуйте внести это изменение:
val sink: FileSink[User] = FileSink
.forBulkFormat(new Path(s"$targetPath/data"),
ParquetProtoWriters.forType(classOf[User]))
.build()
input.sinkTo(sink)
StreamingFileSink
устарела и заменяется на FileSink
.
Кажется, что явное добавление зависимости для Apache Parquet Protobuf решает проблему.
Для пользователей Maven в pom.xml
добавлена следующая зависимость:
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-protobuf</artifactId>
<version>1.11.1</version>
</dependency>
Спасибо, Дэвид, но, к сожалению, это не решило проблему. Все сгенерированные файлы по-прежнему пусты, а новый выходной файл создается ~ каждую секунду. Моя версия Flink — 1.14.2 (со scala 2.12).