0%

与传统测试区别

image-20211231104504478

功能测试

数据质量

  • 主要包括4种测试方法

image-20211231104800758

常用的功能测试方法

数据约束检查

  • 如数据类型、长度、索引、主键等是否符合要求

数据存储检查

  • 是否需要压缩文件形式存储
  • hive表类型是否合理(内外部表、分区、分桶表)
  • 代码中读取、写入文件目录是否正确

SQL文件检查

开发规范检查。一般公司都有自己的规范,如hiveql中的:

  • 注释
  • 字段,不行用*代替全部
  • …..

SQ语法检查

  • 合理使用insert into、insert overwrite、order by、group by等

数据处理逻辑验证

  • 脏数据处理是否符合预期
  • 去重处理

shell脚本测试

  • 测试jar包是否引用正确
  • mapper、reducer文件、mapreduce依赖文件等路径、运行配置参数是否合理
  • 我对shell不熟悉,一般采用python

调度任务测试

  • 是否支持重跑
  • 依赖层次是否合理
  • 任务是否在规定时间完成等

性能测试

主要由六种测试:基准测试、并发测试、负载测试、压力测试、容量测试、稳定性测试

性能测试步骤

image-20211231111135709

性能测试案例

  • 第六章用户行为分析平台为例子,采用YCSB工具,对平台底层Kudu进行性能测试

  • YCSB 是雅虎开源的性能测试工具,对NoSql产品进行测试和评估,如HBase、MongoDB等

测试场景

image-20211231152400144

测试结果

image-20211231152443995

image-20211231153058203

大数据基准测试

对大数据框架、大数据平台、工具的出现进行基准测试,测试步骤分为:

  • 数据准备

  • 负载选择

image-20211231153711579

  • 指标度量

基准测试工具

image-20211231153834017

大数据ETL测试

ETL测试类型

元数据测试

  • 验证表定义是否符号数据模型和应用程序的设计规范,包括对数据类型、数据长度、索引和约束等

数据完整性测试

  • 目的是验证是否已将预期数据从源加载到目标中,主要测试:比较源和目标之间的计数,如最大、最小、总和、均值和实际数据量

数据转换测试

  • 白盒测试。用sql或pl/sql对数据转换,用转换后的数据与目标中数据比较
  • 黑盒测试。外部界面方式造数据完成转换,用转换后的数据与目标中数据比较

增量ETL测试

目的是验证源上的更新能否正确加载到目标系统中

ETL集成测试

如下几个步骤:

  • 在源系统中,设置测试数据
  • 执行ETL过程把数据加载到目标中
  • 查看或处理目标系统中数据
  • 验证数据和使用该数据的应用程序的功能

ETL性能测试

ETL测试场景

实时数据ETL和测试

  • 实时数据一般指分钟级别以下,通常包括实时计算、存储、展示和分析等。

image-20211231161902589

  • 原始数据:可以理解为上游原始数据。对于整个上层消费应用,除数据本身以外,其他都是“黑盒,即不可见。目前,对于接入的数据源,常见的提供数据的方式人 Kafka、MQ 等。
  • 实时数据处理:这是整个数据流转路径的核心,负责根据业务需求对原始数据进行理并转发。常见的处理实时数据的应用框架有Flink、Storm和SparkStreaming等。
  • 数据存储:用于保存处理后的数据。对于业务功能,可以在这里获取需要使用的数据。在这里,我们一般使用基于内存的key-value数据库 Redis,以及列式数据医。 ClickHouse、MongoDB和HDFS等。另外,我们可以将数据转发至另一个数握 通道。
  • 数据应用:数据的具体使用。在这里,我们可以对数据进行业务层面的处理、数据的可视化展示。

离线数据ETL和测试

  • 离线数据一般采用T+1模式,即每天凌晨处理前一天的数据,一般采用Sqoop,datax,Flume和MapReduce等

image-20211231163101257

  • 对于不同的数据仓库结构,有不同的测试点

image-20211231163336807

大数据ETL测试工具

  • iCDQ
  • Talend

大数据项目开发概览

一个完整的大数据项目架构可以分为数据采集层、数据存储层、数据计算层、数据接入层、数据应用层和基础服务层,如下图所示:

image-20211229113510858

数据的采集和存储

数据采集方式主要有:

  • 网络数据采集

  • 服务端日志采集

  • 客户端日志采集

服务端日志采集

  • 服务端日志是重要的数据来源,它可以支持业务问题排查、业务数据分析等。下图是常见的服务端日志架构。客户端通过API( Application Programming Interface,应用程序编程接口)向服务端发送请求,服务端会在服务器本地记录日志文件。日志文件可能包括访问日志、Lnux系统日志和业务日志等。这些日志会被服务端日志采集服务收集、同步发给日志服务器,以便后续使用。

image-20211229150453592

一个完整的服务端日志采集流程如图4-3所示。

  • 在客户端向服务端发送请求后,服务端会在服务器本地记录请求日志或业务日志。此外,服务端会通过服务直接向Kafka推送日志。

  • 服务器的本地日志在写入的同时,还会通过Flume等工具进行持续收集,并推送到 Kafka集群中指定的主题。在收集、推送的过程中,会进行一些日志数据处理工作,如请求头信息解析、接口参数提取和日志信息格式化等。

  • 我们可以使用SparkStreaming实时消费处理Kafka消息,并将处理好的数据写入指定的数据库中,如MySQL、Redis等。此种方式可以提供实时的服务端日志应用,如错误日志监控报警等。对于Spark Streaming处理完的数据,将继续推送到Kafka,供其他业务

  • 我们也可以使用flume消费kafka的消息,并将处理号的数据写入HDFS,为后续的离线使用提供基础数据

    image-20211229151331689

客户端日志采集

移动端分类

移动端分为两种:

  • Native App:原生应用程序。也就是基于智能手机系统的原生程式编写运行的应用程序,比如Android,IOS等
  • Hybrid App:混合应用程序。原生应用程序嵌入h5的方式

采集方式

  • 浏览器页面的日志采集。如采集Hybrid App中h5浏览器
    • 浏览量PV
    • 访问量UV
  • 移动端客户端日志的采集。如采集Native App
    • 埋点SDK

数据同步

  • 数据仓库的特性之一是集成,即首先把未经过加工处理的、不同来源的、不同形式的数据同步到ODS层,一般情况下,这些ODS层数据包括日志数据和业务DB数据。对于业务DB数据而言(比如存储在MySQL中),将数据采集并导入到数仓中(通常是Hive或者MaxCompute)是非常重要的一个环节。
  • 针对不同数据类型和业务场景,我们可以选择不同的数据同步方式:
    • 直连同步
    • 数据文件同步
    • 数据库日志解析同步

直连同步

  • 直连同步是指通过定义好的规范接口API和基于动态链接库的方式直接连接业务库
  • 直连同步的方式配置十分简单,很容易上手操作,比较适合操作型业务系统的数据同步,但是会存在随着业务规模的增长,数据同步花费的时间会越来越长、连数据库查询数据,对数据库影响非常大,容易造成慢查询,可能会影响业务线上的正常服务

image-20211229155528410

数据文件同步

  • 对文件进行格式约定,直接从源系统生成数据(mysql,oracle,DB2等)的文件,通过FTP服务器传输到目标系统,最终加载到目标系统中。

image-20211229155932718

  • 通过文件服务进行上传、下载,可能出现丢包或出现错误。可以加一些校验机制

数据库日志解析同步

  • 数据库日志解析的同步方式可以实现实时与准实时的同步,延迟可以控制在毫秒级别的,其最大的优势就是性能好、效率高,不会对源数据库造成影响,目前,从业务系统到数据仓库中的实时增量同步,广泛采取这种方式。当然,这种方式也会存在一些问题,比如批量补数时造成大量数据更新,日志解析会处理较慢,造成数据延迟。除此之外,这种方式比较复杂投入也较大,因为需要一个实时的抽取系统去抽取并解析日志

image-20211229160911594

大数据存储

对于大数据存储选项,需要从存储成本、数据规模、数据访问特性和查询性能等方面进行考虑,下图列举了不同场景大数据存储选型。

image-20211229161451827

大数据计算

  • 一般分为离线计算、批量计算、实时计算和流式计算,业界一般都使用离线计算和实时计算,下图是各个计算方式的区别

离线计算

image-20211229164031999

  • 离线计算使用的多数场景是周期(小时、天,月)性执行一个job任务,离线计算应用中常用的是离线ETL的处理。如MapReduce就是一个离线的计算框架,下图举例说明了离线计算工作流

image-20211229165345162

  • 离线计算需要上下游各组件合作,一般会由多个任务单元构成(HiveQL,shell,Shell和MapReduce等),多个单元由很强的依赖关系

实时计算

  • 对数据计算要求较高,如实时的ETL,实时监控等,延时一般为毫秒级别,目前笔比较流行的实时框架有Spark StreamingFlink
  • 下图展示了一个大数据计算平台架构,其中包含离线计算和实时计算

image-20211229170148516

大数据监控

在大数据进行采集、存储和处理后,下面就是项目的上线和日常运行。在运行过程中监控和报警就非常重要

数据监控

一些常见的监控内容:

  • 以时间维度对数据记录进行监控。比如某个时间段出现明显波动
  • 对数据的NULL和0值进行监控
  • 对数据的值域进行监控。如对数据中某字段出现合理域值以外的值
  • 对数据的重复度进行监控。如电商业务的交易记录出现重复等

运维监控

  • 在当前大数据项目中,使用服务器集群方式支撑业务已经成为常态,开发自动化运维监控系统非常重要。
  • 监控常见的cpu,men,io,tps,对大数据生态系统进行监控,如zookeeper节点可用性,yarn资源空闲情况、kafka消息堆积情况,以及spark job完成进度等
  • 其他的一些监控,如端口,nginx的延迟

大数据项目开发案例

  • 数据分析平台是一类重要的大数据应用,广泛应用于互联网金融、银行、电子商务和在线教育等领域。
  • 数据分析平台通常需要实时计算、查询并借助可视化平台展示数据。通过数据分析平台,用户可以更加直观、高效和全面地了解数据分布情况,观察数据变化趋势,最终达到数据驱动业务决策的目标。
  • 与单纯的数据可视化类平台或BI报表相比,数据分析平台的链路更长,除最终的可视化数据展示以外,还包括源头的数据采集;而BI报表通常只是对数据的汇总和查询展示,不涉及数据采集、数据存储等环节。如图4-15所示,根据不同的数据分析类型,数据分析平台可分为如下几类:**用户行为类(应用比较多)**、App分析类、应用市场监控类、流量分析类和广告效果监控类等。

image-20211230104909426

项目背景介绍

  • 分析手机App用户数量和交易的某些行为习惯,实现精准营销。类似于购物推荐这种

项目需求分析

  • 与传统应用开发相同,在迸行大数据项目开发前,首先要明确项目的需求。用户行为分析平台需要提供多种数据釆集方式,通过对埋点数据的采集、处理、建模和存储,进行深度分析和应用,帮助企业高效获取海量数据,并进行多维、实时和准确的数据分析,还原真实业务场。用户行为分析平台的业务流程包括数据釆集、数据处理、数据建模、数据存储、数据簑理、智能分析和基础看板。图4-16是用户行为分析平台的业务流程。

image-20211230110240930

  • 用户行为分析主要包括:基础看板和智能分析
  • 基础看板:如实时统计在线人数、启动次数
  • 智能分析:如从商品浏览率到提交订单,支付订单,各个节点用户转化率

项目开发流程

  • 大数据项目的完整流程如图4-18所示,在进行大数据项目开发时,首先,大数据开发工程师需要根据产品需求文档进行架构设
    计、模型设计和调度设计,产出架构设计文档、ETL设计文档和调度设计文档;然后,项目
    相关人员(包括测试人员)需要对这些设计进行评审等。

image-20211230111333106

  • 图4-19为大数据项目开发流程。

image-20211230111422715

结构设计

根据平台需求的特点,在架构设计时,将用户行为分析平台拆分为4个服务:

  • 数据采集与预处理服务( og service)
  • 数据处理服务(实时数据处理服务(real-task)使用Flink,离线数据处理服务采用 Azkaban进行任务调度),
  • 数据查询服务(DAS),
  • 数据管理服务(DMS)

如图4-20所示。数据从源数据到可视化展示需要从下至上经过这些服务的处理。

image-20211230112924605

用户行为分析平台的核心是数据处理,即从数据采集到数据应用的数据流转,如图4-21

image-20211230113054864

模型设计

根据用户行为分析平台的功能和特点,我们需要设计能够满足其应用需求的数据仓库架构。图4-24是一个数据仓库架构方案:

image-20211230114656565

  • RB层(实时层)KRB的全称是 Kudu ReaBase,该层的表全部为未统计汇总的实时明细数如 krb. trs_app_event_log表是埋点日志事件表
  • KRS层(实时汇总层)KRS的全称是Kucu Real Sum,该层的表全部为统计汇总的实时明细数据,如ks. rs app_user_open_dh表是App用活跃统计表(以小时为单位进行分区)表名的结尾后缀用于表明该表分区间单位,如本例中的dh表示小时进行分区
  • HDB层(贴源层):HDB的全称是 Hive Data Base,该层的表全部为未统计汇总的离线明细数据。该层数据是通过mpaa从KRB层同步过来的,如 hdb db app_equ_install_info表是App安装设备信息表。
  • HDS层(汇总层):HDS的全称是 Hive Data stat,该层的表全部为统计汇总的离线数据明细
  • HTP层(中间层):HTP的全称是 Hive Temporary,该层的表全部为数据处理过程中生成的中间临时数据,如 htp. tmp skynet user_retention d表是日用户留存分析中间表(以天为单位进行分区)
  • IVW层(视图层):IVW的全称是 mpala View,该层的表全部为mpaa操作的视图表,如ⅳ w.trs app_event log表是KRB层(实时层)中提到的 krs. trs app event_log表的视图。

在后续测试案例中,我们会使用实时层中的埋点日志事件表 krb. trs_ app_event_loc
这里只举例说明该表的结构设计。表4-7汇总了(贴出部分)

  • 以log_i作为主键。
  • 以log_id作为hash分区,以log_dt作为rang分区
  • 历史数据分区和最新数据分区一直保留
  • 只新增数据,增量分区,使用 Flink SQL写数据

image-20211230115834627

调度设计

  • 上面列出的HDS层数据表位于用户行为分析平台的离线数据存储层。对于离线数据理,具有周期性、重复性的特点,因此,我们使用调度工具 Azkaban提供的可靠计划来处理数据。根据各数据表中数据的前后依赖关系,用户行为分析平台的调度设计如图4-25际示。由图4-25可知,为了提高项目的可读性,子任务节点的命名与表名保持形式上的统一(子任务节点生成与之名称对应的数据表中的数据)。

image-20211230151251867

  • 在完成用户行为分析平台的架构设计、模型设计和调度设计后,开发人员便可以根据它们进行相应的代码开发。通过对用户行为分析平台项目的介绍,我们不难发现,项目中需要开发的功能多、业务流程复杂,需要测试人员的参与来保证平台质量。、

