[关闭]
@hadoopMan 2018-09-13T07:50:58.000000Z 字数 6061 阅读 1985

spark源码阅读之sparkrdd读取hdfs上textFile和sequenceFile

spark源码阅读


1,textFile入口

  1. sc.textFile(args(0))

2,textFile源码

textFile的默认参数

  1. minPartitions: Int = defaultMinPartitions

其意义

  1. defaultParallelism=conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
  2. defaultMinPartitions=math.min(defaultParallelism, 2)

接着就是构建了一个
hadoopFile对象

3,hadoopFile源码

首先是,将hadoop的配置文件序列化,然后封装成广播变量

  1. val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))

其次是,构建设置输入路径的函数。

  1. val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
  2. 其最终,将输入path处理后调用的方法是
  3. conf.set(org.apache.hadoop.mapreduce.lib.input.
  4. FileInputFormat.INPUT_DIR, str.toString());

该参数传入hadoordd后会在构建对象的时候生成闭包

  1. sparkContext.clean(initLocalJobConfFuncOpt.get)

最后构建了一个

  1. new HadoopRDD(
  2. this,
  3. confBroadcast,
  4. Some(setInputPathsFunc),
  5. inputFormatClass,
  6. keyClass,
  7. valueClass,
  8. minPartitions).setName(path)

4,进入hadoopRDD

1,getPartitions

首先获取job的配置文件

  1. val jobConf = getJobConf()

接着获取输入格式

  1. val inputFormat = getInputFormat(jobConf)

获取分片信息

  1. val inputSplits = inputFormat.getSplits(jobConf, minPartitions)

根据RDD的id,分区的index,和分片信息构建HadoopPartition对象数组

  1. val array = new Array[Partition](inputSplits.size)
  2. for (i <- 0 until inputSplits.size) {
  3. //new 一个分区对象,并指定该分区对象的rdd的id,分区的位置,分片的具体信息FileSplit
  4. array(i) = new HadoopPartition(id, i, inputSplits(i))
  5. }

2,获取分片信息

进入getSplit函数内部,我们只做重要部件分析
首先是listStatus,在该方法内部,首先是获取输入的dirs

  1. Path[] dirs = getInputPaths(job);

接着是获取许可

  1. TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);

接着是获取是否递归读取目录的布尔值,默认是不递归

  1. boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false);
  2. 也是该参数的值
  3. mapreduce.input.fileinputformat.input.dir.recursive

接着是创建路径过滤器,筛选掉一些我们不需要的文件,入以_,.开头的

  1. List<PathFilter> filters = new ArrayList<PathFilter>();
  2. filters.add(hiddenFileFilter);
  3. PathFilter jobFilter = getInputPathFilter(job);
  4. if (jobFilter != null) {
  5. filters.add(jobFilter);
  6. }
  7. PathFilter inputFilter = new MultiPathFilter(filters);

接着是获取list status的线程数目

  1. int numThreads = job
  2. .getInt(
  3. org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS,
  4. org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);

假如线程数目为1

  1. List<FileStatus> locatedFiles = singleThreadedListStatus(job, dirs, inputFilter, recursive);
  2. result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]);

假如大于一,由于牵涉到同步和合并的问题,所以问题稍微复杂了一些,通过固定线程池的方式去读取,多线程的方式去读取,具体线程数目可以有参数mapreduce.input.fileinputformat.list-status.num-threads设置。

  1. Iterable<FileStatus> locatedFiles = null;
  2. try {
  3. LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
  4. job, dirs, recursive, inputFilter, false);
  5. locatedFiles = locatedFileStatusFetcher.getFileStatuses();
  6. } catch (InterruptedException e) {
  7. throw new IOException("Interrupted while getting file statuses");
  8. }
  9. result = Iterables.toArray(locatedFiles, FileStatus.class);

其中LocatedFileStatusFetcher,构建对象的时候构建固定线程数目的线程池

  1. rawExec = Executors.newFixedThreadPool(
  2. numThreads,
  3. new ThreadFactoryBuilder().setDaemon(true)
  4. .setNameFormat("GetFileInfo #%d").build());
  5. exec = MoreExecutors.listeningDecorator(rawExec);

