FLUME-720: CollectorSink doesn't pass the new format parameter

Review Request #1886 - Created Aug. 3, 2011 and submitted

Jonathan Hsieh
old-flume
(apache), flume-720
flume
CollectorSink doesn't properly pass the format parameter down to the EscapedCustomDfs sink. 
For example, this is working fine: 
collectorSource(54001) | escapedCustomDfs("hdfs://hadoop1-m1:8020/", "test", seqfile("SnappyCodec") ); 

However, this is using the codec defined in flume-conf.xml 
collectorSource(54001) | collectorSink("hdfs://hadoop1-m1:8020/", "test-", 600000, seqfile("SnappyCodec") ); 

By itself this bug would not be very serious, however the problem is that escapedCustomDfs/customDfs are using the same compressor, and they apply it on the whole file, in addition to the compression done natively by the sequence file - this makes the sequence file double compressed and invalid. 

-- 

The root cause of the problem is that the customEscapedDFS and customDfs do not read the format placed into the context by the collector.  This patch refactors this so that output format is resolved in proper order (local, then context, then config file). 
Added unit test.  Other unit tests running.

Ran local tests:
----
Create data that is supposed to be seq file: 
bin/flume sink 'collectorSink("file:///tmp/bz","bzip",5000, seqfile("bzip2"))' 
... 
Type stuff and write some events. 

Read file that is supposed to be seq file: 
bin/flume source 'seqfile("/tmp/bz/bzipxxxxxx")' 

The latter command will fail if the file is not a seq file. If you look at the generated files you could see if it is a avrojson text file, or look for magic bytes that say SEQ (sequence file) and java classnames for the selected codec.
  1. no review for 3 weeks.  committing.
  2. 
      
Loading...