2016年2月1日星期一

Spark中数据分区

Spark程序可以通过控制RDD分区来减少通信开销,所有的键值对RDD都可以进行分区。只有当数据集多次在比如join这种基于键的操作中使用时,分区才会有帮助。

一个例子:
有一张固定的用户信息表,以RDD(UserID, UserInfo)的形式保存在UserInfo中,另外有一个用户行为表,以RDD(UserID, LinkInfo)形式保存在LinkInfo中,现在统计在LinkInfo中但不在UserInfo中的topic个数。

不够高效的实现:
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()

def processNewLogs(logFileName: String) {
  val events = sc.sequenceFile[User, LinkInfo](logFileName)
  val joined = userData.join(events)  // RDD of (UserID, (UserInfo, LinkInfo)) pairs.
  val offTopicVisits = joined.filter {
    case (userId, (userInfo, linkInfo)) => !userInfo.topics.contains(linkInfo.topic)
  }.count()
  println("Number of visits to non-subscribed topics: " + offTopicVisits)
}
以上代码中,连接操作需要将两个数据集中的所有键的哈希值都求出来,将该哈希值相同的记录传到同一台机器上,然后在那台机器上对所有键相同的记录进行连接操作。在每次调用时都对userData表进行哈希值计算和跨节点数据混洗,虽然userData的数据从来都不会变化。

使用数据分区实现:
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://..."
                         .partitionBy(new HashPartitioner(100)).persist()
在构建userData时候调用了partitionBy,Spark就知道了该RDD是根据键的哈希值来分区的,这样在调用join时,Spark就会利用到这一点。
当调用userData.join(events)时,Spark只会对events进行数据混洗操作,将events中特性UserID的记录发送到userData的对应分区所在的那台机器上。
注意,将分区之后的结果持久化是必要的,否则后面每次用到userData这个RDD时都会重新地对数据进行分区操作,partitionBy带来的好处就会被抵消,导致重复对数据进行分区以及跨节点的混洗。

可以从分区中获益的操作:
cogroup()、groupWith()、join()、leftOuterrJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()、lookup()

对于RDD的pair,map()的结果不会有固定的分区,因为传给map()的函数理论上可以改变元素的键。不过Spark提供了mapValues()和flatMapValues(),可以保证每个二元组的键保持不变。

使用mapValues的PageRank的例子:
val links = sc.objectFile[(String, Seq[String])]("links").partitionBy(new HashPartitioner(100)).persist()
var ranks = links.mapValues(v => 1.0)  // 因为使用了mapValues,分区和links一样
for (i <- 0 until 10) {
  val contributions = links.join(ranks).flatMap {
    case (pageId, (links, rank)) =>
      links.map(dest => (dest, rank / links.size))
  }
  ranks = contributions.reduceByKey(_ + _).mapValues(v => 0.15 + 0.85 * v)
}
ranks.saveAsTextFile("ranks")

为了最大化分区相关优化的潜在作用,应该在无需改变元素的键时尽量使用mapValues()或flatMapValues()。

没有评论:

发表评论