package safeluck.drive.evaluation.util; import android.os.Handler; import android.os.Looper; import android.util.Log; import com.anyun.basecommonlib.MyLog; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * MyApplication2 * Created by lzw on 2019/12/12. 16:22:49 * 邮箱:632393724@qq.com * All Rights Saved! Chongqing AnYun Tech co. LTD * * * * *

@ClassName: CThreadPoolExecutor.java

* * * *

@Description: 自定义固定大小的线程池 * * 每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。 * * 线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。 * *

* * 合理利用线程池能够带来三个好处: * * 第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。 * * 第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。 * * 第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。 * * 我们可以通过ThreadPoolExecutor来创建一个线程池: * * new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler); * *

* * corePoolSize(线程池的基本大小): * * 当提交一个任务到线程池时,线程池会创建一个线程来执行任务, * * 即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。 * * 如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。 * *

* * runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。 * * ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。 * *

* * LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。 * * 静态工厂方法Executors.newFixedThreadPool()使用了这个队列。 * *

* * SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态, * * 吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。 * *

* * PriorityBlockingQueue:一个具有优先级的无限阻塞队列。 * *

* * maximumPoolSize(线程池最大大小): * * 线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。 * *

* * ThreadFactory: * * 用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。 * *

* * RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。 * * 这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。 * * AbortPolicy:直接抛出异常。 * * CallerRunsPolicy:只用调用者所在线程来运行任务。 * * DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。 * * DiscardPolicy:不处理,丢弃掉。 * * 当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。 * *

* * keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。 * * 所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。 * *

* * TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES), * * 毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

* *
* *

@author: FreddyChen

* *

@date: 2019/2/3 15:35

* *

@email: chenshichao@outlook.com

