博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
阿里云大数据MaxCompute基于UDTF解析JSON日志的案例
阅读量:6535 次
发布时间:2019-06-24

本文共 12655 字,大约阅读时间需要 42 分钟。

因为MaxCompute提供的系统函数有限,所以平台提供了强大的自定义函数(UDF)来进行复杂的数据处理,因为MaxCompute的沙箱机制,所以解析JSON日志串的时候需要使用GSON来进行解析,本例中原始数据可能是从其他DB通过数据集成同步到MaxCompute平台上的,所以MaxCompute可能有个待处理的原始表如下(按照天来同步日志会有个年月日的分区,根据实际业务加上即可):

create table t_biz_log(    BIGINT id,    STRING logcontent)

上表logcontent待解析JSON日志串案例如下(字符串中的VALUE文本可能出现特殊字符如反斜线\等)

[{"acsRegion":"cn-huhehaote","apiVersion":"2016-04-28","errorCode":"Forbindden","errorMessage":"The specified Instance already bind eip","eventId":"01168520-E248-4949-84AC-48EF6FA59292","eventName":"CreateForwardEntry","eventSource":"aliyuncs.com","eventTime":"2018-04-11T07:32:47Z","eventType":"ApiCall","eventVersion":"1","isGlobal":false,"requestId":"01168520-E248-48EF6FA59292","requestParameters":{"_response_json_parse":"\"true\"","IpProtocol":"\"tcp\"","ExternalIp":"\"39.10.2.1\"","ForwardTableId":"\"ftb-hp3bbrmtlho\"","SecureTransport":"\"true\"","needarrayitemname":"\"true\"","RequestId":"\"01168-E248-4949-84AC-48EF6FA59292\"","ExternalPort":"\"12\"","RegionId":"\"cn-huhehaote\"","InternalPort":"\"112\"","HostId":"\"huhehaote.aliyuncs.com\"","InternalIp":"\"192.168.1.167\""},"serviceName":"Vpc","sourceIpAddress":"106.11.34.11","userIdentity":{"accountId":"44404","principalId":"44404","sessionContext":{"attributes":{"creationDate":"2018-04-11T07:32:47Z","mfaAuthenticated":"false"}},"type":"root-account","userName":"root"}}]

案例过程如下:

第一步:分析上面的JSON找出想要的关键信息并创建MaxCompute表:

create table t_analysis_log(    String acsRegion,    String apiVersion,    String eventId,    String eventName,    String eventSource,    String eventTime,    String eventType,    String eventVersion,    String requestId,    String SourceCidrIp,    String SecurityGroupId,    String IpProtocol,    String NicType,    String Policy,    String PortRange,    String serviceName,    String sourceIpAddress,    String userAgent,    String accessKeyId,    String accountId,    String principalId,    String type,    String userName)
第二步:创建实体类(因为上面的json有嵌套结构,所以需要类结构也是关联模式)

