Flume 配置文件概述

云计算 waitig 780℃ 百度已收录 0评论

flume Overview

Alias Conventions

Alias NameAlias Type
aagent
cchannel
rsource
ksink
gsink group
iinterceptor
ykey
hhost
sserializer

Component Summary

Component InterfaceType AliasImplementation Class
org.apache.flume.Sourceavroorg.apache.flume.source.AvroSource
org.apache.flume.Sourcenetcatorg.apache.flume.source.NetcatSource
org.apache.flume.Sourceseqorg.apache.flume.source.SequenceGeneratorSource
org.apache.flume.Sourceexecorg.apache.flume.source.ExecSource
org.apache.flume.Sourcesyslogtcporg.apache.flume.source.SyslogTcpSource
org.apache.flume.Sourcemultiport_syslogtcporg.apache.flume.source.MultiportSyslogTCPSource
org.apache.flume.Sourcesyslogudporg.apache.flume.source.SyslogUDPSource
org.apache.flume.Sourcespooldirorg.apache.flume.source.SpoolDirectorySource
org.apache.flume.Sourcehttporg.apache.flume.source.http.HTTPSource
org.apache.flume.Sourcethriftorg.apache.flume.source.ThriftSource
org.apache.flume.Sourcejmsorg.apache.flume.source.jms.JMSSource
org.apache.flume.Sourcecustomorg.example.MySource
org.apache.flume.Channelmemoryorg.apache.flume.channel.MemoryChannel
org.apache.flume.Channeljdbcorg.apache.flume.channel.jdbc.JdbcChannel
org.apache.flume.Channelfileorg.apache.flume.channel.file.FileChannel
org.apache.flume.Channelcustom-classorg.example.MyChannel
org.apache.flume.Sinknullorg.apache.flume.sink.NullSink
org.apache.flume.Sinkloggerorg.apache.flume.sink.LoggerSink
org.apache.flume.Sinkavroorg.apache.flume.sink.AvroSink
org.apache.flume.Sinkhdfsorg.apache.flume.sink.hdfs.HDFSEventSink
org.apache.flume.Sinkhbaseorg.apache.flume.sink.hbase.HBaseSink
org.apache.flume.Sinkasynchbaseorg.apache.flume.sink.hbase.AsyncHBaseSink
org.apache.flume.Sinkelasticsearchorg.apache.flume.sink.elasticsearch.ElasticSearchSink
org.apache.flume.Sinkfile_rollorg.apache.flume.sink.RollingFileSink
org.apache.flume.Sinkircorg.apache.flume.sink.irc.IRCSink
org.apache.flume.Sinkthriftorg.apache.flume.sink.ThriftSink
org.apache.flume.Sinkcustom-classorg.example.MySink
org.apache.flume.interceptor.Interceptortimestamporg.apache.flume.interceptor.TimestampInterceptor$Builder
org.apache.flume.interceptor.Interceptorhostorg.apache.flume.interceptor.HostInterceptor$Builder
org.apache.flume.interceptor.Interceptorstaticorg.apache.flume.interceptor.StaticInterceptor$Builder
org.apache.flume.interceptor.Interceptorregex_filterorg.apache.flume.interceptor.RegexFilteringInterceptor$Builder
org.apache.flume.interceptor.Interceptorregex_extractororg.apache.flume.interceptor.RegexFilteringInterceptor$Builder
org.apache.flume.ChannelSelectorreplicatingorg.apache.flume.channel.ReplicatingChannelSelector
org.apache.flume.ChannelSelectormultiplexingorg.apache.flume.channel.MultiplexingChannelSelector
org.apache.flume.ChannelSelectorcustom-classorg.example.MyChannelSelector
org.apache.flume.SinkProcessordefaultorg.apache.flume.sink.DefaultSinkProcessor
org.apache.flume.SinkProcessorfailoverorg.apache.flume.sink.FailoverSinkProcessor
org.apache.flume.SinkProcessorload_balanceorg.apache.flume.sink.LoadBalancingSinkProcessor
org.apache.flume.SinkProcessorcustom-class

Sources

  • avro source
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
  • Thrift source
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
  • Exec Source
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
  • JMS Source
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE
  • Spooling Directory Source
a1.channels = ch-1
a1.sources = src-1
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true
  • Kafka Source
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id

or

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used
  • NetCat UDP Source
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcatudp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1
  • Sequence Generator Source
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1
  • Syslog TCP Source
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
  • Custom Source
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.example.MySource
a1.sources.r1.channels = c1

Sinks

  • HDFS Sink
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

# an event with timestamp 11:54:34 AM, June 12, 2012 will cause the hdfs path to become /flume/events/2012-06-12/1150/00.
  • Hive Sink
