2016年2月17日星期三

将HDFS中JSON格式日志以日期分区导入Hive

下面看一个实例,按天进行分区。

数据格式如下,需要解析其中的部分字段:
{"version_name":"3.2.0","phone_system_fingerprint":"Xiaomi/libra/libra:5.1.1/LMY47V/V7.1.6.0.LXKCNCK:user/release-keys","phone_device_id":"ac614a689cdbb1ecd2713a4dbc008682","phone_device_type":"Mi-4c","watch_device_id":"7664e1b6eba4abc542ad5db0ce892fb7","properties":{"address":"中国,北京市,北京市,海淀区,海淀北二街,8号,39.989566,116.316318"},"type":"watch","version":"12600","deviceid":"ac614a689cdbb1ecd2713a4dbc008682","timestamp":"1455512510381","sys_version":"5.1.1","address":"中国,河北省,唐山市,路北区,和平路,,39.630073,118.136363","net_state":"","event":"user_connect","user_id":"ac614a689cdbb1ecd2713a4dbc008682","ticwear_version":"tic_3.2.1","watch_device_type":"Ticwatch","sys_model":"Mi-4c","channel":"main"}

1、首先用pig解析JSON,脚本如下:
REGISTER elephant-bird-pig.jar;
REGISTER elephant-bird-core.jar;
REGISTER elephant-bird-hadoop-compat.jar;
REGISTER json-simple.jar;

%default WATCH_UPLOAD_LOG /path/*.json

raw_log = LOAD '$WATCH_UPLOAD_LOG'
                USING com.twitter.elephantbird.pig.load.JsonLoader('-nestedLoad');

tic_log = FOREACH raw_log GENERATE
                 (chararray)$0#'version_name' AS version_name,
                 (chararray)$0#'phone_device_id' AS phone_device_id,
                 (chararray)$0#'watch_device_id' AS watch_device_id,
                 (chararray)$0#'phone_device_type' AS phone_device_type,
                 (chararray)$0#'properties'#'address' AS address,
                 (chararray)$0#'type' AS type,
                 (chararray)$0#'event' AS event,
                 (chararray)$0#'user_id' AS user_id,
                 (chararray)$0#'deviceid' AS device_id,
                 (chararray)$0#'ticwear_version' AS ticwear_version,
                 (chararray)$0#'watch_device_type' AS watch_device_type,
                 (chararray)$0#'sys_model' AS sys_model,
                 (chararray)$0#'timestamp' AS timestamp;

STORE tic_log INTO '/path/tic_log';

2、使用Hive导入数据:
create external table watch_upload_info_temp(version_name string, phone_device_id string, watch_device_id string, phone_device_type string, address string, type string, event string, user_id string, device_id string, ticwear_version string, watch_device_type string, sys_model string, `timestamp` bigint) row format delimited fields terminated by '\t' stored as textfile location '/path/tic_log';

create table watch_upload_info(version_name string, phone_device_id string, watch_device_id string, phone_device_type string, address string, type string, event string, user_id string, device_id string, ticwear_version string, watch_device_type string, sys_model string, `timestamp` bigint) partitioned by (time string);

set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions = 10000;
set hive.exec.max.dynamic.partitions.pernode = 1000;

insert into table watch_upload_info partition(time) select *, from_unixtime(floor(`timestamp`/1000), "yyyy-MM-dd") time from watch_upload_info_temp where `timestamp` > 1417363200000;  -- 这里1417363200000是2014-12-01,timestamp是以ms为单位。

没有评论:

发表评论