说明

阅读本章,请把hadoop和hive环境搭建好,可以参考如下文章:

数据仓库实例

  • 在本节中,我们通过一个简单的实例介绍数据仓库对数据的处理过程。假设有一家连锁超市,它有多家分店。每一个分店都有很多种类的商品,包括日用品、肉类、冷冻食品、烘焙食品和花卉等。所有产
    品在整个连锁超市环境下有一个唯一的产品编号。图3-15为一张顾客结账清单。

    image-20211214170515688

  • 经过一段时间的商品销售后,连锁超市积累了大量销售数据,如下图所示,超市分店具有分店名、分店地址
    和开店时间属性,商品有商品类别、商品价格、唯一编号和生产地址属性。当然,地址可以进一步拆分为省、市等。

image-20211214170552603

  • 假设对商品A进行促销,如发放代金券、降价等,现在分析促销活动对商品A销售量的,为了简便,本实例统计超市分店中商品A每天的销售量、到店消费人数和购买商品A的消费者的比例
  • 我们在数据仓的设计与构建文章中的数据仓库的设计中提到过,数据仓库分为数据接入层、数据明细层、数据汇总层和数据集市等。数据接入层负责将业务系统中的商品相关销售数据导入;数据明细层负责对数据接入层的数据进行预处理,过滤”脏”数据等;数据汇总层将数据按照订单进行汇总;数据集市层负责聚合计算相应的指标。
  • 由于要对商品在时间、地点等维度的指标进行汇总计算,因此,我们在数据仓库层使用维度建模方式建表,(我们在数据仓的设计与构建中的数据仓库建模方法也说过相应概念)。显然,我们对日期、超市分店(地址)和商品等维度比较感兴趣。图3-17所示为商品的维度模型实际的建模过程比这复杂。以日期维度为例,在实际建模中,时间维度表一般会会有当天是一个月中的哪一天,当天是一年中的哪天,当前周是一年中的哪周,当前季度是年中哪季度,以及时间视计算肭表示等字段,方便将销售指标在各种时间点上进行同比。

image-20211214172404147

  • 假设超市业务系统中的销售数据是以实际购物清单拆分的形式存放,即在购物清单中,含有品、商品价格和交易时间(清单创建时间)等信息,则超市业务系统的数据库中会有如图下的表关系

    image-20211214172637088

  • 由于商品信息表和超市分店信息表的数据量不大,且基本无改动,因此可以选择全量更新的方式将数据加载到数据仓库。而来自各超市分店的商品销售清单的数据量很大,且每天会有新插入的数据记录,因此,在将数据加载到数据仓库时,可以选择增量加载方式

  • 在本实例中,对于数据仓库的存储,采用HDFS和Hive,在ETL过程中,使用 HiveQL。图3-19为各级数据表的关系。

image-20211215100757274

数据接入层ODS

创建接入层的表

首先,在Hive中,创建数据库接入层对应的表,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# 切换到hadoop用户
su hadoop
# 进入到hive
hive

-- 创建超市分店信息表
DROP TABLE IF EXISTS ods_market_info;
create table ods_market_info(
market_id string comment '超市分店编号',
market_address string comment '超市分店地址',
start_time string comment '有效期起始时间',
end_time string comment '有效期终止时间',
market_name string comment '超市分店名称',
create_time string comment '创建时间',
update_time string comment '更新时间'
)
partitioned by(dt string)
row format delimited fields terminated by '\t';


--创建商品信息表
DROP TABLE IF EXISTS ods_product_info;
CREATE TABLE ods_product_info(
product_id int comment '商品id',
type_name string comment '类别名',
supplier_phone string comment '供应商手机号',
supplier_address string comment '供应商地址',
product_price string comment '商品价格',
product_desc string comment '商品说明',
start_time string comment '有效期起始时间',
end_time string comment '有效期终止时间',
product_name string comment '商品名称',
create_time string comment '创建时间',
update_time string comment '更新时间'
) comment '商品信息表'
partitioned by(dt string)
row format delimited fields terminated by '\t';


--创建清单记录表
DROP TABLE IF EXISTS ods_sale_info;
CREATE TABLE ods_sale_info(
order_id string comment '清单号',
order_status string comment '清单状态',
market_id string comment' 超市分店编号',
product_num int comment '商品数量',
product_id int comment '商品id',
create_time string comment '创建时间',
update_time string comment '更新时间'
) comment '清单记录表'
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

# 查看到新建成功的表
hive> show tables;
OK
course
ods_market_info
ods_product_info
ods_sale_info
stu
stu1
Time taken: 0.026 seconds, Fetched: 6 row(s)

准备业务数据

  • 批量造mysql表的数据,采用存储过程的方式

  • mysql中创建业务关系表,product_info(商品信息表)、market_info(超市分店信息表)、sale_info(清单记录表)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
mysql -uroot -p 
use hive

-- 创建商品信息表,以id为主键
create table product_info(
id int(10) not null auto_increment primary key,
product_id int comment '商品id',
type_name varchar(100) comment '类别名',
supplier_phone varchar(100) comment '供应商手机号',
supplier_address varchar(100) comment '供应商地址',
product_price varchar(100) comment '商品价格',
product_desc varchar(100) comment '商品说明',
start_time varchar(100) comment '有效期起始时间',
end_time varchar(100) comment '有效期终止时间',
product_name varchar(100) comment '商品名称',
create_time varchar(100) comment '创建时间',
update_time varchar(100) comment '更新时间'
) engine=innodb default charset=utf8;


-- 创建超市分店信息表
create table market_info(
id int(10) not null auto_increment primary key,
market_id varchar(100) comment '超市分店编号',
market_address varchar(100) comment '超市分店地址',
start_time varchar(100) comment '有效期起始时间',
end_time varchar(100) comment '有效期终止时间',
market_name varchar(100) comment '超市分店名称',
create_time varchar(100) comment '创建时间',
update_time varchar(100) comment '更新时间'
) engine=innodb default charset=utf8;

--创建清单记录表
create table sale_info(
id int(10) not null auto_increment primary key,
order_id varchar(100) comment '清单号',
order_status varchar(100) comment '清单状态',
market_id varchar(100) comment' 超市分店编号',
product_num int comment '商品数量',
product_id int comment '商品id',
create_time varchar(100) comment '创建时间',
update_time varchar(100) comment '更新时间'
) engine=innodb default charset=utf8;


# 查看到各个新建的三个表
mysql> show tables;



| market_info |
| product_info |
| sale_info |
+-------------------------------+
77 rows in set (0.01 sec)

插入数据

超市分店
1
2
3
4
5
6
7
8
9
mysql -uroot -p 
use hive

insert into market_info(market_id,market_address,start_time,end_time,market_name,create_time,update_time) values ('1000001','湖南省长沙市开福区万达广场1021号','2021-12-16','2028-12-17','大润发开福万达店','2021-12-12','2021-12-12');


insert into market_info(market_id,market_address,start_time,end_time,market_name,create_time,update_time) values ('1000002','湖南省长沙市岳麓区万达广场1021号','2021-12-16','2028-12-17','大润发岳麓万达店','2021-12-12','2021-12-12');

insert into market_info(market_id,market_address,start_time,end_time,market_name,create_time,update_time) values ('1000003','湖南省长沙市雨花区万达广场1021号','2021-12-16','2028-12-16','大润发雨花万达店','2021-12-12','2021-12-12');
商品表-存储过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
mysql -uroot -p 
use hive

drop procedure insert_product_info;
delimiter //
create procedure insert_product_info(type_name varchar(100),product_price varchar(100),start_time varchar(100),end_time varchar(100),create_time varchar(100),update_time varchar(100),num int)
begin
declare str char(62) default 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789';
declare product_name char(100);
declare product_id int;
declare i int default 0;
while i<= num DO
-- 生成商品名称随机数
set product_name=concat("商品名称",substring(str,1+floor(rand()*61),2),substring(str,1+floor(rand()*61),3));
-- 生成商品ID随机数
set product_id = floor(rand()*1000);
set i=i+1;
INSERT INTO `hive`.`product_info` (`product_id`, `type_name`, `supplier_phone`, `supplier_address`, `product_price`, `product_desc`, `start_time`, `end_time`, `product_name`, `create_time`, `update_time`) VALUES (product_id, type_name, '18576759590', '湖南省常德市', product_price, '产品描述', start_time, end_time, product_name, create_time, update_time);
end while;
end;
//

# 下面这种方式调用,后面的100就是插入100条数据
mysql> call insert_product_info('食品','50','2021-12-16', '2022-12-17', '2021-12-15','2021-12-15',100) //

mysql> call insert_product_info('酒水','100','2021-12-16', '2022-12-17', '2021-12-15','2021-12-15',100) //

# 查询到各个插入成功的数据
mysql> select count(*) from product_info;//
+----------+
| count(*) |
+----------+
| 202 |
+----------+

  • 数据列表

image-20211217165223721

清单记录表-存储过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
mysql -uroot -p 
use hive

drop procedure insert_sale_info;
delimiter //
create procedure insert_sale_info(order_status varchar(10),market_id varchar(100),product_num int,product_id int,create_time varchar(100),update_time varchar(100),num int)
begin
declare order_id int;
declare i int default 0;
while i<= num DO
set i=i+1;
-- 随机生成订单id
set order_id = floor(rand()*100);
INSERT INTO `hive`.`sale_info` (`order_id`, `order_status`, `market_id`, `product_num`, `product_id`, `create_time`, `update_time`) VALUES (order_id, order_status, market_id, product_num, product_id, create_time, update_time);
end while;
end;
//

# 注意//这个分隔符,是区分存储过程的,调用存储过程注意market_id,product_id的值,要从相应的超市分店,商品信息表中找到对应数据
mysql> call insert_sale_info('待付款','1000001',5, 221,'2021-12-15','2021-12-15',100) //
mysql> call insert_sale_info('已付款','1000002',10, 182, '2021-12-14','2021-12-14',100) //

# 查询到刚刚插入的数据
mysql> select count(*) from sale_info;//
+----------+
| count(*) |
+----------+
| 203 |
+----------+
1 row in set (0.00 sec)
  • 数据列表如下

image-20211217170539691

业务数据导入ODS-datax

datax 环境搭建

  • 建议自己用源代码编译方式,比较稳妥
  • 下载源文件解压
1
2
wget https://github.com/alibaba/DataX/archive/master.zip
unzip DataX-master.zip
  • 下载maven
1
2
3
4
5
6
7
8
9
10
sudo wget --no-check-certificate  https://dlcdn.apache.org/maven/maven-3/3.8.4/binaries/apache-maven-3.8.4-bin.tar.gz

tar -zxvf apache-maven-3.8.4-bin.tar.gz
# 配置maven环境变量
vi /etc/profile

export M2_HOME=/usr/local/apache-maven-3.8.4 //本地maven安装home目录
export PATH=$PATH:$M2_HOME/bin
# 生效环境变量设置
source /etc/profile

image-20211221090119715

  • 配置maven本地仓库, 进如本地maven安装目录里的conf目录, vi settings.xml进行如下修改
1
2
3
4
5
6
7
8
9
 -- 设置仓库地址
<localRepository>/usr/local/apache-maven-3.8.4/repo</localRepository>
-- 设置阿里云镜像
<mirror>
<id>nexus-aliyun</id>
<mirrorOf>central</mirrorOf>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
  • 最后查看maven安装结果 maven -version
1
2
3
4
5
6
7
[root@VM-24-13-centos resp]# mvn -version
Apache Maven 3.8.4 (9b656c72d54e5bacbed989b64718c159fe39b537)
Maven home: /usr/local/apache-maven-3.8.4
Java version: 1.8.0_311, vendor: Oracle Corporation, runtime: /usr/local/jdk1.8.0_311/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1160.11.1.el7.x86_64", arch: "amd64", family: "unix"

  • 修改datax的目录中的pom.xml中的内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<mysql.driver.version>8.0.26</mysql.driver.version>

<!-- reader -->
<module>mysqlreader</module>
<module>hdfsreader</module>
<module>streamreader</module>

<!-- writer -->
<module>mysqlwriter</module>
<module>hdfswriter</module>
<module>streamwriter</module>
<!-- common support module -->
<module>plugin-rdbms-util</module>
<module>plugin-unstructured-storage-util</module>
<module>hbase20xsqlreader</module>
<module>hbase20xsqlwriter</module>
<module>kuduwriter</module>
  • hdfswrite目录下面的pom.xml修改hive和hadoop版本
1
2
3
4
<properties>
<hive.version>3.1.2</hive.version>
<hadoop.version>3.0.3</hadoop.version>
</properties>
  • 在 datax的目录执行编译命令
