大家好,又见面了,我是你们的朋友全栈君。
一、dwd 层介绍
1、对用户行为数据解析。
2、对核心数据进行判空过滤。
3、对业务数据采用维度模型重新建模,即维度退化。
二、dwd 层用户行为数据
2.1 用户行为启动表 dwd_start_log
1、数据来源
ods_start_log -> dwd_start_log
2、表的创建
drop table if exists dwd_start_log;
CREATE EXTERNAL TABLE dwd_start_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`entry` string,
`open_ad_type` string,
`action` string,
`loading_time` string,
`detail` string,
`extend1` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_start_log/'
TBLPROPERTIES('parquet.compression'='lzo');
数据由 parquet 存储,再由 lzo 压缩,数据采用 parquet 存储方式,是可以支持切片的,不需要再对数据创建索引。parquet 存储不仅压缩效率高,而且查询速度也快。
3、加载数据
insert overwrite table dwd_start_log
PARTITION (dt='2020-03-10')
select
get_json_object(line,'$.mid') mid_id,
get_json_object(line,'$.uid') user_id,
get_json_object(line,'$.vc') version_code,
get_json_object(line,'$.vn') version_name,
get_json_object(line,'$.l') lang,
get_json_object(line,'$.sr') source,
get_json_object(line,'$.os') os,
get_json_object(line,'$.ar') area,
get_json_object(line,'$.md') model,
get_json_object(line,'$.ba') brand,
get_json_object(line,'$.sv') sdk_version,
get_json_object(line,'$.g') gmail,
get_json_object(line,'$.hw') height_width,
get_json_object(line,'$.t') app_time,
get_json_object(line,'$.nw') network,
get_json_object(line,'$.ln') lng,
get_json_object(line,'$.la') lat,
get_json_object(line,'$.entry') entry,
get_json_object(line,'$.open_ad_type') open_ad_type,
get_json_object(line,'$.action') action,
get_json_object(line,'$.loading_time') loading_time,
get_json_object(line,'$.detail') detail,
get_json_object(line,'$.extend1') extend1
from ods_start_log where dt='2020-03-10';
4、dwd 层用户行为启动表脚本 ods_to_dwd_log.sh
#!/bin/bash
# 定义变量方便修改
APP=gmall
hive=/opt/module/hive/bin/hive
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql=" insert overwrite table "$APP".dwd_start_log PARTITION (dt='$do_date') select get_json_object(line,'$.mid') mid_id, get_json_object(line,'$.uid') user_id, get_json_object(line,'$.vc') version_code, get_json_object(line,'$.vn') version_name, get_json_object(line,'$.l') lang, get_json_object(line,'$.sr') source, get_json_object(line,'$.os') os, get_json_object(line,'$.ar') area, get_json_object(line,'$.md') model, get_json_object(line,'$.ba') brand, get_json_object(line,'$.sv') sdk_version, get_json_object(line,'$.g') gmail, get_json_object(line,'$.hw') height_width, get_json_object(line,'$.t') app_time, get_json_object(line,'$.nw') network, get_json_object(line,'$.ln') lng, get_json_object(line,'$.la') lat, get_json_object(line,'$.entry') entry, get_json_object(line,'$.open_ad_type') open_ad_type, get_json_object(line,'$.action') action, get_json_object(line,'$.loading_time') loading_time, get_json_object(line,'$.detail') detail, get_json_object(line,'$.extend1') extend1 from "$APP".ods_start_log where dt='$do_date';"
$hive -e "$sql"
2.2 用户行为事件表数据
1、数据来源及数据拆分
2、创建基础明细表 dwd_base_event_log
明细表用于存储 ODS 层原始表转换过来的明细数据。
(1) 建表
drop table if exists dwd_base_event_log;
CREATE EXTERNAL TABLE dwd_base_event_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`event_name` string,
`event_json` string,
`server_time` string
)
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_base_event_log/'
TBLPROPERTIES('parquet.compression'='lzo');
说明:其中 event_name 和 event_json 用来对应事件名和整个事件。这个地方将原始日志 1 对多的形式拆分出来了。操作的时候我们需要将原始日志展平,需要用到 UDF 和 UDTF。
(2) 自定义 udf(base_analize)
udf 函数特点:一行进一行出。
A、思路
B、代码
package com.atguigu.udf;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.json.JSONException;
import org.json.JSONObject;
/** * @description: 自定义 UDF 用于解析公共字段 * @author: hyr * @time: 2020/4/21 14:00 */
public class BaseFieldUDF extends UDF {
public String evaluate(String line, String key) throws JSONException {
String[] log = line.split("\\|");
if (log.length != 2 || StringUtils.isBlank(log[1])){
return "";
}
JSONObject basejson = new JSONObject(log[1].trim());
String result = "";
// 获取服务器时间
if ("st".equals(key)){
result = log[0].trim();
}else if ("et".equals(key)){
// 获取事件数组
if (basejson.has("et")){
result = basejson.getString("et");
}
}else {
JSONObject cm = basejson.getJSONObject("cm");
// 获取 key 对应的公共字段的 value
if (cm.has(key)){
result = cm.getString(key);
}
}
return result;
}
/** * 用于数据测试 */
public static void main(String[] args) throws JSONException {
String line = "1583769785914|{\"cm\":{\"ln\":\"-79.5\",\"sv\":\"V2.3.1\",\"os\":\"8.2.5\",\"g\":\"1MF7Y4W4@gmail.com\",\"mid\":\"0\",\"nw\":\"WIFI\",\"l\":\"en\",\"vc\":\"14\",\"hw\":\"640*1136\",\"ar\":\"MX\",\"uid\":\"0\",\"t\":\"1583695967769\",\"la\":\"-45.9\",\"md\":\"HTC-13\",\"vn\":\"1.1.5\",\"ba\":\"HTC\",\"sr\":\"K\"},\"ap\":\"app\",\"et\":[{\"ett\":\"1583700386001\",\"en\":\"newsdetail\",\"kv\":{\"entry\":\"1\",\"goodsid\":\"0\",\"news_staytime\":\"12\",\"loading_time\":\"15\",\"action\":\"2\",\"showtype\":\"1\",\"category\":\"16\",\"type1\":\"102\"}},{\"ett\":\"1583706290595\",\"en\":\"notification\",\"kv\":{\"ap_time\":\"1583702836945\",\"action\":\"3\",\"type\":\"4\",\"content\":\"\"}},{\"ett\":\"1583681747595\",\"en\":\"active_foreground\",\"kv\":{\"access\":\"1\",\"push_id\":\"3\"}},{\"ett\":\"1583725227310\",\"en\":\"active_background\",\"kv\":{\"active_source\":\"1\"}},{\"ett\":\"1583743888737\",\"en\":\"comment\",\"kv\":{\"p_comment_id\":0,\"addtime\":\"1583697745229\",\"praise_count\":901,\"other_id\":5,\"comment_id\":2,\"reply_count\":163,\"userid\":9,\"content\":\"诸帛咕死添共项饶伞锯产荔讯胆遇卖吱载舟沮稀蓟\"}}]}";
String mid = new BaseFieldUDF().evaluate(line, "et");
System.out.println(mid);
}
}
(3) 自定义 udtf(flat_analizer)
udtf 函数特点:一行进多行出。
A、思路
B、代码
package com.atguigu.udtf;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONException;
import java.util.ArrayList;
/** * @description: 自定义 udtf 用于展开业务字段 * @author: hyr * @time: 2020/4/21 14:44 */
public class EventJsonUDTF extends GenericUDTF {
// 该方法中,我们将指定输出参数的名称和参数类型
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException{
ArrayList<String> fieldNames = new ArrayList<>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<>();
fieldNames.add("event_name");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("event_json");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
// 输入 1 条记录,输出若干条记录
@Override
public void process(Object[] objects) throws HiveException {
// 获取传入的 et
String input = objects[0].toString();
// 如果传入的数据为空,直接返回过滤掉该数据
if (StringUtils.isBlank(input)){
return;
}else {
// 获取一共有几个事件
try {
JSONArray ja = new JSONArray(input);
if (ja == null){
return;
}
// 循环遍历每一个事件
for (int i = 0; i < ja.length(); i++) {
String[] result = new String[2];
try {
// 取出每个事件名称
result[0] = ja.getJSONObject(i).getString("en");
// 取出每一个事件整体
result[1] = ja.getString(i);
} catch (JSONException e) {
continue;
}
// 将结果返回
forward(result);
}
} catch (JSONException e) {
e.printStackTrace();
}
}
}
// 当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出
@Override
public void close() throws HiveException {
}
}
(4) 解析事件日志基础明细表
insert overwrite table dwd_base_event_log partition(dt='2020-03-10')
select
base_analizer(line,'mid') as mid_id,
base_analizer(line,'uid') as user_id,
base_analizer(line,'vc') as version_code,
base_analizer(line,'vn') as version_name,
base_analizer(line,'l') as lang,
base_analizer(line,'sr') as source,
base_analizer(line,'os') as os,
base_analizer(line,'ar') as area,
base_analizer(line,'md') as model,
base_analizer(line,'ba') as brand,
base_analizer(line,'sv') as sdk_version,
base_analizer(line,'g') as gmail,
base_analizer(line,'hw') as height_width,
base_analizer(line,'t') as app_time,
base_analizer(line,'nw') as network,
base_analizer(line,'ln') as lng,
base_analizer(line,'la') as lat,
event_name,
event_json,
base_analizer(line,'st') as server_time
from ods_event_log lateral view flat_analizer(base_analizer(line,'et')) tmp_flat as event_name,event_json
where dt='2020-03-10' and base_analizer(line,'et')<>'';
(5) 事件日志解析脚本 ods_to_dwd_base_log.sh
#!/bin/bash
# 定义变量方便修改
APP=gmall
hive=/opt/module/hive/bin/hive
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql=
" INSERT overwrite table "$APP".dwd_base_event_log partition(dt='$do_date') select base_analizer(line,'mid') as mid_id, base_analizer(line,'uid') as user_id, base_analizer(line,'vc') as version_code, base_analizer(line,'vn') as version_name, base_analizer(line,'l') as lang, base_analizer(line,'sr') as source, base_analizer(line,'os') as os, base_analizer(line,'ar') as area, base_analizer(line,'md') as model, base_analizer(line,'ba') as brand, base_analizer(line,'sv') as sdk_version, base_analizer(line,'g') as gmail, base_analizer(line,'hw') as height_width, base_analizer(line,'t') as app_time, base_analizer(line,'nw') as network, base_analizer(line,'ln') as lng, base_analizer(line,'la') as lat, event_name, event_json, base_analizer(line,'st') as server_time from "$APP".ods_event_log lateral view flat_analizer(base_analizer(line, 'et')) tmp_flat as event_name,event_json where dt='$do_date' and base_analizer(line,'et')<>''; "
$hive -e "$sql"
3、商品点击表 dwd_display_log
(1) 建表
drop table if exists dwd_display_log;
CREATE EXTERNAL TABLE dwd_display_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`goodsid` string,
`place` string,
`extend1` string,
`category` string,
`server_time` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_display_log/'
TBLPROPERTIES('parquet.compression'='lzo');
(2) 加载数据
insert overwrite table dwd_display_log PARTITION (dt='2020-03-10')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area, model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.goodsid') goodsid,
get_json_object(event_json,'$.kv.place') place,
get_json_object(event_json,'$.kv.extend1') extend1,
get_json_object(event_json,'$.kv.category') category,
server_time
from dwd_base_event_log
where dt='2020-03-10' and event_name='display';
4、商品详情页表 dwd_newsdetail_log
(1) 建表
drop table if exists dwd_newsdetail_log;
CREATE EXTERNAL TABLE dwd_newsdetail_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`entry` string,
`action` string,
`goodsid` string,
`showtype` string,
`news_staytime` string,
`loading_time` string,
`type1` string,
`category` string,
`server_time` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_newsdetail_log/'
TBLPROPERTIES('parquet.compression'='lzo');
(2) 加载数据
insert overwrite table dwd_newsdetail_log PARTITION (dt='2020-03-10')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.entry') entry,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.goodsid') goodsid,
get_json_object(event_json,'$.kv.showtype') showtype,
get_json_object(event_json,'$.kv.news_staytime') news_staytime,
get_json_object(event_json,'$.kv.loading_time') loading_time,
get_json_object(event_json,'$.kv.type1') type1,
get_json_object(event_json,'$.kv.category') category,
server_time
from dwd_base_event_log
where dt='2020-03-10' and event_name='newsdetail';
5、商品列表页表 dwd_loading_log
(1) 建表
drop table if exists dwd_loading_log;
CREATE EXTERNAL TABLE dwd_loading_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`loading_time` string,
`loading_way` string,
`extend1` string,
`extend2` string,
`type` string,
`type1` string,
`server_time` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_loading_log/'
TBLPROPERTIES('parquet.compression'='lzo');
(2) 加载数据
insert overwrite table dwd_loading_log PARTITION (dt='2020-03-10')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.loading_time') loading_time,
get_json_object(event_json,'$.kv.loading_way') loading_way,
get_json_object(event_json,'$.kv.extend1') extend1,
get_json_object(event_json,'$.kv.extend2') extend2,
get_json_object(event_json,'$.kv.type') type,
get_json_object(event_json,'$.kv.type1') type1,
server_time
from dwd_base_event_log
where dt='2020-03-10' and event_name='loading';
6、广告表 dwd_ad_log
(1) 建表
drop table if exists dwd_ad_log;
CREATE EXTERNAL TABLE dwd_ad_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`entry` string,
`action` string,
`contentType` string,
`displayMills` string,
`itemId` string,
`activityId` string,
`server_time` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_ad_log/'
TBLPROPERTIES('parquet.compression'='lzo');
(2) 加载数据
insert overwrite table dwd_ad_log PARTITION (dt='2020-03-10')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.entry') entry,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.contentType') contentType,
get_json_object(event_json,'$.kv.displayMills') displayMills,
get_json_object(event_json,'$.kv.itemId') itemId,
get_json_object(event_json,'$.kv.activityId') activityId,
server_time
from dwd_base_event_log
where dt='2020-03-10' and event_name='ad';
7、消息通知表 dwd_notification_log
(1) 建表
drop table if exists dwd_notification_log;
CREATE EXTERNAL TABLE dwd_notification_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`noti_type` string,
`ap_time` string,
`content` string,
`server_time` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_notification_log/'
TBLPROPERTIES('parquet.compression'='lzo');
(2) 加载数据
insert overwrite table dwd_notification_log PARTITION (dt='2020-03-10')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.action') action,
get_json_object(event_json,'$.kv.noti_type') noti_type,
get_json_object(event_json,'$.kv.ap_time') ap_time,
get_json_object(event_json,'$.kv.content') content,
server_time
from dwd_base_event_log
where dt='2020-03-10' and event_name='notification';
8、用户后台活跃表 dwd_active_background
(1) 建表
drop table if exists dwd_active_background_log;
CREATE EXTERNAL TABLE dwd_active_background_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`active_source` string,
`server_time` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_background_log/'
TBLPROPERTIES('parquet.compression'='lzo');
(2) 加载数据
insert overwrite table dwd_active_background_log PARTITION (dt='2020-03-10')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.active_source') active_source,
server_time
from
dwd_base_event_log
where
dt='2020-03-10' and event_name='active_background';
9、评论表 dwd_comment_log
(1) 建表
drop table if exists dwd_comment_log;
CREATE EXTERNAL TABLE dwd_comment_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`comment_id` int,
`userid` int,
`p_comment_id` int,
`content` string,
`addtime` string,
`other_id` int,
`praise_count` int,
`reply_count` int,
`server_time` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_comment_log/'
TBLPROPERTIES('parquet.compression'='lzo');
(2) 加载数据
insert overwrite table dwd_comment_log PARTITION (dt='2020-03-10')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.comment_id') comment_id,
get_json_object(event_json,'$.kv.userid') userid,
get_json_object(event_json,'$.kv.p_comment_id') p_comment_id,
get_json_object(event_json,'$.kv.content') content,
get_json_object(event_json,'$.kv.addtime') addtime,
get_json_object(event_json,'$.kv.other_id') other_id,
get_json_object(event_json,'$.kv.praise_count') praise_count,
get_json_object(event_json,'$.kv.reply_count') reply_count,
server_time
from
dwd_base_event_log
where dt='2020-03-10' and event_name='comment';
10、收藏表 dwd_favorites_log
(1) 建表
drop table if exists dwd_favorites_log;
CREATE EXTERNAL TABLE dwd_favorites_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`id` int,
`course_id` int,
`userid` int,
`add_time` string,
`server_time` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_favorites_log/'
TBLPROPERTIES('parquet.compression'='lzo');
(2) 加载数据
insert overwrite table dwd_favorites_log PARTITION (dt='2020-03-10')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.id') id,
get_json_object(event_json,'$.kv.course_id') course_id,
get_json_object(event_json,'$.kv.userid') userid,
get_json_object(event_json,'$.kv.add_time') add_time,
server_time
from
dwd_base_event_log
where
dt='2020-03-10' and event_name='favorites';
11、点赞表 dwd_praise_log
(1) 建表
drop table if exists dwd_praise_log;
CREATE EXTERNAL TABLE dwd_praise_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`id` string,
`userid` string,
`target_id` string,
`type` string,
`add_time` string,
`server_time` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_praise_log/'
TBLPROPERTIES('parquet.compression'='lzo');
(2) 加载数据
insert overwrite table dwd_praise_log PARTITION (dt='2020-03-10')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.id') id,
get_json_object(event_json,'$.kv.userid') userid,
get_json_object(event_json,'$.kv.target_id') target_id,
get_json_object(event_json,'$.kv.type') type,
get_json_object(event_json,'$.kv.add_time') add_time,
server_time
from
dwd_base_event_log
where
dt='2020-03-10' and event_name='praise';
12、错误日志表 dwd_error_log
(1) 建表
drop table if exists dwd_error_log;
CREATE EXTERNAL TABLE dwd_error_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`errorBrief` string,
`errorDetail` string,
`server_time` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_error_log/'
TBLPROPERTIES('parquet.compression'='lzo');
(2) 加载数据
insert overwrite table dwd_error_log PARTITION (dt='2020-03-10')
select
mid_id,
user_id,
version_code,
version_name,
lang,
source,
os,
area,
model,
brand,
sdk_version,
gmail,
height_width,
app_time,
network,
lng,
lat,
get_json_object(event_json,'$.kv.errorBrief') errorBrief,
get_json_object(event_json,'$.kv.errorDetail') errorDetail,
server_time
from
dwd_base_event_log
where
dt='2020-03-10' and event_name='error';
13、dwd 层事件表加载数据脚本 ods_to_dwd_event_log.sh
#!/bin/bash
# 定义变量方便修改
APP=gmall
hive=/opt/module/hive/bin/hive
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql=" insert overwrite table "$APP".dwd_display_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.goodsid') goodsid, get_json_object(event_json,'$.kv.place') place, get_json_object(event_json,'$.kv.extend1') extend1, get_json_object(event_json,'$.kv.category') category, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='display'; insert overwrite table "$APP".dwd_newsdetail_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.entry') entry, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.goodsid') goodsid, get_json_object(event_json,'$.kv.showtype') showtype, get_json_object(event_json,'$.kv.news_staytime') news_staytime, get_json_object(event_json,'$.kv.loading_time') loading_time, get_json_object(event_json,'$.kv.type1') type1, get_json_object(event_json,'$.kv.category') category, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='newsdetail'; insert overwrite table "$APP".dwd_loading_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.loading_time') loading_time, get_json_object(event_json,'$.kv.loading_way') loading_way, get_json_object(event_json,'$.kv.extend1') extend1, get_json_object(event_json,'$.kv.extend2') extend2, get_json_object(event_json,'$.kv.type') type, get_json_object(event_json,'$.kv.type1') type1, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='loading'; insert overwrite table "$APP".dwd_ad_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.entry') entry, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.contentType') contentType, get_json_object(event_json,'$.kv.displayMills') displayMills, get_json_object(event_json,'$.kv.itemId') itemId, get_json_object(event_json,'$.kv.activityId') activityId, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='ad'; insert overwrite table "$APP".dwd_notification_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.noti_type') noti_type, get_json_object(event_json,'$.kv.ap_time') ap_time, get_json_object(event_json,'$.kv.content') content, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='notification'; insert overwrite table "$APP".dwd_active_background_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.active_source') active_source, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='active_background'; insert overwrite table "$APP".dwd_comment_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.comment_id') comment_id, get_json_object(event_json,'$.kv.userid') userid, get_json_object(event_json,'$.kv.p_comment_id') p_comment_id, get_json_object(event_json,'$.kv.content') content, get_json_object(event_json,'$.kv.addtime') addtime, get_json_object(event_json,'$.kv.other_id') other_id, get_json_object(event_json,'$.kv.praise_count') praise_count, get_json_object(event_json,'$.kv.reply_count') reply_count, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='comment'; insert overwrite table "$APP".dwd_favorites_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.id') id, get_json_object(event_json,'$.kv.course_id') course_id, get_json_object(event_json,'$.kv.userid') userid, get_json_object(event_json,'$.kv.add_time') add_time, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='favorites'; insert overwrite table "$APP".dwd_praise_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.id') id, get_json_object(event_json,'$.kv.userid') userid, get_json_object(event_json,'$.kv.target_id') target_id, get_json_object(event_json,'$.kv.type') type, get_json_object(event_json,'$.kv.add_time') add_time, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='praise'; insert overwrite table "$APP".dwd_error_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.errorBrief') errorBrief, get_json_object(event_json,'$.kv.errorDetail') errorDetail, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='error';"
$hive -e "$sql"
三、dwd 层业务数据
3.1 数仓建模
3.2 维度表
1、商品维度表 dwd_dim_sku_info (全量)
(1) 数据来源
ods_sku_info、ods_base_trademark、ods_spu_info、ods_base_category3、ods_base_category2、ods_base_category1。
(2) 建表
DROP TABLE IF EXISTS `dwd_dim_sku_info`;
CREATE EXTERNAL TABLE `dwd_dim_sku_info` (
`id` string COMMENT '商品id',
`spu_id` string COMMENT 'spuid',
`price` double COMMENT '商品价格',
`sku_name` string COMMENT '商品名称',
`sku_desc` string COMMENT '商品描述',
`weight` double COMMENT '重量',
`tm_id` string COMMENT '品牌id',
`tm_name` string COMMENT '品牌名称',
`category3_id` string COMMENT '三级分类id',
`category2_id` string COMMENT '二级分类id',
`category1_id` string COMMENT '一级分类id',
`category3_name` string COMMENT '三级分类名称',
`category2_name` string COMMENT '二级分类名称',
`category1_name` string COMMENT '一级分类名称',
`spu_name` string COMMENT 'spu名称',
`create_time` string COMMENT '创建时间'
)
COMMENT '商品维度表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_sku_info/'
tblproperties ("parquet.compression"="lzo");
(3) 数据加载
insert overwrite table dwd_dim_sku_info partition(dt='2020-03-10')
select
sku.id,
sku.spu_id,
sku.price,
sku.sku_name,
sku.sku_desc,
sku.weight,
sku.tm_id,
ob.tm_name,
sku.category3_id,
c2.id category2_id,
c1.id category1_id,
c3.name category3_name,
c2.name category2_name,
c1.name category1_name,
spu.spu_name,
sku.create_time
from (
select * from ods_sku_info where dt='2020-03-10' )
sku
join
(
select * from ods_base_trademark where dt='2020-03-10'
)ob on sku.tm_id = ob.tm_id
join
(
select * from ods_spu_info where dt='2020-03-10'
)spu on spu.id = sku.spu_id
join
(
select * from ods_base_category3 where dt='2020-03-10'
)c3 on sku.category3_id=c3.id
join
(
select * from ods_base_category2 where dt='2020-03-10'
)c2 on c3.category2_id=c2.id
join
(
select * from ods_base_category1 where dt='2020-03-10'
)c1 on c2.category1_id=c1.id;
2、优惠券信息表 dwd_dim_coupon_info (全量)
(1) 数据来源
ods_coupon_info -> dwd_dim_coupon_info
(2) 建表
drop table if exists dwd_dim_coupon_info;
create external table dwd_dim_coupon_info(
`id` string COMMENT '购物券编号',
`coupon_name` string COMMENT '购物券名称',
`coupon_type` string COMMENT '购物券类型 1 现金券 2 折扣券 3 满减券 4 满件打折券',
`condition_amount` string COMMENT '满额数',
`condition_num` string COMMENT '满件数',
`activity_id` string COMMENT '活动编号',
`benefit_amount` string COMMENT '减金额',
`benefit_discount` string COMMENT '折扣',
`create_time` string COMMENT '创建时间',
`range_type` string COMMENT '范围类型 1、商品 2、品类 3、品牌',
`spu_id` string COMMENT '商品id',
`tm_id` string COMMENT '品牌id',
`category3_id` string COMMENT '品类id',
`limit_num` string COMMENT '最多领用次数',
`operate_time` string COMMENT '修改时间',
`expire_time` string COMMENT '过期时间'
) COMMENT '优惠券信息表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_coupon_info/'
tblproperties ("parquet.compression"="lzo");
(3) 加载数据
insert overwrite table dwd_dim_coupon_info partition(dt='2020-03-10')
select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
spu_id,
tm_id,
category3_id,
limit_num,
operate_time,
expire_time
from
ods_coupon_info
where
dt='2020-03-10';
3、活动维度表 dwd_dim_activity_info (全量)
(1) 数据来源
ods_activity_info、ods_activity_rule。
(2) 建表
drop table if exists dwd_dim_activity_info;
create external table dwd_dim_activity_info(
`id` string COMMENT '编号',
`activity_name` string COMMENT '活动名称',
`activity_type` string COMMENT '活动类型',
`condition_amount` string COMMENT '满减金额',
`condition_num` string COMMENT '满减件数',
`benefit_amount` string COMMENT '优惠金额',
`benefit_discount` string COMMENT '优惠折扣',
`benefit_level` string COMMENT '优惠级别',
`start_time` string COMMENT '开始时间',
`end_time` string COMMENT '结束时间',
`create_time` string COMMENT '创建时间'
) COMMENT '活动信息表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_activity_info/'
tblproperties ("parquet.compression"="lzo");
(3) 加载数据
insert overwrite table dwd_dim_activity_info partition(dt='2020-03-10')
select
info.id,
info.activity_name,
info.activity_type,
rule.condition_amount,
rule.condition_num,
rule.benefit_amount,
rule.benefit_discount,
rule.benefit_level,
info.start_time,
info.end_time,
info.create_time
from
(
select * from ods_activity_info where dt='2020-03-10'
)info
left join
(
select * from ods_activity_rule where dt='2020-03-10'
)rule on info.id = rule.activity_id;
4、地区维度表 dwd_dim_base_province (特殊)
(1) 数据来源
ods_base_province、ods_base_region。
(2) 建表
DROP TABLE IF EXISTS `dwd_dim_base_province`;
CREATE EXTERNAL TABLE `dwd_dim_base_province` (
`id` string COMMENT 'id',
`province_name` string COMMENT '省市名称',
`area_code` string COMMENT '地区编码',
`iso_code` string COMMENT 'ISO编码',
`region_id` string COMMENT '地区id',
`region_name` string COMMENT '地区名称'
) COMMENT '地区省市表'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_base_province/'
tblproperties ("parquet.compression"="lzo");
(3) 加载数据
insert overwrite table dwd_dim_base_province
select
bp.id,
bp.name,
bp.area_code,
bp.iso_code,
bp.region_id,
br.region_name
from
ods_base_province bp
join
ods_base_region br
on
bp.region_id=br.id;
5、时间维度表 dwd_dim_date_info (特殊)(预留)
(1) 建表
DROP TABLE IF EXISTS `dwd_dim_date_info`;
CREATE EXTERNAL TABLE `dwd_dim_date_info`(
`date_id` string COMMENT '日',
`week_id` int COMMENT '周',
`week_day` int COMMENT '周的第几天',
`day` int COMMENT '每月的第几天',
`month` int COMMENT '第几月',
`quarter` int COMMENT '第几季度',
`year` int COMMENT '年',
`is_workday` int COMMENT '是否是周末',
`holiday_id` int COMMENT '是否是节假日')
row format delimited fields terminated by '\t'
location '/warehouse/gmall/dwd/dwd_dim_date_info/';
(2) 把 date_info.txt 文件 上传到 hadoop151 的 /opt/module/db_log/ 路径
(3) 加载数据
load data local inpath ‘/opt/module/db_log/date_info.txt’ into table dwd_dim_date_info;
3.3 事实表
1、订单明细事实表 dwd_fact_order_detail (事务型快照事实表)
(1) 关联维度
(2) 数据来源
ods_order_detail、ods_order_info
(3) 建表
drop table if exists dwd_fact_order_detail;
create external table dwd_fact_order_detail (
`id` string COMMENT '订单编号',
`order_id` string COMMENT '订单号',
`user_id` string COMMENT '用户id',
`sku_id` string COMMENT 'sku商品id',
`sku_name` string COMMENT '商品名称',
`order_price` decimal(10,2) COMMENT '商品价格',
`sku_num` bigint COMMENT '商品数量',
`create_time` string COMMENT '创建时间',
`province_id` string COMMENT '省份ID',
`total_amount` decimal(20,2) COMMENT '订单总金额'
)
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_order_detail/'
tblproperties ("parquet.compression"="lzo");
(4) 数据加载
insert overwrite table dwd_fact_order_detail partition(dt='2020-03-10')
select
od.id,
od.order_id,
od.user_id,
od.sku_id,
od.sku_name,
od.order_price,
od.sku_num,
od.create_time,
oi.province_id,
od.order_price*od.sku_num
from
(
select * from ods_order_detail where dt='2020-03-10'
) od
join
(
select * from ods_order_info where dt='2020-03-10'
) oi
on od.order_id=oi.id;
2、支付事实表 dwd_fact_payment_info (事务型快照事实表)
(1) 关联维度
(2) 数据来源
ods_payment_info、ods_order_info。
(3) 建表
drop table if exists dwd_fact_payment_info;
create external table dwd_fact_payment_info (
`id` string COMMENT '',
`out_trade_no` string COMMENT '对外业务编号',
`order_id` string COMMENT '订单编号',
`user_id` string COMMENT '用户编号',
`alipay_trade_no` string COMMENT '支付宝交易流水编号',
`payment_amount` decimal(16,2) COMMENT '支付金额',
`subject` string COMMENT '交易内容',
`payment_type` string COMMENT '支付类型',
`payment_time` string COMMENT '支付时间',
`province_id` string COMMENT '省份ID'
)
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_payment_info/'
tblproperties ("parquet.compression"="lzo");
(4) 加载数据
insert overwrite table dwd_fact_payment_info partition(dt='2020-03-10')
select
pi.id,
pi.out_trade_no,
pi.order_id,
pi.user_id,
pi.alipay_trade_no,
pi.total_amount,
pi.subject,
pi.payment_type,
pi.payment_time,
oi.province_id
from
(
select * from ods_payment_info where dt='2020-03-10'
) pi
join
(
select id, province_id from ods_order_info where dt='2020-03-10'
) oi
on pi.order_id = oi.id;
3、退款事实表 dwd_fact_order_refund_info (事务型快照事实表)
(1) 关联维度
(2) 数据来源
ods_order_refund_info
(3) 建表
drop table if exists dwd_fact_order_refund_info;
create external table dwd_fact_order_refund_info(
`id` string COMMENT '编号',
`user_id` string COMMENT '用户ID',
`order_id` string COMMENT '订单ID',
`sku_id` string COMMENT '商品ID',
`refund_type` string COMMENT '退款类型',
`refund_num` bigint COMMENT '退款件数',
`refund_amount` decimal(16,2) COMMENT '退款金额',
`refund_reason_type` string COMMENT '退款原因类型',
`create_time` string COMMENT '退款时间'
) COMMENT '退款事实表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_order_refund_info/'
tblproperties ("parquet.compression"="lzo");
(4) 加载数据
insert overwrite table dwd_fact_order_refund_info partition(dt='2020-03-10')
select
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
create_time
from
ods_order_refund_info
where
dt='2020-03-10';
4、评价事实表 dwd_fact_comment_info (事务型快照事实表)
(1) 关联维度
(2) 数据来源
ods_comment_info
(3) 建表
drop table if exists dwd_fact_comment_info;
create external table dwd_fact_comment_info(
`id` string COMMENT '编号',
`user_id` string COMMENT '用户ID',
`sku_id` string COMMENT '商品sku',
`spu_id` string COMMENT '商品spu',
`order_id` string COMMENT '订单ID',
`appraise` string COMMENT '评价',
`create_time` string COMMENT '评价时间'
) COMMENT '评价事实表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_comment_info/'
tblproperties ("parquet.compression"="lzo");
(4) 加载数据
insert overwrite table dwd_fact_comment_info partition(dt='2020-03-10')
select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
create_time
from
ods_comment_info
where
dt='2020-03-10';
5、加购事实表 dwd_fact_cart_info (周期型快照事实表,每日快照)
(1) 介绍
由于购物车的数量是会发生变化,所以导增量不合适。
每天做一次快照,导入的数据是全量,区别于事务型事实表是每天导入新增。
周期型快照事实表劣势:存储的数据量会比较大。
解决方案:周期型快照事实表存储的数据比较讲究时效性,时间太久了的意义不大,可以删除以前的数据。
(2) 关联维度
(3) 数据来源
ods_cart_info
(4) 建表
drop table if exists dwd_fact_cart_info;
create external table dwd_fact_cart_info(
`id` string COMMENT '编号',
`user_id` string COMMENT '用户id',
`sku_id` string COMMENT 'skuid',
`cart_price` string COMMENT '放入购物车时价格',
`sku_num` string COMMENT '数量',
`sku_name` string COMMENT 'sku名称 (冗余)',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '修改时间',
`is_ordered` string COMMENT '是否已经下单。1为已下单;0为未下单',
`order_time` string COMMENT '下单时间'
) COMMENT '加购事实表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_cart_info/'
tblproperties ("parquet.compression"="lzo");
(5) 加载数据
insert overwrite table dwd_fact_cart_info partition(dt='2020-03-10')
select
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time
from
ods_cart_info
where
dt='2020-03-10';
6、收藏事实表 ods_favor_info (周期型快照事实表,每日快照)
(1) 介绍
收藏的标记,是否取消,会发生变化,做增量不合适。
每天做一次快照,导入的数据是全量,区别于事务型事实表是每天导入新增。
(2) 关联维度
(3) 数据来源
ods_favor_info
(4) 建表
drop table if exists dwd_fact_favor_info;
create external table dwd_fact_favor_info(
`id` string COMMENT '编号',
`user_id` string COMMENT '用户id',
`sku_id` string COMMENT 'skuid',
`spu_id` string COMMENT 'spuid',
`is_cancel` string COMMENT '是否取消',
`create_time` string COMMENT '收藏时间',
`cancel_time` string COMMENT '取消时间'
) COMMENT '收藏事实表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_favor_info/'
tblproperties ("parquet.compression"="lzo");
(5) 加载数据
insert overwrite table dwd_fact_favor_info partition(dt='2020-03-10')
select
id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
from
ods_favor_info
where
dt='2020-03-10';
7、优惠券领用事实表 dwd_fact_coupon_use (累积型快照事实表)
(1) 介绍
优惠卷的生命周期:领取优惠卷 -> 用优惠卷下单 -> 优惠卷参与支付。
累积型快照事实表使用:统计优惠卷领取次数、优惠卷下单次数、优惠卷参与支付次数。
(2) 维度关联
(3) 数据来源
ods_coupon_use、dwd_fact_coupon_use
(4) 建表
drop table if exists dwd_fact_coupon_use;
create external table dwd_fact_coupon_use(
`id` string COMMENT '编号',
`coupon_id` string COMMENT '优惠券ID',
`user_id` string COMMENT 'userid',
`order_id` string COMMENT '订单id',
`coupon_status` string COMMENT '优惠券状态',
`get_time` string COMMENT '领取时间',
`using_time` string COMMENT '使用时间(下单)',
`used_time` string COMMENT '使用时间(支付)'
) COMMENT '优惠券领用事实表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_coupon_use/'
tblproperties ("parquet.compression"="lzo");
注意:dt 是按照优惠卷领用时间 get_time 做为分区。
(5) 数据加载
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table dwd_fact_coupon_use partition(dt)
select
if(new.id is null,old.id,new.id),
if(new.coupon_id is null,old.coupon_id,new.coupon_id),
if(new.user_id is null,old.user_id,new.user_id),
if(new.order_id is null,old.order_id,new.order_id),
if(new.coupon_status is null,old.coupon_status,new.coupon_status),
if(new.get_time is null,old.get_time,new.get_time),
if(new.using_time is null,old.using_time,new.using_time),
if(new.used_time is null,old.used_time,new.used_time),
date_format(if(new.get_time is null,old.get_time,new.get_time),'yyyy-MM-dd')
from (
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from dwd_fact_coupon_use
where dt in
(
select
date_format(get_time,'yyyy-MM-dd')
from ods_coupon_use
where dt='2020-03-10'
)
)old
full outer join
(
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from
ods_coupon_use
where
dt='2020-03-10'
)new on old.id=new.id;
8、订单事实表 dwd_fact_order_info (累积型快照事实表)
(1) 介绍
订单生命周期:创建时间 -> 支付时间 -> 取消时间 -> 完成时间 -> 退款时间 -> 退款完成时间。
由于 ODS 层订单表只有创建时间和操作时间两个状态,不能表达所有时间含义,所以需要关联订单状态表。订单事实表里面增加了活动 id,所以需要关联活动订单表。
(2) 关联维度
(3) 数据来源
ods_order_info、ods_order_status_log、ods_activity_order、dwd_fact_order_info
(4) 建表
create external table dwd_fact_order_info (
`id` string COMMENT '订单编号',
`order_status` string COMMENT '订单状态',
`user_id` string COMMENT '用户id',
`out_trade_no` string COMMENT '支付流水号',
`create_time` string COMMENT '创建时间(未支付状态)',
`payment_time` string COMMENT '支付时间(已支付状态)',
`cancel_time` string COMMENT '取消时间(已取消状态)',
`finish_time` string COMMENT '完成时间(已完成状态)',
`refund_time` string COMMENT '退款时间(退款中状态)',
`refund_finish_time` string COMMENT '退款完成时间(退款完成状态)',
`province_id` string COMMENT '省份ID',
`activity_id` string COMMENT '活动ID',
`original_total_amount` string COMMENT '原价金额',
`benefit_reduce_amount` string COMMENT '优惠金额',
`feight_fee` string COMMENT '运费',
`final_total_amount` decimal(10,2) COMMENT '订单金额'
)
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_order_info/'
tblproperties ("parquet.compression"="lzo");
(5) 加载数据
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table dwd_fact_order_info partition(dt)
select
if(new.id is null,old.id,new.id),
if(new.order_status is null,old.order_status,new.order_status),
if(new.user_id is null,old.user_id,new.user_id),
if(new.out_trade_no is null,old.out_trade_no,new.out_trade_no),
if(new.tms['1001'] is null,old.create_time,new.tms['1001']),
if(new.tms['1002'] is null,old.payment_time,new.tms['1002']),
if(new.tms['1003'] is null,old.cancel_time,new.tms['1003']),
if(new.tms['1004'] is null,old.finish_time,new.tms['1004']),
if(new.tms['1005'] is null,old.refund_time,new.tms['1005']),
if(new.tms['1006'] is null,old.refund_finish_time,new.tms['1006']),
if(new.province_id is null,old.province_id,new.province_id),
if(new.activity_id is null,old.activity_id,new.activity_id),
if(new.original_total_amount is null,old.original_total_amount,new.original_total_amount),
if(new.benefit_reduce_amount is null,old.benefit_reduce_amount,new.benefit_reduce_amount),
if(new.feight_fee is null,old.feight_fee,new.feight_fee),
if(new.final_total_amount is null,old.final_total_amount,new.final_total_amount),
date_format(if(new.tms['1001'] is null,old.create_time,new.tms['1001']),'yyyy-MM-dd')
from (
select
id,
order_status,
user_id,
out_trade_no,
create_time,
payment_time,
cancel_time,
finish_time,
refund_time,
refund_finish_time,
province_id,
activity_id,
original_total_amount,
benefit_reduce_amount,
feight_fee,
final_total_amount
from dwd_fact_order_info
where dt
in
(
select
date_format(create_time,'yyyy-MM-dd')
from ods_order_info
where dt='2020-03-10'
)
)old
full outer join
(
select
info.id,
info.order_status,
info.user_id,
info.out_trade_no,
info.province_id,
act.activity_id,
log.tms,
info.original_total_amount,
info.benefit_reduce_amount,
info.feight_fee,
info.final_total_amount
from
(
select
order_id,
str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') tms
from ods_order_status_log
where dt='2020-03-10'
group by order_id
)log
join (
select * from ods_order_info where dt='2020-03-10'
)info
on log.order_id=info.id
left join
(
select * from ods_activity_order where dt='2020-03-10'
)act
on log.order_id=act.order_id
)new
on old.id=new.id;
9、dwd 层业务数据导入脚本
#!/bin/bash
APP=gmall
hive=/opt/module/hive/bin/hive
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;then
do_date=$2
else
do_date=`date -d "-1 day" +%F`
fi
sql1=" set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table ${APP}.dwd_dim_sku_info partition(dt='$do_date') select sku.id, sku.spu_id, sku.price, sku.sku_name, sku.sku_desc, sku.weight, sku.tm_id, ob.tm_name, sku.category3_id, c2.id category2_id, c1.id category1_id, c3.name category3_name, c2.name category2_name, c1.name category1_name, spu.spu_name, sku.create_time from ( select * from ${APP}.ods_sku_info where dt='$do_date' )sku join ( select * from ${APP}.ods_base_trademark where dt='$do_date' )ob on sku.tm_id=ob.tm_id join ( select * from ${APP}.ods_spu_info where dt='$do_date' )spu on spu.id = sku.spu_id join ( select * from ${APP}.ods_base_category3 where dt='$do_date' )c3 on sku.category3_id=c3.id join ( select * from ${APP}.ods_base_category2 where dt='$do_date' )c2 on c3.category2_id=c2.id join ( select * from ${APP}.ods_base_category1 where dt='$do_date' )c1 on c2.category1_id=c1.id; insert overwrite table ${APP}.dwd_dim_coupon_info partition(dt='$do_date') select id, coupon_name, coupon_type, condition_amount, condition_num, activity_id, benefit_amount, benefit_discount, create_time, range_type, spu_id, tm_id, category3_id, limit_num, operate_time, expire_time from ${APP}.ods_coupon_info where dt='$do_date'; insert overwrite table ${APP}.dwd_dim_activity_info partition(dt='$do_date') select info.id, info.activity_name, info.activity_type, rule.condition_amount, rule.condition_num, rule.benefit_amount, rule.benefit_discount, rule.benefit_level, info.start_time, info.end_time, info.create_time from ( select * from ${APP}.ods_activity_info where dt='$do_date' )info left join ( select * from ${APP}.ods_activity_rule where dt='$do_date' )rule on info.id = rule.activity_id; insert overwrite table ${APP}.dwd_fact_order_detail partition(dt='$do_date') select od.id, od.order_id, od.user_id, od.sku_id, od.sku_name, od.order_price, od.sku_num, od.create_time, oi.province_id, od.order_price*od.sku_num from ( select * from ${APP}.ods_order_detail where dt='$do_date' ) od join ( select * from ${APP}.ods_order_info where dt='$do_date' ) oi on od.order_id=oi.id; insert overwrite table ${APP}.dwd_fact_payment_info partition(dt='$do_date') select pi.id, pi.out_trade_no, pi.order_id, pi.user_id, pi.alipay_trade_no, pi.total_amount, pi.subject, pi.payment_type, pi.payment_time, oi.province_id from ( select * from ${APP}.ods_payment_info where dt='$do_date' )pi join ( select id, province_id from ${APP}.ods_order_info where dt='$do_date' )oi on pi.order_id = oi.id; insert overwrite table ${APP}.dwd_fact_order_refund_info partition(dt='$do_date') select id, user_id, order_id, sku_id, refund_type, refund_num, refund_amount, refund_reason_type, create_time from ${APP}.ods_order_refund_info where dt='$do_date'; insert overwrite table ${APP}.dwd_fact_comment_info partition(dt='$do_date') select id, user_id, sku_id, spu_id, order_id, appraise, create_time from ${APP}.ods_comment_info where dt='$do_date'; insert overwrite table ${APP}.dwd_fact_cart_info partition(dt='$do_date') select id, user_id, sku_id, cart_price, sku_num, sku_name, create_time, operate_time, is_ordered, order_time from ${APP}.ods_cart_info where dt='$do_date'; insert overwrite table ${APP}.dwd_fact_favor_info partition(dt='$do_date') select id, user_id, sku_id, spu_id, is_cancel, create_time, cancel_time from ${APP}.ods_favor_info where dt='$do_date'; insert overwrite table ${APP}.dwd_fact_coupon_use partition(dt) select if(new.id is null,old.id,new.id), if(new.coupon_id is null,old.coupon_id,new.coupon_id), if(new.user_id is null,old.user_id,new.user_id), if(new.order_id is null,old.order_id,new.order_id), if(new.coupon_status is null,old.coupon_status,new.coupon_status), if(new.get_time is null,old.get_time,new.get_time), if(new.using_time is null,old.using_time,new.using_time), if(new.used_time is null,old.used_time,new.used_time), date_format(if(new.get_time is null,old.get_time,new.get_time),'yyyy-MM-dd') from ( select id, coupon_id, user_id, order_id, coupon_status, get_time, using_time, used_time from ${APP}.dwd_fact_coupon_use where dt in ( select date_format(get_time,'yyyy-MM-dd') from ${APP}.ods_coupon_use where dt='$do_date' ) )old full outer join ( select id, coupon_id, user_id, order_id, coupon_status, get_time, using_time, used_time from ${APP}.ods_coupon_use where dt='$do_date' )new on old.id=new.id; insert overwrite table ${APP}.dwd_fact_order_info partition(dt) select if(new.id is null,old.id,new.id), if(new.order_status is null,old.order_status,new.order_status), if(new.user_id is null,old.user_id,new.user_id), if(new.out_trade_no is null,old.out_trade_no,new.out_trade_no), if(new.tms['1001'] is null,old.create_time,new.tms['1001']), if(new.tms['1002'] is null,old.payment_time,new.tms['1002']), if(new.tms['1003'] is null,old.cancel_time,new.tms['1003']), if(new.tms['1004'] is null,old.finish_time,new.tms['1004']), if(new.tms['1005'] is null,old.refund_time,new.tms['1005']), if(new.tms['1006'] is null,old.refund_finish_time,new.tms['1006']), if(new.province_id is null,old.province_id,new.province_id), if(new.activity_id is null,old.activity_id,new.activity_id), if(new.original_total_amount is null,old.original_total_amount,new.original_total_amount), if(new.benefit_reduce_amount is null,old.benefit_reduce_amount,new.benefit_reduce_amount), if(new.feight_fee is null,old.feight_fee,new.feight_fee), if(new.final_total_amount is null,old.final_total_amount,new.final_total_amount), date_format(if(new.tms['1001'] is null,old.create_time,new.tms['1001']),'yyyy-MM-dd') from ( select id, order_status, user_id, out_trade_no, create_time, payment_time, cancel_time, finish_time, refund_time, refund_finish_time, province_id, activity_id, original_total_amount, benefit_reduce_amount, feight_fee, final_total_amount from ${APP}.dwd_fact_order_info where dt in (select date_format(create_time,'yyyy-MM-dd') from ${APP}.ods_order_info where dt='$do_date') ) old full outer join ( select info.id, info.order_status, info.user_id, info.out_trade_no, info.province_id, act.activity_id, log.tms, info.original_total_amount, info.benefit_reduce_amount, info.feight_fee, info.final_total_amount from ( select order_id, str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') tms from ${APP}.ods_order_status_log where dt='$do_date' group by order_id ) log join ( select * from ${APP}.ods_order_info where dt='$do_date' ) info on log.order_id=info.id left join ( select * from ${APP}.ods_activity_order where dt='$do_date' ) act on log.order_id=act.order_id ) new on old.id=new.id; insert overwrite table ${APP}.dwd_dim_user_info_his_tmp select * from ( select id, name, birthday, gender, email, user_level, create_time, operate_time, '$do_date' start_date, '9999-99-99' end_date from ${APP}.ods_user_info where dt='$do_date' union all select uh.id, uh.name, uh.birthday, uh.gender, uh.email, uh.user_level, uh.create_time, uh.operate_time, uh.start_date, if(ui.id is not null and uh.end_date='9999-99-99', date_add(ui.dt,-1), uh.end_date) end_date from ${APP}.dwd_dim_user_info_his uh left join ( select * from ${APP}.ods_user_info where dt='$do_date' ) ui on uh.id=ui.id )his order by his.id, start_date; insert overwrite table ${APP}.dwd_dim_user_info_his select * from ${APP}.dwd_dim_user_info_his_tmp; "
sql2=" insert overwrite table ${APP}.dwd_dim_base_province select bp.id, bp.name, bp.area_code, bp.iso_code, bp.region_id, br.region_name firom ${APP}.ods_base_province bp join ${APP}.ods_base_region br on bp.region_id=br.id;"
case $1 in
"first")
{
$hive -e "$sql1" $hive -e "$sql2"
};;
"all")
{
$hive -e "$sql1"
};;
esac
四、拉链表
1、什么是拉链表?
2、为什么要做拉链表?
3、如何使用拉链表?
4、拉链表形成过程?
5、拉链表制作流程图
6、用户维度拉链表
步骤 1:初始化拉链表(首次独立执行)
(1) 建立拉链表
drop table if exists dwd_dim_user_info_his;
create external table dwd_dim_user_info_his(
`id` string COMMENT '用户id',
`name` string COMMENT '姓名',
`birthday` string COMMENT '生日',
`gender` string COMMENT '性别',
`email` string COMMENT '邮箱',
`user_level` string COMMENT '用户等级',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '操作时间',
`start_date` string COMMENT '有效开始日期',
`end_date` string COMMENT '有效结束日期'
) COMMENT '订单拉链表'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_user_info_his/'
tblproperties ("parquet.compression"="lzo");
(2) 初始化拉链表
insert overwrite table dwd_dim_user_info_his
select
id,
name,
birthday,
gender,
email,
user_level,
create_time,
operate_time,
'2020-03-10',
'9999-99-99'
from
ods_user_info oi
where
oi.dt='2020-03-10';
步骤 2:制作当日变动数据(包括新增、修改)每日执行
(1) 如何获得每日变动表
A、最好表内有创建时间和变动时间 。
B、如果没有,可以利用第三方工具监控比如 canal,监控 MySQL 的实时变化进行记录。
C、逐行对比前后两天的数据,检查 md5(concat(全部有可能变化的字段)) 是否相同 (low)。
(2) 因为 ods_order_info 本身导入过来就是新增变动明细的表,所以不用处理
A、数据库中新增 2020-03-11 一天的数据。
B、通过 Sqoop 把 2020-03-11 日所有数据导入。
C、ods 层数据导入。
步骤 3:先合并变动信息,再追加新增信息,插入到临时表中
(1) 建立临时表
drop table if exists dwd_dim_user_info_his_tmp;
create external table dwd_dim_user_info_his_tmp(
`id` string COMMENT '用户id',
`name` string COMMENT '姓名',
`birthday` string COMMENT '生日',
`gender` string COMMENT '性别',
`email` string COMMENT '邮箱',
`user_level` string COMMENT '用户等级',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '操作时间',
`start_date` string COMMENT '有效开始日期',
`end_date` string COMMENT '有效结束日期'
) COMMENT '订单拉链临时表'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_user_info_his_tmp/'
tblproperties ("parquet.compression"="lzo");
(2) 导入数据
insert overwrite table dwd_dim_user_info_his_tmp
select * from
(
select
id,
name,
birthday,
gender,
email,
user_level,
create_time,
operate_time,
'2020-03-11' start_date,
'9999-99-99' end_date
from
ods_user_info
where
dt='2020-03-11'
union all
select
uh.id,
uh.name,
uh.birthday,
uh.gender,
uh.email,
uh.user_level,
uh.create_time,
uh.operate_time,
uh.start_date,
if(ui.id is not null and uh.end_date='9999-99-99', date_add(ui.dt,-1), uh.end_date) end_date
from dwd_dim_user_info_his uh left join
(
select
*
from ods_user_info
where dt='2020-03-11'
) ui on uh.id=ui.id
)his
order by his.id, start_date;
步骤 4:把临时表覆盖给拉链表
insert overwrite table dwd_dim_user_info_his select * from dwd_dim_user_info_his_tmp;
五、dwd 层总结
1、dwd 层采用 parquet 存储 + lzo 压缩的方式。
2、dwd 层是数据仓库中的关键一层,数据仓库建模在这一层完成。
3、dwd 层用户行为表 12 张,业务数据表 14 表,共计 26 张表。
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/153197.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...