package com.kangyu;public class AnalysisObj {    private String            acsRegion;    private String            apiVersion;    private String            eventId;    private String            eventName;    private String            eventSource;    private String            eventTime;    private String            eventType;    private String            eventVersion;    private String            requestId;    private RequestParameters requestParameters;    private String            serviceName;    private String            sourceIpAddress;    private String            userAgent;    private UserIdentity      userIdentity;    public String getApiVersion() {        return apiVersion;    }    public void setApiVersion(String apiVersion) {        this.apiVersion = apiVersion;    }    public String getEventId() {        return eventId;    }    public void setEventId(String eventId) {        this.eventId = eventId;    }    public String getEventName() {        return eventName;    }    public void setEventName(String eventName) {        this.eventName = eventName;    }    public String getEventSource() {        return eventSource;    }    public void setEventSource(String eventSource) {        this.eventSource = eventSource;    }    public String getEventTime() {        return eventTime;    }    public void setEventTime(String eventTime) {        this.eventTime = eventTime;    }    public String getEventType() {        return eventType;    }    public void setEventType(String eventType) {        this.eventType = eventType;    }    public String getEventVersion() {        return eventVersion;    }    public void setEventVersion(String eventVersion) {        this.eventVersion = eventVersion;    }    public String getRequestId() {        return requestId;    }    public void setRequestId(String requestId) {        this.requestId = requestId;    }    public String getServiceName() {        return serviceName;    }    public void setServiceName(String serviceName) {        this.serviceName = serviceName;    }    public String getSourceIpAddress() {        return sourceIpAddress;    }    public void setSourceIpAddress(String sourceIpAddress) {        this.sourceIpAddress = sourceIpAddress;    }    public String getUserAgent() {        return userAgent;    }    public void setUserAgent(String userAgent) {        this.userAgent = userAgent;    }    public RequestParameters getRequestParameters() {        return requestParameters;    }    public void setRequestParameters(RequestParameters requestParameters) {        this.requestParameters = requestParameters;    }    public UserIdentity getUserIdentity() {        return userIdentity;    }    public void setUserIdentity(UserIdentity userIdentity) {        this.userIdentity = userIdentity;    }    public String getAcsRegion() {        return acsRegion;    }    public void setAcsRegion(String acsRegion) {        this.acsRegion = acsRegion;    }}class RequestParameters {    private String SourceCidrIp;    private String SecurityGroupId;    private String IpProtocol;    private String NicType;    private String Policy;    private String PortRange;    public String getSourceCidrIp() {        return SourceCidrIp;    }    public void setSourceCidrIp(String sourceCidrIp) {        SourceCidrIp = sourceCidrIp;    }    public String getSecurityGroupId() {        return SecurityGroupId;    }    public void setSecurityGroupId(String securityGroupId) {        SecurityGroupId = securityGroupId;    }    public String getIpProtocol() {        return IpProtocol;    }    public void setIpProtocol(String ipProtocol) {        IpProtocol = ipProtocol;    }    public String getNicType() {        return NicType;    }    public void setNicType(String nicType) {        NicType = nicType;    }    public String getPolicy() {        return Policy;    }    public void setPolicy(String policy) {        Policy = policy;    }    public String getPortRange() {        return PortRange;    }    public void setPortRange(String portRange) {        PortRange = portRange;    }}class UserIdentity {    private String accessKeyId;    private String accountId;    private String principalId;    private String type;    private String userName;    public String getAccessKeyId() {        return accessKeyId;    }    public void setAccessKeyId(String accessKeyId) {        this.accessKeyId = accessKeyId;    }    public String getAccountId() {        return accountId;    }    public void setAccountId(String accountId) {        this.accountId = accountId;    }    public String getPrincipalId() {        return principalId;    }    public void setPrincipalId(String principalId) {        this.principalId = principalId;    }    public String getType() {        return type;    }    public void setType(String type) {        this.type = type;    }    public String getUserName() {        return userName;    }    public void setUserName(String userName) {        this.userName = userName;    }}
第三步:创建JSON处理业务

说明:

1.传参为一个String类型的JSON,传出参数为多个解析后的String类型

2.需要引用GSON包等,在帮助文档的的odpscmd_public.zip的lib目录下就有对应jar包

