FLUME-247: Add Efficient HBase Sink whith Flexible Event's Attributes Writing

Review Request #921 - Created Sept. 28, 2010 and submitted

Alex Baranau
old-flume
FLUME-247
flume
FLUME-247: Add Efficient HBase Sink whith Flexible Event's Attributes Writing
Please refer to the jira issue it has detailed explanation.

Added implementation of the sink and a simple test-case for it:
src/java/com/cloudera/flume/handlers/hbase/Attr2HBaseEventSink.java
src/javatest/com/cloudera/flume/handlers/hbase/TestAttr2HBaseSink.java

Not included in a diff (because the patch is applicable to Patric's http://github.com/phunt/flume/tree/hbase_sink2 branch, not trunk):

--- a/src/java/com/cloudera/flume/conf/SinkFactoryImpl.java
+++ b/src/java/com/cloudera/flume/conf/SinkFactoryImpl.java
@@ -63,6 +63,7 @@ import com.cloudera.flume.handlers.endtoend.AckChecksumChecker;
 import com.cloudera.flume.handlers.endtoend.AckChecksumInjector;
 import com.cloudera.flume.handlers.endtoend.ValueDecorator;
 import com.cloudera.flume.handlers.hbase.HBaseEventSink;
+import com.cloudera.flume.handlers.hbase.Attr2HBaseEventSink;
 import com.cloudera.flume.handlers.hdfs.CustomDfsSink;
 import com.cloudera.flume.handlers.hdfs.DFSEventSink;
 import com.cloudera.flume.handlers.hdfs.EscapedCustomDfsSink;
@@ -143,6 +144,7 @@ public class SinkFactoryImpl extends SinkFactory {
 
       // experiemental
       { "hbase", HBaseEventSink.builder() },
+      { "attr2hbase", Attr2HBaseEventSink.builder() },
 
       // deprecated
       { "thrift", ThriftEventSink.builder() },


  1. 
      
  2. Thanks! for writing the initial hbase sink. 
    From my understanding of the code, we define a prefix and look for same in the event. What is the plan to make this generic other than looking for attributes with pre-defined prefix ?
  3. 
      
  1. 
      
  2. It can be defined by user, take a look at class javadoc for "attrPrefix" param, there's some info. Also, I'd add that in case the prefix = "" then all event's attributes written into Put.
  3. 
      
  1. 
      
  2. Thanks! for clarifying it. Lgtm, I can add to Patrick's branch and test, if others don't have any suggestion/comments.
  3. 
      
  1. 
      
  2. setScannerCaching", may also be required.
    Or is there some reason to avoid it ?
  3. I guess, Attr2HBaseEventSink must be flexible to include other optional parameters like setScannerCaching or some other parameter of Htable...
  4. Some may need "checkAndPut" instead of put.
    Won't this be needed in future for "end to end reliability" ?
    Should it also be configurable?
  5. 
      
  1. 
      
  2. Hm.. We don't actually perform scans in a sink. Don't see a reason for making their respective settings configurable.
  3. Re setScannerCaching - please, see above. Re other params - I think the ones which we included (writeToWal, client buffer size) are the main (the only?) ones which affect write performance (from those we can configure on the client side).
  4. Making use of checkAndPut during "bulk import" (the main use-case of a sink) might be too complicated. Can't figure out the use-case for this. Any ideas of a possible usage scenario?
    
    It also becomes very hard to make sink configurable for checkAndPut. It might be easier (and more usable) for users to implement custom sink with their app's logic in it. Thoughts?
    1. Thanks Alex for clarifying, I am new to HBase and was curious about these parameters. If others have no comments, LGTM.
      Alex, lets push the first cut into Cloudera/flume. Do you have a branch/fork with these changes ? -Thanks
  5. 
      
  1. Alex,
    
    This looks pretty good, clean, and is well commented.  Awesome!
    
    I have a few minor things that you should address:
    1) a few minor nits about comments, 
    2) a concern with an error checking case (potential ArrayOutOfBounds problem) that you should define, and ideally add a test case for
    3) some state checking (no double opens, check to prevent NPE on close) that would be nice.
  2. this would ideally comment about the default settings.
  3. Probably want to javadoc comment that configuration here is HBase Configuration (instead of Hadoop/Hdfs/Flume/MapRed, etc)
  4. Add string message (extra arg) when this fails.  This is user visible.
  5. So you expect something like "2hb_user:name". 
    
    What happens if there is no ':' chars such as "prefix_thing_attr"?
    
    I think this will cause an ArrayOutOfBoundsException.  Maybe check size before usage and ignore if not long enough?
  6. Probably want to check if table!=null before calling close
  7. Probably want to check if table==null before allow this to create a new table.  (two opens without a close or error in between is illegal behavior, and should through an IllegalStateException if this happens).
  8. 
      
  1. Thank you for reviewing the code. All comments are valuable.
    I also did some small refactoring and added few more simple tests.
  2. Agree. Fixed.
  3. added. However haven't noticed anywhere in Flume code guys using it. (Hint).
    1. As code gets added or revisited we update.  Admittedly, when we started we were not consistent about this.
  4. Hm... It might be even more sophisticated logic needed.
    
    In case of not "well-formatted" attribute key, if "system" column family is specified and there's enough info (in an attribute key) to figure out qualifier then I'd put the data into "system" column family.
    It is also very convenient when specifying attrPrefix="" (empty string). I added changes, please take a look at more details in javadoc
  5. yeah, although I think it shouldn't invoke close() if open() isn't succeeded. Also shouldn't invoke multiple times.
    
    Aha! On the other hand, when smth like LazyOpenSource/Sink is used, then open() might have never been called before close().
    
    btw, that looks strange in LazyOpenSource, that it invokes close() even if source wasn't "actually open". So it looks like close() can be used to e.g. free some resources which where used in ctor.. Don't think it's a good idea to do anything apart from simplest initialization in ctors..
    
    Also it looks like we need to set table to null, for proper handling multiple cycles of open/close. This is missed from the FLUME-6 code.
    1. You are correct about resources and ctors.  All resource grabbing (like getting ports to bind to, connections, file handles etc) happen in open and are released in close.  It has been specified in the updated version of the plugin docs (FLUME-155).  If close is missing setting the table to null in FLUME-6, then yup, it was a bug there.
  6. 
      
  1. With patches either way is fine -- reviewboard can generate diffs between the diffs.  For example, if full patch 1 uploaded, and then full but different patch 2 uploaded, reviewboard can generate diff between 1 and 2.  If a 3rd full one is uploaded, it can do diffs between 1 and 3 as well.
    
    The new logic seems to be pretty confusing -- it took me a long time to review and I don't quite understand why some of the tests have the values they do.  Some seem inconsistent from a users point of view (Test lines 229-231).  Would it would be more straightforward if you did the split(":",2) and did nothing on incomplete cases instead of implicitly doing things?
    1. Maybe only allow "pre_colfam:col" (and possibly "pre_:col") but not "pre_colfam:" and "pre_:"?
  2. how about "native" event data
  3. also should note that row key is nanos if not present as attrPrefix not present.
    
    I feel that all of this implicit rowkey stuff is difficult to reason about, and could result in tables that have data with different schemas being inserted.  
    
    Let's say one event had the prefix, and then another did not.  Both will be put but have semantically different keys.  Yikes!
  4. qualifier == col name right?  col is in hbase javadoc but qualifier is not -- where did that term come from?
    
    Ah -- javadoc for hbase 20.2 says col and colfam, 20.6 says family and qualifier 
    
    (Put.get method was not present in 20.2)
  5. what about the other fields?
  6. Acutally, my preference is to include body by default. 
    
    Code style -- keep TODO but remove commented out code.
  7. comment about what entries are?
    
    what is the meaning of the string and byte[] (I think it is col and value, but I have dig around a little to figure this out).
  8. I think this is right but is dense and requires a bit of effort to grok. 
    
    How about inverting some of the test and returning early to reduce the nesting depth?
    
    Since there is  some expression duplication, maybe  give intermediate expressions boolean vars to make intent clear?
  9. maybe return here.
  10. maybe return here.
    
  11. if nothing happens and we attempted the split, warn?
  12. This is straightforward to me.
  13. add checks to see that "native"/"system fields are *not* present?
  14. Ah, ok, nanos is default of attr prefix is missing.
  15. explicit check showing that "other" is not present?
  16. maybe rename "foo" to "row" or "key"
  17. Confused -- why isn't sysFam a family and the expected value 1?
    
    what is the default if no colfam is specified?  maybe add check to show that col "any" got value foo?
  18. Confused, I think here we have the "sysFam" colFamily and have a column called "any" with value "foo"?
  19. I think here we have the default colfam which is "sysFam" with col "any" with value "foo".
    
    
  20. I'm confused, I think we have colfam "any", with no col name (or colname ""?) -- so shouldn't this fall through and add no values?
  21. I'm confused why this isn't 2.
  22. I think this make sense -- no col fam specified and no col fam in entry so nothing happens.
  23. I think this make sense -- colfam "any", but no col so nothing happens?
  24. no colfam, col "any" but since no default colfam nothing happens.
  25. colfam overridden wiht "any" but no col so nothing happens.
  26. here we have a colfam and colnam so something finally does actually happen.
  27. so this has colfam "any" (or column?)
  28. colfam "sysFam", col "any"
    
    
  29. colfam "any" col == ???
    
    Why is there 1 present?  
  30. colfam "columnFam", col "columnName"
  31. 
      
  1. Right, it looks like I overcomplicated the thing with trying to figure out qualifier name from not properly formatted attribute name...
    I made the logic simpler and added a table which is very explanatory (I hope).
    
    One Q: do you think it makes sense to put sink params "writeBufferSize" and "writeToWal" *before* "attrPrefix"? It seems to me that users will use them much more often.
    
    Thank you!
    1. Sorry I fell behind here.  I think I've been waiting for a code update and you've been waiting for more feedback and we got deadlocked.  
      
      I think I need to understand what your use case is -- what is in your events and what you want to save in hbase.  It seems like our intuition for defaults doesn't match up and I'm trying to figure out why.
      
      In either case, I think some examples (and higher level docs perhaps) to understand how this interface is supposed to work. 
      
      My first instinct is that settings like "writeBufferSize" and "writeToWal" are settings I would have put into flume-site.xml and FlumeConfiguration with reasonable defaults.  This is because these feel like settings that don't necessarily need to be set differently for each hbasesink.  (I could be wrong there -- your comment seems to imply this).  
      
  2. What do you mean by event's "native" data? Body?
    1. This is really a nit.  (low priority)
      
      I think the name "native" or "default" event data when referring to e.getBody() makes more sense then "system" data.
  3. That is true. But I think it would be great to give users ability to manipulate (read as "decorate") row keys.
    
    I see one major scenario when attributes with prefixes are added to the event: when certain decorator(s) is used to "prepare" event attributes to be stored with this sink (if I'm not mistaken you suggested this decoupling as a correct way back at ML). Thus, to me it's about either *all* events was processed by decorator(s) or not. If it wasn't then default row key is used (currently nanos, but please see my comments at JIRA). If it was, then it's the responsibility of decorator(s) configuration (or, in case of custom one - implementation) to make sure row keys would be consistent.
    
    Does it make sense?
    1. I agree that the user should have the ability to specify an attribute or combination of attributes for a key.   
      
      The assumption that all events are processed by the decorator is not a valid assumption, and cannot be enforced without a bunch more work.  Some decorators inject new events into the stream that may not have been decoratored!  If we are unable to enforce we should at least try to be consistent.
      
      My concern is that inconsistent data can be inserted which seems odd to me.  I think I'd rather have events that don't have the specified attribute key *not* put any data into the hbase table *at all*!    This would allow all of the rows to have a consistent key.
      
      If the user didn't specify any attribute as a key,  nanos might make as a default.  Note in this case, all the rows would consistently have nanos as its key!
  4. I'd say that only these two are really relevant in this context and mean something "outside of Flume" (I got this understanding from FLUME-6 initially).
    Sorry, I just revised that "priority" is also meaningful here. Anything else you think might be included?
    
    P.S. hm.. there's no info about whether priority can absent (==null), will add implicit check.
    1. Comment says there is updated code to look at, but I don't see it.
      
      If the goal is to be the definitive hbase sink, then I think we need to either include all the fields, or allow the user to specify which ones they care about.  
      
      I think FLUME-6 was basically a first cut to show that flume data can get into hbase, and lay out a bare bones skeleton.  I think here we are trying to flesh it out a little more.
      
      
  5. That makes sense, especially if this sink will be used in a way of FLUME-6 (simplest use-case). Thus, attr2hbase("tableName", "colFam") would do the same thing as hbase("tableName", "colFam"). Given that attr2hbase has more config options it can substitute the older one, right?
    
    I'd need to add "include body" param to sink: this can save a lot of space for those who don't need it.
    1. The FLUME-6 was just a proof of concept, and never committed because we needed to figure out dependency and plugin story.  It would probaly be sufficient for the current main use case for flume (collecting logs).  nanos would be a decent key and the body of the log is exactly what we want to keep!
      
      I don't think that maintaining the interface of proof of concept's is a requirement, but I don't think we can remove the capability of writing bodies!
      
  6. right, not very readable. 
    Btw, "Entry here represents event's attribute, key is attribute name and value is attribute value", so it makes even more sense to put such comment here
    1. yup.  I was just saying I had to dig around to find it.
  7. all makes sense (and, according to Joshua Bloch;))
  8. I do verify that there are no extra column families in a Put ("Matching column families added", 2 above).
  9. I check here that there's only "system" column family (and it's name as well) in a Put.
    
    More detailed tests for adding attributes are in tests for addAttribute method
  10. Mmm... attribute ("any"->foo) isn't going anywhere: it's name doesn't contain required prefix.
    
    Do you think it makes sense to put all such attributes into "system" column family?
    I wouldn't do it by default. I'd leave user option to use decorator to add attribute with "proper" name here. Also, in case he/she needs all attributes to be placed into HBase table then attrPrefix="" will do that.
    1. That would be one way to do it.  I just want to make sure there is an ability to put the "system"/"native"/"default" attributes.  
      
      I think of the recording raw logs use case as the default use case.  Thus in my mind, in this use case we definitely would put the body in, and would likely want timestamps and generating host (which are default attributs).   
      
      Can you tell me more about the use case you have in mind? 
      
      
  11. As I said (in general comment to the review) - it looks like I overcomplicated stuff. Will simplify.
  12. 
      
  1. 
      
  2. "testDoNotWriteBody" would be better name
  3. 
      
  1. I'm sorry: I think it mostly stuck because of me. I agree that it's better/easier to move further in smaller steps, will comment on this at JIRA issue page. Some comments below. 
    
    Re use-case:
    The use-case I'm referring to is the following:
    * There's a system that produces logs (let's say simple text, tab-delimited format)
    * We want these logs to be imported (on ongoing basis) in HBase table so that:
      ** values are put into correspondent colFam:qualifier places (e.g. if one has user session id in log, then he/she wants this value to be put into "user:sessionId" column)
    Thus, we could further process the data as we want in a more convenient way than if we put the whole log line in some column and parse it each time logs are processed by some job.
    
    Re "writeBufferSize" and "writeToWal":
    We can put "writeBufferSize" into flume's config file, but I'd want it to be more manageable (can change it for particular collector, at particular time). Let me put an example which I believe explains this. Imagine some monitoring system which is feed with log data by flume collector gathering it from another systems. There can be a system with quite high load and thus a lot of data throughput and we want to set reasonable writeBufferSize to prevent too many puts into HBase. On the other hand we may want to serve another system which produces logs with a low rate. So that if we apply the same writeBufferSize value here this can result in logs being put into Hbase each several minutes (or hours, i.e. with too low rate). So, we might want to set up a lower buffer here.
    1. Alex, these explanations helps a lot!
      
      In the first part, I've been assuming and biased towards saving all information "raw" and then doing post processing.  Your description assumes that some processing is done and that you care about only certain meta data that you've extracted from the log. I think both are valid, and ideally this should support both.  The good thing is that I think we are close.
      
      The second part really is a great example that explains why you might want the writebuffer/wal settings to be parameterizable. I totally buy that story.
      
  2. Agree. If we expect row key to be set in some attribute and it is absent then we should not put this record in HBase at all. That looks consistent.
    
    The problem is, that by default, attrPrefix is set (to "2hb_"). It can be only overridden as sink param. So, if we use this logic we will never use nanos (or whatever event's data) as a key.
    
    To fix this, I have this suggestions:
    Add a param for telling where we are looking for a key (attribute name), if not specified we use default logic (currently getNanos)
    
    1. My main concern here is whatever interface we pick, we make it consistent and simple.  I think adding yet another parameter is a possibility, but could make this more confusing.
      
      Here's another suggestion that seems even simpler and even more consistent:  don't even use nanos as a key at all and just drop if the attr_prefix is not present.   Explicitly push the responsibility to create a new attribute with nanos info to the user.
  3. right: adding priority is missing. Have it in my code though: smth get lost. Will upload "freshest" diff
  4. 
      
  1. 
      
  2. One suggestion: We can make this more generic rather than column name always as "event" and writing to "familyName". Since we receiving input from user, we can decide
    writeBody="" -> Event's body not written
    writebody="someFamily:someCol" -> Event's body written to "someFamily:someCol
    writeBody="someCol" -> Events family written to familyName 
    
    I am suggesting this since Event's body can be quite huge and we may not want it in same ColFamily
    
    Maybe we can do in next cut ?
    1. My suggestion is to punt for future patch/review.  I'd like to get something in that we can iterate on and move design decisions into the JIRA instead of in the code review (ideally we should agree on interaces there and then just have to verify/confirm that code does that here.)
  3. Need to change, if writeBody is made more generic
    1. I suggest punting on making this generic for the time being.  
      
      Slightly different issue -- be careful about the semantics of Boolean.valueOf(String)!    If the string anystring other than case insensitve "true" is provided, it returns false! http://download.oracle.com/javase/1.4.2/docs/api/java/lang/Boolean.html#valueOf(java.lang.String)
      
      ex: Boolean.valueOf("yes") == false
      
  4. Since we are also writing Body, should it be still attr2hbase ? 
    1. I think the biggest confusion here is because we haven't nailed down the interface and semantics this sink supposed have.  This patch has been floating for a while, so I'd like to get a baseline in that we can iterate/evolve or get feedback on.
  5. 
      
  1. I think this is pretty good!  
    
    Please confirm my questions -- about the last comment.   
    
    My main suggestions are just a couple of style/readability nits.
    
  2. maybe rename var to "defaultFamilyName" or "systemFamilyName" to aid readability?
  3. nit: maybe rename to attemptToAddAttribute?  addAttribute seems like it should always succeed and doesn't convey that it might do nothing.
  4. src/java/com/cloudera/flume/handlers/hbase/Attr2HBaseEventSink.java (Diff revisions 1 - 3)
     
     
     
     
     
     
     
    nit: using intermediate variables with names would be more verbose but way easier to read!
    
    String table = argv[0]
    String defaultFamily = xx ? argv[1] : xx;
    boolean writeBody =  xx ? ... (argv[2]) : xx;
    String attrPrefix = xx ? argv[3] : xx;
    Long buffersize = xx ? xxx(argv[4])  : xx ;
    boolean writeToWal xx ? ..(argv[5]) : xx;
    return new xxx(table, defaultFamily, writeBody, attrPrefix, buffersize, writeToWal);
    
  5. nit: If a col isn't present does it return null (or do something else)?  Might be more direct if you look up the body and have it fail/return null.
  6. "system" should be replaced with "sysFam"?
  7. 
      
Review request changed

Change Summary:

Updated diff. Changes include:
* (major) write nothing to HBase when attribute with row key is absent
* (minor) code style changes
* (minor) small unit-test adjustments

Diff:

Revision 4 (+269 -43)

Show changes

  1. LGTM. I can commit this, after integrating it as a plug-in. 
  2. 
      
  1. lgtm.  Lets check this in on a separate branch, and work on it there until it has been plugin-ified.  
    1. I hope that patch file I submitted is enough. Please let me know if I need to do a fork and apply changes to it in github.
  2. 
      
Loading...