1
2
3
4
5
6
7
8
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
WARNING] Assembly file: /usr/local/DataX-master/target/datax is not a regular file (it may be a directory). It cannot be attached to the project build for installation or deployment.
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for datax-all 0.0.1-SNAPSHOT:
[INFO] kuduwriter ......................................... SUCCESS [ 2.148 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS

  • 把target目录中的datax.tar.gz移动到指定目录,解压
1
2
3
[root@VM-24-13-centos target]# cp datax.tar.gz /usr/local/
cd /usr/local/
tar -zxvf datax.tar.gz
数据超市导入ods表
  • 创建分区信息,手动创建分区路径

  • 不过奇怪的是我用下面命令的方式创建,用datax导入报错找不到创建的分区

    1
    hdfs dfs -mkdir -p /user/hive/warehouse/hive.db/ods_market_info/dt=2021-12-21
    • 用sql原生语句insert插入一条数据后,重新datax导入就成功了
    1
    insert into ods_market_info partition(dt = '2021-12-21')
1
2
3
4
5
6
7
8
9
10
11
12
13
# 切换用户
su hadoop
# 进入到hive模式
hive
# 使用hive数据库
use hive;
# 手动插入分区信息内容
hive>insert into ods_market_info partition(dt = '2021-12-21') values ('111','222','33','44','55','66','77');
# 查看到刚刚插入的信息
hive>select * from ods_market_info;
# 新增完后,可以删除表中数据,也可不删
hive>truncate table ods_market_info;
hive>exit;
  • 查看到分区信息
1
2
3
[hadoop@VM-24-13-centos root]$ hadoop fs -ls /user/hive/warehouse/hive.db/ods_market_info/
drwxr-xr-x - hadoop supergroup 0 2021-12-22 15:57 /user/hive/warehouse/hive.db/ods_market_info/dt=2021-12-21
drwxr-xr-x - hadoop supergroup 0 2021-12-22 15:57 /user/hive/warehouse/hive.db/ods_market_info/dt=2021-12-22
  • 在datax的job目录编写一个mysql_hive_ods_market_info.json文件,同步超市分店配置用

    1
    2
    [root@VM-24-13-centos job]# ls
    job.json mysql_hive_ods_market_info.json
  • 编辑mysql_hive_ods_market_info.json文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    {
    "job": {
    "setting": {
    "speed": {
    "channel": 1
    }
    },
    "content": [{
    "reader": {
    "name": "mysqlreader",
    "parameter": {
    "column": [
    "market_id",
    "market_address",
    "start_time",
    "end_time",
    "market_name",
    "create_time",
    "update_time"
    ],
    "connection": [
    {
    "jdbcUrl": [
    "jdbc:mysql://localhost:3306/hive"
    ],
    "table": [
    "market_info"
    ]
    }
    ],
    "password": "hive1234",
    "username": "hive"
    }
    },

    "writer": {
    "name": "hdfswriter",
    "parameter": {
    "defaultFS": "hdfs://localhost:9000",
    "fileType": "text",
    "path": "/user/hive/warehouse/hive.db/ods_market_info/dt=2021-12-21",
    "fileName": "ods_market_info",
    "column": [{
    "name": "market_id",
    "type": "string"
    },
    {
    "name": "market_address",
    "type": "string"
    },
    {
    "name": "start_time",
    "type": "string"
    },
    {
    "name": "end_time",
    "type": "string"
    },
    {
    "name": "market_name",
    "type": "string"
    },
    {
    "name": "create_time",
    "type": "string"
    },
    {
    "name": "update_time",
    "type": "string"
    }
    ],
    "writeMode": "append",
    "fieldDelimiter": "\t",
    }
    }
    }]
    }
    }

    hive> show create table hive.ods_market_info;
    …..
    LOCATION
    ‘hdfs://localhost:9000/user/hive/warehouse/hive.db/ods_market_info’
    ….

    执行命令后在结果中可以看到LOCATOIN,就是hive在hdfs中的存储目录。填写到writer下的path中,dt就是刚刚创建的分区

  • 运行datax命令

1
2
3
4
5
# 回到root用户
[hadoop@VM-24-13-centos root]$ su
[root@VM-24-13-centos ~]# cd /usr/local/datax/bin/
# DHADOOP_USER_NAME一定要用hadoop用户,用其他用户会报错没有权限
[root@VM-24-13-centos bin]# python datax.py -p "-DHADOOP_USER_NAME=hadoop" ../job/mysql_hive_ods_market_info.json

image-20211222172711034

  • 查看hive中的超市表中是否有数据
1
2
3
4
5
6

[root@VM-24-13-centos bin]# su hadoop
[hadoop@VM-24-13-centos bin]$ hive
hive> select * from hive.ods_market_info;
000001 湖南省长沙市开福区万达广场1021号 2021-12-16 00:00:00 2028-12-16 23:59:59 大润发开福万达店 2021-12-12 16:00:00 2021-12-12 16:00:00 2021-12-21
1000002 湖南省长沙市岳麓万达广场1021号 2021-12-16 00:00:00 2028-12-16 23:59:59 大润发岳麓万达店 2021-12-12 16:00:00 2021-12-12 16:00:00 2021-12-21
商品信息导入ods表
1
2
3
4
5
6
7
8
9
10
11
12
13
# 切换用户
su hadoop
# 进入到hive模式
hive
# 使用hive数据库
use hive;
# 手动插入分区信息内容
hive>insert into ods_product_info partition(dt = '2021-12-21') VALUES (11, '222', '18576759590', '湖南省常德市', '222', '产品描述', '333', '444', '555', '666', '77');
# 查看到刚刚插入的信息
hive>select * from ods_product_info;
# 新增完后,可以删除表中数据,也可不删
hive>truncate table ods_product_info;
hive>exit;
  • 查看到分区信息
1
2
[hadoop@VM-24-13-centos root]$ hadoop fs -ls /user/hive/warehouse/hive.db/ods_product_info/
drwxr-xr-x - hadoop supergroup 0 2021-12-23 09:31 /user/hive/warehouse/hive.db/ods_product_info/dt=2021-12-21
  • 在datax的job目录编写一个mysql_hive_ods_product_info.json文件,同步超市分店配置用

    1
    2
    3
    4
    [root@VM-24-13-centos job]# ls
    -rwxrwxrwx 1 root root 1587 Dec 21 18:05 job.json
    -rw-r--r-- 1 root root 1861 Dec 22 15:54 mysql_hive_ods_market_info.json
    -rw-r--r-- 1 root root 1861 Dec 23 09:36 mysql_hive_ods_product_info.json
  • 编辑mysql_hive_ods_productinfo.json文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"product_id",
"type_name",
"supplier_phone",
"supplier_address",
"product_price",
"product_desc",
"start_time",
"end_time",
"product_name",
"create_time",
"update_time",
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://localhost:3306/hive"
],
"table": [
"product_info"
]
}
],
"password": "hive1234",
"username": "hive"
}
},

"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://localhost:9000",
"fileType": "text",
"path": "/user/hive/warehouse/hive.db/ods_product_info/dt=2021-12-21",
"fileName": "ods_product_info",
"column": [{
"name": "product_id",
"type": "int"
},
{
"name": "type_name",
"type": "string"
},
{
"name": "supplier_phone",
"type": "string"
},
{
"name": "supplier_address",
"type": "string"
},
{
"name": "product_price",
"type": "string"
},
{
"name": "product_desc",
"type": "string"
},
{
"name": "start_time",
"type": "string"
},
{
"name": "end_time",
"type": "string"
},

{
"name": "product_name",
"type": "string"
},

{
"name": "create_time",
"type": "string"
},
{
"name": "update_time",
"type": "string"
}
],
"writeMode": "append",
"fieldDelimiter": "\t",
}
}
}]
}
}
  • 运行datax命令
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 回到root用户
[hadoop@VM-24-13-centos root]$ su
[root@VM-24-13-centos ~]# cd /usr/local/datax/bin/
# DHADOOP_USER_NAME一定要用hadoop用户,用其他用户会报错没有权限
[root@VM-24-13-centos bin]# python datax.py -p "-DHADOOP_USER_NAME=hadoop" ../job/mysql_hive_ods_product_info.json

任务启动时刻 : 2021-12-23 10:30:52
任务结束时刻 : 2021-12-23 10:31:05
任务总计耗时 : 12s
任务平均流量 : 2.22KB/s
记录写入速度 : 20rec/s
读出记录总数 : 201
读写失败总数 : 0

  • 查看hive中的商品表中是否有数据
1
2
3
4
5
6
7
8
9
su hadoop
hive
use hive;
hive> select count(product_id) from ods_product_info;
Total MapReduce CPU Time Spent: 0 msec
OK
201
Time taken: 2.154 seconds, Fetched: 1 row(s)

销售事实导入ods表
1
2
3
4
5
6
7
8
9
10
11
12
13
# 切换用户
su hadoop
# 进入到hive模式
hive
# 使用hive数据库
use hive;
# 手动插入分区信息内容
hive>insert into ods_sale_info partition(dt = '2021-12-21') values (1, '222', '333', 4, 55, '666', '77');
# 查看到刚刚插入的信息
hive>select * from ods_sale_info;
# 新增完后,可以删除表中数据,也可不删
hive>truncate table ods_sale_info;
hive>exit;
  • 查看到分区信息
1
2
[hadoop@VM-24-13-centos root]$ hadoop fs -ls /user/hive/warehouse/hive.db/ods_sale_info/
drwxr-xr-x - hadoop supergroup 0 2021-12-23 11:09 /user/hive/warehouse/hive.db/ods_sale_info/dt=2021-12-21
  • 在datax的job目录编写一个mysql_hive_ods_sale_info.json文件,同步销售事实表配置用

    1
    2
    3
    4
    5
    6
    [root@VM-24-13-centos job]# ls
    -rwxrwxrwx 1 root root 1587 Dec 21 18:05 job.json
    -rw-r--r-- 1 root root 1861 Dec 22 15:54 mysql_hive_ods_market_info.json
    -rw-r--r-- 1 root root 2267 Dec 23 10:28 mysql_hive_ods_product_info.json
    -rw-r--r-- 1 root root 2267 Dec 23 11:06 mysql_hive_ods_sale_info.json

  • 编辑mysql_hive_ods_productinfo.json文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"order_id",
"order_status",
"market_id",
"product_num",
"product_id",
"create_time",
"update_time"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://localhost:3306/hive"
],
"table": [
"sale_info"
]
}
],
"password": "hive1234",
"username": "hive"
}
},

"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://localhost:9000",
"fileType": "text",
"path": "/user/hive/warehouse/hive.db/ods_sale_info/dt=2021-12-21",
"fileName": "ods_sale_info",
"column": [{
"name": "order_id",
"type": "string"
},
{
"name": "order_status",
"type": "string"
},
{
"name": "market_id",
"type": "string"
},
{
"name": "product_num",
"type": "int"
},
{
"name": "product_id",
"type": "int"
},

{
"name": "create_time",
"type": "string"
},
{
"name": "update_time",
"type": "string"
}
],
"writeMode": "append",
"fieldDelimiter": "\t",
}
}
}]
}
}
  • 运行datax
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 回到root用户
[hadoop@VM-24-13-centos root]$ su
[root@VM-24-13-centos ~]# cd /usr/local/datax/bin/
# DHADOOP_USER_NAME一定要用hadoop用户,用其他用户会报错没有权限
[root@VM-24-13-centos bin]# python datax.py -p "-DHADOOP_USER_NAME=hadoop" ../job/mysql_hive_ods_sale_info.json


任务启动时刻 : 2021-12-23 11:17:21
任务结束时刻 : 2021-12-23 11:17:34
任务总计耗时 : 12s
任务平均流量 : 1.07KB/s
记录写入速度 : 20rec/s
读出记录总数 : 202
读写失败总数 : 0

  • 查看hive中的销售事实表是否有数据
1
2
3
4
5
6
7
8
9
su hadoop
hive
use hive;
hive> select count(order_id) from ods_sale_info;

Stage-Stage-1: HDFS Read: 27232 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
202
  • 再次导入一次数据,造成重复的脏数据,为下一步数据清洗例子做准备
1
2
[root@VM-24-13-centos bin]# python datax.py -p "-DHADOOP_USER_NAME=hadoop" ../job/mysql_hive_ods_sale_info.json

  • 查看 hive中的销售事实表存在了404条数据,有一半重复的
1
2
3
4
5
6
7
8
9
su hadoop
hive
use hive;
hive> select count(order_id) from ods_sale_info;

Stage-Stage-1: HDFS Read: 27232 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
404

数据清洗

  • 在业务数据导入到ods层时,可能一些误操作,脏数据等,需要对ods层的数据进行清洗处理,本次就以ods_sale_info表中去重重复的order_id
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
su hadoop
hive
use hive;

hive>drop table if exists tmp_ods_to_dwd_sale_info;
create table tmp_ods_to_dwd_sale_info
as select a.order_id,a.order_status,a.market_id,a.product_num,a.product_id,a.create_time,a.update_time from
(select order_id,order_status,market_id,product_num,product_id,create_time,update_time, ROW_NUMBER() OVER(partition by order_id order BY create_time DESC) rn FROM ods_sale_info) a
WHERE a.rn=1;

# 查看到的只有85条数据
hive> select count(*) from tmp_ods_to_dwd_sale_info;
OK
85

数据明细层DWD

  • 数据清洗完毕后,把ODS层数据导入到OWD层

数据仓库建模

  • 在数据仓库层,采用星形模式创建超市分店维度表、商品维度表、日期维度表和销售事实表

维度建模

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# 切换到hadoop用户
su hadoop
# 进入到hive
hive

-- 创建超市分维度表
DROP TABLE IF EXISTS dw_dim_market_info;
create table dw_dim_market_info(
market_id string comment '超市分店编号',
market_address string comment '超市分店地址',
effective_date string comment '有效期起始时间',
expriry_date string comment '有效期终止时间',
market_name string comment '超市分店名称'
) comment '创建超市分维度表'
partitioned by(dt string)
row format delimited fields terminated by '\t';


--创建商品维度表
DROP TABLE IF EXISTS dw_dim_product_info;
CREATE TABLE dw_dim_product_info(
product_id int comment '商品id',
type_name string comment '类别名',
supplier_phone string comment '供应商手机号',
supplier_address string comment '供应商地址',
product_price string comment '商品价格',
product_desc string comment '商品说明',
effective_date string comment '有效期起始时间',
expriry_date string comment '有效期终止时间',
product_name string comment '商品名称'
) comment '商品维度表'
partitioned by(dt string)
row format delimited fields terminated by '\t';

--创建日期维度表
DROP TABLE IF EXISTS dw_dim_date_info;
CREATE TABLE dw_dim_date_info(
date_id string comment '日期id',
year_value string comment '年',
month_value string comment'月',
day_value string comment '日',
date_value string comment '年-月-日',
is_weekend string comment '是否周末', -- 0表示非周末,1表示周末
day_of_week string comment '一周中的周几'
) comment '日期维度表'
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

--创建销售事实表
DROP TABLE IF EXISTS dwd_sale_fact;
CREATE TABLE dwd_sale_fact(
order_id string comment '清单号',
order_status string comment '清单状态',
market_id string comment' 超市分店编号',
date_id string comment '日期id',
product_num int comment '商品数量',
product_id int comment '商品id',
create_time string comment '创建时间',
update_time string comment '更新时间'
) comment '销售事实表'
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';




# 查看到新建成功的表
hive> show tables;
OK
course
dw_dim_date_info
dw_dim_market_info
dw_dim_product_info
dwd_sale_fact
ods_market_info
ods_product_info
ods_sale_info
stu
stu1
tmp_ods_to_dwd_sale_info

导入ODS层数据

  • 把数据接入层(ODS)导入到维度表中
日期维度表
  • 初始化一些测试数据,注意date_value这个字段的值,需要和tmp_ods_to_dwd_sale_info中的create_time有关联关系,要造一些相等条件的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
su hadoop
hive
hive>use hive;
hive>insert into dw_dim_date_info partition(dt = '2021-12-21') VALUES ('2021122101', '2021', '12','15','2021-12-15', '0', '51');

hive>insert into dw_dim_date_info partition(dt = '2021-12-21') VALUES ('2021122102', '2021', '12', '24','2021-12-24', '0', '51');

hive>insert into dw_dim_date_info partition(dt = '2021-12-21') VALUES ('2021122103', '2021', '12', '25','2021-12-25', '1', '51');


hive> select * from dw_dim_date_info;
OK
2021122101 2021 12 15 2021-12-15 0 51 2021-12-21
2021122102 2021 12 24 2021-12-24 0 51 2021-12-21
2021122103 2021 12 25 2021-12-25 1 51 2021-12-21

超市维度表
  • ods_market_info 表数据插入
  • 此次实例中,好像没有用到
1
2
3
4
5
6
7
8
9
hive>insert into dw_dim_market_info partition(dt = '2021-12-21') select market_id,market_address,market_name,start_time as effective_date,end_time as expiry_date
from ods_market_info;


hive> select * from dw_dim_market_info;
OK
1000001 湖南省长沙市开福区万达广场1021号 大润发开福万达店 2021-12-16 2028-12-17 2021-12-21
1000002 湖南省长沙市岳麓区万达广场1021号 大润发岳麓万达店 2021-12-16 2028-12-17 2021-12-21
1000003 湖南省长沙市雨花区万达广场1021号 大润发雨花万达店 2021-12-16 2028-12-16 2021-12-21
商品维度表
  • ods_product_info表数据插入
  • 此次实例中,好像没有用到
