中文在线免费看视频_国产成人精品亚洲日本在线观看_亚洲精品第一综合99久久_国产亚洲精品日韩综合网

當前位置: 首頁 / 技術干貨 / 正文
數據采集工具之Flume 的攔截器

2023-03-15

攔截器    interceptor 事件

  在Flume運行過程中 ,Flume有能力在運行階段修改/刪除Event,這是通過攔截器(Interceptors)來實現的。攔截器有下面幾個特點:

  攔截器需要實現org.apache.flume.interceptor.Interceptor接口。

  攔截器可以修改或刪除事件基于開發者在選擇器中選擇的任何條件。

  攔截器采用了責任鏈模式,多個攔截器可以按指定順序攔截。

  一個攔截器返回的事件列表被傳遞給鏈中的下一個攔截器。

  如果一個攔截器需要刪除事件,它只需要在返回的事件集中不包含要刪除的事件即可。

  一、系統內置攔截器

  Timestamp Interceptor :時間戳攔截器,將當前時間戳(毫秒)加入到events header中,key名字為:timestamp,值為當前時間戳。用的不是很多

  Host Interceptor:主機名攔截器。將運行Flume agent的主機名或者IP地址加入到events header中,key名字為:host(也可自定義)

  Static Interceptor:靜態攔截器,用于在events header中加入一組靜態的key和value。

  二、內置攔截器的使用

  2.1. Timestamp+HTTP+File+HDFS

  通過時間攔截器,數據源為HTTP,傳送的通道模式是FileChannel,最后輸出的目的地為HDFS

  采集方案

  a1.sources = r1

  a1.channels = c1

  a1.sinks = s1

  a1.sources.r1.type=http

  a1.sources.r1.bind = qianfeng01

  a1.sources.r1.port = 6666

  a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler

  a1.sources.r1.handler.nickname = JSON props

  a1.sources.r1.interceptors=i1 i2 i3

  a1.sources.r1.interceptors.i1.type=timestamp

  #如果攔截器中已經有了時間戳,直接替換成現在的

  a1.sources.r1.interceptors.i1.preserveExisting=false

  a1.sources.r1.interceptors.i2.type=host

  a1.sources.r1.interceptors.i2.preserveExisting=false

  a1.sources.r1.interceptors.i2.useIP=true

  a1.sources.r1.interceptors.i2.hostHeader=hostname

  a1.sources.r1.interceptors.i3.type=static

  a1.sources.r1.interceptors.i3.preserveExisting=false

  a1.sources.r1.interceptors.i3.key=hn

  a1.sources.r1.interceptors.i3.value=qianfeng01

  a1.channels.c1.type=memory

  a1.channels.c1.capacity=1000

  a1.channels.c1.transactionCapacity=100

  a1.channels.c1.keep-alive=3

  a1.channels.c1.byteCapacityBufferPercentage=20

  a1.channels.c1.byteCapacity=800000

  a1.sinks.s1.type=hdfs

  a1.sinks.s1.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/%H%M

  a1.sinks.s1.hdfs.filePrefix=%{hostname}

  a1.sinks.s1.hdfs.fileSuffix=.log

  a1.sinks.s1.hdfs.inUseSuffix=.tmp

  a1.sinks.s1.hdfs.rollInterval=60

  a1.sinks.s1.hdfs.rollSize=1024

  a1.sinks.s1.hdfs.rollCount=10

  a1.sinks.s1.hdfs.idleTimeout=0

  a1.sinks.s1.hdfs.batchSize=100

  a1.sinks.s1.hdfs.fileType=DataStream

  a1.sinks.s1.hdfs.writeFormat=Text

  a1.sinks.s1.hdfs.round=true

  a1.sinks.s1.hdfs.roundValue=1

  a1.sinks.s1.hdfs.roundUnit=second

  a1.sinks.s1.hdfs.useLocalTimeStamp=true

  a1.sources.r1.channels=c1

  a1.sinks.s1.channel=c1

  啟動 Agent

  [root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./ts.conf -n a1 -Dflume.root.logger=INFO,console

  測試數據

  [root@qianfeng01 ~]# curl -X POST -d '[{"headers":{"hn":"qianfeng01","pwd":"123456"},"body":"this is my content qianfeng01"}]' http://qianfeng01:6666

  三、自定義攔截器

  為了提高Flume的擴展性,用戶可以自己定義一個攔截器, 對每一組的item_type和active_time都過濾出相應的HOST和USERID

  處理數據樣例:

  log='{

  "host":"www.baidu.com",

  "user_id":"13755569427",

  "items":[

  {

  "item_type":"eat",

  "active_time":156234

  },

  {

  "item_type":"car",

  "active_time":156233

  }

  ]

  }'

  結果樣例:

  {"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"}

  3.1. pom.xml

