数据治理

数据治理考评平台

考评平台内部主要由两部分组成。

  • 一部分是最核心的“考评核算引擎”,主要功能是结合考评平台数据考核参数以及各个数据组件的相关信息,比如Hive元数据库、DolphinScheduler 中配置的任务信息、任务完成状况、 HDFS集群的相关信息,进行考评最终核算出分数,并进行保存。
  • 另一部分是服务于外部访问的请求,比如查看考评、维护参数、补充元数据等等
    架构图

项目背景:

随着大数据技术的广泛应用,企业搭建了各种数据仓库、数据湖,但由于数据源复杂、口径不统一等问题,导致数据质量低、数据孤岛严重、安全风险高。因此,数据治理成为企业必不可少的环节。本项目旨在建立一套数据治理考评平台,以评估和优化企业的数据治理水平。

项目目标:

量化数据治理效果,提供标准化评分体系
发现并改善数据质量、规范性、安全性等问题
通过自动化考评减少人工干预,提高治理效率

核心功能:

  1. 数据考评指标体系

规范性(如表命名规范、字段备注)
存储管理(如生命周期、空表检测)
计算质量(如长期无产出、计算报错)
数据质量(如数据产出时效、数据量监控)
安全性(如权限管理、安全等级)

数据治理考评指标

在数据治理考评平台中,我们将考评指标分为五个大类:规范、存储、计算、质量、安全,并针对不同考核项进行打分评估。

1. 规范 (Standardization)

考评指标 评分标准 需要信息
有技术负责人 (Technical Owner) 有则10分,无则0分 元数据
有业务负责人 (Business Owner) 有则10分,无则0分 元数据
表名是否合规 (Table Naming Compliance) 参考数仓表规范,不合规0分,部分合规5分,完全合规10分 元数据
表有备注 (Table Comment) 有则10分,无则0分 元数据
字段有备注信息 (Column Comment) (备注字段数 / 总字段数) * 10分 元数据

2. 存储 (Storage)

考评指标 评分标准 需要信息
生命周期合理 (Lifecycle Management) 未设置生命周期0分,普通生命周期10分,超长存储5分 元数据
是否为空表 (Empty Table) 空表0分,有数据10分 元数据
存在相似表 (Duplicate Tables) 相似表字段重复率超过一定百分比则0分,否则10分 元数据

3. 计算 (Computation)

考评指标 评分标准 需要信息
长期无产出 (Inactive Data) 近{days}天内无数据产出0分,否则10分 HDFS
长期无访问 (Unused Data) 近{days}天内无访问0分,否则10分 HDFS
计算任务有报错 (Job Errors) 任务执行有报错0分,无报错10分 任务信息
简单加工 (Simple Processing) SQL无 JOIN / GROUP BY / 过滤操作则0分,否则10分 任务信息

4. 质量 (Data Quality)

考评指标 评分标准 需要信息
表产出时效监控 (Data Timeliness) 产出时间超过历史均值一定比例则0分,否则10分 任务信息
表产出数据量监控 (Data Volume) 产出数据量超出历史均值上下限则0分,否则10分 HDFS

5. 安全 (Security)

考评指标 评分标准 需要信息
未明确安全等级 (Undefined Security Level) 未设置0分,已设置10分 元数据
目录文件访问权限 (Access Control) 目录或文件权限超出建议范围则0分,否则10分 HDFS

评分规则

数据治理考评平台会按照上述考评指标对企业的数据治理情况进行自动评估,并计算出最终得分,帮助企业发现问题并持续优化数据管理策略。

  1. 考评引擎

结合Hive元数据、HDFS存储信息、DolphinScheduler任务信息等计算评分
自动扫描数据仓库并生成考评报告
3. 数据治理可视化

通过Web页面展示各类考评指标及得分
详细展示每张表的元数据、存储情况、访问记录等
3. 技术栈:

后端:Spring Boot、MyBatis-Plus、MySQL
大数据组件:Hive、HDFS、DolphinScheduler
前端:基于Vue.js的可视化界面
4. 项目亮点:

量化考评机制:采用打分体系量化数据治理状况,清晰展示问题点
自动化数据获取:结合Hive、HDFS等多源数据,实时更新考评结果
高扩展性:支持定制考评规则,可适配不同企业需求

