+-
java 多线程统一管理

LocalExecutor.java 定义自己的线程池实体

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class LocalExecutor {
	private String name;

	private int poolSize;

	private int blockQueueWarningSize;

	private ExecutorService executorService = null;

	public LocalExecutor(String name, int poolSize, int blockQueueWarningSize) {
		this.name = name;
		this.poolSize = poolSize;
		this.blockQueueWarningSize = blockQueueWarningSize;
		this.executorService = Executors.newFixedThreadPool(poolSize);
	}

	public String getName() {
		return name;
	}

	public int getPoolSize() {
		return poolSize;
	}

	public int getBlockQueueWarningSize() {
		return blockQueueWarningSize;
	}

	public ExecutorService getExecutorService() {
		return executorService;
	}

}

LocalExecutorManager.java 用于将线程池进行集中化管理,并且启动监控线程。监控间隔为5秒一次。


如果需要的话,可以自己改

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class LocalExecutorManager {
	private static ConcurrentHashMap<String, LocalExecutor> hashMap = null;

	private LocalExecutorManager() {
	};

	private static void init() {

		hashMap = new ConcurrentHashMap<>();
		hashMap.put("processlog", new LocalExecutor("processlog", 5, 10));
		// add other thread
		// hashMap.put("processCompareData", new LocalExecutor("processCompareData", 12, 100));

		ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
		service.scheduleAtFixedRate(new ExecutorMonitor(hashMap), 1, 5, TimeUnit.SECONDS);
	}

	public static ExecutorService getExecutorService(String name) {
		if (null == hashMap) {
			init();
		}
		if (null != hashMap && hashMap.containsKey(name)) {
			return hashMap.get(name).getExecutorService();
		}
		return null;
	}
}

ExecutorMonitor.java 线程监控


import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;

public class ExecutorMonitor implements Runnable {
	private ConcurrentHashMap<String, LocalExecutor> hashMap = null;

	public ExecutorMonitor(ConcurrentHashMap<String, LocalExecutor> hashMap) {
		this.hashMap = hashMap;
	}

	@Override
	public void run() {
		for (ConcurrentHashMap.Entry<String, LocalExecutor> items : hashMap.entrySet()) {
			LocalExecutor executor = items.getValue();
			ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor.getExecutorService();
			if (threadPoolExecutor.getQueue().size() > executor.getBlockQueueWarningSize()) {
				System.out.println("Thread name:" + executor.getName() + ",  Thread max:" + executor.getPoolSize() + ", Thread queue:"
						+ threadPoolExecutor.getQueue().size() + ", Exceed warning thread:" + executor.getBlockQueueWarningSize());
			}
		}
	}
}


TestLocalExecutormanager.java 测试

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;

import org.junit.Test;

public class TestLocalExecutormanager {
	@Test
	public void test() throws InterruptedException {
		ExecutorService executor = LocalExecutorManager.getExecutorService("processlog");
		int maxNum = 100;
		CountDownLatch latch = new CountDownLatch(maxNum);
		for (int i = 0; i < maxNum; i++) {
			executor.execute(new Runnable() {

				@Override
				public void run() {
					// TODO Auto-generated method stub
					System.out.println(Thread.currentThread().getName());
					try {
						Thread.sleep(5000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					} finally {
						latch.countDown();
					}
				}
			});
		}
		latch.await();
		executor.shutdown();
		// System.out.println("==shutdown==");
		// Thread.sleep(2 * 60 * 1000);
		System.out.println("==end==");
	}
}