自定义flume拦截器
自定义flume拦截器
使用IDEA自定义flume拦截器
1.创建maven工程flume-interceptor
2.创建包com.flume.interceptor
3.在pom.xml中添加如下配置
org.apache.flume flume-ng-core 1.8.0 provided com.alibaba fastjson 1.2.62 maven-compiler-plugin 3.1 1.8 1.8 maven-assembly-plugin jar-with-dependencies make-assembly package single
4.在com.flume.interceptor包下创建JSONUtils类
package com.flume.interceptor; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONException; public class JSONUtils { //验证数据是否json public static boolean isValidate(String log) { try { JSON.parse(log); return true; }catch (JSONException e){ return false; } } }
5.在com.flume.interceptor包下创建ETLInterceptor类
package com.flume.interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.Charset; import java.util.Iterator; import java.util.List; public class ETLInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { //取数据后进行校验 //获取数据 byte[] body = event.getBody(); String log = new String(body, Charset.forName("utf-8")); //校验 if(JSONUtils.isValidate(log)){ return event; } return null; } @Override public Listintercept(List list) { Iterator iterator = list.iterator(); while (iterator.hasNext()){ Event next = iterator.next(); if (intercept(next) == null) { iterator.remove(); } } return list; } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new ETLInterceptor(); } @Override public void configure(Context context) { } } @Override public void close() { } }
6.打包
7.将包放入Hadoop01的flume/lib目录下
找到包的位置,之后放入
这样一个拦截器就完成了