手动实现简单的线程池_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > 手动实现简单的线程池

手动实现简单的线程池

 2018/5/22 3:11:54  getthrough  程序员俱乐部  我要评论(0)
  • 摘要:手动实现简单的线程池写在前面:本文使用了BlockingQueue作为线程池实现的数据结构,利用生产者/消费者思想进行多任务的处理。实现方式比较简单,并没有完全实现所有方法,本文可作为线程池和同步队列的入门学习参考。受限于博主的姿势水平,本文中的一些方法肯定存在优化的空间及更好的实现方式,欢迎探讨。基于spring-boot编写,测试。1.自定义线程池接口packagecom.getthrough.threadpool.mythreadpool;/***<p>
  • 标签:实现 线程

手动实现简单的线程

写在前面:

本文使用了 BlockingQueue 作为线程池实现的数据结构,利用生产者/消费者思想进行多任务的处理。

实现方式比较简单,并没有完全实现所有方法,本文可作为线程池和同步队列的入门学习参考。

受限于博主的姿势水平,本文中的一些方法肯定存在优化的空间及更好的实现方式,欢迎探讨。

?

基于 spring-boot 编写,测试。

?

1. 自定义线程池接口

class="java" name="code">package com.getthrough.threadpool.mythreadpool;

/**
 * <p>This interface is a top interface that defined several necessary methods,
 * it imitates {@link java.util.concurrent.ExecutorService},
 * {@link java.util.concurrent.ThreadPoolExecutor}
 * for personal learning.</p>
 * @author: getthrough
 * @date: 2018/5/20
 * @description:
 * @version:
 */
public interface ThreadPool {

    /**
     * to execute the given task in the future,
     * it can be executed by a thread or a thread pool.
     * @param runnable the given task
     */
    void execute(Runnable runnable);

    /**
     * It will close the thread pool after all submitted tasked are executed,
     * and will not accept new tasks.
     */
    void shutdown();

    /**
     * test whether the thread pool has been shut down.
     * @return the boolean result.
     */
    boolean isShutdown();

}

?

2. 线程池的默认实现

package com.getthrough.threadpool.mythreadpool.impl;

import com.getthrough.threadpool.mythreadpool.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * @author: getthrough
 * @date: 2018/5/20
 * @description:
 * @version:
 */
public class DefaultThreadPool implements ThreadPool {

    public Logger logger = LoggerFactory.getLogger(DefaultThreadPool.class);

    /**
     * Workers queue, get the task from {@code tasks} and run the task.
     */
    private BlockingQueue<Worker> workers = new LinkedBlockingQueue<>(DEFAULT_POOL_SIZE);

    /**
     * The queue to accept the tasks.
     */
    private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue(MAX_POOL_SIZE);

    private int corePoolSize = 0;

    private int maxPoolSize = 0;

    /**
     * How long will the worker waits(keep alive) for the task if there is no task in tasks.
     */
    private volatile long aliveTime = 0L;

    /**
     * The default pool size.
     */
    private static final int DEFAULT_POOL_SIZE = 20;
    /**
     * The maximum pool size.
     */
    private static final int MAX_POOL_SIZE = 30;

    private volatile boolean isShutdown = false;

    public DefaultThreadPool() throws InterruptedException {
        this.corePoolSize = DEFAULT_POOL_SIZE;
        this.maxPoolSize = MAX_POOL_SIZE;
        new DefaultThreadPool(DEFAULT_POOL_SIZE, MAX_POOL_SIZE);
    }

    public DefaultThreadPool(int corePoolSize, int maxPoolSize) {
        if (corePoolSize <= 0 || maxPoolSize <= 0 || aliveTime < 0)
            throw new IllegalArgumentException("ERROR:arguments must greater than zero!");
        if (corePoolSize > maxPoolSize)
            throw new IllegalArgumentException("ERROR:corePoolSize can't be greater than maxPoolSize!");

        this.corePoolSize = corePoolSize;
        this.maxPoolSize = maxPoolSize;

        for (int i = 0; i < corePoolSize; i ++) {
            Worker worker = new Worker(getTask(0L));
            workers.add(worker);
            worker.start();
        }

    }

    @Override
    public void execute(Runnable runnable) {
        if (isShutdown) {
            logger.info("pool is closed, you should call start method");
            return;
        }

        if (workers.size() < corePoolSize) {
            Worker worker = new Worker(runnable);
            workers.add(worker);
            worker.start();
            logger.info("task is immediately got by work : {}", worker.getName());
        } else if (workers.size() == corePoolSize) {
            try {
                tasks.put(runnable);
                logger.info("task waiting in the task queue...");
            } catch (InterruptedException e) {
                logger.info("application is busy, please try again later!");
            }
        }

    }

    @Override
    public void shutdown() {
        // reject the new task
        isShutdown = true;

        for(;;) {
            if (tasks.size() == 0){

                // clear the work queue
                workers.clear();
                break;

            }
        }

        logger.info("shutting down the pool");

    }

    @Override
    public boolean isShutdown() {
        return workers.size() == 0;
    }


    private Runnable getTask(long timeOut) {

        try {
            return tasks.poll(timeOut, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;

    }

    public void start() {
        isShutdown = false;
    }


    private class Worker extends Thread{

        private Runnable task;

        Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {

            while ((task != null || (task = getTask(60L)) != null)) {
                try {
//                    if (!Thread.interrupted())
                        task.run();
                    logger.info("worker : {} has finished the task.", getName());
                } finally {
                    task = null;
                }

            }

        }
    }

    public int getCorePoolSize() {
        return corePoolSize;
    }

    public void setCorePoolSize(int corePoolSize) {
        this.corePoolSize = corePoolSize;
    }

    public int getMaxPoolSize() {
        return maxPoolSize;
    }

    public void setMaxPoolSize(int maxPoolSize) {
        this.maxPoolSize = maxPoolSize;
    }

}

?

3. 简单的 main 方法测试

package com.getthrough.threadpool;

import com.getthrough.threadpool.mythreadpool.ThreadPool;
import com.getthrough.threadpool.mythreadpool.impl.DefaultThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
 * @author: getthrough
 * @date: 2018/5/21
 * @description:
 * @version:
 */
public class TestClass {

    private static Logger logger = LoggerFactory.getLogger(DefaultThreadPool.class);

    public static void main(String[] args) throws InterruptedException {
        ThreadPool threadPool = new DefaultThreadPool();

        for (int i = 0; i < 22; i++) {
            threadPool.execute(()-> {
                logger.info("TASK produced");
            });
            try {
                TimeUnit.MILLISECONDS.sleep(50L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        TimeUnit.SECONDS.sleep(1L);
        threadPool.shutdown();
        logger.info("shutdown : {}", threadPool.isShutdown());
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                logger.info("task submit after shutdown");
            }
        });
        TimeUnit.SECONDS.sleep(1L);
        ((DefaultThreadPool)threadPool).start();
        logger.info("thread pool restarted ");

        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                logger.info("task submit after restart");
            }
        });
        TimeUnit.SECONDS.sleep(1L);
        threadPool.shutdown();


    }

}

?

完整代码获取:https://github.com/Getthrough/my-threadpool/tree/master?

?

?

发表评论
用户名: 匿名