package com.kangyu;import java.util.ArrayList;import java.util.List;import com.aliyun.odps.udf.UDFException;import com.aliyun.odps.udf.UDTF;import com.aliyun.odps.udf.annotation.Resolve;import com.google.gson.Gson;import com.google.gson.JsonArray;import com.google.gson.JsonElement;import com.google.gson.JsonParser;import com.google.gson.reflect.TypeToken;@Resolve({ "string->string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string" })public class AnalysisLog extends UDTF {    @Override    public void process(Object[] arg0) throws UDFException {        String log = (String) arg0[0];        log = log.replaceAll("\\\\", "").replace("\"{", "{").replace("}\"", "}").replace("\"\"", "\"");        log = log.replace("\"[", "[").replace("]\"", "]");        if (log.indexOf("\"errorCode\"") != -1) {            return;        }        String acsRegion = null;        String apiVersion = null;        String eventId = null;        String eventName = null;        String eventSource = null;        String eventTime = null;        String eventType = null;        String eventVersion = null;        String requestId = null;        // requestParameters;        String sourceCidrIp = null;        String securityGroupId = null;        String ipProtocol = null;        String nicType = null;        String policy = null;        String portRange = null;        String serviceName = null;        String sourceIpAddress = null;        String userAgent = null;        // userIdentity;        String accessKeyId = null;        String accountId = null;        String principalId = null;        String type = null;        String userName = null;        List
list = GsonUtil.fromJsonList(log, AnalysisObj.class); for (AnalysisObj obj : list) { acsRegion = obj.getAcsRegion(); apiVersion = obj.getApiVersion(); eventId = obj.getEventId(); eventName = obj.getEventName(); eventSource = obj.getEventSource(); eventTime = obj.getEventTime(); eventType = obj.getEventType(); eventVersion = obj.getEventVersion(); requestId = obj.getRequestId(); serviceName = obj.getServiceName(); sourceIpAddress = obj.getSourceIpAddress(); userAgent = obj.getUserAgent(); RequestParameters paramObj = obj.getRequestParameters(); if (paramObj != null) { sourceCidrIp = paramObj.getSourceCidrIp(); securityGroupId = paramObj.getSecurityGroupId(); ipProtocol = paramObj.getIpProtocol(); nicType = paramObj.getNicType(); policy = paramObj.getPolicy(); portRange = paramObj.getPortRange(); } UserIdentity identityObj = obj.getUserIdentity(); if (identityObj != null) { accessKeyId = identityObj.getAccessKeyId(); accountId = identityObj.getAccountId(); principalId = identityObj.getPrincipalId(); type = identityObj.getType(); userName = identityObj.getUserName(); } forward(acsRegion, apiVersion, eventId, eventName, eventSource, eventTime, eventType, eventVersion, requestId, serviceName, sourceIpAddress, userAgent, sourceCidrIp, securityGroupId, ipProtocol, nicType, policy, portRange, accessKeyId, accountId, principalId, type, userName); } }class GsonUtil { // 将Json数据解析成相应的映射对象 public static
T parseJsonWithGson(String jsonData, Class
type) { Gson gson = new Gson(); T result = gson.fromJson(jsonData, type); return result; } // 将Json数组解析成相应的映射对象列表 public static
List
parseJsonArrayWithGson(String jsonData, Class
type) { Gson gson = new Gson(); List
result = gson.fromJson(jsonData, new TypeToken
>() { }.getType()); return result; } public static
ArrayList
fromJsonList(String json, Class
cls) { ArrayList
mList = new ArrayList
(); Gson gson = new Gson(); try { JsonArray array = new JsonParser().parse(json).getAsJsonArray(); for (final JsonElement elem : array) { mList.add(gson.fromJson(elem, cls)); } } catch (Exception e) { System.out.println("json=" + json); e.printStackTrace(); } return mList; }}
第四步:生成代码的UDTF的jar包,可以使用jar -cvf命令,也可以直接使用如eclipse工具导出

file--->export 选择 java下面的jar file

8afefdd53f0a2df9960645534c9a1124bc82d0a5

第五步:使用DataWorks上传上面导出的jar包

3e632ca57d8c81a83f92f3d072ac81f546030c0d

第六步:根据上传的jar包来创建自定义函数

create function analysis_log_udf as 'com.kangyu.AnalysisLog' using 'analysisLog.jar'
第七步:可以使用上面创建的函数进行查询

select analysis_log_udf(logcontent) as (acsRegion,    apiVersion,    eventId,    eventName,    eventSource,    eventTime,    eventType,    eventVersion,    requestId,    SourceCidrIp,    SecurityGroupId,    IpProtocol,    NicType,    Policy,    PortRange,    serviceName,    sourceIpAddress,    userAgent,    accessKeyId,    accountId,    principalId,    type,    userName)from t_biz_log
第八步:可以在上面的SQL外面增加insert overwrite操作并在DataWorks中配置同步任务

注意:

如果您的两张表有分区请自行加上 partitioned by来进行分区并且查询的时候也增加where条件

insert overwrite table t_analysis_logselect analysis_log_udf(logcontent) as (acsRegion,    apiVersion,    eventId,    eventName,    eventSource,    eventTime,    eventType,    eventVersion,    requestId,    SourceCidrIp,    SecurityGroupId,    IpProtocol,    NicType,    Policy,    PortRange,    serviceName,    sourceIpAddress,    userAgent,    accessKeyId,    accountId,    principalId,    type,    userName)from t_biz_log

转载地址:http://owkdo.baihongyu.com/

你可能感兴趣的文章
input文本框实现宽度自适应代码实例
查看>>
C#基本数据类型 <思维导图>
查看>>
POJ3321 Apple Tree (树状数组)
查看>>
一个程序员的自白(延迟满足)
查看>>
protocol buffers的编码原理
查看>>
行为型设计模式之命令模式(Command)
查看>>
减少死锁的几个常用方法
查看>>
HDFS 核心原理
查看>>
正确配置jstl的maven依赖,jar包冲突的问题终于解决啦
查看>>
利用KMP算法解决串的模式匹配问题(c++) -- 数据结构
查看>>
登录内网账号后,连接不上内网网址
查看>>
安装 MariaDB
查看>>
Python3学习笔记16-错误和异常
查看>>
图像识别——ubuntu16.04 movidius VPU NCSDK深度学习环境搭建
查看>>
.NET应用架构设计—适当使用活动记录模式代替领域模型模式
查看>>
dev_dbg()
查看>>
关于RVDS的PRESERVE8
查看>>
Tomcat在Mac平台安裝
查看>>
底层和高层-失去的平衡[我们要开发怎样的应用]
查看>>
60款高质量的网站模板免费下载(上篇)
查看>>