[Spark][pyspark]cache persist checkpoint 对RDD与DataFrame的使用记录
结论
- cache操作通过调用persist实现,默认将数据持久化至内存(RDD)内存和硬盘(DataFrame),效率较高,存在内存溢出等潜在风险。
- persist操作可通过参数调节持久化地址,内存,硬盘,堆外内存,是否序列化,存储副本数,存储文件为临时文件,作业完成后数据文件自动删除。
- checkpoint操作,将数据持久化至硬盘,会切断血缘,存在磁盘IO操作,速度较慢,作业完成后数据文件不会自动删除。
注:
- 当我们对RDD进行checkpoint操作时,只是暂时加上标记,表明该RDD需要被checkpoint,在之后的action操作后,runJob计算完RDD后,才会进行doCheckpoint操作,也就是数据进行持久化的过程,RDD需要被重新生成,第二次计算或者从persist后的存储区中读取。
- DataFrame在进行checkpoint操作时,默认参数eager=True,会立刻进行一次count的action,这样就完成了DataFrame数据的获取,然后返回一个新的DataFrame,以此清理掉了前序依赖,降低DAG和physicalPlan复杂度。
持久化数据原因——lazy evaluation
Spark框架有惰性评估(lazy evaluation)性质,也称懒执行性质,懒执行就是等到绝对需要时才执行计算。在Spark中,当用户表达一些对数据的操作时,不是立即修改数据,而是建立一个作用到原始数据的转换计划。Spark首先会将这个计划编译为可以在集群中高效运行的流水线式的物理执行计划,然后等待,直到最后时刻才开始执行计算。
lazy evaluation的意义与影响:
- 不运行action就不触发计算,避免了大量的无意义的计算,即避免了大量的无意义的中间结果的产生,即避免产生无意义的磁盘I/O及网络传输
- 更深层次的意义在于,执行运算时,看到之前的计算操作越多,执行优化的可能性就越高
- 不保留数据只保留操作流程的性质使得对象可重用,但数据不可重用,使得某些场景下需要使用持久化
常见使用场景
1. 存在递归式Join操作 eg:[Sample,feature1,feature2,...,featureN]→Table
由于Join属于transformation算子,不属于action算子,由于懒执行性质,每一次的join并不会执行,只是记录执行计划,在最后table.show()时才会执行,这导致spark会在重复join操作时形成十分复杂的依赖关系。由于存在复杂的依赖关系,在引擎进行计算时,会不停出现资源申请和回收操作,最终导致任务的失败,加入持久化算子后,如DF.persist()后,会添加-InMemoryTableScan依赖,达到数据的持久化,避免了每次都需要从头运行的问题。
2. 存在数据重用 eg:df -操作A->A结果;df-操作B→B结果
由于懒执行性质,对象可以重用,但数据并不能重用,如果需要对同一个RDD/DF进行不同操作以得到不同结果时,若不进行持久化操作,将会从头开始计算数据,使用数据持久化可以避免对一个RDD反复进行计算。提高作业运行效率。
3. 注重数据安全性 eg: 数据可能丢失的情况下,还要保证高性能
当数据丢失时,Spark会依据数据的依赖重新计算数据,若想保证运行效率,则需要对一些关键数据进行checkpoint操作,写入HDFS或本地磁盘中,保证数据的安全。
RDD的持久化
持久化使用记录
点击查看RDD重用demo
import findspark #如果要使用findspark配置,必须写在import pyspark之前
spark_home = r'D:\Programs\spark-2.4.5-bin-hadoop2.7'
python_home = r'D:\Programs\anaconda3\python'
findspark.init(spark_home,python_home)
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('testPersist')\
.config("master","local[4]")\
.enableHiveSupport()\
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("Error")
sc.setCheckpointDir('D:\\code')
l = ["hello python","hello spark"]
rdd = sc.parallelize(l)
print(rdd.collect())
flatmaprdd = rdd.flatMap(lambda x:x.split(' '))
def fun(x):
print("fun is call")
return (x,1)
maprdd = flatmaprdd.map(fun)
reducerdd = maprdd.reduceByKey(lambda x,y:x+y)
reducerdd.collect() #action算子,触发执行
grouprdd = maprdd.groupByKey()
grouprdd.collect()
#############################
##output--除去stage[Stage 1:> (0 + 8) / 8]
fun is call
fun is call
fun is call
fun is call
fun is call
fun is call
fun is call
fun is call
#不使用持久化,使得作业在执行ruducerdd.collect()与grouprdd.collect()时都会从头运行,这导致map被执行了8次
#############################
import findspark
spark_home = r'D:\Programs\spark-2.4.5-bin-hadoop2.7'
python_home = r'D:\Programs\anaconda3\python'
findspark.init(spark_home,python_home)
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('testPersist')\
.config("master","local[4]")\
.enableHiveSupport()\
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("Error")
sc.setCheckpointDir('D:\\code')
l = ["hello python","hello spark"]
rdd = sc.parallelize(l)
print(rdd.collect())
flatmaprdd = rdd.flatMap(lambda x:x.split(' '))
def fun(x):
print("fun is call")
return (x,1)
maprdd = flatmaprdd.map(fun)
maprdd.cache() #执行持久化
reducerdd = maprdd.reduceByKey(lambda x,y:x+y)
reducerdd.collect() #action算子,触发执行
grouprdd = maprdd.groupByKey()
grouprdd.collect()
##############################
##output
fun is call
fun is call
fun is call
fun is call
#使用rdd.cache()操作,完成了数据的重用,map操作仅执行4次
##############################
RDD下三种持久化的区别
cache pyspark文档 源码 demo
Persist this RDD with the default storage level (MEMORY_ONLY).
RDD cache
def cache(self):
"""
Persist this RDD with the default storage level (`MEMORY_ONLY`).
"""
self.is_cached = True
self.persist(StorageLevel.MEMORY_ONLY)
return self
- cache属于懒执行算子,需要进行action操作后才会在内存中持久化数据,会为rdd添加血缘关系,todebugstring()输出如下,在rdd.cache()执行后并没有增加血缘关系,而执行action算子后,多出一个-CachedPartitions: 8; MemorySize: 311.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B,表示内存中有空间存储了该数据,程序会优先进行读取。
rdd.toDebugString()
import findspark
spark_home = r'D:\Programs\spark-2.4.5-bin-hadoop2.7'
python_home = r'D:\Programs\anaconda3\python'
findspark.init(spark_home,python_home)
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('testPersist')\
.config("master","local[4]")\
.enableHiveSupport()\
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("Error")
sc.setCheckpointDir('D:\\code')
l = ["hello python","hello spark"]
rdd = sc.parallelize(l)
print(rdd.collect())
flatmaprdd = rdd.flatMap(lambda x:x.split(' '))
def fun(x):
print("fun is call")
return (x,1)
maprdd = flatmaprdd.map(fun)
print(str(maprdd.toDebugString(),'utf-8'))
maprdd.cache()
# maprdd.checkpoint()
print(str(maprdd.toDebugString(),'utf-8'))
reducerdd = maprdd.reduceByKey(lambda x,y:x+y)
reducerdd.collect() #action算子,触发执行
print(str(maprdd.toDebugString(),'utf-8'))
grouprdd = maprdd.groupByKey()
###########################
##output
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 []
| ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 []
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 [Memory Serialized 1x Replicated]
| ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 [Memory Serialized 1x Replicated]
fun is call
fun is call
fun is call
fun is call
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 [Memory Serialized 1x Replicated]
| CachedPartitions: 8; MemorySize: 311.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 [Memory Serialized 1x Replicated]
persist pyspark文档 源码
Set this RDD’s storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_ONLY).
RDD persist
def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):
"""
Set this RDD's storage level to persist its values across operations
after the first time it is computed. This can only be used to assign
a new storage level if the RDD does not have a storage level set yet.
If no storage level is specified defaults to (`MEMORY_ONLY`).
Examples
--------
>>> rdd = sc.parallelize(["b", "a", "c"])
>>> rdd.persist().is_cached
True
"""
self.is_cached = True
javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
self._jrdd.persist(javaStorageLevel)
return self
StorageLevel (_useDisk:Boolen , _useMemory:Boolen , _useOffHeap:Boolen , _deserialized:Boolen , _replication:Int=1)
是否存入磁盘,是否存入内存,是否使用堆外内存,是否不进行序列化,副本数
checkpoint pyspark文档 源码 demo
Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint directory set with SparkContext.setCheckpointDir() and all references to its parent RDDs will be removed. This function must be called before any job has been executed on this RDD. It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.
RDD checkpoint
def checkpoint(self):
"""
Mark this RDD for checkpointing. It will be saved to a file inside the
checkpoint directory set with :meth:`SparkContext.setCheckpointDir` and
all references to its parent RDDs will be removed. This function must
be called before any job has been executed on this RDD. It is strongly
recommended that this RDD is persisted in memory, otherwise saving it
on a file will require recomputation.
"""
self.is_checkpointed = True
self._jrdd.rdd().checkpoint()
toDebugstring
import findspark
spark_home = r'D:\Programs\spark-2.4.5-bin-hadoop2.7'
python_home = r'D:\Programs\anaconda3\python'
findspark.init(spark_home,python_home)
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('testPersist')\
.config("master","local[4]")\
.enableHiveSupport()\
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("Error")
sc.setCheckpointDir('D:\\code')
list0 = ["hello python","hello spark"]
rdd0 = sc.parallelize(list0)
print(rdd0.collect())
flatmaprdd = rdd0.flatMap(lambda x:x.split(' '))
def fun(x):
print("fun is call")
return (x,1)
maprdd = flatmaprdd.map(fun)
print(str(maprdd.toDebugString(),'utf-8'))
# maprdd.persist()
maprdd.checkpoint()
print(str(maprdd.toDebugString(),'utf-8'))
print('-----------------------------------')
reducerdd = maprdd.reduceByKey(lambda x,y:x+y)
reducerdd.collect() #action算子,触发执行
print(str(maprdd.toDebugString(),'utf-8'))
grouprdd = maprdd.groupByKey()
grouprdd.collect()
##############################
##output
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 []
| ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 []
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 []
| ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 []
-----------------------------------
fun is call
fun is call
fun is call
fun is call
fun is call
fun is call
fun is call
fun is call
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 []
| ReliableCheckpointRDD[7] at collect at D:/code/Test_CheckPoint.py:74 []
2.checkpoint()为了保证数据的安全性,在存储的时候会重新进行数据的获取,所以有8个fun is call,并且是懒执行,在运行到.collect()才会触发checkpoint操作
如果不希望checkpoint()重新对数据进行计算,可结合cache()/persist()使用
checkpoint&persist
list0 = ["hello python","hello spark"]
rdd0 = sc.parallelize(list0)
print(rdd0.collect())
flatmaprdd = rdd0.flatMap(lambda x:x.split(' '))
def fun(x):
print("fun is call")
return (x,1)
maprdd = flatmaprdd.map(fun)
print(str(maprdd.toDebugString(),'utf-8'))
maprdd.persist()
maprdd.checkpoint()
print(str(maprdd.toDebugString(),'utf-8'))
print('-----------------------------------')
reducerdd = maprdd.reduceByKey(lambda x,y:x+y)
print(reducerdd.collect()) #action算子,触发执行
#maprdd.unpersist()
print(str(maprdd.toDebugString(),'utf-8'))
grouprdd = maprdd.groupByKey()
##############################
##output
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 []
| ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 []
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 [Memory Serialized 1x Replicated]
| ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 [Memory Serialized 1x Replicated]
-----------------------------------
fun is call
fun is call
fun is call
fun is call
[('python', 1), ('hello', 2), ('spark', 1)]
(8) PythonRDD[1] at RDD at PythonRDD.scala:53 [Memory Serialized 1x Replicated]
| CachedPartitions: 8; MemorySize: 311.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ReliableCheckpointRDD[7] at collect at D:/code/Test_CheckPoint.py:74 [Memory Serialized 1x Replicated]
DataFrame的持久化
持久化操作Demo
df持久化
import findspark
spark_home = r'D:\Programs\spark-2.4.5-bin-hadoop2.7'
python_home = r'D:\Programs\anaconda3\python'
findspark.init(spark_home,python_home)
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('testPersist')\
.config("master","local[4]")\
.enableHiveSupport()\
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("Error")
sc.setCheckpointDir('D:\\code')
rdd1 = sc.parallelize([('Bob',13,65),('Alice',9,35),('Elf',45,24)])
rdd2 = sc.parallelize([('Bob',131,651),('Alice',91,351),('Elf',451,241)])
rdd3 = sc.parallelize([('Bob',132,652),('Alice',92,352),('Elf',452,242)])
rdd4 = sc.parallelize([('Bob',133,653),('Alice',93,353),('Elf',453,243)])
df1 = rdd1.toDF(['name','a','b'])
# df1.checkpoint()
df2 = rdd2.toDF(['name','aa','bb'])
# df2.checkpoint()
df3 = rdd3.toDF(['name','aaa','bbb'])
# df3.checkpoint()
df4 = rdd4.toDF(['name','aaaa','bbbb'])
# df4.checkpoint()
tmp1 = df1.join(df2,['name'],'inner')
# tmp1.persist()
# tmp1.checkpoint()
tmp1.show()
tmp2 = df3.join(df4,['name'],'inner')
# tmp2.persist()
# tmp2.checkpoint()
tmp2.show()
final = tmp1.join(tmp2,['name'],'inner')
# final.checkpoint()
final.explain()
final.show()
##############################
== Physical Plan ==
*(11) Project [name#0, a#1L, b#2L, aa#7L, bb#8L, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
+- *(11) SortMergeJoin [name#0], [name#12], Inner
:- *(5) Project [name#0, a#1L, b#2L, aa#7L, bb#8L]
: +- *(5) SortMergeJoin [name#0], [name#6], Inner
: :- *(2) Sort [name#0 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(name#0, 200)
: : +- *(1) Filter isnotnull(name#0)
: : +- Scan ExistingRDD[name#0,a#1L,b#2L]
: +- *(4) Sort [name#6 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#6, 200)
: +- *(3) Filter isnotnull(name#6)
: +- Scan ExistingRDD[name#6,aa#7L,bb#8L]
+- *(10) Project [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
+- *(10) SortMergeJoin [name#12], [name#18], Inner
:- *(7) Sort [name#12 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#12, 200)
: +- *(6) Filter isnotnull(name#12)
: +- Scan ExistingRDD[name#12,aaa#13L,bbb#14L]
+- *(9) Sort [name#18 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#18, 200)
+- *(8) Filter isnotnull(name#18)
+- Scan ExistingRDD[name#18,aaaa#19L,bbbb#20L]
##############################
import findspark
spark_home = r'D:\Programs\spark-2.4.5-bin-hadoop2.7'
python_home = r'D:\Programs\anaconda3\python'
findspark.init(spark_home,python_home)
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('testPersist')\
.config("master","local[4]")\
.enableHiveSupport()\
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("Error")
sc.setCheckpointDir('D:\\code')
rdd1 = sc.parallelize([('Bob',13,65),('Alice',9,35),('Elf',45,24)])
rdd2 = sc.parallelize([('Bob',131,651),('Alice',91,351),('Elf',451,241)])
rdd3 = sc.parallelize([('Bob',132,652),('Alice',92,352),('Elf',452,242)])
rdd4 = sc.parallelize([('Bob',133,653),('Alice',93,353),('Elf',453,243)])
df1 = rdd1.toDF(['name','a','b'])
# df1.checkpoint()
df2 = rdd2.toDF(['name','aa','bb'])
# df2.checkpoint()
df3 = rdd3.toDF(['name','aaa','bbb'])
# df3.checkpoint()
df4 = rdd4.toDF(['name','aaaa','bbbb'])
# df4.checkpoint()
tmp1 = df1.join(df2,['name'],'inner')
tmp1.persist()
tmp1 = tmp1.checkpoint()
tmp1.show()
tmp2 = df3.join(df4,['name'],'inner')
tmp2.persist()
tmp2 = tmp2.checkpoint()
tmp2.show()
final = tmp1.join(tmp2,['name'],'inner')
# final.checkpoint()
final.explain()
final.show()
## 使用persist()后的执行计划
== Physical Plan ==
*(2) Project [name#0, a#1L, b#2L, aa#7L, bb#8L, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
+- *(2) BroadcastHashJoin [name#0], [name#12], Inner, BuildRight
:- *(2) Filter isnotnull(name#0)
: +- InMemoryTableScan [name#0, a#1L, b#2L, aa#7L, bb#8L], [isnotnull(name#0)]
: +- InMemoryRelation [name#0, a#1L, b#2L, aa#7L, bb#8L], StorageLevel(disk, memory, 1 replicas)
: +- *(5) Project [name#0, a#1L, b#2L, aa#7L, bb#8L]
: +- *(5) SortMergeJoin [name#0], [name#6], Inner
: :- *(2) Sort [name#0 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(name#0, 200)
: : +- *(1) Filter isnotnull(name#0)
: : +- Scan ExistingRDD[name#0,a#1L,b#2L]
: +- *(4) Sort [name#6 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#6, 200)
: +- *(3) Filter isnotnull(name#6)
: +- Scan ExistingRDD[name#6,aa#7L,bb#8L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *(1) Filter isnotnull(name#12)
+- InMemoryTableScan [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L], [isnotnull(name#12)]
+- InMemoryRelation [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L], StorageLevel(disk, memory, 1 replicas)
+- *(5) Project [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
+- *(5) SortMergeJoin [name#12], [name#18], Inner
:- *(2) Sort [name#12 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#12, 200)
: +- *(1) Filter isnotnull(name#12)
: +- Scan ExistingRDD[name#12,aaa#13L,bbb#14L]
+- *(4) Sort [name#18 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#18, 200)
+- *(3) Filter isnotnull(name#18)
+- Scan ExistingRDD[name#18,aaaa#19L,bbbb#20L]
#############################
##使用checkpoint后的执行计划
== Parsed Logical Plan ==
'Join UsingJoin(Inner,Buffer(name))
:- LogicalRDD [name#0, a#1L, b#2L, aa#7L, bb#8L], false
+- LogicalRDD [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L], false
== Analyzed Logical Plan ==
name: string, a: bigint, b: bigint, aa: bigint, bb: bigint, aaa: bigint, bbb: bigint, aaaa: bigint, bbbb: bigint
Project [name#0, a#1L, b#2L, aa#7L, bb#8L, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
+- Join Inner, (name#0 = name#12)
:- LogicalRDD [name#0, a#1L, b#2L, aa#7L, bb#8L], false
+- LogicalRDD [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L], false
== Optimized Logical Plan ==
Project [name#0, a#1L, b#2L, aa#7L, bb#8L, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
+- Join Inner, (name#0 = name#12)
:- Filter isnotnull(name#0)
: +- LogicalRDD [name#0, a#1L, b#2L, aa#7L, bb#8L], false
+- Filter isnotnull(name#12)
+- LogicalRDD [name#12, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L], false
== Physical Plan ==
*(3) Project [name#0, a#1L, b#2L, aa#7L, bb#8L, aaa#13L, bbb#14L, aaaa#19L, bbbb#20L]
+- *(3) SortMergeJoin [name#0], [name#12], Inner
:- *(1) Filter isnotnull(name#0)
: +- Scan ExistingRDD[name#0,a#1L,b#2L,aa#7L,bb#8L]
+- *(2) Filter isnotnull(name#12)
+- Scan ExistingRDD[name#12,aaa#13L,bbb#14L,aaaa#19L,bbbb#20L]
-
由于dataframe没有todebugstring() 所以使用explain()代替,观察执行计划
-
使用persist()后执行计划中多出InMemoryTableScan条目
-
使用checkpoint()后执行计划截断了tmp1和tmp2的执行,转而变成了 +- Scan ExistingRDD[name#12,aaa#13L,bbb#14L,aaaa#19L,bbbb#20L]
DAG图记录
不使用Checkpoint的DAG
使用Checkpoint的DAG
job checkpoint persist() 使用与否的DAG图
重要注:
DataFrame的checkpoint与RDD的实现不同
DataFrame返回了一个新对象
df&rdd checkpoint
rdd = spark.sparkContext.parallelize([[1,2,3]])
persisted_rdd = rdd.persist()
rdd == persisted_rdd
out: True
df = rdd.toDF(['A','B','C'])
spark.sparkContext.setCheckpointDir('.')
checkpointed_df = df.checkpoint()
checkpointed_df == df
out:False
type(checkpointed_df)
out:DataFrame
checkpointed_rdd = rdd.checkpoint()
checkpointed_rdd == rdd
out:False
type(checkpointed_rdd)
out:NoneType
Dataframe的三种持久化的区别
cache pyspark文档 源码
Persists the DataFrame with the default storage level (MEMORY_AND_DISK).
df cache
def cache(self):
"""Persists the :class:`DataFrame` with the default storage level (`MEMORY_AND_DISK`).
.. versionadded:: 1.3.0
Notes
-----
The default storage level has changed to `MEMORY_AND_DISK` to match Scala in 2.0.
"""
self.is_cached = True
self._jdf.cache()
return self
persist pyspark文档 源码
Sets the storage level to persist the contents of the DataFrame across operations after the first time it is computed. This can only be used to assign a new storage level if the DataFrame does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_AND_DISK_DESER)
df persist
def persist(self, storageLevel=StorageLevel.MEMORY_AND_DISK_DESER):
"""Sets the storage level to persist the contents of the :class:`DataFrame` across
operations after the first time it is computed. This can only be used to assign
a new storage level if the :class:`DataFrame` does not have a storage level set yet.
If no storage level is specified defaults to (`MEMORY_AND_DISK_DESER`)
.. versionadded:: 1.3.0
Notes
-----
The default storage level has changed to `MEMORY_AND_DISK_DESER` to match Scala in 3.0.
"""
self.is_cached = True
javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel)
self._jdf.persist(javaStorageLevel)
return self
checkpoint pyspark文档 源码
Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the logical plan of this DataFrame, which is especially useful in iterative algorithms where the plan may grow exponentially. It will be saved to files inside the checkpoint directory set with SparkContext.setCheckpointDir().
df checkpoint
def checkpoint(self, eager=True):
"""Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the
logical plan of this :class:`DataFrame`, which is especially useful in iterative algorithms
where the plan may grow exponentially. It will be saved to files inside the checkpoint
directory set with :meth:`SparkContext.setCheckpointDir`.
.. versionadded:: 2.1.0
Parameters
----------
eager : bool, optional
Whether to checkpoint this :class:`DataFrame` immediately
Notes
-----
This API is experimental.
"""
jdf = self._jdf.checkpoint(eager)
return DataFrame(jdf, self.sql_ctx)
##############################################################
/**
* Returns a checkpointed version of this Dataset.
*
* @param eager Whether to checkpoint this dataframe immediately
* @param reliableCheckpoint Whether to create a reliable checkpoint saved to files inside the
* checkpoint directory. If false creates a local checkpoint using
* the caching subsystem
*/
private def checkpoint(eager: Boolean, reliableCheckpoint: Boolean): Dataset[T] = {
val actionName = if (reliableCheckpoint) "checkpoint" else "localCheckpoint"
withAction(actionName, queryExecution) { physicalPlan =>
val internalRdd = physicalPlan.execute().map(_.copy())
if (reliableCheckpoint) {
internalRdd.checkpoint()
} else {
internalRdd.localCheckpoint()
}
if (eager) {
internalRdd.count()
}
参考资料
https://blog.csdn.net/liuwei063608/article/details/79805901 关于checkpoint的探索
三种持久化方法的对比
https://blog.csdn.net/don_chiang709/article/details/84065510 Storage子模块分析
http://spark.apache.org/docs/latest/api/python/search.html?q=checkpoint# pyspark源码
https://github.com/apache/spark/blob/v3.1.2/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala spark源码