How to use the Flume E2E mode

E2E provides the best reliability mode:

You can tune the reliability level of the agents, simply by specifying a different kind of agent sink. There are three levels available. One of them is the E2E (End To End) mode, which relies on an acknowledgement, and will retry if no acknowledgement is received.

E2E limitations:

E2E mode works only with Hadoop. This means you can’t use:

  • text( ” .. ” )

and can only use:

  • collectorSink( “hdfs://…” , “src” )
  • collectorSink( “file:///../..” , “prefix” )
  • Your own customized Sink

Important: Even if you are using your own customized Sink which doesn’t use any Hadoop facilities, you must have Hadoop running on your collector!

The collector is the key for working with E2E:

In E2E mode, the agent side is sending data in the most reliable mode. The collector must to be setup to calculate and send ACKs.
In order to work with E2E you must add a collector wrapper that calculates ACKs.

For example an E2E configuration should look like:

agent  : source | agentE2ESink (machine,port);
collector : collectorSource(port) | collector(time_in_ms) { collectorSink(...)};

The time_in_ms argument in the collector wrapper means every time_in_ms milliseconds, ACKs
are flushed back on the path back to agents. In BE (best effort) and DFO modes, data should continuously arrive. However, in E2E mode
data tends to arrive in clumps. In all cases, after the collector’s time_in_ms argument expires, the collectorSink gets closed, and then reopened.

Detailed examples (using Flume shell commands):

Writing to files:

exec config agent1 'tail("/tmp/logs.in")'  'agentE2ESink("agent2",35888)'
exec config agent2 'collectorSource( 35888 )' 'collector(10000) {collectorSink("file:///tmp/yy","zz-")}'

In this example, the logs from agent1 (file “logs.in”) are collected and forward to agent2’s directory: “/tmp/yy”

Customized Sink:

exec config agent1 'tail("/tmp/logs.in")'  'agentE2ESink("agent2",35888)'
exec config agent2 'collectorSource( 35888 )' 'collector(10000) { mySimpleSink ()}'

in this example, the logs from agent1 (file “logs.in”) are collected and forward to agent2’s mySimpleSink.
The mySimpleSink is a simple sink, that flushes the log Body into a specific file in /tmp directory. Please note that since the collector reopened every time_in_ms, you should write your Sink to open the file in append mode.

Metrics information:

The metrics information at an agent or a collector can be viewed at
port 35862 (e.g. http://agent:35862/)

2 Comments
  1. Anthony says:

    Very helpful!
    Would you post the source of mySimpleSink? Or give me a link of example to do that.
    It’ll help me save the time to write ack to agent.
    Thanks

Leave a Reply

*