1
2
3
4
5
6
7
8
9
10
11
hive>insert into dw_dim_product_info partition(dt = '2021-12-21') select product_id,product_name,type_name,supplier_phone,supplier_address,product_price,product_desc,start_time as effective_date,end_time as expiry_date
from ods_product_info;

hive> select * from dw_dim_product_info;
OK
221 商品名称78ABC 食品 18576759590 湖南省常德市 50 产品描述 2021-12-16 2022-12-17 2021-12-21
545 商品名称XYyzA 食品 18576759590 湖南省常德市 50 产品描述 2021-12-16 2022-12-17 2021-12-21
639 商品名称GHdef 食品 18576759590 湖南省常德市 50 产品描述 2021-12-16 2022-12-17 2021-12-21
459 商品名称cdtuv 食品 18576759590 湖南省常德市 50 产品描述 2021-12-16 2022-12-17 2021-12-21


销售事实表
  • tmp_ods_to_dwd_sale_info表是上述处理重复销售清单记录表的过滤后的临时表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
hive>insert into dwd_sale_fact partition(dt = '2021-12-21') select a.order_id,a.order_status,a.market_id,b.date_id,a.product_num,a.product_id,a.create_time,a.update_time
from tmp_ods_to_dwd_sale_info a
inner join dw_dim_date_info b
on a.create_time=b.date_value;

# 查询到2021-12-15的关联数据
hive> select * from dwd_sale_fact;
OK
11 待付款 1000001 2021122101 5 221 2021-12-15 2021-12-15 2021-12-21
12 待付款 1000001 2021122101 5 221 2021-12-15 2021-12-15 2021-12-21
14 待付款 1000001 2021122101 5 221 2021-12-15 2021-12-15 2021-12-21
16 待付款 1000001 2021122101 5 221 2021-12-15 2021-12-15 2021-12-21
17 待付款 1000001 2021122101 5 221 2021-12-15 2021-12-15 2021-12-21
20 待付款 1000001 2021122101 5 221 2021-12-15 2021-12-15 2021-12-21
....

数据汇总层DWS

  • 由于我们要统计商品A的销售量,以及商品A的购买比例,因此在数据汇总层,对销售数据按照清单号进行汇总,并添加include_product_a 字段,用于表示该清单是否商品A(本实例中的商品id为221),处理过程如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
-- 创建DWS层清单记录表
drop table if exists dws_order_info;
create table dws_order_info (
order_id string comment '清单号',
order_status string comment '清单状态',
market_id string comment' 超市分店编号',
include_product_a int comment '是否包括商品A',
date_id string comment '日期id',
a_num int comment '商品A数量',
product_info string comment '商品信息',
create_time string comment '创建时间',
update_time string comment '更新时间'
) comment '清单记录表'
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

-- 创建中间表,添加is_product_a字段
drop table if exists tmp_dwd_to_dws_order_info;
create table tmp_dwd_to_dws_order_info as select
order_id,order_status,market_id,date_id,
Case
when product_id=221 then 1
else 0
end as is_product_a, -- 是否为商品A
case
when product_id=221 then product_num
else 0
end as a_num, -- 商品A的数量
product_id,
product_num,
create_time,
update_time
from dwd_sale_fact;

# 查询到以及过滤的product_id为221的清单数据
hive> select * from tmp_dwd_to_dws_order_info;
OK
11 待付款 1000001 2021122101 1 5 221 5 2021-12-15 2021-12-15
12 待付款 1000001 2021122101 1 5 221 5 2021-12-15 2021-12-15
14 待付款 1000001 2021122101 1 5 221 5 2021-12-15 2021-12-15
16 待付款 1000001 2021122101 1 5 221 5 2021-12-15 2021-12-15
17 待付款 1000001 2021122101 1 5 221 5 2021-12-15 2021-12-15
20 待付款 1000001 2021122101 1 5 221 5 2021-12-15 2021-12-15
22 待付款 1000001 2021122101 1 5 221 5 2021-12-15 2021-12-15
23 待付款 1000001 2021122101 1 5 221 5 2021-12-15 2021-12-15
24 待付款 1000001 2021122101 1 5 221 5 2021-12-15 2021-12-15
25 待付款 1000001 2021122101 1 5 221 5 2021-12-15 2021-12-15



-- 按照清单号进行清单数据汇总
hive>insert into dws_order_info partition(dt = '2021-12-21')
select order_id,order_status,market_id,date_id,
case
when sum(is_product_a)>0 then 1
else 0
end as include_product_a,
sum(a_num) as a_num,
concat_ws('_',collect_list(cast(product_id as string)),collect_list(cast(product_num as string))) as product_info,
create_time,update_time
from tmp_dwd_to_dws_order_info group by order_id,order_status,market_id,date_id,create_time,update_time;
hive> select * from dws_order_info;
OK
11 待付款 1000001 2021122101 1 5 221_5 2021-12-15 2021-12-15 2021-12-21
12 待付款 1000001 2021122101 1 5 221_5 2021-12-15 2021-12-15 2021-12-21
14 待付款 1000001 2021122101 1 5 221_5 2021-12-15 2021-12-15 2021-12-21
16 待付款 1000001 2021122101 1 5 221_5 2021-12-15 2021-12-15 2021-12-21
17 待付款 1000001 2021122101 1 5 221_5 2021-12-15 2021-12-15 2021-12-21
20 待付款 1000001 2021122101 1 5 221_5 2021-12-15 2021-12-15 2021-12-21

数据集市层DWM

  • 在数据集市层,需要对相关指标进行聚合计算,处理过程如下。
  • 此处商品A的id为221
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
drop table if exists dwn_order_info_by_day;
create table dwn_order_info_by_day
as select
count(distinct c.order_id) as consumption_num, -- 商品A销售清单
sum(c.a_num) as day_num, -- 商品A消费总数
sum(c.include_product_a)/count(distinct c.order_id) as buy_a_rate -- 购买商品A的消费比例
from
(
select
a.order_id as order_id,
a.a_num as a_num,
a.include_product_a as include_product_a,
b.year_value as year_value,
b.month_value as month_value,
b.day_value as day_value
from dws_order_info a
left join dw_dim_date_info b on a.date_id=b.date_id) c
group by c.day_value;

-- 查询到商品A的购买数据记录
hive> select * from dwn_order_info_by_day;
OK
62 310 2.021122101E9
Time taken: 0.098 seconds, Fetched: 1 row(s)

总结

  • 以上就是数据从数据源经过ETL处理最终加载到数据仓库的整个过程。在实际业务过程中,数据规模庞大、业务逻辑复杂,需要生成大量的ETL处理任务,因此在数据仓库设计过程中,需要考虑中间层数据的通用性。在调度系统(如Airflow、Azkaban等)的调度下,这些ETL任务分批有序执行,最终生成报表等应用所需的数据

说明

hive简单介绍

使用 hive 的命令行接口,感觉很像操作关系数据库,但是 hive 和关系数据库还是有很大的不同,下面我就比较下 hive 与关系数据库的区别,具体如下:

  • Hive 和关系数据库存储文件的系统不同,Hive 使用的是 hadoop 的 HDFS(hadoop 的分布式文件系统),关系数据库则是服务器本地的文件系统;
  • hive 使用的计算模型是 mapreduce,而关系数据库则是自己设计的计算模型;
  • 关系数据库都是为实时查询的业务进行设计的,而 Hive 则是为海量数据做数据挖掘设计的,实时性很差;实时性的区别导致 Hive 的应用场景和关系数据库有很大的不同;

安装hive

  • 下载版本选择为3.1.2
1
2
3
4
5
6
[root@VM-24-13-centos local]# wget --no-check-certificate  https://dlcdn.apache.org/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz

sudo tar -zxvf ./apache-hive-3.1.2-bin.tar.gz -C /usr/local # 解压到/usr/local中
cd /usr/local/
sudo mv apache-hive-3.1.2-bin hive # 将文件夹名改为hive
sudo chown -R hadoop:hadoop hive # 修改文件权限

配置环境

1
2
3
4
5
6
[hadoop@VM-24-13-centos local]$ vi ~/.bashrc


export HIVE_HOME=/usr/local/hive
export PATH=$PATH:$HIVE_HOME/bin
export HADOOP_HOME=/usr/local/hadoop
  • 生效环境变量

    1
    source ~/.bashrc
  • 修改/usr/local/hive/conf下的hive-site.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    cd /usr/local/hive/conf
    mv hive-default.xml.template hive-default.xml

    [hadoop@VM-24-13-centos conf]$ vi hive-site.xml

    <?xml version="1.0" encoding="UTF-8" standalone="no"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <configuration>
    <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true</value>
    <description>JDBC connect string for a JDBC metastore</description>
    </property>
    <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>com.mysql.jdbc.Driver</value>
    <description>Driver class name for a JDBC metastore</description>
    </property>
    <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>hive</value>
    <description>username to use against metastore database</description>
    </property>
    <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>hive1234</value>
    <description>记得在创建用户时,密码要和这个对应</description>
    </property>
    </configuration>

安装和配置mysql

安装mysql

  • 我之前已经装好了,省略此步骤

配置mysql

  • 新建hive数据库
1
2
[hadoop@VM-24-13-centos conf]$ mysql -u root -p
create database hive; #这个hive数据库与hive-site.xml中localhost:3306/hive的hive对应,用来保存hive元数据
  • 配置mysql允许hive接入
1
2
3
4
5
6
7
8
9
mysql> set global validate_password.policy='LOW'; # mysql8后,有密码策略要求,改为低
mysql >create user hive@localhost identified by 'hive1234';
# hive 代表你要创建的此数据库的新用户账号
# localhost 代表访问本地权限,不可远程访问,还有其他值
# %代表通配所有host地址权限(可远程访问)
# 指定特殊Ip访问权限 如10.138.106.10
# hive1234代表你要创建的此数据库的新用密码
mysql>grant all privileges on *.* to 'hive'@'%' # 授权数据库给hive用户
mysql> flush privileges; #刷新mysql系统权限关系表

配置hive

启动hive

  • 启动hive之前,请先启动hadoop集群。

    1
    2
    3
    start-all.sh #启动hadoop
    hive #启动hive

错误处理

1
2
hive> show databases;# 输入后报错
FAILED: HiveException java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
  • Hive现在包含一个用于 Hive Metastore 架构操控的脱机工具,名为 schematool.此工具可用于初始化当前 Hive 版本的 Metastore 架构。此外,其还可处理从较旧版本到新版本的架构升级,用下面的命令:
1
2
3
4
5
[hadoop@VM-24-13-centos conf]$ schematool -dbType mysql -initSchema

# 出现下面的错误,提升驱动载入失败
org.apache.hadoop.hive.metastore.HiveMetaException: Failed to load driver
Underlying cause: java.lang.ClassNotFoundException : com.mysql.jdbc.Driver

image-20211208112332826

  • 下载安装mysql驱动
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
sudo wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java-8.0.26-1.el7.noarch.rpm

# 可以看到其他都是文档,那么连接驱动文件是/usr/share/java/mysql-connector-java.jar
[root@study1 opt]# rpm -qpl mysql-connector-java-8.0.26-1.el7.noarch.rpm
警告:mysql-connector-java-8.0.26-1.el7.noarch.rpm: 头V3 DSA/SHA1 Signature, 密钥 ID 5072e1f5: NOKEY
/usr/share/doc/mysql-connector-java-8.0.26
/usr/share/doc/mysql-connector-java-8.0.26/CHANGES
/usr/share/doc/mysql-connector-java-8.0.26/INFO_BIN
/usr/share/doc/mysql-connector-java-8.0.26/INFO_SRC
/usr/share/doc/mysql-connector-java-8.0.26/LICENSE
/usr/share/doc/mysql-connector-java-8.0.26/README
/usr/share/java/mysql-connector-java.jar

# 安装,提示缺少依赖java-headless且版本要大于1.8版本
[root@study1 opt]# rpm -ivh mysql-connector-java-8.0.26-1.el7.noarch.rpm
警告:mysql-connector-java-8.0.26-1.el7.noarch.rpm: 头V3 DSA/SHA1 Signature, 密钥 ID 5072e1f5: NOKEY
错误:依赖检测失败:
java-headless >= 1:1.8.0 被 mysql-connector-java-1:8.0.26-1.el7.noarch 需要

# 安装依赖
[root@study1 opt]# yum -y install java-headless

# 再次安装
[root@study1 opt]# rpm -ivh mysql-connector-java-8.0.26-1.el7.noarch.rpm
警告:mysql-connector-java-8.0.26-1.el7.noarch.rpm: 头V3 DSA/SHA1 Signature, 密钥 ID 5072e1f5: NOKEY
准备中... ################################# [100%]
正在升级/安装...
1:mysql-connector-java-1:8.0.26-1.e################################# [100%]

# 查看驱动文件
[root@study1 opt]# ll /usr/share/java
总用量 2328
-rw-r--r--. 1 root root 2381198 9月 11 05:55 mysql-connector-java.jar
# 把驱动文件拷贝到hive的lib中
[root@study1 opt]# cp /usr/share/java/mysql-connector-java.jar /usr/local/hive/lib
  • 再次schematool初始化就可以了
1
[hadoop@VM-24-13-centos local]$ schematool -dbType mysql -initSchema
  • 进入hive
1
2
3
4
5
6
7
8
9
10
[hadoop@VM-24-13-centos local]$ hive

hive> show databases;
OK
default
Time taken: 0.02 seconds, Fetched: 1 row(s)
hive> show tables;
OK
Time taken: 0.036 seconds
hive>
  • 如果要退出Hive交互式执行环境,可以输入如下命令:
1
2
hive> exit;
[hadoop@VM-24-13-centos local]$

Hive的常用HiveQL操作

Hive基本数据类型

  • Hive支持基本数据类型和复杂类型, 基本数据类型主要有数值类型(INT、FLOAT、DOUBLE ) 、布尔型和字符串, 复杂类型有三种:ARRAY、MAP 和 STRUCT。

基本数据类型

  • TINYINT: 1个字节
  • SMALLINT: 2个字节
  • INT: 4个字节
  • BIGINT: 8个字节
  • BOOLEAN: TRUE/FALSE
  • FLOAT: 4个字节,单精度浮点型
  • DOUBLE: 8个字节,双精度浮点型STRING 字符串

复杂数据类型

  • ARRAY: 有序字段
  • MAP: 无序字段
  • STRUCT: 一组命名的字段

用的HiveQL操作命令

  • Hive常用的HiveQL操作命令主要包括:数据定义、数据操作。接下来详细介绍一下这些命令即用法(想要了解更多请参照《Hive编程指南》一书)

  • 数据定义:主要用于创建修改和删除数据库、表、视图、函数和索引。

创建、修改和删除数据库

1
2
3
4
5
6
create database if not exists hive;       #创建数据库
show databases; #查看Hive中包含数据库
show databases like 'h.*'; #查看Hive中以h开头数据库
use hive; # 使用数据库
show tables; # 查看表列表
drop table usr; # 删除表