<dependencies>
  <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
  <dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
  </dependency>
  <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
  <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.48</version>
  </dependency>
</dependencies>

  3.2. 代碼實現

  /**

  * @Author 千鋒大數據教學團隊

  * @Company 千鋒好程序員大數據

  * @Description 自定義攔截器:對每一組的item_type和active_time都過濾出相應的HOST和USERID

  */

  public class MyInterceptor implements Interceptor {

  @Override

  public void initialize() {

  //初始化方法,寫攔截器初始化時的業務

  }

  @Override

  public void close() {

  //關閉方法,寫攔截器關閉時的代碼

  }

  /**

  * 解析單條event

  * @param event

  * @return

  */

  @Override

  public Event intercept(Event event) {

  //輸入

  String inputeBody=null;

  //輸出

  byte[] outputBoday=null;

  //解析---這里定義對單條Event處理規則

  try {

  inputeBody=new String(event.getBody(), Charsets.UTF_8);

  ArrayListtemp = new ArrayList<>();

  JSONObject bodyObj = JSON.parseObject(inputeBody);

  //1)公共字段

  String host = bodyObj.getString("host");

  String user_id = bodyObj.getString("user_id");

  JSONArray data = bodyObj.getJSONArray("items");

  //2)Json數組=>every json obj

  for (Object item : data) {

  JSONObject itemObj = JSON.parseObject(item.toString());

  HashMap<string, object=""> fields = new HashMap<>();

  fields.put("host",host);

  fields.put("user_id",user_id);

  fields.put("item_type",itemObj.getString("item_type"));

  fields.put("active_time",itemObj.getLongValue("active_time"));

  temp.add(new JSONObject(fields).toJSONString());

  }

  //3)Json obj 拼接

  outputBoday=String.join("\n",temp).getBytes();

  }catch (Exception e){

  System.out.println("輸入數據:"+inputeBody);

  e.printStackTrace();

  }

  event.setBody(outputBoday);

  return event;

  }

  /**

  * 解析一批event

  * @param events

  * @return

  */

  @Override

  public Listintercept(Listevents) {

  //輸出---一批Event

  ArrayListresult = new ArrayList<>();

  //輸入---一批Event

  try{

  for (Event event : events) {

  //一條條解析

  Event interceptedEvent = intercept(event);

  byte[] interceptedEventBody = interceptedEvent.getBody();

  if(interceptedEventBody.length!=0){

  String multiEvent = new String(interceptedEventBody, Charsets.UTF_8);

  String[] multiEventArr = multiEvent.split("\n");

  for (String needEvent : multiEventArr) {

  SimpleEvent simpleEvent = new SimpleEvent();

  simpleEvent.setBody(needEvent.getBytes());

  result.add(simpleEvent);

  }

  }

  }

  }catch (Exception e){

  e.printStackTrace();

  }

  return result;

  }

  /**

  * 實現內部類接口

  */

  public static class Builder implements Interceptor.Builder{

  @Override

  public Interceptor build() {

  return new MyInterceptor();

  }

  @Override

  public void configure(Context context) {

  }

  }

  }

  3.3. 打包上傳

  使用maven將攔截器打包,然后把此包和依賴的fastjson一起上傳到Flume lib目錄下

  3.4. 采集方案制定

  a1.sources = s1

  a1.channels = c1

  a1.sinks = r1

  a1.sources.s1.type = TAILDIR

  #文件以JSON格式記錄inode、絕對路徑和每個跟蹤文件的最后位置

  a1.sources.s1.positionFile = /root/flume/taildir_position.json

  #以空格分隔的文件組列表。每個文件組表示要跟蹤的一組文件

  a1.sources.s1.filegroups = f1

  #文件組的絕對路徑

  a1.sources.s1.filegroups.f1=/root/flume/data/.*log

  #是否添加存儲絕對路徑文件名的標題

  a1.sources.s1.fileHeader = true

  #使用自定義攔截器

  a1.sources.s1.interceptors = i1

  a1.sources.s1.interceptors.i1.type = flume.MyInterceptor$Builder

  a1.channels.c1.type = file

  a1.channels.c1.dataDirs = /root/flume/filechannle/dataDirs

  a1.channels.c1.checkpointDir = /root/flume/filechannle/checkpointDir

  a1.channels.c1.capacity = 1000

  a1.channels.c1.transactionCapacity = 100

  a1.sinks.r1.type = hdfs

  a1.sinks.r1.hdfs.path = hdfs://qianfeng01:8020/flume/spooldir

  a1.sinks.r1.hdfs.filePrefix =

  a1.sinks.r1.hdfs.round = true

  a1.sinks.r1.hdfs.roundValue = 10

  a1.sinks.r1.hdfs.roundUnit = minute

  a1.sinks.r1.hdfs.fileSuffix= .log

  a1.sinks.r1.hdfs.rollInterval=60

  a1.sinks.r1.hdfs.fileType=DataStream

  a1.sinks.r1.hdfs.writeFormat=Text

  a1.sources.s1.channels = c1

  a1.sinks.r1.channel = c1

  3.5. 啟動 Agent

  [root@qianfeng01 flumeconf]# flume-ng agent -c ../conf/ -f ./mytest.conf -n a1 -Dflume.root.logger=INFO,console

  3.6. 測試數據

  [root@qianfeng01 ~]# vi my.sh

  #!/bin/bash

  log='{

  "host":"www.baidu.com",

  "user_id":"13755569427",

  "items":[

  {

  "item_type":"eat",

  "active_time":156234

  },

  {

  "item_type":"car",

  "active_time":156233

  }

  ]

  }'

  echo $log>> /root/flume/data/test.log

  [root@qianfeng01 ~]# bash my.sh

  執行后我們希望得到是數據格式:

  {"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"}

  {"active_time":156233,"user_id":"13755569427","item_type":"car","host":"www.baidu.com"}


