Flume简介与实践

Flume是一个分布式、高可用日志收集系统,可以收集不同来源的日志并集中存储。目前Flume是Apache顶级项目。

架构

数据流模型

UserGuide_image00
Flume Agent是一个JVM进程,包含三个基本组件:

  1. Source,用于从外部数据源获取数据;
  2. Channel,用于暂存数据;
  3. Sink,用于向目标发送数据。

Source

Flume支持以下Source:

  1. Avro Source,侦听Avro端口,接收外部Avro客户端发送的数据;
  2. Thrift Source,侦听Thrift端口,接收外部Thrift客户端发送的数据;
  3. Exec Source,执行Unix命令,并将标准输出作为外部数据源,例如,可以使用“cat [named pipe]”或“tail -F [file]”命令持续地输出数据作为外部数据源;
  4. JMS Source,从JMS消息队列或主题中读取消息作为外部数据源;
  5. Spooling Directory Source,将目录下的文件作为外部数据源,当有新文件加入目录时,则读取文件中的数据;
  6. Kafka Source,从Kafka topic中读取消息作为外部数据源;
  7. NetCat Source,侦听某个端口,按行接收数据作为外部数据源;
  8. Sequence Generator Source,产生从0开始,按1递增的序列作为外部数据源,主要用于测试;
  9. Syslog Sources,读取syslog作为外部数据源;
  10. HTTP Source,将HTTP GET和POST请求作为外部数据源;
  11. Stress Source,内部产生数据用于压力测试;
  12. Legacy Sources,接收Flume 0.9.4发送的数据并作适配,作为Flume 1.x的外部数据源;
  13. Scribe Source,适配Scribe,将Scribe作为外部数据源。

Channel

Flume支持以下Channel:

  1. Memory Channel,将数据暂存在内存队列中;
  2. JDBC Channel,将数据暂存在数据库中;
  3. Kafka Channel,将数据暂存在Kafka集群中;
  4. File Channel,将数据暂存在文件中;
  5. Spillable Memory Channel,同时使用内存和文件暂存数据。

Skin

Flume支持以下Sink:

  1. HDFS Sink,将数据发送至HDFS;
  2. Hive Sink,将数据发送至Hive;
  3. Logger Sink,以日志方式输出数据,主要用于测试;
  4. Avro Sink,以Avro方式将数据发送到其他的Avro服务器端;
  5. Thrift Sink,以Thrift方式将数据发送到其他的Thrift服务器端;
  6. File Roll Sink,将数据发送至本地文件;
  7. Null Sink,不再发送数据;
  8. HBaseSinks,将数据发送至HBase;
  9. MorphlineSolrSink,将数据发送至Solr;
  10. ElasticSearchSink,将数据发送至ElasticSearch;
  11. Kafka Sink,将数据发送至Kafka Topic。

安装

直接下载和解压可执行包:

tar -zxvf apache-flume-1.5.0.1-bin.tar.gz
ln -s apache-flume-1.5.0.1-bin flume

配置

以tail -F方式读取nginx日志,使用内存暂存并发送到Kafka topic,配置mp_pv_producer.properties如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#agent section
mp_pv_producer.sources=s
mp_pv_producer.channels=c
mp_pv_producer.sinks=r

#source section
mp_pv_producer.sources.s.type=exec
mp_pv_producer.sources.s.channels=c
mp_pv_producer.sources.s.restart=true
mp_pv_producer.sources.s.restartThrottle=500
mp_pv_producer.sources.s.command=tail -f /data/logs/nginx/access.log

# Each sink's type must be defined
mp_pv_producer.sinks.r.type=org.apache.flume.plugins.KafkaSink
mp_pv_producer.sinks.r.metadata.broker.list=10.16.3.97:9092,10.16.3.172:9092
mp_pv_producer.sinks.r.partition.key=0
mp_pv_producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
mp_pv_producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
mp_pv_producer.sinks.r.request.required.acks=0
mp_pv_producer.sinks.r.max.message.size=1000000
mp_pv_producer.sinks.r.producer.type=sync
mp_pv_producer.sinks.r.custom.encoding=UTF-8
mp_pv_producer.sinks.r.custom.topic.name=mp_pv
#Specify the channel the sink should use
mp_pv_producer.sinks.r.channel=c

# Each channel's type is defined.
mp_pv_producer.channels.c.type=memory
mp_pv_producer.channels.c.capacity=1000

启动

nohup bin/flume-ng agent –conf conf –conf-file conf/mp_pv_producer.properties –name mp_pv_producer -Dflume.root.logger=INFO,console &