接了一个新需求:需要做数据仓库的血缘关系。正所谓兵来将挡水来土掩,那咱就动手吧。
血缘关系是数据治理的一块,其实有专门的第三方数据治理框架,但考虑到目前的线上环境已经趋于稳定,引入新的框架无疑是劳民伤财,伤筋动骨,所以就想以最小的代价把这个事情给做了。目前我们考虑做的血缘关系呢只是做输入表和输出表,最后会形成一张表与表之间的链路图。这个东西的好处就是有助于仓库人员梳理业务,后面可能还会做字段之间的血缘关系等,后面做了再说,今天只是记录一下输入表和输出表的血缘关系。
我们线上的环境用来做etl的是hive sql和spark sql,所以想到的就是将hive sql和spark sql都拦截下来,然后通过语法解析,解析出其中的输入表和输出表。搞完这个这事就算是大功告成了。
首先第一步是拦截hive sql和spark sql,这个呢我们是直接在hive和spark的源码中切入了一小段代码用来拦截sql,然后将拦截到的sql存入到mysql中,这个暂且不做记录,后续补充,这里的重点是sql的解析。
其实做hive的血缘关系分析在源码中是有一个类可以参考的:org.apache.hadoop.hive.ql.tools.LineageInfo, 不过呢,这个例子不全面,不能覆盖到我们线上的情况。比如 hive中的with语法,create table语法就不能覆盖到,好巧不巧,跟仓库的同事聊过之后,with这种语法也是用的很多的,所以只需要在这个例子上加上一些东东,就基本可以满足我们的需求啦。
总结一下:其实做表与表之间的血缘关系只需要考虑到下面几种语法就差不多了:with, create table , insert, select; 可能我说的不全面,但我目前了解的情况大概就是这些。下面是hive的解析的核心代码:
public class LineageUtils implements NodeProcessor { // 存放输入表 TreeSetinputTableList = new TreeSet (); // 存放目标表 TreeSet outputTableList = new TreeSet (); //存放with子句中的别名, 最终的输入表是 inputTableList减去withTableList TreeSet withTableList = new TreeSet (); public TreeSet getInputTableList() { return inputTableList; } public TreeSet getOutputTableList() { return outputTableList; } public TreeSet getWithTableList() { return withTableList; } public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { ASTNode pt = (ASTNode) nd; switch (pt.getToken().getType()) { //create语句 case HiveParser.TOK_CREATETABLE: { String createName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) pt.getChild(0)); outputTableList.add(createName); break; } //insert语句 case HiveParser.TOK_TAB: { // System.out.println(pt.getChildCount() + "tab"); String insertName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) pt.getChild(0)); outputTableList.add(insertName); // System.out.println("insertName " + insertName); break; } //from语句 case HiveParser.TOK_TABREF: { ASTNode tabTree = (ASTNode) pt.getChild(0); String fromName = (tabTree.getChildCount() == 1) ? BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(0)) : BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(0)) + "." + tabTree.getChild(1); inputTableList.add(fromName); break; } // with.....语句 case HiveParser.TOK_CTE: { for (int i = 0; i < pt.getChildCount(); i++) { ASTNode temp = (ASTNode) pt.getChild(i); String cteName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) temp.getChild(1)); withTableList.add(cteName); } break; } } return null; } public void getLineageInfo(String query) throws ParseException, SemanticException { ParseDriver pd = new ParseDriver(); ASTNode tree = pd.parse(query); while ((tree.getToken() == null) && (tree.getChildCount() > 0)) { tree = (ASTNode) tree.getChild(0); } inputTableList.clear(); outputTableList.clear(); withTableList.clear(); Map rules = new LinkedHashMap (); Dispatcher disp = new DefaultRuleDispatcher(this, rules, null); GraphWalker ogw = new DefaultGraphWalker(disp); ArrayList topNodes = new ArrayList(); topNodes.add(tree); ogw.startWalking(topNodes, null); } //进行测试,sql语句是瞎写的,但是语法是对的 public static void main(String[] args) throws IOException, ParseException, SemanticException { //String query = "insert into qc.tables_lins_cnt partition(dt='2016-09-15') select a.x from (select x from cc group by x) a left join yy b on a.id = b.id left join (select * from zz where id=1) c on c.id=b.id"; // String query ="from (select id,name from xx where id=1) a insert overwrite table dsl.dwm_all_als_active_d partition (dt='main') select id group by id insert overwrite table dsl.dwm_all_als_active_d2 partition (dt='main') select name group by name"; String query = "with q1 as ( select key from src where key = '5'), q2 as ( select key from with1 a inner join with2 b on a.id = b.id) insert overwrite table temp.dt_mobile_play_d_tmp2 partition(dt='2018-07-17') select * from q1 cross join q2"; LineageUtils lep = new LineageUtils(); lep.getLineageInfo(query); System.out.println("Input tables = " + lep.getInputTableList()); System.out.println("Output tables = " + lep.getOutputTableList()); System.out.println("with tables = " + lep.getWithTableList()); }}
上述代码的运行需要引入maven依赖:
<dependency>
<groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.1.0</version> </dependency>这样核心的解析基本上就大功告成啦,spark sql虽然与hive sql有些差别,但是核心的解析还是照样可以用到这个类的,只不过有些地方需要注意,后面会继续进行记录。
注:这个类只是进行解析的工具类,还有一些细节需要进行考虑,比如sql语句中没有带 database,也就是如果sql中的表不是 database.tablename的形式,该怎么处理?这个时候就需要考虑上下文中的切库(比如 use temp)处理了, 不过这不是什么大问题,是可以解决的。我们想到的解决步骤就在拦截sql的那一层进行的处理,如果有切库的操作,就先把库记录下来,等sql解析完成之后,再去遍历哪些表没有带数据库,将没有带库的表面拼接上先前记录的库即可。