准备一台新的客户机,拷贝hive使用的hadoop及hadoop的配置,设置HADOOP_HOME。拷贝已配置好的hive文件夹到新的机器,设置HIVE_HOME,修改PATH增加hadoop和hive。确保$HIVE_HOME/conf/hive-site.xml配置中的hive.aux.jars.path的hive文件夹路径正确。
如果已经设置了HADOOP_HOME或者将hadoop添加到了PATH中,执行hive确依然说没有找到hadoop,不妨看一下$HIVE_HOME/conf/hive-env.sh中有没有已经设置了错误的HADOOP_HOME路径,删掉即可。
将hadoop目录下的jline改为高版本:
cp $HIVE_HOME/lib/jline-2.12.jar $HADOOP_HOME/share/hadoop/yarn/lib/
rm $HADOOP_HOME/share/hadoop/yarn/lib/jline-0.9.94.jar
执行hive,完成。
2016年2月17日星期三
Hadoop远程客户端配置
拷贝集群中的hadoop文件夹到新的客户机(主要是etc/hadoop/目录下的配置文件),vim ~/.bashrc,增加环境变量:
export HADOOP_HOME=your_hadoop_path
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=$HADOOP_HOME/share/hadoop/common/lib/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/hadoop/hdfs/lib/*:$HADOOP_HOME/share/hadoop/httpfs/tomcat/lib/*:$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_HOME/share/hadoop/mapreduce/lib/*:$HADOOP_HOME/share/hadoop/tools/lib/*:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/share/hadoop/yarn/lib/*
source~/.bashrc,完成。
如果发现还是找不到class(已经设置了HADOOP_CLASSPATH,但是执行hadoop class命令时候发现classpath设置并没有生效),直接将HADOOP_CLASSPATH写入到hadoop和hdfs的脚本中去。问题原因不明。
export HADOOP_HOME=your_hadoop_path
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=$HADOOP_HOME/share/hadoop/common/lib/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/hadoop/hdfs/lib/*:$HADOOP_HOME/share/hadoop/httpfs/tomcat/lib/*:$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_HOME/share/hadoop/mapreduce/lib/*:$HADOOP_HOME/share/hadoop/tools/lib/*:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/share/hadoop/yarn/lib/*
source~/.bashrc,完成。
如果发现还是找不到class(已经设置了HADOOP_CLASSPATH,但是执行hadoop class命令时候发现classpath设置并没有生效),直接将HADOOP_CLASSPATH写入到hadoop和hdfs的脚本中去。问题原因不明。
将HDFS中JSON格式日志以日期分区导入Hive
下面看一个实例,按天进行分区。
数据格式如下,需要解析其中的部分字段:
{"version_name":"3.2.0","phone_system_fingerprint":"Xiaomi/libra/libra:5.1.1/LMY47V/V7.1.6.0.LXKCNCK:user/release-keys","phone_device_id":"ac614a689cdbb1ecd2713a4dbc008682","phone_device_type":"Mi-4c","watch_device_id":"7664e1b6eba4abc542ad5db0ce892fb7","properties":{"address":"中国,北京市,北京市,海淀区,海淀北二街,8号,39.989566,116.316318"},"type":"watch","version":"12600","deviceid":"ac614a689cdbb1ecd2713a4dbc008682","timestamp":"1455512510381","sys_version":"5.1.1","address":"中国,河北省,唐山市,路北区,和平路,,39.630073,118.136363","net_state":"","event":"user_connect","user_id":"ac614a689cdbb1ecd2713a4dbc008682","ticwear_version":"tic_3.2.1","watch_device_type":"Ticwatch","sys_model":"Mi-4c","channel":"main"}
1、首先用pig解析JSON,脚本如下:
REGISTER elephant-bird-pig.jar;
REGISTER elephant-bird-core.jar;
REGISTER elephant-bird-hadoop-compat.jar;
REGISTER json-simple.jar;
%default WATCH_UPLOAD_LOG /path/*.json
raw_log = LOAD '$WATCH_UPLOAD_LOG'
USING com.twitter.elephantbird.pig.load.JsonLoader('-nestedLoad');
tic_log = FOREACH raw_log GENERATE
(chararray)$0#'version_name' AS version_name,
(chararray)$0#'phone_device_id' AS phone_device_id,
(chararray)$0#'watch_device_id' AS watch_device_id,
(chararray)$0#'phone_device_type' AS phone_device_type,
(chararray)$0#'properties'#'address' AS address,
(chararray)$0#'type' AS type,
(chararray)$0#'event' AS event,
(chararray)$0#'user_id' AS user_id,
(chararray)$0#'deviceid' AS device_id,
(chararray)$0#'ticwear_version' AS ticwear_version,
(chararray)$0#'watch_device_type' AS watch_device_type,
(chararray)$0#'sys_model' AS sys_model,
(chararray)$0#'timestamp' AS timestamp;
STORE tic_log INTO '/path/tic_log';
2、使用Hive导入数据:
create external table watch_upload_info_temp(version_name string, phone_device_id string, watch_device_id string, phone_device_type string, address string, type string, event string, user_id string, device_id string, ticwear_version string, watch_device_type string, sys_model string, `timestamp` bigint) row format delimited fields terminated by '\t' stored as textfile location '/path/tic_log';
create table watch_upload_info(version_name string, phone_device_id string, watch_device_id string, phone_device_type string, address string, type string, event string, user_id string, device_id string, ticwear_version string, watch_device_type string, sys_model string, `timestamp` bigint) partitioned by (time string);
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions = 10000;
set hive.exec.max.dynamic.partitions.pernode = 1000;
insert into table watch_upload_info partition(time) select *, from_unixtime(floor(`timestamp`/1000), "yyyy-MM-dd") time from watch_upload_info_temp where `timestamp` > 1417363200000; -- 这里1417363200000是2014-12-01,timestamp是以ms为单位。
数据格式如下,需要解析其中的部分字段:
{"version_name":"3.2.0","phone_system_fingerprint":"Xiaomi/libra/libra:5.1.1/LMY47V/V7.1.6.0.LXKCNCK:user/release-keys","phone_device_id":"ac614a689cdbb1ecd2713a4dbc008682","phone_device_type":"Mi-4c","watch_device_id":"7664e1b6eba4abc542ad5db0ce892fb7","properties":{"address":"中国,北京市,北京市,海淀区,海淀北二街,8号,39.989566,116.316318"},"type":"watch","version":"12600","deviceid":"ac614a689cdbb1ecd2713a4dbc008682","timestamp":"1455512510381","sys_version":"5.1.1","address":"中国,河北省,唐山市,路北区,和平路,,39.630073,118.136363","net_state":"","event":"user_connect","user_id":"ac614a689cdbb1ecd2713a4dbc008682","ticwear_version":"tic_3.2.1","watch_device_type":"Ticwatch","sys_model":"Mi-4c","channel":"main"}
1、首先用pig解析JSON,脚本如下:
REGISTER elephant-bird-pig.jar;
REGISTER elephant-bird-core.jar;
REGISTER elephant-bird-hadoop-compat.jar;
REGISTER json-simple.jar;
%default WATCH_UPLOAD_LOG /path/*.json
raw_log = LOAD '$WATCH_UPLOAD_LOG'
USING com.twitter.elephantbird.pig.load.JsonLoader('-nestedLoad');
tic_log = FOREACH raw_log GENERATE
(chararray)$0#'version_name' AS version_name,
(chararray)$0#'phone_device_id' AS phone_device_id,
(chararray)$0#'watch_device_id' AS watch_device_id,
(chararray)$0#'phone_device_type' AS phone_device_type,
(chararray)$0#'properties'#'address' AS address,
(chararray)$0#'type' AS type,
(chararray)$0#'event' AS event,
(chararray)$0#'user_id' AS user_id,
(chararray)$0#'deviceid' AS device_id,
(chararray)$0#'ticwear_version' AS ticwear_version,
(chararray)$0#'watch_device_type' AS watch_device_type,
(chararray)$0#'sys_model' AS sys_model,
(chararray)$0#'timestamp' AS timestamp;
STORE tic_log INTO '/path/tic_log';
2、使用Hive导入数据:
create external table watch_upload_info_temp(version_name string, phone_device_id string, watch_device_id string, phone_device_type string, address string, type string, event string, user_id string, device_id string, ticwear_version string, watch_device_type string, sys_model string, `timestamp` bigint) row format delimited fields terminated by '\t' stored as textfile location '/path/tic_log';
create table watch_upload_info(version_name string, phone_device_id string, watch_device_id string, phone_device_type string, address string, type string, event string, user_id string, device_id string, ticwear_version string, watch_device_type string, sys_model string, `timestamp` bigint) partitioned by (time string);
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions = 10000;
set hive.exec.max.dynamic.partitions.pernode = 1000;
insert into table watch_upload_info partition(time) select *, from_unixtime(floor(`timestamp`/1000), "yyyy-MM-dd") time from watch_upload_info_temp where `timestamp` > 1417363200000; -- 这里1417363200000是2014-12-01,timestamp是以ms为单位。
2016年2月16日星期二
Hive查看已存在的表是内部表或外部表
两种方法:
1) hive> desc formatted tbl_name
2) hive> desc extended tbl_name
Table Type是MANAGED_TABLE表示内部表,EXTERNAL_TABLE是外部表。
1) hive> desc formatted tbl_name
2) hive> desc extended tbl_name
Table Type是MANAGED_TABLE表示内部表,EXTERNAL_TABLE是外部表。
2016年2月15日星期一
pig中dump时无法连接server
pig默认的hdfs模式下dump数据到控制台出现以下提示信息:
org.apache.hadoop.ipc.Client - Retrying connect to server: host/ip:port.
解决:
Hadoop集群没有启动JobHistoryServer进程,执行命令:
mr-jobhistory-daemon.sh start historyserver
org.apache.hadoop.ipc.Client - Retrying connect to server: host/ip:port.
解决:
Hadoop集群没有启动JobHistoryServer进程,执行命令:
mr-jobhistory-daemon.sh start historyserver
2016年2月2日星期二
reStructuredText语法
reST语法比较多,相比MarkDown还是有点复杂的,本文只记录了一部分。详情参考:链接。
标题
for parts
#########
for chapters
*************
= for sections
- for subsections
^ for subsubsections
" for paragraphs
作为修饰的字符长度要大于等于文字长度
列表
# + - 都可以
无序列表:
# 列表1
# 子列表
# 列表2
有序列表:
#. 列表1
#. 列表2
1. 列表1
2. 列表2
样式
**粗体** *斜体* ``引用``
保留原来的行样式:
| These lines are
| broken exactly like in
| the source file.
代码
::
print(1)
print(2)
下标
H\ :sub:`2`\ O
上标
E = mc\ :sup:`2`
表格
+------------------------+------------+----------+----------+
| Header row, column 1 | Header 2 | Header 3 | Header 4 |
| (header rows optional) | | | |
+========================+============+==========+==========+
| body row 1, column 1 | column 2 | column 3 | column 4 |
+------------------------+------------+----------+----------+
| body row 2 | ... | ... | |
+------------------------+------------+----------+----------+
===== ===== =====
第1列 第2列 第3列
===== ===== =====
8 1 6
3 5 7
4 9 2
===== ===== =====
超链接
内嵌网址超链接:
`google <http://www.google.com.tw/>`_
可以把链接地址和目标定义分开:
google_
.. _google: http://www.google.com.tw/
注释
.. 注释
图片
.. image:: images/ball1.gif
:height: 200
:width: 300
标题
for parts
#########
for chapters
*************
= for sections
- for subsections
^ for subsubsections
" for paragraphs
作为修饰的字符长度要大于等于文字长度
列表
# + - 都可以
无序列表:
# 列表1
# 子列表
# 列表2
有序列表:
#. 列表1
#. 列表2
1. 列表1
2. 列表2
样式
**粗体** *斜体* ``引用``
保留原来的行样式:
| These lines are
| broken exactly like in
| the source file.
代码
::
print(1)
print(2)
下标
H\ :sub:`2`\ O
上标
E = mc\ :sup:`2`
表格
+------------------------+------------+----------+----------+
| Header row, column 1 | Header 2 | Header 3 | Header 4 |
| (header rows optional) | | | |
+========================+============+==========+==========+
| body row 1, column 1 | column 2 | column 3 | column 4 |
+------------------------+------------+----------+----------+
| body row 2 | ... | ... | |
+------------------------+------------+----------+----------+
===== ===== =====
第1列 第2列 第3列
===== ===== =====
8 1 6
3 5 7
4 9 2
===== ===== =====
超链接
内嵌网址超链接:
`google <http://www.google.com.tw/>`_
可以把链接地址和目标定义分开:
google_
.. _google: http://www.google.com.tw/
注释
.. 注释
图片
.. image:: images/ball1.gif
:height: 200
:width: 300
2016年2月1日星期一
Spark中数据分区
Spark程序可以通过控制RDD分区来减少通信开销,所有的键值对RDD都可以进行分区。只有当数据集多次在比如join这种基于键的操作中使用时,分区才会有帮助。
一个例子:
有一张固定的用户信息表,以RDD(UserID, UserInfo)的形式保存在UserInfo中,另外有一个用户行为表,以RDD(UserID, LinkInfo)形式保存在LinkInfo中,现在统计在LinkInfo中但不在UserInfo中的topic个数。
不够高效的实现:
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()
def processNewLogs(logFileName: String) {
val events = sc.sequenceFile[User, LinkInfo](logFileName)
val joined = userData.join(events) // RDD of (UserID, (UserInfo, LinkInfo)) pairs.
val offTopicVisits = joined.filter {
case (userId, (userInfo, linkInfo)) => !userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to non-subscribed topics: " + offTopicVisits)
}
以上代码中,连接操作需要将两个数据集中的所有键的哈希值都求出来,将该哈希值相同的记录传到同一台机器上,然后在那台机器上对所有键相同的记录进行连接操作。在每次调用时都对userData表进行哈希值计算和跨节点数据混洗,虽然userData的数据从来都不会变化。
使用数据分区实现:
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://..."
.partitionBy(new HashPartitioner(100)).persist()
在构建userData时候调用了partitionBy,Spark就知道了该RDD是根据键的哈希值来分区的,这样在调用join时,Spark就会利用到这一点。
当调用userData.join(events)时,Spark只会对events进行数据混洗操作,将events中特性UserID的记录发送到userData的对应分区所在的那台机器上。
注意,将分区之后的结果持久化是必要的,否则后面每次用到userData这个RDD时都会重新地对数据进行分区操作,partitionBy带来的好处就会被抵消,导致重复对数据进行分区以及跨节点的混洗。
可以从分区中获益的操作:
cogroup()、groupWith()、join()、leftOuterrJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()、lookup()
对于RDD的pair,map()的结果不会有固定的分区,因为传给map()的函数理论上可以改变元素的键。不过Spark提供了mapValues()和flatMapValues(),可以保证每个二元组的键保持不变。
使用mapValues的PageRank的例子:
val links = sc.objectFile[(String, Seq[String])]("links").partitionBy(new HashPartitioner(100)).persist()
var ranks = links.mapValues(v => 1.0) // 因为使用了mapValues,分区和links一样
for (i <- 0 until 10) {
val contributions = links.join(ranks).flatMap {
case (pageId, (links, rank)) =>
links.map(dest => (dest, rank / links.size))
}
ranks = contributions.reduceByKey(_ + _).mapValues(v => 0.15 + 0.85 * v)
}
ranks.saveAsTextFile("ranks")
为了最大化分区相关优化的潜在作用,应该在无需改变元素的键时尽量使用mapValues()或flatMapValues()。
一个例子:
有一张固定的用户信息表,以RDD(UserID, UserInfo)的形式保存在UserInfo中,另外有一个用户行为表,以RDD(UserID, LinkInfo)形式保存在LinkInfo中,现在统计在LinkInfo中但不在UserInfo中的topic个数。
不够高效的实现:
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()
def processNewLogs(logFileName: String) {
val events = sc.sequenceFile[User, LinkInfo](logFileName)
val joined = userData.join(events) // RDD of (UserID, (UserInfo, LinkInfo)) pairs.
val offTopicVisits = joined.filter {
case (userId, (userInfo, linkInfo)) => !userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to non-subscribed topics: " + offTopicVisits)
}
以上代码中,连接操作需要将两个数据集中的所有键的哈希值都求出来,将该哈希值相同的记录传到同一台机器上,然后在那台机器上对所有键相同的记录进行连接操作。在每次调用时都对userData表进行哈希值计算和跨节点数据混洗,虽然userData的数据从来都不会变化。
使用数据分区实现:
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://..."
.partitionBy(new HashPartitioner(100)).persist()
在构建userData时候调用了partitionBy,Spark就知道了该RDD是根据键的哈希值来分区的,这样在调用join时,Spark就会利用到这一点。
当调用userData.join(events)时,Spark只会对events进行数据混洗操作,将events中特性UserID的记录发送到userData的对应分区所在的那台机器上。
注意,将分区之后的结果持久化是必要的,否则后面每次用到userData这个RDD时都会重新地对数据进行分区操作,partitionBy带来的好处就会被抵消,导致重复对数据进行分区以及跨节点的混洗。
可以从分区中获益的操作:
cogroup()、groupWith()、join()、leftOuterrJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()、lookup()
对于RDD的pair,map()的结果不会有固定的分区,因为传给map()的函数理论上可以改变元素的键。不过Spark提供了mapValues()和flatMapValues(),可以保证每个二元组的键保持不变。
使用mapValues的PageRank的例子:
val links = sc.objectFile[(String, Seq[String])]("links").partitionBy(new HashPartitioner(100)).persist()
var ranks = links.mapValues(v => 1.0) // 因为使用了mapValues,分区和links一样
for (i <- 0 until 10) {
val contributions = links.join(ranks).flatMap {
case (pageId, (links, rank)) =>
links.map(dest => (dest, rank / links.size))
}
ranks = contributions.reduceByKey(_ + _).mapValues(v => 0.15 + 0.85 * v)
}
ranks.saveAsTextFile("ranks")
为了最大化分区相关优化的潜在作用,应该在无需改变元素的键时尽量使用mapValues()或flatMapValues()。
Spark中的累加器和广播变量
在使用map()或者filter()之类的函数时,可以使用在map外面定义的变量,但是无法改变变量的值。共享变量(累加器和广播变量)突破了这一个限制。
累加器
将文本按照空格分隔,顺便统计空行个数:
val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0) // 创建Accumulator[Int]并初始化为0
val res = file.flatMap( line => {
if (line == "") {
blankLines += 1
}
lines.split(" ")
})
res.collect() // 这里随便执行一个action,这样才会执行map中的运算
println(blankLines.value)
广播变量
在RDD的操作中如果每一次都使用同一个变量,驱动节点每次都会发送该变量到各个工作节点,如果变量很大,浪费了通信的开销,这种情况下可以使用广播变量,这个值只会被发送到各节点一次。
用法:
val broadcastVar = sc.broadcast(1)
sc.parallelize(1 to 10).map(_ + broadcastVar.value)
累加器
将文本按照空格分隔,顺便统计空行个数:
val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0) // 创建Accumulator[Int]并初始化为0
val res = file.flatMap( line => {
if (line == "") {
blankLines += 1
}
lines.split(" ")
})
res.collect() // 这里随便执行一个action,这样才会执行map中的运算
println(blankLines.value)
广播变量
在RDD的操作中如果每一次都使用同一个变量,驱动节点每次都会发送该变量到各个工作节点,如果变量很大,浪费了通信的开销,这种情况下可以使用广播变量,这个值只会被发送到各节点一次。
用法:
val broadcastVar = sc.broadcast(1)
sc.parallelize(1 to 10).map(_ + broadcastVar.value)
Spark的缓存
Spark在第一次调用动作并计算出RDD结果后,该动作的结果可以存储在集群的内存或者磁盘上,这样下一次需要调用依赖该RDD的动作时,就不需要从依赖关系中重新计算RDD,数据可以从缓存分区中直接返回。
例如,
cached.cache()
cached.count()
cached.take(10)
调用count时会导致第一次计算RDD,然后又需要计算take的动作,调用take时, 访问的是已经缓存好的元素,不需要再做计算。
例如,
cached.cache()
cached.count()
cached.take(10)
调用count时会导致第一次计算RDD,然后又需要计算take的动作,调用take时, 访问的是已经缓存好的元素,不需要再做计算。
订阅:
博文 (Atom)