@hadoopMan
2018-09-13T07:50:58.000000Z
字数 6061
阅读 2182
spark源码阅读
sc.textFile(args(0))
textFile的默认参数
minPartitions: Int = defaultMinPartitions
其意义
defaultParallelism=conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))defaultMinPartitions=math.min(defaultParallelism, 2)
接着就是构建了一个
hadoopFile对象
首先是,将hadoop的配置文件序列化,然后封装成广播变量
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
其次是,构建设置输入路径的函数。
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)其最终,将输入path处理后调用的方法是conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, str.toString());
该参数传入hadoordd后会在构建对象的时候生成闭包
sparkContext.clean(initLocalJobConfFuncOpt.get)
最后构建了一个
new HadoopRDD(this,confBroadcast,Some(setInputPathsFunc),inputFormatClass,keyClass,valueClass,minPartitions).setName(path)
首先获取job的配置文件
val jobConf = getJobConf()
接着获取输入格式
val inputFormat = getInputFormat(jobConf)
获取分片信息
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
根据RDD的id,分区的index,和分片信息构建HadoopPartition对象数组
val array = new Array[Partition](inputSplits.size)for (i <- 0 until inputSplits.size) {//new 一个分区对象,并指定该分区对象的rdd的id,分区的位置,分片的具体信息FileSplitarray(i) = new HadoopPartition(id, i, inputSplits(i))}
进入getSplit函数内部,我们只做重要部件分析
首先是listStatus,在该方法内部,首先是获取输入的dirs
Path[] dirs = getInputPaths(job);
接着是获取许可
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
接着是获取是否递归读取目录的布尔值,默认是不递归
boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false);也是该参数的值mapreduce.input.fileinputformat.input.dir.recursive
接着是创建路径过滤器,筛选掉一些我们不需要的文件,入以_,.开头的
List<PathFilter> filters = new ArrayList<PathFilter>();filters.add(hiddenFileFilter);PathFilter jobFilter = getInputPathFilter(job);if (jobFilter != null) {filters.add(jobFilter);}PathFilter inputFilter = new MultiPathFilter(filters);
接着是获取list status的线程数目
int numThreads = job.getInt(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS,org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
假如线程数目为1
List<FileStatus> locatedFiles = singleThreadedListStatus(job, dirs, inputFilter, recursive);result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]);
假如大于一,由于牵涉到同步和合并的问题,所以问题稍微复杂了一些,通过固定线程池的方式去读取,多线程的方式去读取,具体线程数目可以有参数mapreduce.input.fileinputformat.list-status.num-threads设置。
Iterable<FileStatus> locatedFiles = null;try {LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(job, dirs, recursive, inputFilter, false);locatedFiles = locatedFileStatusFetcher.getFileStatuses();} catch (InterruptedException e) {throw new IOException("Interrupted while getting file statuses");}result = Iterables.toArray(locatedFiles, FileStatus.class);
其中LocatedFileStatusFetcher,构建对象的时候构建固定线程数目的线程池
rawExec = Executors.newFixedThreadPool(numThreads,new ThreadFactoryBuilder().setDaemon(true).setNameFormat("GetFileInfo #%d").build());exec = MoreExecutors.listeningDecorator(rawExec);
在locatedFileStatusFetcher.getFileStatuses();方法中
runningTasks.incrementAndGet();for (Path p : inputDirs) {runningTasks.incrementAndGet();ListenableFuture<ProcessInitialInputPathCallable.Result> future = exec.submit(new ProcessInitialInputPathCallable(p, conf, inputFilter));Futures.addCallback(future, processInitialInputPathCallback);}runningTasks.decrementAndGet();
获取goalsize这个参数
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
获取最小分片大小,可通过mapreduce.input.fileinputformat.split.minsize参数设置
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
最后就是遍历文件,获取分片,这其中最重要的操作有两个
第一个是根据压缩类型,包括不压缩。判断文件是否可以进行分片。这个函数调用的是输入文件类型自身的。在这里是TextInputFormat.isSplitable。
isSplitable(fs, path)protected boolean isSplitable(FileSystem fs, Path file) {final CompressionCodec codec = compressionCodecs.getCodec(file);if (null == codec) {return true;}return codec instanceof SplittableCompressionCodec;}
假如不可分片那么整个文件当做一个分片
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
假如压缩类型是不压缩和BZip2Codec压缩类型,说明文件是可以分割的,首先是计算分片大小
long blockSize = file.getBlockSize();long splitSize = computeSplitSize(goalSize, minSize, blockSize);实体就是Math.max(minSize, Math.min(goalSize, blockSize));
从上边可以看书我们输入的分配大小并不能觉得分区数或者mr的mapreduce数目。只是一个辅助的参数而已。真正的决定因素还是文件数,文件压缩类型,文件的真是大小。
最后就是
long bytesRemaining = length;while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,length-bytesRemaining, splitSize, clusterMap);splits.add(makeSplit(path, length-bytesRemaining, splitSize,splitHosts[0], splitHosts[1]));bytesRemaining -= splitSize;}if (bytesRemaining != 0) {String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length- bytesRemaining, bytesRemaining, clusterMap);splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,splitHosts[0], splitHosts[1]));}
获取分片信息
val split = theSplit.asInstanceOf[HadoopPartition]
获取RecordReader
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
由于这里的inputFormat是TextInputFormat,故而此处获得的reader是
new LineRecordReader(job, (FileSplit) genericSplit,recordDelimiterBytes);
sc.sequenceFile[ImmutableBytesWritable,Array[Byte]](fileName)
进入之后呢
withScope {assertNotStopped()val kc = clean(kcf)()val vc = clean(vcf)()val format = classOf[SequenceFileInputFormat[Writable, Writable]]val writables = hadoopFile(path, format,kc.writableClass(km).asInstanceOf[Class[Writable]],vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions)writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }}
两者的主要区别在于:
1,liststatus
@Overrideprotected FileStatus[] listStatus(JobConf job) throws IOException {FileStatus[] files = super.listStatus(job);for (int i = 0; i < files.length; i++) {FileStatus file = files[i];if (file.isDirectory()) { // it's a MapFilePath dataFile = new Path(file.getPath(), MapFile.DATA_FILE_NAME);FileSystem fs = file.getPath().getFileSystem(job);// use the data filefiles[i] = fs.getFileStatus(dataFile);}}return files;}
2,getRecordReader
public RecordReader<K, V> getRecordReader(InputSplit split,JobConf job, Reporter reporter)throws IOException {reporter.setStatus(split.toString());return new SequenceFileRecordReader<K, V>(job, (FileSplit) split);}