好程序員公眾號

  • · 剖析行業發展趨勢
  • · 匯聚企業項目源碼

好程序員開班動態

More+
  • HTML5大前端 <高端班>

    開班時間:2021-04-12(深圳)

    開班盛況

    開班時間:2021-05-17(北京)

    開班盛況
  • 大數據+人工智能 <高端班>

    開班時間:2021-03-22(杭州)

    開班盛況

    開班時間:2021-04-26(北京)

    開班盛況
  • JavaEE分布式開發 <高端班>

    開班時間:2021-05-10(北京)

    開班盛況

    開班時間:2021-02-22(北京)

    開班盛況
  • Python人工智能+數據分析 <高端班>

    開班時間:2021-07-12(北京)

    預約報名

    開班時間:2020-09-21(上海)

    開班盛況
  • 云計算開發 <高端班>

    開班時間:2021-07-12(北京)

    預約報名

    開班時間:2019-07-22(北京)

    開班盛況
IT培訓IT培訓
在線咨詢
IT培訓IT培訓
試聽
IT培訓IT培訓
入學教程
IT培訓IT培訓
立即報名
IT培訓

Copyright 2011-2023 北京千鋒互聯科技有限公司 .All Right 京ICP備12003911號-5 京公網安備 11010802035720號

中文在线免费看视频_国产成人精品亚洲日本在线观看_亚洲精品第一综合99久久_国产亚洲精品日韩综合网

            日韩欧美亚洲国产精品字幕久久久| 美女网站一区二区| 亚洲国产毛片aaaaa无费看| 日日夜夜精品视频免费| 精品系列免费在线观看| 一本色道综合亚洲| 精品毛片乱码1区2区3区| 国产精品国产三级国产有无不卡| 亚洲成人tv网| 99久久精品费精品国产一区二区| 欧美日韩大陆在线| 国产精品美女久久久久久久久久久 | 中文字幕电影一区| 婷婷综合五月天| 色综合亚洲欧洲| 久久色在线视频| 乱一区二区av| 91精品国产黑色紧身裤美女| 日韩理论片一区二区| 日本不卡中文字幕| 欧美亚洲国产bt| 亚洲欧美另类综合偷拍| 丰满少妇在线播放bd日韩电影| 欧美成人在线直播| 日韩国产一区二| 欧美一级片在线| 亚洲成a人片综合在线| 欧美亚洲高清一区二区三区不卡| 亚洲国产精品ⅴa在线观看| 国产综合久久久久久鬼色| 日韩精品自拍偷拍| 国产一区二区三区在线观看精品| 日韩一区二区三区四区| 久久国产成人午夜av影院| 日韩精品中文字幕一区二区三区| 男男成人高潮片免费网站| 欧美一级生活片| 国产成人av一区二区| 国产精品区一区二区三区| 9l国产精品久久久久麻豆| 亚洲综合网站在线观看| 欧美一级xxx| 成人av一区二区三区| 一区二区成人在线| 日韩欧美一区在线| 99在线精品视频| 青青草97国产精品免费观看 | 国模娜娜一区二区三区| 欧美国产精品v| 欧美丝袜丝交足nylons图片| 日本视频中文字幕一区二区三区| 国产视频一区二区在线观看| 欧美日韩亚洲综合| 粉嫩av一区二区三区| 日日夜夜精品视频天天综合网| 久久久噜噜噜久噜久久综合| 欧美日韩久久一区| 成人av电影在线| 国内精品久久久久影院薰衣草 | 精品免费99久久| 在线观看日韩精品| www.欧美色图| 国产经典欧美精品| 蜜桃视频在线观看一区| 亚洲综合在线视频| 国产精品久久三| 欧美一卡二卡三卡| 欧美视频一区在线| 欧洲色大大久久| 91网站最新地址| 成人小视频免费观看| 国产黄人亚洲片| 国产99久久久久久免费看农村| 国产一区二区剧情av在线| 久久国产乱子精品免费女| 日韩av在线播放中文字幕| 亚洲午夜日本在线观看| 亚洲大片免费看| 日本少妇一区二区| 国内外成人在线| 成人黄色小视频在线观看| 成人激情视频网站| 99久久婷婷国产综合精品电影| 91网站最新地址| 欧美体内she精视频| 日韩欧美一级二级| 亚洲国产精品二十页| 亚洲人成网站色在线观看| 一区二区在线观看免费 | 精品电影一区二区三区| 日韩视频免费观看高清完整版在线观看 | 7777精品久久久大香线蕉| 精品国产免费人成在线观看| 国产日韩欧美不卡| 亚洲午夜成aⅴ人片| 狠狠色丁香九九婷婷综合五月| 国产精品白丝av| 欧美精选午夜久久久乱码6080| 日韩精品一区二区三区蜜臀| 一区二区三区久久| 国内精品在线播放| 欧美性生活影院| 国产清纯白嫩初高生在线观看91| 亚洲精品日日夜夜| 国产1区2区3区精品美女| 91精品国产免费久久综合| 国产精品国产三级国产aⅴ无密码| 亚洲成人av一区二区| 99re66热这里只有精品3直播| 精品欧美乱码久久久久久 | 精品视频999| 一二三区精品视频| 欧美午夜在线一二页| 亚洲日本va在线观看| 成人av资源站| 久久久久国产精品麻豆| 亚洲一区二区三区激情| 成人福利视频网站| 精品1区2区在线观看| 日韩精品五月天| 在线电影国产精品| 亚洲精品国产高清久久伦理二区| 国产一级精品在线| 日韩欧美美女一区二区三区| 日本一区二区免费在线| 精品一区二区日韩| 精品国产麻豆免费人成网站| 中文字幕免费观看一区| 国产成人免费9x9x人网站视频| 精品久久久久久无| 国产麻豆91精品| 久久这里只有精品首页| 美洲天堂一区二卡三卡四卡视频| 欧美三级日韩在线| 日韩国产在线观看一区| 日韩欧美电影一二三| 国产不卡在线播放| 国产精品第四页| 欧美伊人精品成人久久综合97| 亚洲成人久久影院| 日韩精品一区二区三区中文不卡 | 制服丝袜亚洲色图| 精品在线你懂的| 亚洲视频在线观看三级| 欧美日韩国产一二三| 成人在线一区二区三区| 亚洲一区二区不卡免费| 日韩精品一区二区三区中文精品| 国产麻豆午夜三级精品| 国产日韩欧美精品一区| 欧美精品 日韩| 韩国av一区二区三区四区| 国产精品成人一区二区三区夜夜夜| 91在线视频播放地址| 国产伦精品一区二区三区免费迷 | 色婷婷综合久久久| 极品美女销魂一区二区三区免费| 国产精品欧美极品| 欧美一级理论片| 欧美三级在线看| 国产一区激情在线| 国产福利一区二区三区视频| 三级久久三级久久久| 亚洲色图在线播放| 国产精品视频第一区| 久久综合色之久久综合| 欧美电影免费观看高清完整版在| 欧美日韩在线综合| 国产精品一二三在| 成人性生交大合| 国产精品1区2区3区| 激情图片小说一区| 精品视频资源站| 欧美在线视频你懂得| 在线视频你懂得一区| 欧美日韩精品一二三区| 国产.精品.日韩.另类.中文.在线.播放| 日韩激情在线观看| 久久精品免费观看| 麻豆视频观看网址久久| 精品一区二区三区香蕉蜜桃 | 另类成人小视频在线| 国产另类ts人妖一区二区| 精品在线亚洲视频| 成人av资源在线观看| 欧美亚洲综合久久| 91精品国产一区二区人妖| 精品入口麻豆88视频| 国产亚洲一区二区三区在线观看| 日韩欧美视频一区| 国产精品久久综合| 丝袜亚洲另类欧美| 国产另类ts人妖一区二区| 色网站国产精品| 日韩亚洲电影在线| 亚洲精品日日夜夜| 午夜精品久久久久久| 国产69精品一区二区亚洲孕妇| 色综合激情五月| 日韩一区二区三区高清免费看看|