* * * * @see http://www.infoq.com/cn/articles/java-threadPool * */ public class CThreadPoolExecutor { private static final String TAG = CThreadPoolExecutor.class.getSimpleName(); private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();// CPU个数 // private static final int CORE_POOL_SIZE = CPU_COUNT + 1;// 线程池中核心线程的数量 // private static final int MAXIMUM_POOL_SIZE = 2 * CPU_COUNT + 1;// 线程池中最大线程数量 private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));// 线程池中核心线程的数量 private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;// 线程池中最大线程数量 private static final long KEEP_ALIVE_TIME = 30L;// 非核心线程的超时时长,当系统中非核心线程闲置时间超过keepAliveTime之后,则会被回收。如果ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,则该参数也表示核心线程的超时时长 private static final int WAIT_COUNT = 128; // 最多排队个数,这里控制线程创建的频率 private static ThreadPoolExecutor pool = createThreadPoolExecutor(); private static ThreadPoolExecutor createThreadPoolExecutor() { if (pool == null) { pool = new ThreadPoolExecutor( CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue(WAIT_COUNT), new CThreadFactory("CThreadPool", Thread.NORM_PRIORITY - 2), new CHandlerException()); } return pool; } public static class CThreadFactory implements ThreadFactory { private AtomicInteger counter = new AtomicInteger(1); private String prefix = ""; private int priority = Thread.NORM_PRIORITY; public CThreadFactory(String prefix, int priority) { this.prefix = prefix; this.priority = priority; } public CThreadFactory(String prefix) { this.prefix = prefix; } public Thread newThread(Runnable r) { Thread executor = new Thread(r, prefix + " #" + counter.getAndIncrement()); executor.setDaemon(true); executor.setPriority(priority); return executor; } } /** * 抛弃当前的任务 */ private static class CHandlerException extends ThreadPoolExecutor.AbortPolicy { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { MyLog.d(TAG, "rejectedExecution:" + r); MyLog.e(TAG, logAllThreadStackTrace().toString()); // Tips.showForce("任务被拒绝", 5000); if (!pool.isShutdown()) { pool.shutdown(); pool = null; } pool = createThreadPoolExecutor(); } } private static ExecutorService jobsForUI = Executors.newFixedThreadPool( CORE_POOL_SIZE, new CThreadFactory("CJobsForUI", Thread.NORM_PRIORITY - 1)); /** * 启动一个消耗线程,常驻后台 * * @param r */ public static void startConsumer(final Runnable r, final String name) { runInBackground(new Runnable() { public void run() { new CThreadFactory(name, Thread.NORM_PRIORITY - 3).newThread(r).start(); } }); } /** * 提交到其他线程去跑,需要取数据的时候会等待任务完成再继续 * * @param task * @return */ public static Future submitTask(Callable task) { return jobsForUI.submit(task); } /** * 强制清理任务 * * @param task * @return */ public static void cancelTask(Future task) { if (task != null) { task.cancel(true); } } /** * 从 Future 中获取值,如果发生异常,打日志 * * @param future * @param tag * @param name * @return */ public static T getFromTask(Future future, String tag, String name) { try { return future.get(); } catch (Exception e) { Log.e(tag, (name != null ? name + ": " : "") + e.toString()); } return null; } public static void runInBackground(Runnable runnable) { if (pool == null) { Log.i(TAG, "runInBackground: "); createThreadPoolExecutor(); } pool.execute(runnable); // Future future = pool.submit(runnable); // try { // future.get(); // } catch (InterruptedException e) { // e.printStackTrace(); // } catch (ExecutionException e) { // e.printStackTrace(); // } } private static Thread mainThread; private static Handler mainHandler; static { Looper mainLooper = Looper.getMainLooper(); mainThread = mainLooper.getThread(); mainHandler = new Handler(mainLooper); } public static boolean isOnMainThread() { return mainThread == Thread.currentThread(); } public static void runOnMainThread(Runnable r) { if (isOnMainThread()) { r.run(); } else { mainHandler.post(r); } } public static void runOnMainThread(Runnable r, long delayMillis) { if (delayMillis <= 0) { runOnMainThread(r); } else { mainHandler.postDelayed(r, delayMillis); } } // 用于记录后台等待的Runnable,第一个参数外面的Runnable,第二个参数是等待中的Runnable private static HashMap mapToMainHandler = new HashMap(); public static void runInBackground(final Runnable runnable, long delayMillis) { if (delayMillis <= 0) { runInBackground(runnable); } else { Runnable mainRunnable = new Runnable() { @Override public void run() { mapToMainHandler.remove(runnable); pool.execute(runnable); } }; mapToMainHandler.put(runnable, mainRunnable); mainHandler.postDelayed(mainRunnable, delayMillis); } } /** * 对runOnMainThread的,移除Runnable * * @param r */ public static void removeCallbackOnMainThread(Runnable r) { mainHandler.removeCallbacks(r); } public static void removeCallbackInBackground(Runnable runnable) { Runnable mainRunnable = mapToMainHandler.get(runnable); if (mainRunnable != null) { mainHandler.removeCallbacks(mainRunnable); } } public static void logStatus() { StringBuilder sb = new StringBuilder(); sb.append("getActiveCount"); sb.append(pool.getActiveCount()); sb.append("\ngetTaskCount"); sb.append(pool.getTaskCount()); sb.append("\ngetCompletedTaskCount"); sb.append(pool.getCompletedTaskCount()); Log.d(TAG, sb.toString()); } public static StringBuilder logAllThreadStackTrace() { StringBuilder builder = new StringBuilder(); Map liveThreads = Thread.getAllStackTraces(); for (Iterator i = liveThreads.keySet().iterator(); i.hasNext(); ) { Thread key = i.next(); builder.append("Thread ").append(key.getName()) .append("\n"); StackTraceElement[] trace = liveThreads.get(key); for (int j = 0; j < trace.length; j++) { builder.append("\tat ").append(trace[j]).append("\n"); } } return builder; } public static void main(String[] args) { for (int i = 0; i < 10000; i++) { final int index = i; System.out.println("index=" + index); CThreadPoolExecutor.runInBackground(new Runnable() { @Override public void run() { System.out.println("正在运行第[" + (index + 1) + "]个线程."); } }); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } }