2016年9月22日星期四

解决Pig中ERROR 1071: Cannot convert a map to a String

运行Pig脚本时遇到这样的问题:
ERROR 1071: Cannot convert a map to a String

后来发现原因是map的一个字段值为空字符串,我在后面取了这个字段的值:(chararray)$0#'field' AS field,然后就出错了。所以我在这之前将filed字段为''的都FILTER BY $0#'field' != ''之后问题解决。

2016年9月14日星期三

Spark中解析json

建议使用fastjson:
libraryDependencies += "com.alibaba" % "fastjson" % "1.2.24"


其他一些方案(不推荐):

scala.util.parsing.json.JSON:
解析:JSON.parseFull(x).get.asInstanceOf[Map[String, Any]]
生成:JSONObject(Map("field" -> value, ...))

或者com.fasterxml.jackson解析和生成json。

2016年9月13日星期二

Spark join的用法

用一个例子说明一下:
val a = sc.parallelize(List((1, "a"), (3, "c"), (5, "e"), (6, "f")))
val b = sc.parallelize(List((1, "a"), (2, "b"), (3, "c"), (4, "d")))

a.join(b).collect
    Array[(Int, (String, String))] = Array((1,(a,a)), (3,(c,c)))

a.leftOuterJoin(b).collect
    Array[(Int, (String, Option[String]))] = Array((1,(a,Some(a))), (6,(f,None)), (3,(c,Some(c))), (5,(e,None)))

a.rightOuterJoin(b).collect
    Array[(Int, (Option[String], String))] = Array((4,(None,d)), (1,(Some(a),a)), (3,(Some(c),c)), (2,(None,b)))

a.fullOuterJoin(b).collect
    Array[(Int, (Option[String], Option[String]))] = Array((4,(None,Some(d))), (1,(Some(a),Some(a))), (6,(Some(f),None)), (3,(Some(c),Some(c))), (5,(Some(e),None)), (2,(None,Some(b))))

a.subtractByKey(b).collect
    Array[(Int, String)] = Array((5,e), (6,f))

如果a或者b中有重复的key,join的结果会有多条。

2016年9月12日星期一

解决scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror的问题

运行Spark的时候遇到这样一个问题:
java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror;

首先检查了一下我打包时候的Spark版本是基于scala 2.11的Spark 2.0.0,和spark-submit的版本是一致的,问题还是没有解决。最后发现是我的pom.xml中maven-scala-plugin的jvm版本设置的不对,修改了正确的jvm版本后运行成功。

2016年9月8日星期四

spark中dbscan使用中遇到的问题

Spark中目前没有集成dbscan聚类算法,见https://issues.apache.org/jira/browse/SPARK-5226

找了三个项目:
1、https://github.com/scalanlp/nak
nak中dbscan的输入必须是一个breeze.linalg.DenseMatrix的矩阵包含了所有数据,并不是我所需要的RDD[Vector],不可用。

2、https://github.com/irvingc/dbscan-on-spark
输入是一个RDD[Vector],但代码中只考虑了Vector的前两个值,所以该项目只能计算二维的dbscan,需要自己修改代码支持多维的Vector。

因为我用的Spark 2.0.0,而这个项目用的1.6.1,org.apache.spark.Logging类不存在了,我改为了org.apache.spark.internal.Logging。

运行时又遇到另外一个bug:
java.lang.NoSuchMethodError: scala.collection.immutable.$colon$colon.hd$1()Ljava/lang/Object;
看了一下代码,不知道为什么运行时不支持EvenSplitPartitioner.scala文件中的case (rectangle, count) :: rest => 这种List的写法,所以我将这段改为了获取List的head和tail,重新编译,提交spark运行,终于成功。

另外,该项目的repo中没有最新版本的jar,所以只能通过源码进行打包。

3、https://github.com/alitouka/spark_dbscan
目前只支持csv格式的文件作为输入,每行用逗号隔开。
同样,该项目是基于Spark 1.1.0写的,修改为2.0.0时要删去Logging类。

看了一下代码,如果不用IOHelper.readDataset方法,可以直接将数据转为RDD[Point]后进行计算。

运行了一下,发现vector维度为10000时候小数据量就报错,维度1000时大数据量也会出错,该问题目前还没有解决。

2016年9月1日星期四

mallet中LDA的使用

mallet是一个机器学习的Java库,不支持分布式,总结一下其中的Topic Model的用法。

命令行使用中有两种import data的方式,一种是import-dir,一个文件是一个Instance,有多个文件;另一种是import-file,只有一个文件,每一行是一个Instance。
命令行的使用方法移步http://mallet.cs.umass.edu/topics.php

mallet中以instance为单位表示文档,每一个Instance包括name、label、data、source。mallet中使用不同的Iterator来载入document的文件。
csv文件格式的例子(CsvIterator):http://mallet.cs.umass.edu/topics-devel.php
文本文件的例子(FileIterator):http://mallet.cs.umass.edu/import-devel.php
所有的Iterator见cc.mallet.pipe.iterator。

InstanceList的addThruPipe方法既可以添加Iterator<Instance>也可以添加单独的一个Instance。

两种级别的正则表达式:
1、line regex
用于从每一行文本中提取name、label、data:
instances.addThruPipe(new CsvIterator (fileReader, Pattern.compile("^(\\S*)[\\s,]*(\\S*)[\\s,]*(.*)$"), 3, 2, 1)); // data, label, name fields
参考http://stackoverflow.com/questions/27927556/what-do-the-parameters-of-the-csviterator-mean-in-mallet/27929358#27929358

2、data regex
用于从data中提取出token:
new CharSequence2TokenSequence(Pattern.compile("\\p{L}[\\p{L}\\p{P}]+\\p{L}"))
参考http://mallet.cs.umass.edu/import.php中关于--token-regex的说明。