每个RDD在构建数据时,会根据自己来源一步步倒 导 到数据来源,然后再一步步开始构建RDD数据。
问题:如果一个RDD被触发多次,这个RDD就会按照依赖关系被构建多次,性能相对较差,怎么解决?
Spark的容错机制主要通过以下核心机制实现:
1. RDD的血缘关系(Lineage)
每个RDD都记录其父RDD的转换操作序列(称为血缘关系)。当节点故障导致数据丢失时,Spark会根据血缘关系重新计算丢失的分区数据。例如:
val rddA = sc.textFile("hdfs://data.txt") val rddB = rddA.map(_.toUpperCase) // 转换1 val rddC = rddB.filter(_.contains("SPARK")) // 转换2此时若rddC的分区丢失,系统会回溯到rddB重新执行filter转换。
2. 检查点(Checkpointing)
对于长血缘链的RDD,定期将数据持久化到可靠存储(如HDFS):
rddC.checkpoint() // 截断血缘链- 作用:避免重计算过长血缘链
- 触发条件:当RDD被多次使用或包含宽依赖转换时
3. 任务重试机制
- Executor故障:Driver重新调度受影响任务到其他Executor
- Task失败:默认重试4次(可通过
spark.task.maxFailures配置) - Stage重算:因Shuffle数据丢失时,重新计算整个Stage
4. 数据持久化级别
通过存储级别控制容错粒度:
rddC.persist(StorageLevel.MEMORY_AND_DISK_2) // 内存+磁盘+双副本常用级别:
MEMORY_ONLY:仅内存,故障需重算DISK_ONLY:磁盘持久化MEMORY_AND_DISK_2:内存+磁盘+跨节点双副本
5. DAG调度容错
Spark通过DAG调度器将作业分解为Stage: $$ \text{Stage} = \text{窄依赖转换链} + \text{Shuffle边界} $$
- 单个Task失败仅需重算所在Stage
- Shuffle数据写入持久化存储(默认
spark.shuffle.spill=true)
容错流程示例
graph LR A[节点故障] --> B[丢失RDD分区] B --> C{是否检查点?} C -->|是| D[从存储系统恢复] C -->|否| E[根据血缘重算]这种机制使得Spark能在保证效率的同时,实现分布式环境下的高容错性。