DistCp 源码解析

说明

DistCp(分布式拷贝)是用于大规模集群内部和集群之间拷贝的工具。
它使用 Map/Reduce 实现文件分发,错误处理和恢复,以及报告生成。
它把文件和目录的列表作为 map 任务的输入,每个任务会完成源列表中部分文件的拷贝。
由于使用了 Map/Reduce 方法,这个工具在语义和执行上都会有特殊的地方。 这篇文档会为常用 DistCp 操作提供指南并阐述它的工作模型。

源码详解

作业启动

作业的启动主要包含初始化和作业提交,在初始化阶段主要是 list 左右需要拷贝的文件信息,根据文件信息构造 split 信息。
作业提交阶段就是根据初始化阶段构造的 split 信息,将作业提交到 Yarn 上面。

作业初始化

初始化阶段主要是 list 左右需要拷贝的文件信息,根据文件信息构造 split 信息。

DistCp 的入口函数是 main 函数,在 main 函数里面主要做了两件事:

  • 注册 Cleanup。
  • 初始化和启动作业,核心处理函数为 execute 函数里面的 createAndSubmitJob

创建 Job 对象,主要是指定 Map 的处理类,InputFormat 和 outputFormat 信息:

Job job = Job.getInstance(getConf()); job.setJobName(jobName); job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), context)); job.setJarByClass(CopyMapper.class); configureOutputFormat(job); job.setMapperClass(CopyMapper.class); job.setOutputFormatClass(CopyOutputFormat.class); job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");

根据需要拷贝的目录获取所有的文件信息。支持 snapshot 模式和普通模式。

snapshot 模式

核心函数为 SimpleCopyListing.doBuildListingWithSnapshotDiff。主要是通过 DistCpSync.getAllDiffs 获取 Snapshot 的差异文件。
差异文件主要包含创建、修改、删除类型,将差别的的文件输出到 fileList.seq 文件里面。fileList.seq 文件在 staging 目录下面的的 _distcp_随机的int值

普通模式

核心函数为 SimpleCopyListing.doBuildListing。对于非 snapshot 模式,核心处理逻辑就是通过 list 将所有的文件获取出来。添加到 fileList.seq 里面。
对于 XAttrs 等权限信息也会按照-p 参数指定的来获取。

作业提交

由于 DistCp 也是 MapReduce 作业,所以作业提交沿用了 MapReduce 作业提交的框架,对于 Map 和 Reduce 的处理类,
以及 InputFormat 和 outputFormat 都是 DistCp 自己实现的。

其中比较常用的是 DynamicInputFormat,DynamicInputFormat 主要是通过主要是按照文件数量分配的。

作业运行

AM 运行

在创建作业的时候定义了 outputFormat,在 CopyOutputFormat 中定义了 getOutputCommitter。

job.setOutputFormatClass(CopyOutputFormat.class);

Distcp 的 AM 结束时的核心处理类是 CopyCommitter。结束的时候会调用 commitJob 函数,在 commitJob 函数里面。

deleteMissing 函数

从目标端删除多余的文件,需要配置-delete 参数。

preserveFileAttributesForDirectories 函数

当前函数是用于检查并修改文件属性的功能。当前是单线程运行,在文件多的时候可能会比较慢。同步的权限包含:

  • ACL 权限。
  • 普通权限。
  • 副本数。
  • XATTR 属性。
  • 用户以及用户组。

Map 运行

Map 运行的核心类是 CopyMapper。当前类的核心函数为: setup()map()cleanup()run()

setup 函数

setup 函数主要是读取配置。setup 函数的入参是 Context,里面包含从客户端传入的配置文件信息。可以通过 context.getConfiguration() 获取。

map 函数

map 函数是复制数据的核心类,map 的入参定义如下:

public void map(Text relPath, CopyListingFileStatus sourceFileStatus, Context context) throws IOException, InterruptedException { }
  • relPath:目标文件路径。
  • sourceFileStatus: 源端文件信息,包含路径。

在 map 函数里面主要做了几件事:

  • 获取目标端文件信息,如果复制的属性里面包含 XATTR,则需要单独调用 getXAttrs 接口获取 XATTR 信息,当前会多一次请求,大大的增加复制时间。
  • 检查当前文件是否需要复制,如果需要 copy,则将文件拷贝到目标端。
  • 复制文件的属性到目标端。核心函数如下:
DistCpUtils.preserve(target.getFileSystem(conf), tmpTarget, sourceCurrStatus, fileAttributes, preserveRawXattrs);

cleanup 函数

在当前 map 结束之后调用,主要是更新统计信息,主要是复制带宽。如下:

long secs = (System.currentTimeMillis() - startEpoch) / 1000; incrementCounter(context, Counter.BANDWIDTH_IN_BYTES, totalBytesCopied / ((secs == 0 ? 1 : secs)));

run 函数

主要是 MapReduce 框架层面的逻辑,控制 map 的所有流程,在处理完成之后调用 cleanup。

setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); }

Reduce 运行

DistCp 作业没有 reduce 任务,只有 map。

  • MapReduce
    10 引用 • 1 回帖
  • Hadoop

    Hadoop 是由 Apache 基金会所开发的一个分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。

    93 引用 • 122 回帖 • 619 关注
  • distcp
    1 引用
1 操作
zeekling 在 2025-06-01 22:35:40 更新了该帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...