@spiritnotes
2016-03-10T15:11:59.000000Z
字数 18274
阅读 4994
Spark
机器学习
读书笔记
直接下载预编译好的版本。
测试:MASTER=local2 ./bin/run-example org.apache.spark.examples.SparkPi
由两类程序组成:一个驱动程序和多个执行程序;
单机模式的Spark集群:一个运行在Spark单机主进程和驱动程序的主节点;各自运行一个执行程序进程的多个工作节点;
MASTER=spark://IP:PORT ./bin/run-example ...
Spark程序的编写是从SparkContext开始的。SparkContext初始化需要SparkConf对象。
Scala shell:./bin/spark-shell
python shell:./bin/pyspark
RDD(Resilient Distributed Dataset)是Spark的核心概念。一个RDD代表一系列的“记录”(严格来说,某种类型的对象)。这些记录被分配或分区到一个集群的多个节点上。RDD具有容错性,当某个节点或任务失败时(非用户代码),RDD会在余下节点上自动重建。
创建RDD,sc.parallelize(list_)/sc.textFile("file")。
Spark编程模式下,所有的操作分为转换(transformation)和执行(action)两种。转换操作是对一个数据集中的所有记录执行某种函数,从而使其记录发生改变;而执行通常是运行某种计算或者聚合操作,并将结果返回运行SparkContext的那个驱动程序。
rdddata.map(lambda x:x.upper())
rdddata.count() 返回其个数
转换操作是延后的,调用转换操作并不会执行计算,只有执行操作被调用时才会高效的计算。大部分操作在集群上并行执行,只有必要时才计算结果并将其返回给驱动程序,提高效率。
from pyspark import SparkContext
sc = SparkContext("local[2]",'APP')
data = sc.....
all = data.count()
uniqueUses = data.map(lambda record:record[0]).distinct().count()
products = data.map(lambda record:(record[1],1.0)).reduceByKey(lambda x,y:x+y).collect()
#运行:
#>$SPARK_HOME/bin/spark-submit pythonapp.py
现代的大数据场景包含如下需求:
一个在线电影和电视节目的内容服务。
为什么要使用机器学习
监督学习
非监督学习
离线训练结果
在线模型,模型更新
user_data = sc.textFile('ml-100k/u.user')
user_data.first()
user_fields = user_data.map(lambda line:line.split('|'))
num_genders = user_fields.map(lambda filelds:filelds[1]).distinct().count()
年龄可视化
ages = user_fields.map(lambda x: int(x[1])).collect()
pylab.hist(ages, bins=20,normed=True)
职业分布
count_by_occuption = user_fields.map(lambda i:(i[3],1)).reduceByKey(lambda x,y:x+y).collect()
count_by_occuption.sort(key=operator.itemgetter(1))
xy = list(zip(*count_by_occuption))
pos = np.arange(len(xy[0]))
width = 1.0
ax=plt.axes()
ax.set_xticks(pos+(width/2))
ax.set_xticklabels(xy[0])
plt.bar(pos,xy[1],width,color='lightblue')
plt.xticks(rotation=30)
fig=plt.gcf()
fig.set_size_inches(10,6)
电影年龄查看
movie_ages2 = movie_fileds.map(lambda fields:1998 - covert_year(fields[2])).filter(lambda x:x!=98)
movie_ages = years_filtered.map(lambda r:1998-r).countByValue()
movie_ages2 = movie_fileds.map(lambda fields:1998 - covert_year(fields[2])).filter(lambda x:x!=98).collect()
pylab.hist(movie_ages2, bins=80,normed=True)
大致处理办法
特征是指那些用于模型训练的变量。
名义(nominal)变量:各个可能取值之间没有顺序关系
有序(ordinal)变量:存在顺序关系的
1-of-k编码: 用n位的向量来保存值,相应位赋值1, [0,0,...,0,1,0,...]
派生特征可以包括:平均值、中位值、方差、和、差、最大值、最小值、计数等
电影上可以创建每个用户评分的平均值、方差等
数值特征到类别特征的转换也很常见,比如划分为区间特征。进行这类转换常见的有年龄、地理位置和时间。
例如将电影影评时间转变为小时,将小时划区间再使用1-of-k编码。
自然语言处理是专注于文本内容的处理、表示和建模的一个领域。词袋方法
all_terms_bcast = sc.broadcast(all_terms_dict)
term_vectors = title_terms.map(lambda terms:create_vector(terms, all_terms_bcast.value)
可借助软件包提取特征:scikit-learn、gensim、scikit-image、matplotlib、NLTK、OpenNLP、Breeze
推荐引擎适合如下两类常见场景
基于内容的过滤
协同过滤
推荐模型中包括矩阵分解(matrix factorization)的实现
用户-商品矩阵可以分解为2个低阶且乘积近似的矩阵,因式分解 U×I->U×K K×I
这类模型试图发现对应“用户-物品”矩阵的内在行为结构的隐含特征(这里表现为因子矩阵),所以也称为隐特征模型。隐含特征或因子不能直接解释,但它可以表示了某种含义,如对电影的某个导演/种类/风格或某些演员的偏好
对用户进行预测,则分别取对应的行(用户因子向量)和列(物品因子向量)计算点积即可。
而对于物品之间相似度的计算,则直接应用物品因子向量进行计算。
好处:模型建立后求解相对容易,表现通常比较出色
弊端:物品和用户数量多时,存储和计算较多,不好解释
隐式反馈数据,其中偏好没有直接给出,而是隐含在用户与物品的交互之中。比如电影是否看,看了几次
MLlib将输入的评级数据视为两个矩阵:一个二元偏好矩阵P(是否购买)与一个信心权重矩阵C(购买次数或评分)。隐式模型创建的是二元偏好矩阵的而非评级矩阵的近似。此时用户因子向量和物品因子向量的点积所得到的分数就是对某一物品偏好的估值。
ALS
rating_data_raw = sc.textFile('ml-100k/u.data')
rating_data = rating_data_raw.map(lambda line:line.split('\t'))
rating = rating_data.map(lambda line:(int(line[0]), int(line[1]), float(line[2])))
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
rs_model = ALS.train(rating, 50, 10)
rs_model.userFeatures().count()
rs_model.productFeatures().count()
trainImplict,先将数据转换为隐式数据集,可以通过设置阀值实现
ALS初始化是随机的,因此不同模型结果不一样
rs_model.predict(789, 123)
rs_model.recommendProducts(789,10)
衡量方法有皮尔森相关系数、针对实数向量的余弦相似度以及针对二元向量的杰卡德相似系数
rs_model.productFeatures().map(lambda x: (x[0], cossimilarity(x[1], product_567))).sortBy(lambda x:-x[1]).take(10)
rs_model.productFeatures().map(lambda x: (x[0], cossimilarity(x[1], product_567))).top(10, lambda x:x[1])
评估指标(evaluation metric)是指那些衡量模型预测能力或准确度的方法。有些直接度量模型的预测的目标变量的好坏(比如均方差),有些则关注模型对那些其并未针对性优化过但又十分接近真实应用场景的预测能力(比如平均准确率)。
均方差(Mean Squared Error,MSE)直接衡量用户物品评级矩阵的重建误差,也ALS的最小化目标函数。
mse = rs_model.predictAll(all_user_items).map(lambda x:((x.user, x.product), x.rating)).join(rating.map(lambda x:((x[0],x[1]),x[2]))).map(lambda y:(y[1][0]-y[1][4])**2).sum()/100000
均方根误差(Root Mean Squared Erro,RMSE)在上面结果开根号
整个数据集上的K值平均准确率(Average Precision at K metric,APK)的均值。更适合于评估隐式数据集上的推荐,MSE此时不那么合适。
分布推荐
items = mod.productFeatures.map().collect()
ibroastcast = sc.broadcast(items_matrix)
def rocommend(user_vec):
scores = ibroadcast.value.mmul(user_vec).data.zipWithIndex.sortBy(lambda x:x[1]).map(_._2+ 1).toseq
return user_id, scores
model.userFeatures.map(lambda line: rocommend(line))
import org.apache.spark.mllib.evaluation.RegressionMetrics
x = RegressionMetrics(RDD[(predict, actual)])
x.meanSquaredError()
x.rootMeanSquaredError()
MAP
import org.apache.spark.mllib.evaluation.RankingMetrics
线性模型:容易扩展
决策树:非线性,性能好,
朴素贝叶斯:快速
可用于回归和分类
分类
回归 对等连接函数I
Mlib的分类模型通过 LabeledPoint 对象操作,构建函数为 (label:double, features:vector)
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.tree import DecisionTree,DecisionTreeModel
from pyspark.mllib.classification import NaiveBayes,NaiveBayesModel
svm_model = SVMWithSGD.train(digits, 10)
log_model = LogisticRegressionWithLBFGS.train(digits,10)
nb_model = NaiveBayes.train(digits)
dc_model = DecisionTree.trainClassifier(digits, 2, {})
svm_model.predict(p.features)
all_predict = svm_model.predict(digits.map(lambda x:x.features))
all_predict.take(5)
for model in [svm_model, nb_model, log_model, dc_model]:
print(model.__class__.__name__, digits.map(lambda x:1 if model.predict(x.features) == x.label else 0).sum() /digits.count()
SVMModel 0.9015817223198594
NaiveBayesModel 0.8963093145869947
LogisticRegressionModel 0.9824253075571178
准确率与召回率基本是负相关的。
准确率-召回率曲线,从左上到右下的曲线
ROC是指的真阳性-假阳性曲线
真阳性是正样本中判为阳性的比例,如召回率
假阳性是负样本中判为阳性的比例,误诊率
from pyspark.mllib.evaluation import BinaryClassificationMetrics
for model in [svm_model, nb_model, log_model, dc_model]:
pre = model.predict(features).zip(label)
metric_ = BinaryClassificationMetrics(pre)
print(model.__class__.__name__, metric_.areaUnderPR, metric_.areaUnderROC)
from pyspark.mllib.linalg.distributed import RowMatrix
summary = metric2.computeColumnSummaryStatistics()?
from pyspark.mllib.feature import StandardScaler
label = digits.map(lambda x: x.label)
features = digits.map(lambda x: x.features)
std_ = StandardScaler(withMean = True, withStd=True).fit(features)
scaled_data= label.zip(std_.transform(features))
iterresults = sc.para([1,2,3,4,5]).map(lambda para: trainwith ....)
SGD 随机梯度下降
两个回归模型:
标准的最小二乘回归不使用正则化,但是应用到错误预测值的损失函数会将错误做平方,从而放大损失。因此在实际中往往使用一定的正则化。岭回归,Lasso回归
决策树在回归使用的不纯度度量是方差。
将2到9列转变为二元编码的多列格式
def get_mapping(rdd, idx):
return rdd.map(lambda x:x[idx]).distinct().zipWithIndex().collectAsMap()
mappings = [get_mapping(bk_table, i) for i in range(2,10)]
from pyspark.mllib.regression import LabeledPoint
import numpy as np
def extract_features(record):
cat_vec = np.zeros(cat_len)
i, step = 0, 0
for field in record[2:9]:
m = mappings[i]
idx = m[field]
cat_vec[idx+step] = 1
i = i+1
step += len(m)
num_vec = np.array([float(field) for field in record[10:14]])
return np.concatenate((cat_vec, num_vec))
def extract_label(record):
return float(record[-1])
bk_data_re = bk_table.map(lambda x:LabeledPoint(extract_label(x), extract_features(x)))
决策树只需要使用原数据就可以了
def extract_featrues_dt(r):
return np.array([float(i) for i in r[2:14]])
bk_data_dt = bk_table.map(lambda r:LabeledPoint(extract_label(r), extract_featrues_dt(r)))
from pyspark.mllib.regression import LinearRegressionWithSGD
from pyspark.mllib.tree import DecisionTree
bk_re_model = LinearRegressionWithSGD.train(bk_data_re, iterations=10, step=0.1, intercept=False)
bk_re_true_vs_pre = bk_data_re.map(lambda x:(x.label, bk_re_model.predict(x.features)))
bk_dt_model = DecisionTree.trainRegressor(bk_data_dt, {})
preds = bk_dt_model.predict(bk_data_dt.map(lambda p:p.features))
actual = bk_data_dt.map(lambda p:p.label)
bk_dt_true_vs_pre = actual.zip(preds)
适用于目标变量值域很大,并且没有必要对预测值和目标值的误差进行惩罚的情况。适用于计算误差的百分率而不得hi误差的绝对值
也称判定系数,用来评估模型拟合数据的好坏。
原始目标变量
是非正态分布的
log后的目标变量
取平方根后的目标变量
对数变换影响
线性回归提高了RMSLE,对MSE和MAE无提升。
决策树模型性能下降
聚类的应用:
形式化的目标函数为类簇内的方差和(within cluster sum of squared errors,WCSS)
模糊K-均值 (fuzzy K-means)
混合模型:模糊K-均值的扩展
层次聚类(hierarchical clustering)
: 凝聚聚类(agglomerative clustering):每个样本为一个类,合并相近的类,一次一个
分裂式聚类(divisive clustering):将所有样本作为一个类,每次选择最好的分配方式,分配为两个类
MovieLens数据集
获取电影与类别对应关系
def get_movie_gen(line):
movie_id = int(line[0])
gens = []
for i, gen in enumerate(line[5:]):
if gen == '1':
gens.append(ml_gen_map[str(i)])
return (movie_id, gens)
ml_title_gen = ml_lines.map(lambda x:x.split('|')).map(get_movie_gen)
训练推荐模型
from pyspark.mllib.recommendation import ALS
from pyspark.mllib.recommendation import Rating
ml_rating_lines = sc.textFile('ml-100k/u.data')
ml_rating = ml_rating_lines.map(lambda x:x.split('\t')).map(lambda x:Rating(int(x[0]),int(x[1]),float(x[2])))
ml_rating.cache()
ml_rating_rs_model = ALS.train(ml_rating, 50, 10, 0.1)
ml_movie_factors = ml_rating_rs_model.productFeatures()
ml_movie_vec = ml_movie_factors.map(lambda x:x[1])
ml_user_factors = ml_rating_rs_model.userFeatures()
ml_user_vec = ml_user_factors.map(lambda x:x[1])
归一化
K均值提供了随机和K-means||两种初始化方法,后者默认初始化。K-均值通常都不能收敛到全局最优解,所以实际应用中需要多次训练并选择最优的模型。
from pyspark.mllib.clustering import KMeans
ml_model_movie_kmeans = KMeans.train(ml_movie_vec, 5, 100, 100)
ml_model_movie_kmeans.predict(ml_movie_vec.first())
ml_model_movie_kmeans.predict(ml_movie_vec).take(5)
用数据集解释类别预测
WCSS、Davies-Bouldin指数、Dunn指数、轮廓系数(silhouette coefficient)
Rand measure、F-measure、雅卡尔系数(Jaccard index)
ml_model_movie_kmeans.computeCost(ml_movie_vec)
交叉验证选择K
划分训练集和测试集,在训练集上采取不同的K值进行计算在测试集上进行验证
随着K的增加,WCSS会下降,但是在下降到某个拐点后,下降变得很缓慢
降维方法从一个D维的数据输入提取出k维表示,k一般远远小于D。降维方法本身是一种预处理方法,或者说是一个特征转换的方法,而不是模型预测的方法。
降维中尤为重要的是,被抽取出的维度表示应该仍能捕捉大部分原始数据的变化和结构。这源于一个基本想法:大部分数据源包含某种内部结构,这种结构一般来说是未知的(常称为隐含特征或者潜在特征),但如果能发现结构中的一些特征,我们的模型就可以学习这种结构并从中预测,而不用从大量无关的充满噪声特征的原始数据中去学习预测。缩减维度可以排除数据中的噪声并保留数据原有的隐含结构。
数据降维使用场景
mllib提供了两种相似的降低维度的模型:PCA与SVD
PCA处理一个数据矩阵,提取矩阵中的k个主向量,主向量彼此不相关,计算结果中,第一个主向量表示输入数据的最大变化方向。之后的每个主向量依次代表不考虑之前计算过的所有方向时最大的变化方向。
每个主成分向量上有着与原始数据矩阵相同的特征维度,需要使用映射来做一次降维,将原来的数据被投影到主向量表示的k为空间
都是矩阵分解技术,将原来的矩阵分解为一些维度(或秩)较低的矩阵
将高维的特征向量使用K均值聚类为K个中心,根据原数据与K个聚类中心的远近(也就是计算出每个点到中心的距离)表示这些数据,结果就是一组k元距离。
通过使用不同的聚类矩阵,可以实现数据降维和非线性变化,或者可以通过高效的线性模型计算学习更复杂的模型。如使用高斯和指数距离函数可以实现非常复杂的非线性变换。
pics_all = sc.wholeTextFiles('lfw/*/*')
plt.imshow(plt.imread(pic_paths.first()))
plt.imshow(array(Image.open(pic_paths.first()).convert('L').resize((100,100))))
文本和语言有隐含的结构信息,使用原始的文本很难捕捉到
文本数据的有限维度一般都非常巨大甚至是无限的
自然语言处理(NLP)领域研究文本处理的技术包括特征提取、建模和机器学习。
TF-IDF:
特征哈希是一种处理高维数据的技术,并经常应用在文本和分类数据上。使用1-of-k方法需要建立表格。特征哈希通过使用哈希方程对特征向量赋予向量下标。
news_all = sc.wholeTextFiles('newsgroup/*/*/*')
news_train = news_all.filter(lambda x:'train' in x[0] )
news_test = news_all.subtractByKey(news_train)
white_space_splits = news_train.map(lambda x: x[1]).flatMap(lambda x:x.split()).map(lambda x:x.lower())
white_space_splits = news_train.flatMap(lambda x: (i.lower() for i in x[1].split()) )
white_space_splits = news_train.flatMap(lambda x: (i.lower() for i in re.findall(r'\w+',x[1])))
white_space_splits = news_train.flatMap(lambda x: (i.lower() for i in re.findall(r'\w+',x[1]))).filter(lambda x:re.fullmatch('[^0-9]*', x) is not None)
移除停用词,过滤掉单个字符的词语
white_space_splits = news_train.flatMap(lambda x: (i.lower() for i in re.findall(r'\w+',x[1]))).filter(lambda x:re.fullmatch('[^0-9]*', x) is not None)
temp= white_space_splits.countByValue()
temp2 = sorted(temp.items(), key=operator.itemgetter(1),reverse=True)
stopwords = [i[0] for i in temp2[:30]]
tokens = white_space_splits.distinct().filter(lambda x: len(x)!= 1).filter(lambda x: x not in stopwords).filter(lambda x:x not in onewords)
基于频率去掉词语
tokens_ = tokens.collect()
提取词干
def tokenize(doc):
return sc.parallelize([i.lower() for i in re.findall(r'\w+', doc)]).filter(lambda x:re.fullmatch('[^0-9]*', x) is not None).filter(lambda x: len(x)!= 1).filter(lambda x: x not in stopwords)
z = tokenize(news_train.first()[1])
z.collect()
train_tokens = (news_train
.map(lambda x: re.findall(r'\w+', x[1]))
.map(lambda x:[i.lower() for i in x if i.strip()])
.map(lambda x:[i for i in x if re.fullmatch('[^0-9]*', i) is not None])
.map(lambda x:[i for i in x if len(i)!= 1])
.map(lambda x:[i for i in x if i not in stopwords])
)
TF-IDF处理
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.feature import HashingTF,IDF
hash_tf = HashingTF(2**18)
tf = hash_tf.transform(train_tokens)
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
minMaxValues = tfidf.map(lambda x:(max(x.toArray()), min(x.toArray())))
def cosine(a,b):
return np.dot(a,b)/np.linalg.norm(a)/np.linalg.norm(b)
cosine(np.array([1,2]),np.array([2,2]))
x = hockey_tfidf.take(2)
cosine(x[0].toArray(), x[1].toArray())
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import NaiveBayes
from pyspark.mllib.evaluation import MulticlassMetrics
news_group = news_train.map(lambda x: x[0].split('/')[-2])
new_group_map = news_group.distinct().zipWithIndex().collectAsMap()
group_tfidf = news_group.zip(tfidf).map(lambda x: LabeledPoint(new_group_map[x[0]], x[1]))
group_tfidf.cache()
model = NaiveBayes.train(group_tfidf, lambda_=0.1)
test_tf = hash_tf.transform(news_test.map(lambda x: re.findall(r'\w+', x[1]))
.map(lambda x:[i.lower() for i in x])
.map(lambda x:[i for i in x if re.fullmatch('[^0-9]*', i) is not None])
.map(lambda x:[i for i in x if len(i)!= 1])
.map(lambda x:[i for i in x if i not in stopwords])
)
test_tfidf = idf.transform(test_tf)
test_labels = news_test.map(lambda x: x[0].split('/')[-2]).map(lambda x:new_group_map[x])
test_pred = model.predict(test_tfidf)
pre_with_actual = test_pred.zip(test_labels)
pre_with_actual.filter(lambda x:int(x[0])==int(x[1])).count()/pre_with_actual.count()
#80.12%
metrics = MulticlassMetrics(pre_with_actual)
metrics.weightedFMeasure()?
原始方法也不错
把一个单词表示成一个向量
分布向量表示,使用skip-gram模型,考虑了单词上下文来学习词向量表示的模型
from pyspark.mllib.feature import Word2Vec
w2v = Word2Vec()
w2v.setSeed(42)
w2v_model = w2v.fit(train_tokens)
list(w2v_model.findSynonyms('hockey',20))
在线学习以对训练数据进行完全增量的形式顺序处理一遍为基础(一次只训练一个样例)。
流处理是当数据产生时就开始处理
两种方式
streaming使用第二种方式,核心概念是离散化流(DStream、Discretized Stream)。一个DStream是一个小批量作业的序列,每一个小批量作业表示为一个Spark RDD。
输入源:接收端负责从数据源接受数据并转换成由Spark RDD组成的DStream,支持多种输入源,包括文件、网络等
转换:提供了一个可以在DStream上使用的转换集合;与RDD上的类似,该转换操作RDD包含的数据