Review Board 1.6.3

Unit test and fix for multi-threaded executor service

Review Request #1064 - updated 3 years, 6 months ago

Jonathan Gray Reviewers
trunk hbase
hbase-3139 stack
None hbase svn
See HBASE-3139

 
trunk/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
Revision 1026564 New Change
... 21 lines hidden [Expand]
22
import java.util.List;
22
import java.util.List;
23
import java.util.Map;
23
import java.util.Map;
24
import java.util.Map.Entry;
24
import java.util.Map.Entry;
25
import java.util.concurrent.BlockingQueue;
25
import java.util.concurrent.BlockingQueue;
26
import java.util.concurrent.ConcurrentHashMap;
26
import java.util.concurrent.ConcurrentHashMap;
27
import java.util.concurrent.PriorityBlockingQueue;
27
import java.util.concurrent.LinkedBlockingQueue;
28
import java.util.concurrent.ThreadPoolExecutor;
28
import java.util.concurrent.ThreadPoolExecutor;
29
import java.util.concurrent.TimeUnit;
29
import java.util.concurrent.TimeUnit;
30
import java.util.concurrent.atomic.AtomicInteger;
30
import java.util.concurrent.atomic.AtomicInteger;
31

   
31

   
32
import org.apache.commons.logging.Log;
32
import org.apache.commons.logging.Log;
... 6 lines hidden [Expand]
39
/**
39
/**
40
 * This is a generic executor service. This component abstracts a
40
 * This is a generic executor service. This component abstracts a
41
 * threadpool, a queue to which {@link EventHandler.EventType}s can be submitted,
41
 * threadpool, a queue to which {@link EventHandler.EventType}s can be submitted,
42
 * and a <code>Runnable</code> that handles the object that is added to the queue.
42
 * and a <code>Runnable</code> that handles the object that is added to the queue.
43
 *
43
 *
44
 * <p>In order to create a new service, create an instance of this class and 
44
 * <p>In order to create a new service, create an instance of this class and
45
 * then do: <code>instance.startExecutorService("myService");</code>.  When done
45
 * then do: <code>instance.startExecutorService("myService");</code>.  When done
46
 * call {@link #shutdown()}.
46
 * call {@link #shutdown()}.
47
 *
47
 *
48
 * <p>In order to use the service created above, call
48
 * <p>In order to use the service created above, call
49
 * {@link #submit(EventHandler)}. Register pre- and post- processing listeners
49
 * {@link #submit(EventHandler)}. Register pre- and post- processing listeners
... 191 lines hidden [Expand]
public EventHandlerListener unregisterListener(final EventHandler.EventType type) {
241
  }
241
  }
242

   
242

   
243
  /**
243
  /**
244
   * Executor instance.
244
   * Executor instance.
245
   */
245
   */
246
  private static class Executor {
246
  static class Executor {
247
    // default number of threads in the pool

   
248
    private int corePoolSize = 1;

   
249
    // how long to retain excess threads
247
    // how long to retain excess threads
250
    private long keepAliveTimeInMillis = 1000;
248
    final long keepAliveTimeInMillis = 1000;
251
    // the thread pool executor that services the requests
249
    // the thread pool executor that services the requests
252
    private final ThreadPoolExecutor threadPoolExecutor;
250
    final ThreadPoolExecutor threadPoolExecutor;
253
    // work queue to use - unbounded queue
251
    // work queue to use - unbounded queue
254
    BlockingQueue<Runnable> workQueue = new PriorityBlockingQueue<Runnable>();
252
    final BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
255
    private final AtomicInteger threadid = new AtomicInteger(0);

   
256
    private final String name;
253
    private final String name;
257
    private final Map<EventHandler.EventType, EventHandlerListener> eventHandlerListeners;
254
    private final Map<EventHandler.EventType, EventHandlerListener> eventHandlerListeners;
258

   
255

   
259
    protected Executor(String name, int maxThreads,
256
    protected Executor(String name, int maxThreads,
260
        final Map<EventHandler.EventType, EventHandlerListener> eventHandlerListeners) {
257
        final Map<EventHandler.EventType, EventHandlerListener> eventHandlerListeners) {
261
      this.name = name;
258
      this.name = name;
262
      this.eventHandlerListeners = eventHandlerListeners;
259
      this.eventHandlerListeners = eventHandlerListeners;
263
      // create the thread pool executor
260
      // create the thread pool executor
264
      this.threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxThreads,
261
      this.threadPoolExecutor = new ThreadPoolExecutor(maxThreads, maxThreads,
265
          keepAliveTimeInMillis, TimeUnit.MILLISECONDS, workQueue);
262
          keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q);
266
      // name the threads for this threadpool
263
      // name the threads for this threadpool
267
      ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
264
      ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
268
      tfb.setNameFormat(this.name + "-" + this.threadid.incrementAndGet());
265
      tfb.setNameFormat(this.name + "-%d");
269
      this.threadPoolExecutor.setThreadFactory(tfb.build());
266
      this.threadPoolExecutor.setThreadFactory(tfb.build());
270
    }
267
    }
271

   
268

   
272
    /**
269
    /**
273
     * Submit the event to the queue for handling.
270
     * Submit the event to the queue for handling.
... 14 lines hidden [Expand]
trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
Revision 1026564 New Change
 
trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Revision 1026564 New Change
 
trunk/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java
Revision 1026564 New Change
 
  1. trunk/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java: Loading...
  2. trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java: Loading...
  3. trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java: Loading...
  4. trunk/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java: Loading...