2016年2月17日星期三

Hive远程客户端配置

准备一台新的客户机,拷贝hive使用的hadoop及hadoop的配置,设置HADOOP_HOME。拷贝已配置好的hive文件夹到新的机器,设置HIVE_HOME,修改PATH增加hadoop和hive。确保$HIVE_HOME/conf/hive-site.xml配置中的hive.aux.jars.path的hive文件夹路径正确。
如果已经设置了HADOOP_HOME或者将hadoop添加到了PATH中,执行hive确依然说没有找到hadoop,不妨看一下$HIVE_HOME/conf/hive-env.sh中有没有已经设置了错误的HADOOP_HOME路径,删掉即可。

将hadoop目录下的jline改为高版本:
cp $HIVE_HOME/lib/jline-2.12.jar $HADOOP_HOME/share/hadoop/yarn/lib/
rm $HADOOP_HOME/share/hadoop/yarn/lib/jline-0.9.94.jar

执行hive,完成。

Hadoop远程客户端配置

拷贝集群中的hadoop文件夹到新的客户机(主要是etc/hadoop/目录下的配置文件),vim ~/.bashrc,增加环境变量:

export HADOOP_HOME=your_hadoop_path
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=$HADOOP_HOME/share/hadoop/common/lib/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/hadoop/hdfs/lib/*:$HADOOP_HOME/share/hadoop/httpfs/tomcat/lib/*:$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_HOME/share/hadoop/mapreduce/lib/*:$HADOOP_HOME/share/hadoop/tools/lib/*:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/share/hadoop/yarn/lib/*

source~/.bashrc,完成。

如果发现还是找不到class(已经设置了HADOOP_CLASSPATH,但是执行hadoop class命令时候发现classpath设置并没有生效),直接将HADOOP_CLASSPATH写入到hadoop和hdfs的脚本中去。问题原因不明。

将HDFS中JSON格式日志以日期分区导入Hive

下面看一个实例,按天进行分区。

数据格式如下,需要解析其中的部分字段:
{"version_name":"3.2.0","phone_system_fingerprint":"Xiaomi/libra/libra:5.1.1/LMY47V/V7.1.6.0.LXKCNCK:user/release-keys","phone_device_id":"ac614a689cdbb1ecd2713a4dbc008682","phone_device_type":"Mi-4c","watch_device_id":"7664e1b6eba4abc542ad5db0ce892fb7","properties":{"address":"中国,北京市,北京市,海淀区,海淀北二街,8号,39.989566,116.316318"},"type":"watch","version":"12600","deviceid":"ac614a689cdbb1ecd2713a4dbc008682","timestamp":"1455512510381","sys_version":"5.1.1","address":"中国,河北省,唐山市,路北区,和平路,,39.630073,118.136363","net_state":"","event":"user_connect","user_id":"ac614a689cdbb1ecd2713a4dbc008682","ticwear_version":"tic_3.2.1","watch_device_type":"Ticwatch","sys_model":"Mi-4c","channel":"main"}

1、首先用pig解析JSON,脚本如下:
REGISTER elephant-bird-pig.jar;
REGISTER elephant-bird-core.jar;
REGISTER elephant-bird-hadoop-compat.jar;
REGISTER json-simple.jar;

%default WATCH_UPLOAD_LOG /path/*.json

raw_log = LOAD '$WATCH_UPLOAD_LOG'
                USING com.twitter.elephantbird.pig.load.JsonLoader('-nestedLoad');

tic_log = FOREACH raw_log GENERATE
                 (chararray)$0#'version_name' AS version_name,
                 (chararray)$0#'phone_device_id' AS phone_device_id,
                 (chararray)$0#'watch_device_id' AS watch_device_id,
                 (chararray)$0#'phone_device_type' AS phone_device_type,
                 (chararray)$0#'properties'#'address' AS address,
                 (chararray)$0#'type' AS type,
                 (chararray)$0#'event' AS event,
                 (chararray)$0#'user_id' AS user_id,
                 (chararray)$0#'deviceid' AS device_id,
                 (chararray)$0#'ticwear_version' AS ticwear_version,
                 (chararray)$0#'watch_device_type' AS watch_device_type,
                 (chararray)$0#'sys_model' AS sys_model,
                 (chararray)$0#'timestamp' AS timestamp;

STORE tic_log INTO '/path/tic_log';

2、使用Hive导入数据:
create external table watch_upload_info_temp(version_name string, phone_device_id string, watch_device_id string, phone_device_type string, address string, type string, event string, user_id string, device_id string, ticwear_version string, watch_device_type string, sys_model string, `timestamp` bigint) row format delimited fields terminated by '\t' stored as textfile location '/path/tic_log';

create table watch_upload_info(version_name string, phone_device_id string, watch_device_id string, phone_device_type string, address string, type string, event string, user_id string, device_id string, ticwear_version string, watch_device_type string, sys_model string, `timestamp` bigint) partitioned by (time string);

set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions = 10000;
set hive.exec.max.dynamic.partitions.pernode = 1000;

insert into table watch_upload_info partition(time) select *, from_unixtime(floor(`timestamp`/1000), "yyyy-MM-dd") time from watch_upload_info_temp where `timestamp` > 1417363200000;  -- 这里1417363200000是2014-12-01,timestamp是以ms为单位。

Pig

Please see this doc.

2016年2月16日星期二

Hive查看已存在的表是内部表或外部表

两种方法:
1) hive> desc formatted tbl_name
2) hive> desc extended tbl_name

Table Type是MANAGED_TABLE表示内部表,EXTERNAL_TABLE是外部表。

2016年2月15日星期一

pig中dump时无法连接server

pig默认的hdfs模式下dump数据到控制台出现以下提示信息:
org.apache.hadoop.ipc.Client - Retrying connect to server: host/ip:port.

解决:
Hadoop集群没有启动JobHistoryServer进程,执行命令:
mr-jobhistory-daemon.sh start historyserver

2016年2月2日星期二

reStructuredText语法

reST语法比较多,相比MarkDown还是有点复杂的,本文只记录了一部分。详情参考:链接

标题

for parts
#########

for chapters
*************

= for sections
- for subsections
^ for subsubsections
" for paragraphs
作为修饰的字符长度要大于等于文字长度

列表
# + - 都可以

无序列表:
# 列表1
    # 子列表
# 列表2

有序列表:
#. 列表1
#. 列表2

1. 列表1
2. 列表2

样式

**粗体**  *斜体*  ``引用``

保留原来的行样式:
| These lines are
| broken exactly like in
| the source file.

代码
::

    print(1)
    print(2)

下标
H\ :sub:`2`\ O

上标
E = mc\ :sup:`2`

表格

+------------------------+------------+----------+----------+
| Header row, column 1   | Header 2   | Header 3 | Header 4 |
| (header rows optional) |            |          |          |
+========================+============+==========+==========+
| body row 1, column 1   | column 2   | column 3 | column 4 |
+------------------------+------------+----------+----------+
| body row 2             | ...        | ...      |          |
+------------------------+------------+----------+----------+

=====  =====  =====
第1列  第2列  第3列
=====  =====  =====
8      1      6
3      5      7
4      9      2
=====  =====  =====

超链接
内嵌网址超链接:
`google <http://www.google.com.tw/>`_

可以把链接地址和目标定义分开:
google_

.. _google: http://www.google.com.tw/

注释
.. 注释

图片
.. image:: images/ball1.gif
    :height: 200
    :width: 300

2016年2月1日星期一

Spark中数据分区

Spark程序可以通过控制RDD分区来减少通信开销,所有的键值对RDD都可以进行分区。只有当数据集多次在比如join这种基于键的操作中使用时,分区才会有帮助。

一个例子:
有一张固定的用户信息表,以RDD(UserID, UserInfo)的形式保存在UserInfo中,另外有一个用户行为表,以RDD(UserID, LinkInfo)形式保存在LinkInfo中,现在统计在LinkInfo中但不在UserInfo中的topic个数。

不够高效的实现:
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()

def processNewLogs(logFileName: String) {
  val events = sc.sequenceFile[User, LinkInfo](logFileName)
  val joined = userData.join(events)  // RDD of (UserID, (UserInfo, LinkInfo)) pairs.
  val offTopicVisits = joined.filter {
    case (userId, (userInfo, linkInfo)) => !userInfo.topics.contains(linkInfo.topic)
  }.count()
  println("Number of visits to non-subscribed topics: " + offTopicVisits)
}
以上代码中,连接操作需要将两个数据集中的所有键的哈希值都求出来,将该哈希值相同的记录传到同一台机器上,然后在那台机器上对所有键相同的记录进行连接操作。在每次调用时都对userData表进行哈希值计算和跨节点数据混洗,虽然userData的数据从来都不会变化。

使用数据分区实现:
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://..."
                         .partitionBy(new HashPartitioner(100)).persist()
在构建userData时候调用了partitionBy,Spark就知道了该RDD是根据键的哈希值来分区的,这样在调用join时,Spark就会利用到这一点。
当调用userData.join(events)时,Spark只会对events进行数据混洗操作,将events中特性UserID的记录发送到userData的对应分区所在的那台机器上。
注意,将分区之后的结果持久化是必要的,否则后面每次用到userData这个RDD时都会重新地对数据进行分区操作,partitionBy带来的好处就会被抵消,导致重复对数据进行分区以及跨节点的混洗。

可以从分区中获益的操作:
cogroup()、groupWith()、join()、leftOuterrJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()、lookup()

对于RDD的pair,map()的结果不会有固定的分区,因为传给map()的函数理论上可以改变元素的键。不过Spark提供了mapValues()和flatMapValues(),可以保证每个二元组的键保持不变。

使用mapValues的PageRank的例子:
val links = sc.objectFile[(String, Seq[String])]("links").partitionBy(new HashPartitioner(100)).persist()
var ranks = links.mapValues(v => 1.0)  // 因为使用了mapValues,分区和links一样
for (i <- 0 until 10) {
  val contributions = links.join(ranks).flatMap {
    case (pageId, (links, rank)) =>
      links.map(dest => (dest, rank / links.size))
  }
  ranks = contributions.reduceByKey(_ + _).mapValues(v => 0.15 + 0.85 * v)
}
ranks.saveAsTextFile("ranks")

为了最大化分区相关优化的潜在作用,应该在无需改变元素的键时尽量使用mapValues()或flatMapValues()。

Spark中的累加器和广播变量

在使用map()或者filter()之类的函数时,可以使用在map外面定义的变量,但是无法改变变量的值。共享变量(累加器和广播变量)突破了这一个限制。

累加器
将文本按照空格分隔,顺便统计空行个数:
val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0)  // 创建Accumulator[Int]并初始化为0
val res = file.flatMap( line => {
    if (line == "") {
        blankLines += 1
    }
    lines.split(" ")
})

res.collect()  // 这里随便执行一个action,这样才会执行map中的运算
println(blankLines.value)

广播变量
在RDD的操作中如果每一次都使用同一个变量,驱动节点每次都会发送该变量到各个工作节点,如果变量很大,浪费了通信的开销,这种情况下可以使用广播变量,这个值只会被发送到各节点一次。
用法:
val broadcastVar = sc.broadcast(1)
sc.parallelize(1 to 10).map(_ + broadcastVar.value)

Spark的缓存

Spark在第一次调用动作并计算出RDD结果后,该动作的结果可以存储在集群的内存或者磁盘上,这样下一次需要调用依赖该RDD的动作时,就不需要从依赖关系中重新计算RDD,数据可以从缓存分区中直接返回。

例如,
cached.cache()
cached.count()
cached.take(10)
调用count时会导致第一次计算RDD,然后又需要计算take的动作,调用take时, 访问的是已经缓存好的元素,不需要再做计算。