提取元数据(每天提取一次)
1 明确要提取哪些数据 ( 尽量覆盖,1 考评指标的要求,2 一般用户需要查看的内容)
2 根据要提取的数据建立目标表 (create table)
3 建立project , springboot 项目
4 依赖
5 根据表 生成代码
6 提取元数据业务逻辑编写
6.1 hive元数据

  • 创建hiveclient(帮你查询hive元数据,你也可以自己写)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95

    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <configuration>
    <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://hadoop102:3306/metastore?useSSL=false&amp;useUnicode=true&amp;characterEncoding=UTF-8</value>
    </property>

    <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>com.mysql.jdbc.Driver</value>
    </property>

    <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>root</value>
    </property>

    <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>000000</value>
    </property>

    <property>
    <name>hive.metastore.warehouse.dir</name>
    <value>/warehouse</value>
    </property>

    <property>
    <name>hive.metastore.schema.verification</name>
    <value>false</value>
    </property>

    <property>
    <name>hive.server2.thrift.port</name>
    <value>10000</value>
    </property>

    <property>
    <name>hive.server2.thrift.bind.host</name>
    <value>hadoop102</value>
    </property>

    <property>
    <name>hive.metastore.event.db.notification.api.auth</name>
    <value>false</value>
    </property>

    <property>
    <name>hive.cli.print.header</name>
    <value>true</value>
    </property>

    <property>
    <name>hive.cli.print.current.db</name>
    <value>true</value>
    </property>
    <!--Spark依赖位置(注意:端口号8020必须和namenode的端口号一致)-->
    <property>
    <name>spark.yarn.jars</name>
    <value>hdfs://hadoop102:8020/spark-jars/*</value>
    </property>

    <!--Hive执行引擎-->
    <property>
    <name>hive.execution.engine</name>
    <value>spark</value>
    </property>
    <property>
    <name>hive.exec.dynamic.partition</name>
    <value>true</value>
    <description>Whether or not to allow dynamic partitions in DML/DDL.</description>
    </property>
    <property>
    <name>hive.exec.dynamic.partition.mode</name>
    <value>nonstrict</value>
    <description>
    In strict mode, the user must specify at least one static partition
    in case the user accidentally overwrites all partitions.
    In nonstrict mode all partitions are allowed to be dynamic.
    </description>
    </property>
    <property>
    <name>hive.metastore.client.socket.timeout</name>
    <value>1000</value>
    </property>
    <!-- hiveserver2的高可用参数,开启此参数可以提高hiveserver2的启动速度 -->
    <property>
    <name>hive.server2.active.passive.ha.enable</name>
    <value>true</value>
    </property>

    </configuration>

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23

    public class TableMetaInfoServiceImpl extends ServiceImpl<TableMetaInfoMapper, TableMetaInfo> implements TableMetaInfoService {

    IMetaStoreClient hiveClient = getHiveClient();

    // 初始化 hive 客户端
    private IMetaStoreClient getHiveClient() {
    HiveConf hiveConf = new HiveConf();

    hiveConf.addResource(Thread.currentThread().getContextClassLoader().
    getResourceAsStream("hive-site.xml"));

    IMetaStoreClient client = null;
    try {
    // true表示用hive访问metaStore,false表示用metaserver访问metaStore
    client = RetryingMetaStoreClient.getProxy(hiveConf, true);
    } catch (Exception e) {
    throw new RuntimeException(e);

    }
    return client;
    }
    }
  • 获取hive元数据
    元数据到数据库表里
    hdfs元数据

  • 创建hdfs客户端

1
2
3
4
// 获取hdfs客户端
FileSystem fs = FileSystem.get(new URI(tableFsPath), new Configuration(), tableFsOwner);
// 获取当前目录或文件信息的状态
FileStatus[] fileStatuses = fs.listStatus(curPathFileStatus.getPath());
  • 获取hdfs元数据
  • 元数据到数据库表

元数据附加信息

  • 每张表一条数据(不用每天录入,其中有些信息可以通过hive或hdfs元数据获取,就不用都手动录入了)

保存元数据
7 保存到数据库表里