locatedFileStatusFetcher.getFileStatuses();方法中

  1. runningTasks.incrementAndGet();
  2. for (Path p : inputDirs) {
  3. runningTasks.incrementAndGet();
  4. ListenableFuture<ProcessInitialInputPathCallable.Result> future = exec
  5. .submit(new ProcessInitialInputPathCallable(p, conf, inputFilter));
  6. Futures.addCallback(future, processInitialInputPathCallback);
  7. }
  8. runningTasks.decrementAndGet();

获取goalsize这个参数

  1. long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

获取最小分片大小,可通过mapreduce.input.fileinputformat.split.minsize参数设置

  1. long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
  2. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

最后就是遍历文件,获取分片,这其中最重要的操作有两个
第一个是根据压缩类型,包括不压缩。判断文件是否可以进行分片。这个函数调用的是输入文件类型自身的。在这里是TextInputFormat.isSplitable。

  1. isSplitable(fs, path)
  2. protected boolean isSplitable(FileSystem fs, Path file) {
  3. final CompressionCodec codec = compressionCodecs.getCodec(file);
  4. if (null == codec) {
  5. return true;
  6. }
  7. return codec instanceof SplittableCompressionCodec;
  8. }

假如不可分片那么整个文件当做一个分片

  1. String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
  2. splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));

假如压缩类型是不压缩和BZip2Codec压缩类型,说明文件是可以分割的,首先是计算分片大小

  1. long blockSize = file.getBlockSize();
  2. long splitSize = computeSplitSize(goalSize, minSize, blockSize);
  3. 实体就是
  4. Math.max(minSize, Math.min(goalSize, blockSize));

从上边可以看书我们输入的分配大小并不能觉得分区数或者mr的mapreduce数目。只是一个辅助的参数而已。真正的决定因素还是文件数,文件压缩类型,文件的真是大小。
最后就是

  1. long bytesRemaining = length;
  2. while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  3. String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
  4. length-bytesRemaining, splitSize, clusterMap);
  5. splits.add(makeSplit(path, length-bytesRemaining, splitSize,
  6. splitHosts[0], splitHosts[1]));
  7. bytesRemaining -= splitSize;
  8. }
  9. if (bytesRemaining != 0) {
  10. String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
  11. - bytesRemaining, bytesRemaining, clusterMap);
  12. splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
  13. splitHosts[0], splitHosts[1]));
  14. }

3,compute

获取分片信息

  1. val split = theSplit.asInstanceOf[HadoopPartition]

获取RecordReader

  1. reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

由于这里的inputFormat是TextInputFormat,故而此处获得的reader是

  1. new LineRecordReader(job, (FileSplit) genericSplit,
  2. recordDelimiterBytes);

5,sequenceFile入口

  1. sc.sequenceFile[ImmutableBytesWritable,Array[Byte]](fileName)

进入之后呢

  1. withScope {
  2. assertNotStopped()
  3. val kc = clean(kcf)()
  4. val vc = clean(vcf)()
  5. val format = classOf[SequenceFileInputFormat[Writable, Writable]]
  6. val writables = hadoopFile(path, format,
  7. kc.writableClass(km).asInstanceOf[Class[Writable]],
  8. vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions)
  9. writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
  10. }

两者的主要区别在于:
1,liststatus

  1. @Override
  2. protected FileStatus[] listStatus(JobConf job) throws IOException {
  3. FileStatus[] files = super.listStatus(job);
  4. for (int i = 0; i < files.length; i++) {
  5. FileStatus file = files[i];
  6. if (file.isDirectory()) { // it's a MapFile
  7. Path dataFile = new Path(file.getPath(), MapFile.DATA_FILE_NAME);
  8. FileSystem fs = file.getPath().getFileSystem(job);
  9. // use the data file
  10. files[i] = fs.getFileStatus(dataFile);
  11. }
  12. }
  13. return files;
  14. }

2,getRecordReader

  1. public RecordReader<K, V> getRecordReader(InputSplit split,
  2. JobConf job, Reporter reporter)
  3. throws IOException {
  4. reporter.setStatus(split.toString());
  5. return new SequenceFileRecordReader<K, V>(job, (FileSplit) split);
  6. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注