我正在对Spark使用缓存。现在,我使用了几个缓存,其中一些内存约为20 in。我先是尝试了cache(),然后又尝试了persist和MEMORY_SER,它的大小很大,所以我改成了java序列化,其中一些序列化达到了20 of左右。现在,我想使用Kryo,我已经注册了类,我没有得到任何错误,但它的大小与我在大多数缓存中使用Kryo执行它时相同。
我想缓存的一些对象如下所示:
case class ObjectToCache(id: Option[Long],
listObject1: Iterable[ObjectEnriched],
mp1: Map[String, ObjectEnriched2],
mp2: Map[String, ObjectEnriched3],
mp3: Map[String, ObjectEnriched4])我已经在Kryo注册为:
kryo.register(classOf[Iterable[ObjectEnriched2]])
kryo.register(classOf[Map[String,ObjectEnriched3]])
kryo.register(classOf[Map[String,ObjectEnriched4]])
kryo.register(ObjectEnriched)
kryo.register(ObjectEnriche2)
kryo.register(ObjectEnriched3)
kryo.register(ObjectEnriched4)我做错了什么吗?有没有办法知道它是否使用了Kryo?我认为它在使用,因为在某种程度上,我得到了一个错误,因为我没有留下空间,如下所示:
Serialization trace:
mp1 (ObjectEnriched)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:183)
at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)我正在使用RDD和Spark Streaming。
发布于 2018-08-10 23:37:15
要检查数据帧( DF )是否被缓存,或者不仅仅是通过调用操作df.show来触发缓存,并检查http://localhost:4040/storage中的spark UI以查看DF是否为cached.You,应该可以在那里看到。
您还可以使用queryExecution或explain来查看InMemoryRelation
scala> df.queryExecution.withCachedData范围: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = InMemoryRelation id#0L,true,10000,StorageLevel(磁盘,内存,反序列化,1个副本) +- *范围(0,1,step=1,splits=Some(8))
也可以尝试使用Datasets而不是DataFrame。DataSet不使用标准的序列化方法。它们使用专门的列存储和自己的压缩方法,您甚至不需要使用Kryo序列化程序存储数据集。
https://stackoverflow.com/questions/51788731
复制相似问题