我在星火中有一个StreamingQuery (v2.2.0),也就是说,
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.load()
val query = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("parquet")
.option("checkpointLocation", "s3n://bucket/checkpoint/test")
.option("path", "s3n://bucket/test")
.start()当我运行query时,数据就会保存在AWS S3上,并且在s3n://bucket/checkpoint/test上创建检查点。但是,我还收到日志中的警告:
警告o.a.s.s.e.streaming.OffsetSeqLog无法使用FileContext API来管理路径s3n://bucket/checpoint/test/偏移量处的元数据日志文件。使用FileSystem API来管理日志文件。如果发生故障,日志可能不一致。
我无法理解为什么会出现这个警告。此外,如果出现故障,我的检查点会不会不一致?
有人能帮我解决这个问题吗?
发布于 2017-12-12 05:26:36
查看源代码,此错误来自HDFSMetadataLog类。守则中有一条注释指出:
注意:[HDFSMetadataLog]不支持类似S3的文件系统,因为它们不能保证目录中列出的文件总是显示最新的文件。
因此,问题在于使用AWS S3,它将迫使您使用FileSystemManager API。检查那个类的注释,我们看到,
使用旧的FileManager API实现FileSystem。请注意,此实现不能提供路径的原子重命名,因此可能导致一致性问题。如果不能使用FileContextManager,则只应将其用作备份选项。
因此,当多个作者希望并发重命名操作时,可能会出现一些问题。有一个相关的票证这里,然而,它已经关闭,因为问题不能解决在星火。
如果需要在S3上进行检查点,需要考虑以下几点:
发布于 2017-12-12 12:15:37
对对象存储的真正检查需要进行不同的操作。如果仔细观察,就会发现没有rename(),但是现有的代码都希望它是一个O(1)原子操作。
https://stackoverflow.com/questions/47765493
复制相似问题