要给用户提供对于元数据管理界面
查看和修改 任务2

页面列表查询
参数部分
1 涉及到 根据条件搜索 库名 表名 分层
2 分页 : 第几页 和 每页多少行

结果部分
1 这些字段 在哪几张表 table_meta_info table_meta_info_extra
2 不考虑分页情况的总行数

为了避免join 可以不可以
先查出一个列表
然后再遍历这个列表
用列表的每个元素 查询数据 获得 结果 对元素进行补充。

页面操作 即席查询 操作
不要循环操作数据库
尽量组合成 join 一次提交

在bean里加上以下表示下面这个字段是不在数据库里的,让mybatis不去映射这个字段。
@Tablefield(exist = false)
private List extraList;
数据库表 join操作的结果 如何 封装到一个主子类 中
策略
还是使用sql 进行join操作
1 要区分出同名的字段 每个同名字段单独起别名
2 其他可以※ ,最好还是别写 *
3 使用mapper.xml进行映射
4 映射分两部分 Java属性和字段名 不一致的需要手动映射
其他的可以自动映射
association 是负责 子对象映射关键词
5 配置
application.properties

如果要查多张表,就不建议用先查一张表,另外一张表查出来用set,因为要用循环,每一次循环都要连接数据库,效率低。
所有用join查询,但如何把两个表的字段分别对应起来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Select(" select tm.id as tm_id ," +
"tm.schema_name as tm_schema_name, " +
"tm.table_name as tm_table_name," +
"tm.create_time as tm_create_time," +
"tm.update_time as tm_update_time," +
" te.id as te_id ," +
"te.schema_name as te_schema_name ," +
"te.table_name as te_table_name," +
"te.create_time as te_create_time," +
"te.update_time as te_update_time," +
" tm.* ,te.* from table_meta_info tm join table_meta_info_extra te" +
" on tm.table_name=te.table_name and tm.schema_name=te.schema_name " +
" where assess_date= (select max(assess_date) from table_meta_info )"
)
@ResultMap("meta_map")
public List<TableMetaInfo> selectTableMetaInfoAllList( );
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.atguigu.dga.meta.mapper.TableMetaInfoMapper">
<resultMap id="meta_map" type="com.zzh.dga.meta.bean.TableMetaInfo" autoMapping="true">
<result property="id" column="tm_id"></result>
<result property="schemaName" column="tm_schema_name"></result>
<result property="tableName" column="tm_table_name"></result>
<result property="createTime" column="tm_create_time"></result>
<result property="updateTime" column="tm_update_time"></result>
<association property="tableMetaInfoExtra" javaType="com.zzh.dga.meta.bean.TableMetaInfoExtra" autoMapping="true">
<result property="id" column="te_id"></result>
<result property="schemaName" column="te_schema_name"></result>
<result property="tableName" column="te_table_name"></result>
<result property="createTime" column="te_create_time"></result>
<result property="updateTime" column="te_update_time"></result>
</association>
</resultMap>
</mapper>

还要在application.yml里配置mybatis的mapper扫描路径

1
2
3
4
#设定自动映射行为 :FULL标识对象存在嵌套的情况
mybatis.configuration.auto-mapping-behavior=FULL
#映射文件所在位置
mybatis.mapper-locations=classpath:mapper/*.xml

任务四:根据考评结果计算分数
清理当日元数据–>>提取hive元数据信息–>>补充hdfs元数据信息–>>提取ds元数据信息–>>

指标计算 优化
1 可以把在指标计算过程中可以复用计算 ,提取出来,提前统一计算

2 指标和指标间的计算 存不存在依赖关系: 不存在依赖关系
100张表 *20指标 2000次指标考评计算
并行

考评用多线程,但我统计考评分数要统一统计
如何 让多线程处理 完成 集结 在进行统一的处理进行
1.8以前的java countdown

异步编排

任务三: 对每张表每个指标进行考评

1 把数仓元数据提取出来 table_meta_info table_meta_info_extra

2 把指标清单提取出来 governance_metrics

3 元数据列表和指标列表 双层循环 依次 进行考评

4 获得一个 考评结果明细列表 进行保存

[up主专用,视频内嵌代码贴在这]