Review Board 1.6.3

FLUME-752: Refactor HBase sinks to re-use common code/functionality

Review Request #1900 - updated 2 years, 8 months ago

Alex Baranau Reviewers
flume
FLUME-752
None flume
As agreed in comments at https://review.cloudera.org/r/1847/ this is the part of patch dedicated solely to refactoring of HBase sinks.

New base AbstractHBaseSink was extracted which holds common functionality (common sink attributes which are basically HBase client attributes and HTable connection management code: open, close, etc.)

 
plugins/flume-plugin-hbasesink/src/main/java/com/cloudera/flume/hbase/AbstractHBaseSink.java
New File

   
1
/**

   
2
 * Licensed to Cloudera, Inc. under one

   
3
 * or more contributor license agreements.  See the NOTICE file

   
4
 * distributed with this work for additional information

   
5
 * regarding copyright ownership.  Cloudera, Inc. licenses this file

   
6
 * to you under the Apache License, Version 2.0 (the

   
7
 * "License"); you may not use this file except in compliance

   
8
 * with the License.  You may obtain a copy of the License at

   
9
 *

   
10
 *     http://www.apache.org/licenses/LICENSE-2.0

   
11
 *

   
12
 * Unless required by applicable law or agreed to in writing, software

   
13
 * distributed under the License is distributed on an "AS IS" BASIS,

   
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

   
15
 * See the License for the specific language governing permissions and

   
16
 * limitations under the License.

   
17
 */

   
18
package com.cloudera.flume.hbase;

   
19

   

   
20
import java.io.IOException;

   
21
import java.util.ArrayList;

   
22
import java.util.Arrays;

   
23
import java.util.List;

   
24

   

   
25
import com.cloudera.flume.conf.Context;

   
26
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;

   
27
import com.cloudera.flume.core.Event;

   
28
import com.cloudera.flume.core.EventSink;

   
29
import com.cloudera.util.Pair;

   
30
import com.google.common.base.Preconditions;

   
31
import org.apache.hadoop.conf.Configuration;

   
32
import org.apache.hadoop.hbase.HBaseConfiguration;

   
33
import org.apache.hadoop.hbase.HColumnDescriptor;

   
34
import org.apache.hadoop.hbase.client.HTable;

   
35
import org.apache.hadoop.hbase.client.Put;

   
36
import org.slf4j.Logger;

   
37
import org.slf4j.LoggerFactory;

   
38

   

   
39
/**

   
40
 * Implements basic functionality for HBase sinks

   
41
 *

   
42
 */

   
43
public class AbstractHBaseSink extends EventSink.Base {

   
44
  private static final Logger LOG = LoggerFactory.getLogger(AbstractHBaseSink.class);

   
45

   

   
46
  final String tableName; // not escapeable

   
47
  final long writeBufferSize;

   
48
  final boolean writeToWal;

   
49
  final Configuration config;

   
50

   

   
51
  private HTable table;

   
52

   

   
53
  public AbstractHBaseSink(String tableName, long writeBufferSize, boolean writeToWal, Configuration config) {

   
54
    Preconditions.checkNotNull(tableName, "Must specify table name.");

   
55
    this.tableName = tableName;

   
56
    this.writeBufferSize = writeBufferSize;

   
57
    this.writeToWal = writeToWal;

   
58
    this.config = config;

   
59
  }

   
60

   

   
61
  @Override

   
62
  synchronized public void close() throws IOException, InterruptedException {

   
63
    if (table != null) {

   
64
      table.close(); // performs flushCommits() internally, so we are good when

   
65
                     // autoFlush=false

   
66
      table = null;

   
67
      LOG.info("HBase sink successfully closed");

   
68
    } else {

   
69
      LOG.warn("Double close of HBase sink");

   
70
    }

   
71

   

   
72
  }

   
73

   

   
74
  @Override

   
75
  synchronized public void open() throws IOException, InterruptedException {

   
76
    if (table != null) {

   
77
      throw new IllegalStateException(

   
78
          "HTable is already initialized. Looks like sink close() hasn't been proceeded properly.");

   
79
    }

   
80
    // This instantiates an HTable object that connects you to

   
81
    // the tableName table.

   
82
    table = new HTable(config, tableName);

   
83
    if (writeBufferSize > 0) {

   
84
      table.setAutoFlush(false);

   
85
      table.setWriteBufferSize(writeBufferSize);

   
86
    }

   
87
    LOG.info("HBase sink successfully opened");

   
88
  }

   
89

   

   
90
  protected void write(Put p) throws IOException, InterruptedException {

   
91
    p.setWriteToWAL(writeToWal);

   
92
    table.put(p);

   
93
  }

   
94

   

   
95
  protected final HTable getTable() {

   
96
    return table;

   
97
  }

   
98
}
plugins/flume-plugin-hbasesink/src/main/java/com/cloudera/flume/hbase/Attr2HBaseEventSink.java
Revision 90e342d New Change
 
plugins/flume-plugin-hbasesink/src/main/java/com/cloudera/flume/hbase/HBaseSink.java
Revision 50cd30d New Change
 
plugins/flume-plugin-hbasesink/src/test/java/com/cloudera/flume/hbase/TestHBaseSink.java
Revision 95c0e3b New Change
 
  1. plugins/flume-plugin-hbasesink/src/main/java/com/cloudera/flume/hbase/AbstractHBaseSink.java: Loading...
  2. plugins/flume-plugin-hbasesink/src/main/java/com/cloudera/flume/hbase/Attr2HBaseEventSink.java: Loading...
  3. plugins/flume-plugin-hbasesink/src/main/java/com/cloudera/flume/hbase/HBaseSink.java: Loading...
  4. plugins/flume-plugin-hbasesink/src/test/java/com/cloudera/flume/hbase/TestHBaseSink.java: Loading...