Spark程序可以通过控制RDD分区来减少通信开销,所有的键值对RDD都可以进行分区。只有当数据集多次在比如join这种基于键的操作中使用时,分区才会有帮助。
一个例子:
有一张固定的用户信息表,以RDD(UserID, UserInfo)的形式保存在UserInfo中,另外有一个用户行为表,以RDD(UserID, LinkInfo)形式保存在LinkInfo中,现在统计在LinkInfo中但不在UserInfo中的topic个数。
不够高效的实现:
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()
def processNewLogs(logFileName: String) {
val events = sc.sequenceFile[User, LinkInfo](logFileName)
val joined = userData.join(events) // RDD of (UserID, (UserInfo, LinkInfo)) pairs.
val offTopicVisits = joined.filter {
case (userId, (userInfo, linkInfo)) => !userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to non-subscribed topics: " + offTopicVisits)
}
以上代码中,连接操作需要将两个数据集中的所有键的哈希值都求出来,将该哈希值相同的记录传到同一台机器上,然后在那台机器上对所有键相同的记录进行连接操作。在每次调用时都对userData表进行哈希值计算和跨节点数据混洗,虽然userData的数据从来都不会变化。
使用数据分区实现:
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://..."
.partitionBy(new HashPartitioner(100)).persist()
在构建userData时候调用了partitionBy,Spark就知道了该RDD是根据键的哈希值来分区的,这样在调用join时,Spark就会利用到这一点。
当调用userData.join(events)时,Spark只会对events进行数据混洗操作,将events中特性UserID的记录发送到userData的对应分区所在的那台机器上。
注意,将分区之后的结果持久化是必要的,否则后面每次用到userData这个RDD时都会重新地对数据进行分区操作,partitionBy带来的好处就会被抵消,导致重复对数据进行分区以及跨节点的混洗。
可以从分区中获益的操作:
cogroup()、groupWith()、join()、leftOuterrJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()、lookup()
对于RDD的pair,map()的结果不会有固定的分区,因为传给map()的函数理论上可以改变元素的键。不过Spark提供了mapValues()和flatMapValues(),可以保证每个二元组的键保持不变。
使用mapValues的PageRank的例子:
val links = sc.objectFile[(String, Seq[String])]("links").partitionBy(new HashPartitioner(100)).persist()
var ranks = links.mapValues(v => 1.0) // 因为使用了mapValues,分区和links一样
for (i <- 0 until 10) {
val contributions = links.join(ranks).flatMap {
case (pageId, (links, rank)) =>
links.map(dest => (dest, rank / links.size))
}
ranks = contributions.reduceByKey(_ + _).mapValues(v => 0.15 + 0.85 * v)
}
ranks.saveAsTextFile("ranks")
为了最大化分区相关优化的潜在作用,应该在无需改变元素的键时尽量使用mapValues()或flatMapValues()。
没有评论:
发表评论