博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【Spark篇】---Spark中控制算子
阅读量:6574 次
发布时间:2019-06-24

本文共 2121 字,大约阅读时间需要 7 分钟。

一、前述

 

Spark中控制算子也是懒执行的,需要Action算子触发才能执行,主要是为了对数据进行缓存。

控制算子有三种,cache,persist,checkpoint以上算子都可以将RDD持久化持久化的单位是partitioncache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。

 

二、具体算子

 

1、 cache

 

默认将RDD的数据持久化到内存cache是执行

chche () = persist()=persist(StorageLevel.Memory_Only)

 

SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("CacheTest"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD
lines = jsc.textFile("./NASA_access_log_Aug95"); lines = lines.cache(); long startTime = System.currentTimeMillis(); long count = lines.count();//count是action算子,到这里才能触发cache执行,所以这一次coun加载是从磁盘读数据,然后拉回到drive端。 long endTime = System.currentTimeMillis(); System.out.println("共"+count+ "条数据,"+"初始化时间+cache时间+计算时间="+ (endTime-startTime)); long countStartTime = System.currentTimeMillis(); long countrResult = lines.count();//这一次是从内存种中读数据 long countEndTime = System.currentTimeMillis(); System.out.println("共"+countrResult+ "条数据,"+"计算时间="+ (countEndTime- countStartTime)); jsc.stop();

 

2、persist(可以指定持久化的级别

 

解释:

 

1、MEMORY_AND_DISK 意思是先往内存中放数据,内存不够再放磁盘

2、最常用的是MEMORY_ONLYMEMORY_AND_DISK”_2”表示有副本数。

3、选择的原则是:首先考虑内存,然后考虑序列化之后再放入内存,最后考虑内存加磁盘。

4、尽量避免使用“_2”和DISK_ONLY级别。

5、deserialized是不序列化的意思。

 

注意事项:

 

  1. 1、cache和persist都是懒执行,必须有一个action类算子触发执行。
  2. 2、cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。
  3. 3、cache和persist算子后不能立即紧跟action算子。

错误:

rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了

 

3、 Checkpoint(对Lineage非常长时使用

 

     1、概念和特征:

 

           不仅可以将数据持久化到磁盘,还可以切断RDD之间的依赖关系,checkpoint也是懒执行。

            Checkpoin不仅存储结果,还会存储逻辑,还可以存储元数据。

            Persisit切断不了RDD的依赖关系。

 

     2、checkpoint 的执行原理:

 

           2.1.Spark job执行完之后,spark会从finalRDD从后往前回溯。

           2.2.当回溯到对某个RDD进行了checkpoint,会对这个RDD标记。

           2.3.回溯完成之后,Spark会重新计算标记RDD的结果,然后将结果保存到Checkpint目录中。

 

   3、优化checekpoint

 

  • 因为最后是要触发当前application的action算子,所以在触发之前加一层cache操作,一样会往前执行cache操作,实现对数据的cache ,所以考虑将cache优化到checkpoin的优化流程里。
  • RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job(回溯完成之后重新开的job)只需要将内存中的数据(cache缓存好的checkpoint那个点的数据)拷贝到HDFS上就可以。
  • 省去了重新计算这一步,不需要重头开始来走到checkpoint这个点了。

 

总结:

 

持久化的最小单位是partition!!!

 

 

转载于:https://www.cnblogs.com/LHWorldBlog/p/8401983.html

你可能感兴趣的文章
jsp打印
查看>>
从类开始
查看>>
iOS中真机连接电脑运行程序出现问题
查看>>
java安卓如何实现定义接口
查看>>
Union大小
查看>>
南邮CTF--bypass again
查看>>
函数的渐近增长
查看>>
动态参数
查看>>
FirewallD常用命令及设置
查看>>
Slight difference between C++ and C
查看>>
c++类的嵌套(1)
查看>>
Android SqlLite数据库的创建、增、删、改、查、使用事务
查看>>
phpStorm无法使用svn1.8的解决办法
查看>>
Talk is cheap,show me the code
查看>>
[Java]知乎下巴第3集:来人啊快把知乎的答案装到篮子里去
查看>>
解决中文乱码的问题
查看>>
前端异常测试
查看>>
JSON与localStorage的爱恨情仇
查看>>
input验证码框,输入非数字或非12位时,红框提示;每4位加一个空格
查看>>
IOS上iframe的滚动条失效的解决办法
查看>>