a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = '\t'
a1.sinks.k1.serializer.fieldnames =id,,msg
  • Logger Sink
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
  • Avro Sink
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
  • Thrift Sink
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
  • HBaseSink
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1
  • KafkaSink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
  • HTTP Sink
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = http
a1.sinks.k1.channel = c1
a1.sinks.k1.endpoint = http://localhost:8080/someuri
a1.sinks.k1.connectTimeout = 2000
a1.sinks.k1.requestTimeout = 2000
a1.sinks.k1.acceptHeader = application/json
a1.sinks.k1.contentTypeHeader = application/json
a1.sinks.k1.defaultBackoff = true
a1.sinks.k1.defaultRollback = true
a1.sinks.k1.defaultIncrementMetrics = false
a1.sinks.k1.backoff.4XX = false
a1.sinks.k1.rollback.4XX = false
a1.sinks.k1.incrementMetrics.4XX = true
a1.sinks.k1.backoff.200 = false
a1.sinks.k1.rollback.200 = false
a1.sinks.k1.incrementMetrics.200 = true
  • Custom Sink
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.example.MySink
a1.sinks.k1.channel = c1

Flume Channels

Channels are the repositories where the events are staged on a agent. Source adds the events and Sink removes it.

  • Memory Channel
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
  • JDBC Channel
a1.channels = c1
a1.channels.c1.type = jdbc
  • Kafka Channel
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
  • File Channel
a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

Flume Channel Selectors

If the type is not specified, then defaults to “replicating”.

  • Replicating Channel Selector (default)
a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3

tips:
In the above configuration, c3 is an optional channel. Failure to write to c3 is simply ignored. Since c1 and c2 are not marked optional, failure to write to those channels will cause the transaction to fail.

  • Multiplexing Channel Selector
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4
  • Custom Channel Selector

A custom channel selector is your own implementation of the ChannelSelector interface. A custom channel selector’s class and its dependencies must be included in the agent’s classpath when starting the Flume agent. The type of the custom channel selector is its FQCN.

a1.sources = r1
a1.channels = c1
a1.sources.r1.selector.type = org.example.MyChannelSelector

Flume Sink Processors

Sink groups allow users to group multiple sinks into one entity. Sink processors can be used to provide load balancing capabilities over all sinks inside the group or to achieve fail over from one sink to another in case of temporal failure.

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
  • Default Sink Processor

Default sink processor accepts only a single sink. User is not forced to create processor (sink group) for single sinks. Instead user can follow the source – channel – sink pattern that was explained above in this user guide.

  • Failover Sink Processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
  • Load balancing Sink Processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
  • Custom Sink Processor

Custom sink processors are not supported at the moment.

Event Serializers

The file_roll sink and the hdfs sink both support the EventSerializer interface. Details of the EventSerializers that ship with Flume are provided below.

  • Body Text Serializer
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false
  • “Flume Event” Avro Event Serializer
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = avro_event
a1.sinks.k1.serializer.compressionCodec = snappy
  • Avro Event Serializer
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
a1.sinks.k1.serializer.compressionCodec = snappy
a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc

Flume Interceptors

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
a1.sinks.k1.channel = c1
  • Timestamp Interceptor

This interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptor inserts a header with key timestamp (or as specified by the header property) whose value is the relevant timestamp. This interceptor can preserve an existing timestamp if it is already present in the configuration.

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
  • Host Interceptor

This interceptor inserts the hostname or IP address of the host that this agent is running on. It inserts a header with key host or a configured key whose value is the hostname or IP address of the host, based on configuration.

a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
  • Static Interceptor
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK
  • Remove Header Interceptor

This interceptor manipulates Flume event headers, by removing one or many headers. It can remove a statically defined header, headers based on a regular expression or headers in a list. If none of these is defined, or if no header matches the criteria, the Flume events are not modified.

  • UUID Interceptor

This interceptor sets a universally unique identifier on all events that are intercepted. An example UUID is b5755073-77a9-43c1-8fad-b7a586fc1b97, which represents a 128-bit value.

  • Regex Filtering Interceptor

This interceptor filters events selectively by interpreting the event body as text and matching the text against a configured regular expression. The supplied regular expression can be used to include events or exclude events.

  • Regex Extractor Interceptor

If the Flume event body contained 1:2:3.4foobar5 and the following configuration was used:

a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three

本文由【waitig】发表在等英博客
本文固定链接:Flume 配置文件概述
欢迎关注本站官方公众号,每日都有干货分享!
等英博客官方公众号
点赞 (0)分享 (0)