首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >压缩批次9时IllegalStateException:_spark_metadata/0不存在

压缩批次9时IllegalStateException:_spark_metadata/0不存在
EN

Stack Overflow用户
提问于 2019-05-31 15:20:13
回答 2查看 1.7K关注 0票数 6

我们有使用Spark Structured实现的流应用程序,它试图从Kafka主题读取数据并将其写入HDFS位置。

有时应用程序会失败,并出现异常:

代码语言:javascript
复制
_spark_metadata/0 doesn't exist while compacting batch 9
java.lang.IllegalStateException: history/1523305060336/_spark_metadata/9.compact doesn't exist when compacting batch 19 (compactInterval: 10)

我们无法解决此问题。

我找到的唯一解决方案是删除检查点位置文件,这将使作业在我们再次运行应用程序时从头开始读取主题/数据。然而,对于生产应用来说,这并不是一个可行的解决方案。

有没有人可以在不删除检查点的情况下解决这个错误,这样我就可以从上次运行失败的地方继续运行?

应用程序示例代码:

代码语言:javascript
复制
val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", <server list>)
  .option("subscribe", <topic>)
  .load()

[...] // do some processing

dfProcessed.writeStream
  .format("csv")
  .option("format", "append")
  .option("path",hdfsPath)
  .option("checkpointlocation","")
  .outputmode(append)
  .start
EN

回答 2

Stack Overflow用户

发布于 2021-03-18 22:27:10

错误消息

代码语言:javascript
复制
_spark_metadata/n.compact doesn't exist when compacting batch n+10

可以在以下情况下出现

  • 将一些数据处理到启用了检查点的FileSink中,然后
  • 停止您的流作业,
  • 更改FileSink的输出目录,同时保持相同的checkpointLocation,然后
  • 重新启动流作业

解决方案

由于您不想删除检查点文件,因此只需将丢失的spark元数据文件从文件接收器输出路径复制到输出路径即可。请看下面的内容,了解什么是“缺失的spark元数据文件”。

背景

要理解抛出此IllegalStateException的原因,我们需要了解所提供的文件输出路径中幕后发生的事情。让outPathBefore作为此路径的名称。当您的流式作业正在运行并处理数据时,该作业会创建一个文件夹outPathBefore/_spark_metadata。在该文件夹中,您将发现一个以微批处理标识符命名的文件,其中包含数据已写入的文件(分区文件)的列表,例如:

代码语言:javascript
复制
/home/mike/outPathBefore/_spark_metadata$ ls
0 1 2 3 4 5 6 7

在这种情况下,我们有8个微批次的详细信息。其中一个文件的内容如下所示

代码语言:javascript
复制
/home/mike/outPathBefore/_spark_metadata$ cat 0
v1
{"path":"file:///tmp/file/before/part-00000-99bdc705-70a2-410f-92ff-7ca9c369c58b-c000.csv","size":2287,"isDir":false,"modificationTime":1616075186000,"blockReplication":1,"blockSize":33554432,"action":"add"}

默认情况下,在每十个微批中,这些文件将被压缩,这意味着文件0、1、2、...、9的内容将存储在名为9.compact的压缩文件中。

该过程在随后的十个批次中连续,即在微批次19中,作业聚集最后10个文件,这些文件是9.compact、10、11、12、...、19。

现在,假设您的流式作业一直运行到微批处理15,这意味着该作业已经创建了以下文件:

代码语言:javascript
复制
/home/mike/outPathBefore/_spark_metadata/0
/home/mike/outPathBefore/_spark_metadata/1
...
/home/mike/outPathBefore/_spark_metadata/8
/home/mike/outPathBefore/_spark_metadata/9.compact
/home/mike/outPathBefore/_spark_metadata/10
...
/home/mike/outPathBefore/_spark_metadata/15

在第15个微批之后,您停止了流作业,并将文件接收器的输出路径更改为outPathAfter。由于您保持相同的checkpointLocation,流作业将继续微批次16。但是,它现在会在新的输出路径中创建元数据文件:

代码语言:javascript
复制
/home/mike/outPathAfter/_spark_metadata/16
/home/mike/outPathAfter/_spark_metadata/17
...

现在,这就是抛出异常的地方:当到达微批19时,作业尝试压缩Spark元数据文件夹中的第十个最新文件。但是,它只能找到文件16、17、18,但没有找到9.compact、10等。因此,错误消息为:

代码语言:javascript
复制
java.lang.IllegalStateException: history/1523305060336/_spark_metadata/9.compact doesn't exist when compacting batch 19 (compactInterval: 10)

文档

结构化流式编程指南在Recovery Semantics after Changes in a Streaming Query上进行了说明

“不允许对文件接收器的输出目录进行更改: sdf.writeStream.format("parquet").option("path","/somePath") to sdf.writeStream.format("parquet").option("path","/anotherPath")”

Databricks还在文章Streaming with File Sink: Problems with recovery if you change checkpoint or output directories中写了一些细节

票数 4
EN

Stack Overflow用户

发布于 2019-12-25 11:45:09

由于checkpointLocation存储旧的或已删除的数据信息而导致的checkpointLocation错误。您只需要删除包含checkpointLocation的文件夹。

了解更多信息:https://kb.databricks.com/streaming/file-sink-streaming.html

示例:

代码语言:javascript
复制
df.writeStream
      .format("parquet")
      .outputMode("append")
      .option("checkpointLocation", "D:/path/dir/checkpointLocation")
      .option("path", "D:/path/dir/output")
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .start()
      .awaitTermination()

你需要删除目录checkpointLocation

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56390492

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档