创建、修改和删除表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#创建内部表(管理表)
create table if not exists hive.usr(
name string comment 'username', # name表示字段命,string表示字段类型,comment后面内容表示说明
pwd string comment 'password',
address struct<street:string,city:string,state:string,zip:int> comment 'home address',
identify map<int,tinyint> comment 'number,sex') comment 'description of the table'
tblproperties('creator'='me','time'='2016.1.1'); #tblproperties 设置表的属性

#创建外部表
create external table if not exists usr2(
name string,
pwd string,
address struct<street:string,city:string,state:string,zip:int>,
identify map<int,tinyint>)
row format delimited fields terminated by ',' # 字段分隔符来进行分割,如:test1,223456,湖南省
location '/usr/local/hive/warehouse/hive.db/usr';
# LOCATION一般与外部表(EXTERNAL)一起使用。一般情况下hive元数据默认保存在<hive.metastore.warehouse.dir>中。
# 这个字段的适用场景是:数据已经存在HDFS上不能移动位置了,那么就通过这个字段让表可以直接读到这份数据。另外,要注意建表的时候,应该让表变成外部表。

#创建分区表
create table if not exists usr3(
name string,
pwd string,
address struct<street:string,city:string,state:string,zip:int>,
identify map<int,tinyint>)
partitioned by(city string,state string); # 双分区

#复制usr表的表模式
create table if not exists hive.usr1 like hive.usr;
show tables in hive;
show tables 'u.*'; #查看hive中以u开头的表
describe hive.usr; #查看usr表相关信息
alter table hive.usr rename to custom; #重命名表
#为表增加一个分区
alter table usr3 add if not exists partition(city="beijing",state="China") location '/usr/local/hive/warehouse/usr3/China/beijing';

#修改分区路径
alter table usr3 partition(city="beijing",state="China") set location '/usr/local/hive/warehouse/usr3/CH/beijing';
#删除分区
alter table usr3 drop if exists partition(city="beijing",state="China");
#修改列信息,注意这里,如果使用 after时,交换元素类型不一致,就无法交换成功
alter table custom change column username username string after pwd;

alter table custom add columns(hobby string); #增加列
alter table custom replace columns(uname string); #删除替换列
alter table custom set tblproperties('creator'='liming'); #修改表属性
alter table usr3 partition(city="beijing",state="China") set fileformat sequencefile; #修改存储属性
use hive; #切换到hive数据库下
drop table if exists usr1; #删除表
drop database if exists hive cascade; #删除数据库和它中的表
参考

视图和索引的创建、修改和删除

  • 主要语法如下,用户可自行实现。
1
2
create view view_name as....;                #创建视图
alter view view_name set tblproperties(…); #修改视图
  • 因为视图是只读的,所以 对于视图只允许改变元数据中的 tblproperties属性。

    1
    2
    3
    4
    5
    #删除视图
    drop view if exists view_name;
    #创建索引
    create index index_name on table table_name(partition_name/column_name)
    as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' with deferred rebuild....;
  • 这里’org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler’是一个索引处理器,即一个实现了索引接口的Java类,另外Hive还有其他的索引实现。

1
alter index index_name on table table_name partition(...)  rebulid;   #重建索引
  • 如果使用 deferred rebuild,那么新索引成空白状态,任何时候可以进行第一次索引创建或重建。
1
2
show formatted index on table_name;                       #显示索引
drop index if exists index_name on table table_name; #删除索引
说明

用户自定义函数

  • 没有实践自定义函数,后续有需求在学

  • 在新建用户自定义函数(UDF)方法前,先了解一下Hive自带的那些函数。show functions; 命令会显示Hive中所有的函数名称:

1
2
3
4
5
6
7
8
9
10
11
12
hive> show functions;
OK
!
!=
$sum0
....
cardinality_violation
case
cbrt
ceil
ceiling
...
  • 若想要查看具体函数使用方法可使用describe function 函数名:
1
2
3
hive> describe function abs;
OK
abs(x) - returns the absolute value of x
  • 首先编写自己的UDF前需要继承UDF类并实现evaluate()函数,或是继承GenericUDF类实现initialize()函数、evaluate()函数和getDisplayString()函数,还有其他的实现方法,感兴趣的用户可以自行学习。

  • 另外,如果用户想在Hive中使用该UDF需要将我们编写的Java代码进行编译,然后将编译后的UDF二进制类文件(.class文件)打包成一个JAR文件,然后在Hive会话中将这个JAR文件加入到类路径下,在通过create function语句定义好使用这个Java类的函数。

1
2
3
add jar <jar文件的绝对路径>;                        #创建函数
create temporary function function_name;
drop temporary function if exists function_name; #删除函数

数据操作

  • 主要实现的是将数据装载到表中(或是从表中导出),并进行相应查询操作,对熟悉SQL语言的用户应该不会陌生。

向表中装载数据

这里我们以只有两个属性的简单表为例来介绍。首先创建表stu和course,stu有两个属性id与name,course有两个属性cid与sid。

1
2
3
4
create table if not exists hive.stu(id int,name string) 
row format delimited fields terminated by '\t';
create table if not exists hive.course(cid int,sid int)
row format delimited fields terminated by '\t';
  • 向表中装载数据有两种方法:从文件中导入和通过查询语句插入
从文件中导入
  • 假如这个表中的记录存储于文件stu.txt中,该文件的存储路径为/usr/local/hadoop/examples/stu.txt,内容如下。
1
2
3
4
# id后面的name分割符号要用键盘上的tab隔开,直接复制过去,导入进去会全部都是null
1 xiapi
2 xiaoxue
3 qingqing
  • 下面我们把这个文件中的数据装载到表stu中,操作如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    hive> use hive;
    hive> load data local inpath '/usr/local/hadoop/examples/stu.txt' overwrite into table stu;
    Loading data to table hive.stu
    OK
    # 查询到数据
    hive> select * from hive.stu;
    OK
    1 xiapi
    2 xiaoxue
    3 qingqing
    Time taken: 1.324 s
    • 如果stu.txt文件存储在HDFS 上,则不需要 local 关键字。
通过查询语句插入

使用如下命令,创建stu1表,它和stu表属性相同,我们要把从stu表中查询得到的数据插入到stu1中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
hive> create table stu1 as select id,name from stu;
...
Moving data to directory hdfs://localhost:9000/user/hive/warehouse/hive.db/.hive-staging_hive_2021-12-10_10-42-32_320_1543053100774530944-1/-ext-10002
Moving data to directory hdfs://localhost:9000/user/hive/warehouse/hive.db/stu1
MapReduce Jobs Launched:
Stage-Stage-1: HDFS Read: 31 HDFS Write: 114 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
# 查询到stu1表结构如下
hive> describe stu1;
OK
id int
name string
Time taken: 0.268 seconds, Fetched: 2 row(s)

上面是创建表,并直接向新表插入数据;若表已经存在,向表中插入数据需执行以下命令:

1
2
3
4
5
6
7
# 这里关键字overwrite的作用是替换掉表(或分区)中原有数据,换成into关键字,直接追加到原有内容后。
hive> insert overwrite table stu1 select id,name from stu where(id='1');

# 查询发现只有id为1的数据,其他数据全部清空
hive> select * from hive.stu1;
OK
1 xiapi

从表中导出数据

导出到本地文件
1
2
3
4
5
6
7
8
9
10
11
hive> insert overwrite local directory '/usr/local/hadoop/examples/export_stu' select * from hive.stu;
...
Moving data to local directory /usr/local/hadoop/examples/export_stu
MapReduce Jobs Launched:
Stage-Stage-1: HDFS Read: 30 HDFS Write: 0 SUCCESS

# 查看导出的文件
[hadoop@VM-24-13-centos local]$ cat /usr/local/hadoop/examples/export_stu/000000_0
1xiapi
2xiaoxue
3qingqing
导出到hdfs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
hive> insert overwrite directory '/usr/local/hadoop/examples/export_hdfs_stu' select * from hive.stu;
...

Moving data to directory /usr/local/hadoop/examples/export_hdfs_stu
MapReduce Jobs Launched:
Stage-Stage-1: HDFS Read: 30 HDFS Write: 30 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK

