@hadoopMan
2018-09-13T07:50:58.000000Z
字数 6061
阅读 1985
spark源码阅读
sc.textFile(args(0))
textFile的默认参数
minPartitions: Int = defaultMinPartitions
其意义
defaultParallelism=conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
defaultMinPartitions=math.min(defaultParallelism, 2)
接着就是构建了一个
hadoopFile对象
首先是,将hadoop的配置文件序列化,然后封装成广播变量
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
其次是,构建设置输入路径的函数。
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
其最终,将输入path处理后调用的方法是
conf.set(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.INPUT_DIR, str.toString());
该参数传入hadoordd后会在构建对象的时候生成闭包
sparkContext.clean(initLocalJobConfFuncOpt.get)
最后构建了一个
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
首先获取job的配置文件
val jobConf = getJobConf()
接着获取输入格式
val inputFormat = getInputFormat(jobConf)
获取分片信息
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
根据RDD的id,分区的index,和分片信息构建HadoopPartition对象数组
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
//new 一个分区对象,并指定该分区对象的rdd的id,分区的位置,分片的具体信息FileSplit
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
进入getSplit函数内部,我们只做重要部件分析
首先是listStatus,在该方法内部,首先是获取输入的dirs
Path[] dirs = getInputPaths(job);
接着是获取许可
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
接着是获取是否递归读取目录的布尔值,默认是不递归
boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false);
也是该参数的值
mapreduce.input.fileinputformat.input.dir.recursive
接着是创建路径过滤器,筛选掉一些我们不需要的文件,入以_,.开头的
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
接着是获取list status的线程数目
int numThreads = job
.getInt(
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS,
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
假如线程数目为1
List<FileStatus> locatedFiles = singleThreadedListStatus(job, dirs, inputFilter, recursive);
result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]);
假如大于一,由于牵涉到同步和合并的问题,所以问题稍微复杂了一些,通过固定线程池的方式去读取,多线程的方式去读取,具体线程数目可以有参数mapreduce.input.fileinputformat.list-status.num-threads
设置。
Iterable<FileStatus> locatedFiles = null;
try {
LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
job, dirs, recursive, inputFilter, false);
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
} catch (InterruptedException e) {
throw new IOException("Interrupted while getting file statuses");
}
result = Iterables.toArray(locatedFiles, FileStatus.class);
其中LocatedFileStatusFetcher,构建对象的时候构建固定线程数目的线程池
rawExec = Executors.newFixedThreadPool(
numThreads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("GetFileInfo #%d").build());
exec = MoreExecutors.listeningDecorator(rawExec);
在locatedFileStatusFetcher.getFileStatuses();
方法中
runningTasks.incrementAndGet();
for (Path p : inputDirs) {
runningTasks.incrementAndGet();
ListenableFuture<ProcessInitialInputPathCallable.Result> future = exec
.submit(new ProcessInitialInputPathCallable(p, conf, inputFilter));
Futures.addCallback(future, processInitialInputPathCallback);
}
runningTasks.decrementAndGet();
获取goalsize这个参数
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
获取最小分片大小,可通过mapreduce.input.fileinputformat.split.minsize
参数设置
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
最后就是遍历文件,获取分片,这其中最重要的操作有两个
第一个是根据压缩类型,包括不压缩。判断文件是否可以进行分片。这个函数调用的是输入文件类型自身的。在这里是TextInputFormat.isSplitable。
isSplitable(fs, path)
protected boolean isSplitable(FileSystem fs, Path file) {
final CompressionCodec codec = compressionCodecs.getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}
假如不可分片那么整个文件当做一个分片
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
假如压缩类型是不压缩和BZip2Codec压缩类型,说明文件是可以分割的,首先是计算分片大小
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
实体就是
Math.max(minSize, Math.min(goalSize, blockSize));
从上边可以看书我们输入的分配大小并不能觉得分区数或者mr的mapreduce数目。只是一个辅助的参数而已。真正的决定因素还是文件数,文件压缩类型,文件的真是大小。
最后就是
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts[0], splitHosts[1]));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts[0], splitHosts[1]));
}
获取分片信息
val split = theSplit.asInstanceOf[HadoopPartition]
获取RecordReader
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
由于这里的inputFormat是TextInputFormat,故而此处获得的reader是
new LineRecordReader(job, (FileSplit) genericSplit,
recordDelimiterBytes);
sc.sequenceFile[ImmutableBytesWritable,Array[Byte]](fileName)
进入之后呢
withScope {
assertNotStopped()
val kc = clean(kcf)()
val vc = clean(vcf)()
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]],
vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions)
writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
}
两者的主要区别在于:
1,liststatus
@Override
protected FileStatus[] listStatus(JobConf job) throws IOException {
FileStatus[] files = super.listStatus(job);
for (int i = 0; i < files.length; i++) {
FileStatus file = files[i];
if (file.isDirectory()) { // it's a MapFile
Path dataFile = new Path(file.getPath(), MapFile.DATA_FILE_NAME);
FileSystem fs = file.getPath().getFileSystem(job);
// use the data file
files[i] = fs.getFileStatus(dataFile);
}
}
return files;
}
2,getRecordReader
public RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(split.toString());
return new SequenceFileRecordReader<K, V>(job, (FileSplit) split);
}