+-
LocalExecutorManager.java 用于将线程池进行集中化管理,并且启动监控线程。监控间隔为5秒一次。
ExecutorMonitor.java 线程监控

LocalExecutor.java 定义自己的线程池实体
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class LocalExecutor {
private String name;
private int poolSize;
private int blockQueueWarningSize;
private ExecutorService executorService = null;
public LocalExecutor(String name, int poolSize, int blockQueueWarningSize) {
this.name = name;
this.poolSize = poolSize;
this.blockQueueWarningSize = blockQueueWarningSize;
this.executorService = Executors.newFixedThreadPool(poolSize);
}
public String getName() {
return name;
}
public int getPoolSize() {
return poolSize;
}
public int getBlockQueueWarningSize() {
return blockQueueWarningSize;
}
public ExecutorService getExecutorService() {
return executorService;
}
}
LocalExecutorManager.java 用于将线程池进行集中化管理,并且启动监控线程。监控间隔为5秒一次。
如果需要的话,可以自己改
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class LocalExecutorManager {
private static ConcurrentHashMap<String, LocalExecutor> hashMap = null;
private LocalExecutorManager() {
};
private static void init() {
hashMap = new ConcurrentHashMap<>();
hashMap.put("processlog", new LocalExecutor("processlog", 5, 10));
// add other thread
// hashMap.put("processCompareData", new LocalExecutor("processCompareData", 12, 100));
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
service.scheduleAtFixedRate(new ExecutorMonitor(hashMap), 1, 5, TimeUnit.SECONDS);
}
public static ExecutorService getExecutorService(String name) {
if (null == hashMap) {
init();
}
if (null != hashMap && hashMap.containsKey(name)) {
return hashMap.get(name).getExecutorService();
}
return null;
}
}
ExecutorMonitor.java 线程监控
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
public class ExecutorMonitor implements Runnable {
private ConcurrentHashMap<String, LocalExecutor> hashMap = null;
public ExecutorMonitor(ConcurrentHashMap<String, LocalExecutor> hashMap) {
this.hashMap = hashMap;
}
@Override
public void run() {
for (ConcurrentHashMap.Entry<String, LocalExecutor> items : hashMap.entrySet()) {
LocalExecutor executor = items.getValue();
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor.getExecutorService();
if (threadPoolExecutor.getQueue().size() > executor.getBlockQueueWarningSize()) {
System.out.println("Thread name:" + executor.getName() + ", Thread max:" + executor.getPoolSize() + ", Thread queue:"
+ threadPoolExecutor.getQueue().size() + ", Exceed warning thread:" + executor.getBlockQueueWarningSize());
}
}
}
}
TestLocalExecutormanager.java 测试
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import org.junit.Test;
public class TestLocalExecutormanager {
@Test
public void test() throws InterruptedException {
ExecutorService executor = LocalExecutorManager.getExecutorService("processlog");
int maxNum = 100;
CountDownLatch latch = new CountDownLatch(maxNum);
for (int i = 0; i < maxNum; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
});
}
latch.await();
executor.shutdown();
// System.out.println("==shutdown==");
// Thread.sleep(2 * 60 * 1000);
System.out.println("==end==");
}
}