# 查看导出成功的数据
hive> dfs -cat /usr/local/hadoop/examples/export_hdfs_stu/*;
1xiapi
2xiaoxue
3qingqing

查询操作

  • 和SQL的查询完全一样,这里不再赘述。主要使用select…from…where…等语句,再结合关键字group by、having、like、rlike等操作。这里我们简单介绍一下SQL中没有的case…when…then…句式、join操作和子查询操作。

  • case…when…then…句式和if条件语句类似,用于处理单个列的查询结果,语句如下:

1
2
3
4
5
6
7
8
9
10
11
hive> select id,name,
> case
> when id=1 then 'first'
> when id=2 then 'second'
> else 'third'
> end from stu;
OK
1 xiapi first
2 xiaoxue second
3 qingqing third
Time taken: 0.385 seconds, Fetched: 3 row(s)
  • 连接(join)是将两个表中在共同数据项上相互匹配的那些行合并起来, HiveQL 的连接分为内连接、左向外连接、右向外连接、全外连接和半连接 5 种。
内连接
  • 内连接使用比较运算符根据每个表共有的列的值匹配两个表中的行。
  • 首先,我们先把以下内容插入到course表中
1
2
3
hive> insert into course values(1,3);
hive> insert into course values(2,1);
hive> insert into course values(3,1);
  • 下面, 查询stu和course表中学号相同的所有行,命令如下:

    1
    2
    3
    4
    5
    6
    7
    8
    hive> select stu.*, course.* from stu join course on(stu.id=course.sid);


    OK
    1 xiapi 2 1
    1 xiapi 3 1
    2 xiaoxue 1 2
    Time taken: 11.049 seconds, Fetched: 3 row(s)
右连接

右连接是左向外连接的反向连接,将返回右表的所有行。如果右表的某行在左表中没有匹配行,则将为左表返回空值。命令

1
2
3
4
5
6
7
8

hive> select stu.*, course.* from stu right outer join course on(stu.id=course.sid);
Total MapReduce CPU Time Spent: 0 msec
OK
2 xiaoxue 1 2
1 xiapi 2 1
1 xiapi 3 1
Time taken: 10.887 seconds, Fetched: 3 row(s)
全连接

全连接返回左表和右表中的所有行。当某行在另一表中没有匹配行时,则另一个表的选择列表包含空值。如果表之间有匹配行,则整个结果集包含基表的数据值。命令如下:

1
2
3
4
5
6
7
8
hive> select stu.*, course.* from stu full outer join course on(stu .id=course .sid);


OK
1 xiapi 3 1
1 xiapi 2 1
2 xiaoxue 1 2
3 qingqing NULL NULL
半连接

半连接是 Hive 所特有的, Hive 不支持 in 操作,但是拥有替代的方案; left semi join, 称为半连接, 需要注意的是连接的表不能在查询的列中,只能出现在 on 子句中。命令如下:

1
2
3
4
5
6
7
8
hive> select stu.* from stu left semi join course on(stu .id=course .sid);


Total MapReduce CPU Time Spent: 0 msec
OK
1 xiapi
2 xiaoxue
Time taken: 9.267 seconds, Fetched: 2 row(s)
子查询

标准 SQL 的子查询支持嵌套的 select 子句,HiveQL 对子查询的支持很有限,只能在from 引导的子句中出现子查询。

Hive简单编程实践

  • 下面我们以词频统计算法为例,来介绍怎么在具体应用中使用Hive。词频统计算法又是最能体现MapReduce思想的算法之一,这里我们可以对比它在MapReduce中的实现,来说明使用Hive后的优势。

  • MapReduce实现词频统计的代码可以通过下载Hadoop源码后,在 $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.3.jar 包中找到(wordcount类),wordcount类由63行Java代码编写而成。下面首先简单介绍一下怎么使用MapReduce中wordcount类来统计单词出现的次数,具体步骤如下:

    • 创建input目录,output目录会自动生成。其中input为输入目录,output目录为输出目录。命令如下:
    1
    2
    3
    cd /usr/local/hadoop
    rm -r input
    mkdir input
    • 然后,在input文件夹中创建两个测试文件file1.txt和file2.txt,命令如下:
    1
    2
    3
    cd  /usr/local/hadoop/input
    [hadoop@VM-24-13-centos input]$ echo "hello world" > file1.txt
    [hadoop@VM-24-13-centos input]$ echo "hello world" > file1.txt
    • 执行如下hadoop命令:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    [hadoop@VM-24-13-centos hadoop]$ cd  ..
    # 先删除hdfs中output目录,不然会报错
    [hadoop@VM-24-13-centos hadoop]$ hadoop dfs -rm -r output
    # 删除input
    [hadoop@VM-24-13-centos hadoop]$ hdfs dfs -rm -r input
    # 在HDFS创建一个目录
    [hadoop@VM-24-13-centos hadoop]$ hdfs dfs -mkdir input

    # 复制input到hdfs中的input目录
    [hadoop@VM-24-13-centos hadoop]$ hdfs dfs -put input/* input

    [hadoop@VM-24-13-centos hadoop]$ hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.3.jar wordcount input output

    ...
    2021-12-10 15:22:03,703 INFO mapreduce.Job: map 100% reduce 100%
    2021-12-10 15:22:03,704 INFO mapreduce.Job: Job job_local1659054277_0001 completed successfully

    • 我们可以到output文件夹中查看结果,结果如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    [hadoop@VM-24-13-centos hadoop]$ hdfs dfs -ls output/*
    -rw-r--r-- 1 hadoop supergroup 0 2021-12-10 17:12 output/_SUCCESS
    -rw-r--r-- 1 hadoop supergroup 25 2021-12-10 17:12 output/part-r-00000

    [hadoop@VM-24-13-centos hadoop]$ hdfs dfs -cat output/part-r-00000
    hadoop 1
    hello 2
    world 1

    • 下面我们通过HiveQL实现词频统计功能,此时只要编写下面7行代码,而且不需要进行编译生成jar来执行。HiveQL实现命令如下:

      1
      2
      3
      4
      5
      6
      7
      8
      [hadoop@VM-24-13-centos hadoop]$ hive

      hive> create table docs(line string);
      hive> load data inpath 'input' overwrite into table docs;
      # create table word_count 表示创建数据库
      # as select word, count(1) as count 表示查询列表,一个word,一个统计值
      # from (select explode(split(line,' '))as word from docs) 这里就是从docs复制数据
      hive> create table word_count as select word, count(1) as count from (select explode(split(line,' '))as word from docs) w group by word order by word;
    • 执行后,用select语句查看,结果如下:

      1
      2
      3
      4
      5
      6
      hive> select * from word_count;
      OK
      hadoop 1
      hello 2
      world 1
      Time taken: 0.148 seconds, Fetched: 3 row(s)
  • 由上可知,采用Hive实现最大的优势是,对于非程序员,不用学习编写Java MapReduce代码了,只需要用户学习使用HiveQL就可以了,而这对于有SQL基础的用户而言是非常容易的。

环境

  • 服务器信息,是腾讯云服务器,2核cpu,4GB内存,80GB云硬盘,系统为centos 7.6_x64

介绍

Hadoop是用来处理大数据集合的分布式存储计算基础架构。可以使用一种简单的编程模式,通过多台计算机构成的集群,分布式处理大数据集。hadoop作为底层,其生态环境很丰富。hadoop基础包括以下四个基本模块:

  • hadoop基础功能库:支持其他hadoop模块的通用程序包。
  • HDFS: 一个分布式文件系统,能够以高吞吐量访问应用的数据。
  • YARN: 一个作业调度和资源管理框架。
  • MapReduce: 一个基于YARN的大数据并行处理程序。

安装配置

创建hadoop用户

1
2
3
su              #  root 用户登录
useradd -m hadoop -s /bin/bash # 创建新用户hadoop,并使用 /bin/bash 作为shell
passwd hadoop # 设置密码

设置权限

  • 为 hadoop 用户增加管理员权限,方便部署,避免一些对新手来说比较棘手的权限问题,输入命令visudo

    1
    2
    [hadoop@VM-24-13-centos hadoop]$ sudo visudo
    [sudo] password for hadoop:
  • 找到 root ALL=(ALL) ALL 这行,然后在这行下面增加一行内容:hadoop ALL=(ALL) ALL (当中的间隔为tab),如下图所示:

    image-20211207093705705

  • su hadoop 直接可以切换用户

安装SSH、配置SSH无密码登陆

  • 集群、单节点模式都需要用到 SSH 登陆(类似于远程登陆,你可以登录某台 Linux 主机,并且在上面运行命令),一般情况下,CentOS 默认已安装了 SSH client、SSH server,打开终端执行如下命令进行检验:

    1
    2
    3
    4
    5
    6
    [hadoop@VM-24-13-centos /]$ rpm -qa | grep ssh
    openssh-7.4p1-21.el7.x86_64
    openssh-clients-7.4p1-21.el7.x86_64
    openssh-server-7.4p1-21.el7.x86_64
    libssh2-1.8.0-4.el7.x86_64

    • 包含了 SSH client 跟 SSH server,则不需要再安装
  • 接着执行如下命令(ssh localhost)测试一下 SSH 是否可用:

    1
    2
    3
    4
    5
    6
    7

    [hadoop@VM-24-13-centos /]$ ssh localhost
    The authenticity of host 'localhost (::1)' can't be established.
    ECDSA key fingerprint is SHA256:v9LOJv5al8BNRGGZVJeqa2AdV3znIsa6cjyoj9CbWRQ.
    ECDSA key fingerprint is MD5:bd:51:9d:6f:1f:9c:1f:ad:34:ce:fb:90:4f:27:bc:b1.
    Are you sure you want to continue connecting (yes/no)? yes

    • 此时会有如下提示(SSH首次登陆提示),输入 yes 。然后按提示输入密码 hadoop,这样就登陆到本机了,类似于下面这样:

      1
      2
      3
      4
      5
      Warning: Permanently added 'localhost' (ECDSA) to the list of known hosts.
      hadoop@localhost's password:
      Last login: Tue Dec 7 09:37:37 2021
      [hadoop@VM-24-13-centos ~]$

  • 但这样登陆是需要每次输入密码的,我们需要配置成SSH无密码登陆比较方便。

  • 首先输入 exit 退出刚才的 ssh,就回到了我们原先的终端窗口,

    1
    2
    3
    4
    5
    6

    [hadoop@VM-24-13-centos ~]$ exit
    logout
    Connection to localhost closed.
    [hadoop@VM-24-13-centos /]$

  • 然后利用 ssh-keygen 生成密钥,并将密钥加入到授权中

    1
    2
    3
    4
    cd ~/.ssh/                     # 若没有该目录,请先执行一次ssh localhost
    ssh-keygen -t rsa # 会有提示,都按回车就可以
    cat id_rsa.pub >> authorized_keys # 加入授权
    chmod 600 ./authorized_keys # 修改文件权限

    ~的含义

    在 Linux 系统中,~ 代表的是用户的主文件夹,即 “/home/用户名” 这个目录,如你的用户名为 hadoop,则 ~ 就代表 “/home/hadoop/”。 此外,命令中的 # 后面的文字是注释。

  • 此时再用 ssh localhost 命令,无需输入密码就可以直接登陆了,如下所示。

1
2
3
hadoop@VM-24-13-centos .ssh]$ ssh localhost
Last login: Tue Dec 7 09:44:00 2021 from ::1
[hadoop@VM-24-13-centos ~]$

java

  • java之前已经装好了,为1.8

    1
    2
    3
    4
    5

    [hadoop@VM-24-13-centos ~]$ java -version
    java version "1.8.0_311"
    Java(TM) SE Runtime Environment (build 1.8.0_311-b11)
    Java HotSpot(TM) 64-Bit Server VM (build 25.311-b11, mixed mode)

安装hadoop

  • 安装hadoop版本为3.0
1
2
3
4
5
6
7
8
9
su root
cd /usr/local
wget https://archive.apache.org/dist/hadoop/common/hadoop-3.0.3/hadoop-3.0.3.tar.gz


[hadoop@VM-24-13-centos local]$ sudo tar zxvf hadoop-3.0.3.tar.gz
cd /usr/local/
sudo mv ./hadoop-3.0.3/ ./hadoop # 将文件夹名改为hadoop
sudo chown -R hadoop:hadoop ./hadoop # 修改文件权限
  • Hadoop 解压后即可使用。输入如下命令来检查 Hadoop 是否可用,成功则会显示 Hadoop 版本信息:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    cd /usr/local/hadoop
    ./bin/hadoop version


    [hadoop@VM-24-13-centos hadoop]$ ./bin/hadoop version
    Hadoop 3.0.3
    Source code repository https://yjzhangal@git-wip-us.apache.org/repos/asf/hadoop.git -r 37fd7d752db73d984dc31e0cdfd590d252f5e075
    Compiled by yzhang on 2018-05-31T17:12Z
    Compiled with protoc 2.5.0
    From source with checksum 736cdcefa911261ad56d2d120bf1fa
    This command was run using /usr/local/hadoop/share/hadoop/common/hadoop-common-3.0.3.jar

Hadoop单机配置(非分布式)

  • Hadoop 默认模式为非分布式模式,无需进行其他配置即可运行。非分布式即单 Java 进程,方便进行调试。
  • 这里比较奇怪,安装原文中教程,示例已经验证通过了,当搭建了伪分布式配置时,第二天运行这里的实例,没有成功?然后把实例修改后,发现居然和伪分布式配置代码差不多?暂不做深究了

实例

  • 现在我们可以执行例子来感受下 Hadoop 的运行。Hadoop 附带了丰富的例子(运行 ./bin/hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.3.jar 可以看到所有例子),包括 wordcount、terasort、join、grep 等。

  • 在此我们选择运行 wordcount例子,我们将 input 文件夹中的所有文件作为输入,最后输出结果到 output 文件夹中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
cd /usr/local/hadoop
mkdri iniput1
cp ./etc/hadoop/*.xml ./input1
# 在HDFS创建一个目录
hdfs dfs -mkdir /usr/local/hadoop/input
# 将配置文件作为输入文件上传到刚创建的HDFS目录中
hdfs dfs -put ./input1/* /usr/local/hadoop/input

hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar wordcount /usr/local/hadoop/input /usr/local/hadoop/output

# 查看运行结果列表
[hadoop@VM-24-13-centos hadoop]$ hdfs dfs -ls /usr/local/hadoop/output/*

-rw-r--r-- 1 hadoop supergroup 0 2021-12-10 16:48 /usr/local/hadoop/output/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 9405 2021-12-10 16:48 /usr/local/hadoop/output/part-r-00000

# 用cat查运行数据
[hadoop@VM-24-13-centos hadoop]$ hdfs dfs -cat /usr/local/hadoop/output/part-r-00000
...
datanodes 1
decryptEncryptedKey 1
default 13
default_priority={priority}] 1
defined. 4
delete-key 1
dfsadmin 1
different
  • 注意,Hadoop 默认不会覆盖结果文件,因此再次运行上面实例会提示出错,需要先将 ./output 删除。

    1
    [hadoop@VM-24-13-centos hadoop]$ hdfs dfs -rm -r /usr/local/hadoop/output

Hadoop伪分布式配置

  • Hadoop 可以在单节点上以伪分布式的方式运行,Hadoop 进程以分离的 Java 进程来运行,节点既作为 NameNode 也作为 DataNode,同时读取的是 HDFS 中的文件

设置环境变量

  • 在设置 Hadoop 伪分布式配置前,我们还需要设置 HADOOP 环境变量,执行如下命令在 ~/.bashrc 中设置:
1
2
3
4
5
6
7
8
9
10
11
12
vi ~/.bashrc

# Hadoop Environment Variables
export HADOOP_HOME=/usr/local/hadoop # hadoop的安装路径
export HADOOP_INSTALL=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME # 设置MAPRED环境变量
export HADOOP_COMMON_HOME=$HADOOP_HOME # 设置COMMON环境变量
export HADOOP_HDFS_HOME=$HADOOP_HOME # HDFS的环境变量
export YARN_HOME=$HADOOP_HOME # YARN的环境变量
export JAVA_HOME=/usr/local/jdk1.8.0_311 # 设置jdk的安装目录(用which java查看到)
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin
  • 生效环境变量

    source ~/.bashrc

  • 修改Hadoop-env.sh中的java_home,不然在启动集群(伪集群)时出现报错

    1
    2
    3
    4
    5
    sudo vi /usr/local/hadoop/etc/hadoop/hadoop-env.sh

    # 设置如下
    export JAVA_HOME=/usr/local/jdk1.8.0_311

配置文件

  • Hadoop 的配置文件位于 /usr/local/hadoop/etc/hadoop/ 中,伪分布式需要修改2个配置文件 core-site.xmlhdfs-site.xml

  • Hadoop的配置文件是 xml 格式,每个配置以声明 property 的 name 和 value 的方式来实现。

  • 修改配置文件 core-site.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    sudo vi /usr/local/hadoop/etc/hadoop/core-site.xml 

    <configuration>
    <property>
    <name>hadoop.tmp.dir</name>
    <value>file:/usr/local/hadoop/tmp</value>
    <description>Abase for other temporary directories.</description>
    </property>

    # 设置HDFS的默认名称,在使用命令调用时,可以用此名称
    <property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000</value>
    </property>
    </configuration>
  • 同样的,修改配置文件 hdfs-site.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    <configuration>
    <property>
    <name>dfs.replication</name>
    <value>1</value>
    <description>设置blocks副本数</description>

    </property>
    <property>
    <name>dfs.namenode.name.dir</name>
    <value>file:/usr/local/hadoop/tmp/dfs/name</value>
    <description>设置存放NameNode的数据存储目录</description>
    </property>
    <property>
    <name>dfs.datanode.data.dir</name>
    <value>file:/usr/local/hadoop/tmp/dfs/data</value>
    <description>设置存放DataNode的数据存储目录</description>
    </property>
    </configuration>
  • 配置完成后,执行 NameNode 的格式化:

    1
    ./bin/hdfs namenode -format
  • 接着开启 NaneNodeDataNode 守护进程,./sbin/start-dfs.sh

    1
    2
    3
    4
    5
    [hadoop@VM-24-13-centos hadoop]$ ./sbin/start-dfs.sh
    Starting namenodes on [localhost]
    Starting datanodes
    Starting secondary namenodes [VM-24-13-centos]

  • 启动完成后,可以通过命令 jps 来判断是否成功启动,若成功启动则会列出如下进程: NameNodeDataNodeSecondaryNameNode(如果 SecondaryNameNode 没有启动,请运行 sbin/stop-dfs.sh 关闭进程,然后再次尝试启动尝试)。如果没有 NameNode 或 DataNode ,那就是配置不成功,请仔细检查之前步骤,或通过查看启动日志排查原因。

1
2
3
4
5
6
7

[hadoop@VM-24-13-centos hadoop]$ jps
12433 Jps
11741 DataNode
11597 NameNode
11934 SecondaryNameNode
# HDFS功能:NameNode,SecondaryNameNode,DataNode已经启动

通过查看启动日志分析启动失败原因

有时 Hadoop 无法正确启动,如 NameNode 进程没有顺利启动,这时可以查看启动日志来排查原因,注意几点:

  • 启动时会提示形如 “dblab: starting namenode, logging to /usr/local/hadoop/logs/hadoop-hadoop-namenode-dblab.out”,其中 dblab 对应你的主机名,但启动的日志信息是记录在 /usr/local/hadoop/logs/hadoop-hadoop-namenode-dblab.log 中,所以应该查看这个后缀为 .log 的文件;
  • 每一次的启动日志都是追加在日志文件之后,所以得拉到最后面看,看下记录的时间就知道了。
  • 一般出错的提示在最后面,也就是写着 Fatal、Error 或者 Java Exception 的地方。
  • 可以在网上搜索一下出错信息,看能否找到一些相关的解决方法。

实例

  • 上面的单机模式,grep 例子读取的是本地数据,伪分布式读取的则是 HDFS 上的数据。要使用 HDFS,首先需要在 HDFS 中创建用户目录:

    1
    2
    [hadoop@VM-24-13-centos hadoop]$ ./bin/hdfs dfs -mkdir -p /user/hadoop
    2021-12-07 16:25:32,492 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  • 接着将·./etc/hadoop中的 xml 文件作为输入文件复制到分布式文件系统中,即将 /usr/local/hadoop/etc/hadoop 复制到分布式文件系统中的/user/hadoop/input中。我们使用的是 hadoop 用户,并且已创建相应的用户目录 /user/hadoop ,因此在命令中就可以使用相对路径如 input,其对应的绝对路径就是 /user/hadoop/input

1
2
./bin/hdfs dfs -mkdir input
./bin/hdfs dfs -put ./etc/hadoop/*.xml input
  • 复制完成后,可以通过如下命令查看 HDFS 中的文件列表:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    [hadoop@VM-24-13-centos hadoop]$ ./bin/hdfs dfs -ls input

    Found 9 items
    -rw-r--r-- 1 hadoop supergroup 7861 2021-12-07 16:28 input/capacity-scheduler.xml
    -rw-r--r-- 1 hadoop supergroup 1071 2021-12-07 16:28 input/core-site.xml
    -rw-r--r-- 1 hadoop supergroup 10206 2021-12-07 16:28 input/hadoop-policy.xml
    -rw-r--r-- 1 hadoop supergroup 1339 2021-12-07 16:28 input/hdfs-site.xml
    -rw-r--r-- 1 hadoop supergroup 620 2021-12-07 16:28 input/httpfs-site.xml
    -rw-r--r-- 1 hadoop supergroup 3518 2021-12-07 16:28 input/kms-acls.xml
    -rw-r--r-- 1 hadoop supergroup 682 2021-12-07 16:28 input/kms-site.xml
    -rw-r--r-- 1 hadoop supergroup 758 2021-12-07 16:28 input/mapred-site.xml
    -rw-r--r-- 1 hadoop supergroup 690 2021-12-07 16:28 input/yarn-site.xml
  • 伪分布式运行 MapReduce 作业的方式跟单机模式相同,区别在于伪分布式读取的是HDFS中的文件(可以将单机步骤中创建的本地 input 文件夹,输出结果 output 文件夹都删掉来验证这一点)。

    1
    2
    3
    4
    5
    [hadoop@VM-24-13-centos hadoop]$ ./bin/hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar  wordcount input output 
    ....
    2021-12-07 16:30:56,021 INFO mapreduce.Job: Job job_local897809468_0002 completed successfully
    ...

  • 查看运行结果的命令(查看的是位于 HDFS 中的输出结果):

    1
    2
    3
    4
    5
    6
    7
    8
    [hadoop@VM-24-13-centos hadoop]$ ./bin/hdfs dfs -ls output/*

    -rw-r--r-- 1 hadoop supergroup 0 2021-12-10 17:58 output/_SUCCESS
    -rw-r--r-- 1 hadoop supergroup 9405 2021-12-10 17:58 output/part-r-00000

    [hadoop@VM-24-13-centos hadoop]$ ./bin/hdfs dfs -cat output/part-r-00000
    ....

  • 我们也可以将运行结果取回到本地:

    1
    2
    3
    4
    5
    6
    rm -r ./output    # 先删除本地的 output 文件夹(如果存在)
    ./bin/hdfs dfs -get output ./output # 将 HDFS 上的 output 文件夹拷贝到本机
    [hadoop@VM-24-13-centos hadoop]$ cat ./output/*
    .....


  • Hadoop 运行程序时,输出目录不能存在,否则会提示错误 org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://localhost:9000/user/hadoop/output already exists,因此若要再次执行,需要执行如下命令删除 output 文件夹:

1
./bin/hdfs dfs -rm -r output    # 删除 output 文件夹
  • 为防止覆盖结果,程序指定的输出目录(如 output)不能存在,否则会提示错误,运行前需要先删除输出目录。在实际开发应用程序时,可考虑在程序中加上如下代码,能在每次运行时自动删除输出目录,避免繁琐的命令行操作,可以采用类似于下面java代码:
1
2
3
4
5
6
Configuration conf = new Configuration();
Job job = new Job(conf);

/* 删除输出目录 */
Path outputPath = new Path(args[1]);
outputPath.getFileSystem(conf).delete(outputPath, true);
  • 若要关闭 Hadoop,则运行

    1
    ./sbin/stop-dfs.sh
  • 下次启动 hadoop 时,无需进行 NameNode 的初始化,只需要运行 ./sbin/start-dfs.sh 就可以!

完全分布式

后续打算采用本地三台虚拟机的模式搭建完全分布式。

请注意分布式运行中的这几个结点的区别:

  • 从分布式存储的角度来说,集群中的结点由一个NameNode和若干个DataNode组成,另有一个SecondaryNameNode作为NameNode的备份。
  • 从分布式应用的角度来说,集群中的结点由一个JobTracker和若干个TaskTracker组成,JobTracker负责任务的调度,TaskTracker负责并行执行任务。TaskTracker必须运行在DataNode上,这样便于数据的本地计算。JobTracker和NameNode则无须在同一台机器上。一个机器上,既当namenode,又当datanode,或者说 既 是jobtracker,又是tasktracker。没有所谓的在多台机器上进行真正的分布式计算,故称为”伪分布式”。
  • 真正的分布式,由3个及以上的实体机或者虚拟机组件的机群。
  • 来自这里

参考

什么是数据仓

  • 是BI(商业智能)、报表和数据挖掘等应用的基础

  • 大量的数据集合,4个特点主要包括:面向主题的、集成的、相对稳定的、反应历史变化的

  • 数据仓至少需要具备数据获取、数据存储、数据访问3个核心功能,这3个功能的实现过程是数据源到最终决策应用的流转过程。下图为数据流转图:

image-20211201171618652

  • 数据获取和数据存储这两个功能主要由ETL工具支撑。ETL是指从数据源提前,经过清洗、转换等过程,并最终存储到目标数据仓库的过程。如下图所示,ETL过程3个步骤

    image-20211201172008951

  • 为什么要用ETL

数据仓库、集市、数据湖、中台区别

数据集市

  • 数据仓库面向企业全局业务,而数据集市面向部门级业务

    image-20211206151311207

数据湖

  • 数据存储结构:数据仓主要存储和处理历史数据的机构化数据,而数据湖能存储结构和非结构化所有格式的数据

  • 数据转换处理:数据仓库需要对源数据进行清洗、转换等预处理,以和定义好的数据模型相吻合;而数据湖是从源数据导入,无数据流失,随去随用,只有在使用的时候对数据转换等处理

  • 数据场景:数据仓通常充当商业智能系统、数据仪表盘等可视化报表服务的数据源角色,支持历史分析;数据湖可以作为数据仓库或数据集市的数据源,更适合进行数据的挖掘、探索和预测,

数据中台

  • 由阿里巴巴提出,就是用大数据技术统一处理数据,然后提供API给外部使用,数据中台保护数据仓库和其他服务器中间件

数据仓库的设计

架构分层设计

  • 数据仓库通常可分为数据接入层、数据明细层、数据汇总层、数据集市层、数据应用层、临时层和公共维度层。其中数据明细层和数据汇总层又合称为数据仓库层。

image-20211206154539189

数据接入层ODS

  • (Operational Data Store,ODS),也称数据贴源层,通常从业务数据库直接导入,为了考虑后续可能需要追溯数据问题,因此对于这一层就不建议做过多的数据清洗工作,原封不动地接入原始数据即可,至于数据的去噪、去重、异常值处理等过程可以放在后面的DWD层来做

数据明细层DWD

  • (Data Warehouse Detail,DWD) ,这层和 ODS 层保持一样的数据结构,只不过在从 ODS 里抽取到 DWD 的时候这个过程叫 ETL,后面我们会再讲 ETL,在抽取时对数据进行清洗加工,提供一定的数据质量保证,提供更干净的数据。

数据汇总层DWS

  • (Data Warehouse Summary, DWS),对各个表进行JOIN操作,产生业务所需要的完整数据。该层主要存放明细事实宽表、聚合试试宽表等。

数据集市层DWM

  • 也叫数据中间件,(Data Warehouse Middle,DWM),该层是在DWD层的数据基础上,对数据做一些轻微的聚合操作,生成一些列的中间结果表,提升公共指标的复用性,减少重复加工的工作。
  • 简答来说,对通用的核心维度进行聚合操作,算出相应的统计指标
  • 从广度来说,它包含了所有的业务数据

数据应用层

  • 该层中,数据高度汇总,数据粒度较大,但不一定涵盖所有业务数据,也可能只是数据集市层数据的一个子集。
  • 该层主要是提供给数据产品和数据分析使用的数据,一般会存放在ES、Redis、PostgreSql等系统中供线上系统使用;也可能存放在hive或者Druid中,供数据分析和数据挖掘使用,比如常用的数据报表就是存在这里的

临时层TMP

  • 临时存放一些中间数据计算结果

公共维度层

  • 主要负责一些一致性维度建设,如地点区域表、时间维度表等,数据仓库的各层均可使用此层

数据仓库建模方法

  • 主要有范式建模、维度建模、实体建模

范式建模

是数据仓库逻辑模型设计的基本理论,在数据仓库的模型设计中,一般采用第三范式。一个符合第三范式的关系必须具有以下三个条件:

  • 每个属性的值唯一,不具有多义性;
  • 每个非主属性必须完全依赖于整个主键,而非主键的一部分;
  • 每个非主属性不能依赖于其他关系中的属性,因为这样的话,这种属性应该归到其他关系中去

维度建模

  • 是经典的面向分析的数据仓库建模方法。对数据进行分析时使用的度量。例如:抽取近10年的信用卡数据,分析年申请趋势

  • 经常出现实体表、维度表和事实表等

    • 实体表,用于存放商品的属性信息
    • 维度表,按照某个分析维度来组织的事实描述,如分析某商品近半年来每月下单量,则表中一定存在时间字段属性。
    • 事实表,是维度表各个维度的交点,如某商品在某地某月的销售额
  • 在维度建模的基础上又分为三种模型:星型模型、雪花模型、星座模型。

星型模式

image-20211206163244801

雪花模式

image-20211206163338893

  • 星型模型和雪花模型实例

image-20211206163931436

星座模式

星座模式是星型模式延伸而来,星型模式是基于一张事实表的,而星座模式是基于多张事实表的,而且共享维度信息。

image-20211206163425140

  • 星座模型与前两种情况的区别是事实表的数量,星座模型是基于多个事实表。
  • 基本上是很多数据仓库的常态,因为很多数据仓库都是多个事实表的。所以星座不星座只反映是否有多个事实表,他们之间
    是否共享一些维度表。所以星座模型并不和前两个模型冲突。
模型的选择
  • 首先就是星座不星座这个只跟数据和需求有关系,跟殳计没关系,不用选择。星型还是雪花,取决于性能优先,还是灵活更优先。
  • 目前实际企业开发中,不会绝对选择一种,根据情况灵活组合,甚至并存(一层维度和多层维度都保存)
  • 但是整体来看,更倾向于维度更少的星型模型。尤其是hadoop体系,减少Join就是减少 Shuffle,性能差距很大。(关系型数据可以依靠强大的主键索引)

实体建模

在数据仓建模中不常见,一般适用与业务建模和领域概念建模阶段

数据仓库构建

数据仓库的构建方法

  • 构建方法主要包括自顶向下和自底向上

自顶向下实现

  • 自顶向下的实现需要在项目开始时完成更多计划和设计工作,这就需要涉及参与数据仓库实现的每个工作组、部门或业务线中的人员。要使 用的数据源、安全性、数据结构、数据质量、数据标准和整个数据模型的有关决策一般需要在真正的实现开始之前就完成。
  • 此构建方法,实施周期长,难道略大

自底向上实现

  • 自底向上的实现包含数据仓库的规划和设计,无需等待安置好更大业务范围的数据仓库设计。这并不意味着不会开发更大业务范围的数据仓 库设计;随着初始数据仓库实现的扩展,将逐渐增加对它的构建。现在,该方法得到了比自顶向下方法更广泛的接受,因为数据仓库的直接结果可以实现, 并可以用作扩展更大业务范围实现的证明。

两者结合

  • 两者结合的折中实现:每种实现方法都有利弊。在许多情况下,最好的方法可能是某两种的组合。该方法的关键之一就是确定业务范围的架构需要用于支持 集成的计划和设计的程度,因为数据仓库是用自底向上的方法进行构建。在使用自底向上或阶段性数据仓库项目模型来构建业务范围架构中的一系列数据集 市时,您可以一个接一个地集成不同业务主题领域中的数据集市,从而形成设计良好的业务数据仓库。这样的方法可以极好地适用于业务。在这种方法中, 可以把数据集市理解为整个数据仓库系统的逻辑子集,换句话说数据仓库就是一致化了的数据集市的集合。

总结

无论采用哪种模式,数据仓库构建过程,都可以参考下图介绍的5个步骤。基于BI报表、数据挖掘等应用要求,可参考架构分层设计数据仓库结构进行适当的分层设计,并根据业务要求选择合适的建模方法,可参考数据仓库建模方法

image-20211206171202539

数据仓库实例

需求

  • 现有一excel如下,需要把日期和金额合并成一行

    image-20211203173955957

  • 最终需要实现效果如下:

    image-20211203174051646

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import pandas as pd
import math


class OperateExcel():
def __init__(self, file_path):
self.file_path = file_path
# 注意header参数,取值为前面两行
self.df = pd.read_excel(file_path, sheet_name="日记帐", header=[0, 1])

def get_head(self):
"""
头部处理,格式:
:return:
[year, headers]

year为年,如2021年
headers=[],把所有头部值存到[]中,日期 凭证编号 车牌号 拿货吨位 卖出吨位 摘要 收入 支出 余额
"""
headers = []
# 取头部数据
head = self.df.head(0)
year = ""
for items in head:
# ------ 处理日期,取年
for i in items:
if i.find("年") != -1:
year = i
break
# ------- 处理其他头部
if items[1].replace(" ", "").find("Unnamed") != -1:
headers.append(items[0].replace(" ", ""))
# ---- 金额后的头部,只取收入、支出、余额;
if items[0].replace(" ", "").find("金额") != -1:
headers.append(items[1].replace(" ", ""))

headers.insert(0, "日期")
return year, headers

def get_data(self):
"""
处理数据
:return:
[file_name, data]
file_name表示将要保持的名字
data 表示处理好的数据,给pd.DataFrame使用,格式 {"金额":[1,2,3],"收入":[]}
"""

g_head = self.get_head()
headers = g_head[1]
year = g_head[0]
month = ""
data = {}
for i in headers:
data[i] = []
for items in self.df.values:
# 合计不取数据;第一个单元格为空值说明此行为无效数据,也不取值
if items[0] == '合计' or math.isnan(items[0]):
break
# 处理日期值为:2021年1月1日
month = str(items[0]) + "月"
m_date = year + month + str(int(items[1])) + "日"
data["日期"].append(m_date)
# 凭证编号
num = 0
if str(items[2]) != "nan":
num = items[2]
data["凭证编号"].append(num)

# 车牌号
car_num = 0
if str(items[3]) != "nan":
car_num = items[3]
data["车牌号"].append(car_num)

# 拿货吨位
take_tonnage = 0
if str(items[4]) != "nan":
take_tonnage = items[4]
data["拿货吨位"].append(take_tonnage)

# 卖出吨位
sell_tonnage = 0
if str(items[5]) != "nan":
sell_tonnage = items[5]
data["卖出吨位"].append(sell_tonnage)

# 摘要
summary = ""
if str(items[6]) != "nan":
summary = items[6]
data["摘要"].append(summary)

# 收入
revenue = 0
if str(items[7]) != "nan":
revenue = items[7]
data["收入"].append(revenue)

# 支出
spend = 0
if str(items[8]) != "nan":
spend = items[8]
data["支出"].append(spend)

# 余额
balance = 0
if str(items[9]) != "nan":
balance = items[9]
data["余额"].append(balance)

# 生成的文件名
file_name = year + month + ".xlsx"
return file_name, data

def sum_data(self, data):
"""
新增统计处理的数据,最终给pd.DataFrame使用
:param data: list|表示处理好的数据,给pd.DataFrame使用,格式 {"金额":[1,2,3],"收入":[]}
:return: list
"""
for i in data:
# 插入一行空行
data[i].append(0)

# 手动合计拿货吨位、卖出吨位、收入、支出、余额
data["日期"].append("合计")
# data["拿货吨位"].append(sum(map(float,data["拿货吨位"])))
data["拿货吨位"].append(sum(data["拿货吨位"]))
data["卖出吨位"].append(sum(data["卖出吨位"]))
data["收入"].append(sum(data["收入"]))
data["支出"].append(sum(data["支出"]))
data["余额"].append(sum(data["余额"]))
# 对手动合计拿货吨位、卖出吨位、收入、支出、余额、日期之外的合计行,写入空值
for i in data:
if i not in ["日期", "拿货吨位", "卖出吨位", "收入", "支出", "余额"]:
data[i].append("")

# 如果数据为0,填入到excel中不美观,因此改为”“,默认填进去就是不可见
for j in data:
for k in range(len(data[j])):
if data[j][k] == 0:
data[j][k] = ""

return data

def generate_excel(self):
"""
重新生成excel
:return:
"""
g_data = self.get_data()
# 得到要保存的文件名
file_name = g_data[0]
# 得到处理好的数据,不包括统计
l_data = g_data[1]
# 得到处理好的数据,包括统计
s_data = self.sum_data(l_data)
info_marks = pd.DataFrame(s_data)
writer = pd.ExcelWriter(file_name)
info_marks.to_excel(writer, index=False)
writer.save()
print('生成excel成功,文件名为:%s' % file_name)


if __name__ == "__main__":
file_path = 'XXXX.xls'
o_excel = OperateExcel(file_path)
o_excel.generate_excel()

  • 取头部要取两行
  • 注意nan的处理

大数据技术生态

本文主要抄录《大数据测试技术与实践》

由下而上可以划分为:

  • 数据采集
    • 关系与非关系数据采集组件,分布式消息队列等,如kafka、sqoop
  • 数据存储
    • 分布式存储系统、关系和非关系数据库等,如HDFS、MySQL
  • 管理调度
    • 资源管理和调度YARN,容器Kubernetes、服务协调zookeeper、工作流调度平台(如Azkaban)等
  • 计算机分析
    • 批处理(MapReduce)、流计算(Flink)、查询分析(Impala)和图计算(Gelly)等
  • 组件应用
    • 各种数据分析和机器学习工具,如Hive、Pig、TensorFlow

大数据采集计算

  • 系统日志采集,如kafka、Flume
  • 网络数据采集,如爬虫
  • 其他数据采集

大数据存储计算

分布式文件系统 HDFS

  • 主要解决大数据存储问题

  • GFS(Google File System)的开源实现

  • Hadoop两大核心组成部分之一,另外一个是MapReduce

  • 遵循主从(master/salve)框架

  • 可以由单台服务器扩展到数千台服务器

  • NameNode关联和维护HDFS文件系统的读写操作

  • 多个DataNode1负责存储数据

image-20211201095407758

HDFS优点

  • 具有高度容错能力,能实时监测错误并且自动恢复。
    • 类似于服务器的容灾能力,当某台服务器挂了,就启用备用服务器,进行数据同步
  • 数据存储为Streaming流式数据存储 。批处理数据,而不是实时处理,提高了大量处理数据的能力,但是会牺牲响应时间
  • 大数据集。提供了cluster集群架构,集群可扩展为数百个节点
  • 数据简单一致性。一次性写入多次读取,一个文件创建后就不可再修改,这样可以简化数据一致性
  • 跨硬件和跨软件。平台的可移植性

海量数据列式存储:Hbase

  • HDFS容错率很高,即便是在系统崩溃的情况下,也能够在节点之间快速传输数据。HBase是非关系数据库,是开源的Not-Only-SQL数据库,它的运行建立在Hadoop上。HBase依赖于CAP定理(Consistency, Availability, and Partition Tolerance)中的CP项

  • HDFS最适于执行批次分析。然而,它最大的缺点是无法执行实时分析,而实时分析是信息科技行业的标配。HBase能够处理大规模数据,它不适于批次分析,但它可以向Hadoop实时地调用数据

  • HDFS和HBase都可以处理结构、半结构和非结构数据。因为HDFS建立在旧的MapReduce框架上,所以它缺乏内存引擎,数据分析速度较慢。相反,HBase使用了内存引擎,大大提高了数据的读写速度。

  • HDFS执行的数据分析过程是透明的。HBase与之相反,因为其结构基于NoSQL,它通过在不同的关键字下进行排序而获取数据。

image-20211201100832525

大数据分析技术

  • 批处理计算。针对大规模数据的批量处理,主要代表产品有MapReduce、Spark
  • 流计算。针对流数据的实时计算,主要代表产品有,Spark Streaming、Flink 、Storm等
  • 查询分析计算。针对大规模数据的存储管理和查询分析,主要代表产品有Hive、Impala等
  • 图计算。针对大规模图结构数据的处理,主要代表产品有Pregel、Gelly等

批处理计算-MapReduce

  • 进行大量数据处理时,用MapReduce进行分布式计算,这样可大量减少计算时间,还有Spark、Pig等就是类似的代表产品或技术

  • Map将任务分割成更小任务,由每台服务器分别执行

  • Reduce将所有服务器返回的结果汇总,整理成最终结果

image-20211201102027603

流计算

  • 流式处理假设数据的潜在价值是数据的新鲜度、实时性,需要尽快处理得到结果。在这种方式下,数据以流的方式到达。在数据连续到达的过程中,由于流携带了大量数据,只有小部分的流数据被保存在有限的内存中。流处理方式用于在线应用,通常工作在秒或毫秒级别。

  • 目前主流的流处理组件包括:StromSpark Streaming、KafKa、Flume、Flink、S3等

    • Spark 和 Strom、Flink对比,无法在对实施要求很高的流处理场景中

image-20211201104503379

OLAP引擎

  • 即联机分析处理。OLAP对业务数据执行多维分析,并提供复杂计算,趋势分析和复杂数据建模的能力。它主要用于支持企业决策管理分析,是许多商务智能(BI)应用程序背后的技术。
  • 目前开源的引擎很多,如Hive、Impala、Persto等

Hive

image-20211201111306662

其他参考

大数据管理调度技术

分布式集群资源调度框架-YARN

  • 针对Hadoop1.0中MR的不足,引入了Yarn框架。Yarn框架中将JobTracker资源分配和作业控制分开,分为Resource Manager(RM)以及Application Master(AM)。
  • Hadoop的MapReduce架构称为YARN(另一种资源协助者),是效率更高的资源管理器核心
  • Client客户端,用户向Resource Manage请求执行运算
  • 在NameNode会有Resource Manage统筹管理运算请求
  • 在其他的DateNode会有 Node Manager负责运行,监督每个任务运行情况,并向Resource Manage 汇报状态

image-20211201102156648

容器管理系统:Kubernetes

  • 常见的大数据技术组件一般有对应的开源项目支持部署,如Flink,Spark也有官方支持Spark on Kubernetes运行模式等

ZooKeeper

  • 是大数据的动物管理员,是一个开源的分布式的,是Hadoop的一个子项目

  • Zookeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应,从而实现集群中类似Master/Slave管理模式

  • Zookeeper=文件系统+通知机制

常用的工作流调度平台

  • 对于简单的任务调度,可以使用Linux Crontab,但是对于在多台机器上、任务之前有依赖关系时,Linux Crontab就不能满足需求,因此需要分布式任务调度系统来进行任务编排
  • 业界常用开源调度平台有:Azkaban、Oozie、Airflow等

大数据商业产品

  • 大数据解决方案提供商
  • 大数据云计算服务商
  • 大数据SaaS服务商
  • 大数据开发平台

说明

  • jmeter 5.4.1
  • 测试论坛登录并发的拐点

第一次线程组脚本配置

  • 设置并发10个请求,永远

    image-20211126145852042

  • 设置RPS定时器,将RPS逐渐增加到50/S,并持续一段时间

image-20211126150023226

结果分析

  • 首先分析Hits per Second,在54秒后,RPS为84.7/S(可以理解为:最大支持1秒内84.7个用户同时登录),出现拐点,请求曲线开始收窄,有同学会问,怎么会是RPS?不是HPS吗?因为在单接口请求下,我么可以认为HPS和RPS是相等的

    image-20211129113816875

  • 查看聚合报告,得到RT的响应时间,平均值为203,,这时候可能会有同学说,你这里不能取平均时间,不具有代表性,我们还应该考虑90%,95%,99%不同比例的接口响应时间。对的,个人认为,这些数值在和平均值没有过大的区别的情况下,我们可以取平均值来计算,如果说出现了较大的波动,那么我们需要考虑是不是服务器的内存,cpu出现了问题。

  • 这里我们算下并发数:84.7*0.203=17.19,可以理解为,支持17.19个用户在1s内同时登录

  • 查看TPS ,发现在16秒支持最大tps为12,不然就会出现明显波动

    image-20211129114833158

第一次运行结果总结

  • 我们的最大请求数为84
  • 最大 TPS 为12
  • 最大系统并发数在 17左右
  • 超出这些范围就开始出现波动

第二次调整运行参数

  • 并发数调整为5个

    image-20211129145513074

    运行结果分析

  • 在54秒时,RPS的值为84.3/S

    image-20211129145631570

  • 查看聚合报告,得到RT的响应时间,平均值为175,这里我们算下并发数:84.3*0.175=14.17

image-20211129145944164

  • 查看TPS,在21秒是,tps的值为18为最佳,不然后续出现巨大波动

image-20211129150633788

第二次结果分析

  • 我们的最大请求数为84
  • 最大 TPS 为18
  • 最大系统并发数在 14左右
  • 超出这些范围就开始出现波动

CLI运行

  • 10个并发

命令运行

  • 发起压测请求jmeter -n -t resp.jmx -l result/report.jtl
1
2
3
4
5
6
7
8
9
10
E:\jmeter>jmeter -n -t resp.jmx -l result/report.jtl
Creating summariser <summary>
Created the tree successfully using resp.jmx
Starting standalone test @ Tue Nov 30 10:06:56 CST 2021 (1638238016471)
Waiting for possible Shutdown/StopTestNow/HeapDump/ThreadDump message on port 4445
summary + 9 in 00:00:03 = 2.6/s Avg: 140 Min: 117 Max: 230 Err: 0 (0.00%) Active: 5 Started: 5 Finished: 0
summary + 406 in 00:00:30 = 13.7/s Avg: 148 Min: 108 Max: 1160 Err: 0 (0.00%) Active: 5 Started: 5 Finished: 0
summary = 415 in 00:00:33 = 12.5/s Avg: 148 Min: 108 Max: 1160 Err: 0 (0.00%)
summary + 516 in 00:00:30 = 17.2/s Avg: 232 Min: 108 Max: 15728 Err: 0 (0.00%) Active: 5 Started: 5 Finished: 0
.....
  • 压测完毕后,转变为测试报告

    jmeter -g report.jtl -o report

    image-20211130102551759

    报告分析

  • RPS为80.48/S

    image-20211130102819087

  • 得到RT的响应时间,平均值为121.14,这里我们算下并发数:80.4*0.121=14.17

image-20211130103151709

  • tps的值为27为最佳,不然后续出现巨大波动

    image-20211130104030732

第三次结果分析

  • 我们的最大请求数为84
  • 最大 TPS 为27
  • 最大系统并发数在 14左右
  • 超出这些范围就开始出现波动

三次结果总结

  • 我们的最大请求数为84
  • 最大 TPS 为12-27
    • CLI模式下TPS支持最大
  • 最大系统并发数在 14-17
  • 这些范围就开始出现波动

其他

  • 可以测试多次,取平均值
  • 参考这里的测试方式

Concurrency Thread Group的介绍

  • Concurrency Thread Group提供了用于配置多个线程计划的简化方法
  • 该线程组目的是为了保持并发水平,意味着如果并发线程不够,则在运行线程中启动额外的线程
  • 和Standard Thread Group不同,它不会预先创建所有线程,因此不会使用额外的内存
  • 对于上篇讲到的Stepping Thread Group来说,Concurrency Thread Group是个更好的选择,因为它允许线程优雅地完成其工作
  • Concurrency Thread Group提供了更好的用户行为模拟,因为它使您可以更轻松地控制测试的时间,并创建替换线程以防线程在过程中完成

Concurrency Thread Group参数讲解

image-20211109151532330

  • Target Concurrency:目标并发(线程数),我设置为40
  • Ramp Up Time:启动时间;若设置 1 min,则目标线程在1 imn内全部启动
  • Ramp-Up Steps Count:阶梯次数;若设置 5 ,则目标线程在 1min 内分5次阶梯加压(启动线程);每次启动的线程数 = 目标线程数 / 阶梯次数 = 40 / 5 = 8
  • Hold Target Rate Time:持续负载运行时间;若设置 2 ,则启动完所有线程后,持续负载运行 2 min,然后再结束
  • Time Unit:时间单位(分钟或者秒)
  • Thread Iterations Limit:线程迭代次数限制(循环次数);默认为空,理解成永远,如果运行时间到达Ramp Up Time + Hold Target Rate Time,则停止运行线程【不建议设置该值】
  • Log Threads Status into File:将线程状态记录到文件中(将线程启动和线程停止事件保存为日志文件);

特别注意点

  • Target Concurrency只是个期望值,实际不一定可以达到这个并发数,得看上面的配置【电脑性能、网络、内存、CPU等因素都会影响最终并发线程数】
  • Jmeter会根据Target Concurrency的值和当前处于活动状态的线程数来判断当前并发线程数是否达到了Target Concurrency;若没有,则会不断启动线程,尽力让并发线程数达到Target Concurrency的值

Concurrency Thread Group和Stepping Thread Group的区别

官方说法

  • Stepping Thread Group不提供设置启动延迟时间,阶梯增压过渡时间,阶梯释放过渡时间,但Concurrency Thread Group提供
  • Stepping Thread Group可以阶梯释放线程,而Concurrency Thread Group是瞬时释放(具体看下面介绍)
  • Stepping Thread Group设置了需要启动多少个线程就会严格执行,Concurrency Thread Group会尽力启动线程达到Target Concurrency值

通俗点理解

  • Stepping Thread Group 是手动场景:测试过程,按照设定好的步骤执行
  • Concurrency Thread Group 是目标场景:达到某个目标运行场景,测试过程不可控,动态变化

类比 LR

  • Stepping Thread Group :设置并发用户数,持续时间等,每隔多少时间自动增加多少个用户
  • Concurrency Thread Group:预设一个目标并发数,每隔一段时间增加一部分并发数,直到 TPS 达到目标并发数,然后持续运行一段时间

Concurrency Thread Group + Active Threads Over Time

image-20211109153859390

第一个关注点:阶梯增压过程

看Concurrency Thread Group负载预览图每次阶梯增压都是瞬时增压的,但是实际测试结果可以看到它也是有一个过渡期,并不是瞬时增压

第二个关注点:持续负载运行结束后,所有线程瞬时释放

  • 从图最后可以看到,所有线程都是瞬时释放的
  • 普通的线程组有三种状态:启动、运行、释放;而Concurrency Thread Group的线程可以理解成只有两种状态:启动、运行;因为线程都在极短的时间内就结束了

Concurrency Thread Group的扩展

  • 当Concurrency Thread Group与Throughput Shaping Timer(吞吐量计时器)一起使用时,可以用tstFeedback 函数的调用来动态维护实现目标RPS所需的线程数
  • 使用此方法时, 需要将Ramp Up Time 和 Ramp-Up Steps Count 置空
  • 但要确保 Hold Target Rate Time ≥ Throughput Shaping Timer 时间表中指定的总持续时间值(Duration)

本文来自这里,强烈建议仔细学习此博主的jmeter系列文章