配合线程池定义可继承的线程变量InheritableThreadLocal_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > 配合线程池定义可继承的线程变量InheritableThreadLocal

配合线程池定义可继承的线程变量InheritableThreadLocal

 2017/8/29 19:08:47  xiangshouxiyang  程序员俱乐部  我要评论(0)
  • 摘要:说到可继承的线程变量,大家可能会想到jdk里的实现java.lang.InheritableThreadLocal。它拥有和线程变量ThreadLocal一样的功能,并且,在当前线程上创建一个新的线程实例Thread时,会把这些线程变量从当前线程传递给新的线程实例。(此时线程变量不再线程安全,需要考虑线程安全问题)InheritableThreadLocal:publicclassInheritableThreadLocal<T>extendsThreadLocal<T>
  • 标签:thread 继承 线程

? ? ? ? 说到可继承的线程变量,大家可能会想到jdk里的实现java.lang.InheritableThreadLocal。它拥有和线程变量ThreadLocal一样的功能,并且,在当前线程上创建一个新的线程实例Thread时,会把这些线程变量从当前线程传递给新的线程实例。(此时线程变量不再线程安全,需要考虑线程安全问题)

InheritableThreadLocal:

?

class="java" name="code">public class InheritableThreadLocal<T> extends ThreadLocal<T> {
    /**
     * Computes the child's initial value for this inheritable thread-local
     * variable as a function of the parent's value at the time the child
     * thread is created.  This method is called from within the parent
     * thread before the child is started.
     * <p>
     * This method merely returns its input argument, and should be overridden
     * if a different behavior is desired.
     *
     * @param parentValue the parent thread's value
     * @return the child thread's initial value
     */
    protected T childValue(T parentValue) {
        return parentValue;
    }

    /**
     * Get the map associated with a ThreadLocal.
     *
     * @param t the current thread
     */
    ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }

    /**
     * Create the map associated with a ThreadLocal.
     *
     * @param t the current thread
     * @param firstValue value for the initial entry of the table.
     * @param map the map to store.
     */
    void createMap(Thread t, T firstValue) {
        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
}

?该类继承了ThreadLocal并重写了和ThreadLocalMap相关的方法。这个ThreadLocalMap其实是java线程对象Thread类的两个属性

?

class Thread implements Runnable {
    .......

    /* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
    ThreadLocal.ThreadLocalMap threadLocals = null;

    /*
     * InheritableThreadLocal values pertaining to this thread. This map is
     * maintained by the InheritableThreadLocal class.
     */
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
    ........
}

?这两个ThreadLocalMap其实就是线程变量实际存放的地方,我们用了线程变量set内容,其实就是往ThreadLocalMap里put内容,key是你定义的ThreadLocal本身,value是你往线程变量set的内容。因为内容是存在线程本身上,所以,同一个线程跨了多少个方法都可以访问到,不同线程就访问不到或访问到不同的对象,实现了线程安全。

? ?ThreadLocal的set方法:

public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }

?其中

ThreadLocalMap map = getMap(t);

?这行代码,ThreadLocal的getMap:

ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }

?而InheritableThreadLocal的getMap:

/**
     * Get the map associated with a ThreadLocal.
     *
     * @param t the current thread
     */
    ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }

? ? 即InheritableThreadLocal重写了和ThreadLocalMap相关的方法,其实就是把set的内容放在线程对象的inheritableThreadLocals属性上, 而普通的ThreadLocal则是把set的内容放在线程对象的threadLocals属性上。我们平时新建一个线程,new Thread(),在Thread类实例化的时候调用init()方法:

private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc) {
        if (name == null) {
            throw new NullPointerException("name cannot be null");
        }

        Thread parent = currentThread();
        SecurityManager security = System.getSecurityManager();
        if (g == null) {
            /* Determine if it's an applet or not */

            /* If there is a security manager, ask the security manager
               what to do. */
            if (security != null) {
                g = security.getThreadGroup();
            }

            /* If the security doesn't have a strong opinion of the matter
               use the parent thread group. */
            if (g == null) {
                g = parent.getThreadGroup();
            }
        }

        /* checkAccess regardless of whether or not threadgroup is
           explicitly passed in. */
        g.checkAccess();

        /*
         * Do we have the required permissions?
         */
        if (security != null) {
            if (isCCLOverridden(getClass())) {
                security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
            }
        }

        g.addUnstarted();

        this.group = g;
        this.daemon = parent.isDaemon();
        this.priority = parent.getPriority();
        this.name = name.toCharArray();
        if (security == null || isCCLOverridden(parent.getClass()))
            this.contextClassLoader = parent.getContextClassLoader();
        else
            this.contextClassLoader = parent.contextClassLoader;
        this.inheritedAccessControlContext =
                acc != null ? acc : AccessController.getContext();
        this.target = target;
        setPriority(priority);
        if (parent.inheritableThreadLocals != null)
            this.inheritableThreadLocals =
                ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
        /* Stash the specified stack size in case the VM cares */
        this.stackSize = stackSize;

        /* Set thread ID */
        tid = nextThreadID();
    }

?其中

if (parent.inheritableThreadLocals != null)
            this.inheritableThreadLocals =
                ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

?这行代码把当前线程对象的inheritableThreadLocals属性传递给新建的线程对象inheritableThreadLocals属性,即实现了线程变量的传递。

? ? 上面是可继承的线程变量inheritableThreadLocals的实现原理。但是在实际的应用场景里,绝大多数都是使用线程池来进行多线程编程,所以jdk提供的inheritableThreadLocals类实用性不高。在线程池(ThreadPoolExecutor)中运行一个Runable实例并不会去新建一个线程,而是把Runable实例添加到队列中(在核心线程数已实例化满的时候),让ThreadPoolExecutor的workers去从队列里拿出Runable实例(这是一个典型的生产者消费者模式),然后运行Runable实例.run()方法。故jdk的inheritableThreadLocals这种实现方式没法适用。

? ? ?所以我就想着写一个能在executor上传递的可继承线程变量。而要实现这个功能,单单线程变量本身是不够的,还需要线程池的配合。通过我以前写的博客 http://xiangshouxiyang.iteye.com/blog/2354074 《线程池增强实现》给的思路,设计了自定义的InheritableThreadLocal:

package com.hcd;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;

/**
 * 可在特定线程池中继承的线程变量(配合InheritableThreadLocalExecutor使用)
 * Created by cd_huang on 2017/8/28.
 */
public class InheritableThreadLocal<T> extends ThreadLocal<T>{
	private static List<InheritableThreadLocal> inheritableExecutorThreadLocalList =new CopyOnWriteArrayList<>();

	public InheritableThreadLocal(){
        this(true);
	}

	public InheritableThreadLocal(boolean isAdd){
		/**
		 * 一般线程变量本身也不需要被垃圾回收
		 */
		if(isAdd){
			inheritableExecutorThreadLocalList.add(this);
		}
	}

	/**
	 * 从map里取出内容set线程变量(protected方法,可重写,但不提倡直接调用)
	 * @param map
	 */
	protected void setThreadLocalFromMap(Map map){
		T obj = (T)map.get(this);
		this.set(obj);
	}

	/**
	 * get线程变量装到map里(protected方法,可重写,但不提倡直接调用)
	 * @param map
	 */
	protected void getThreadLocalputMap(Map map){
		T obj = this.get();
		map.put(this,obj);
	}

	/**
	 * 移除掉线程变量(protected方法,可重写,但不提倡直接调用)
	 */
	protected void removeThreadLocal(){
        this.remove();
	}

	/**
	 * 把当前线程可传递的线程变量内容放在map里,在task放进线程池队列前调用
	 * @return
	 */
	public static Map<Object,Object> getThreadLocalsMap(){
		Map<Object,Object> threadLocalMap =new HashMap<>();
		List<InheritableThreadLocal> list =inheritableExecutorThreadLocalList;
		for(InheritableThreadLocal threadLocal:list){
			threadLocal.getThreadLocalputMap(threadLocalMap);
		}
		return threadLocalMap;
	}

	/**
	 * 把map里的内容重新set线程变量内容,在task真正运行run方法前调用
	 * @param threadLocalMap
	 */
	public static void setThreadLocalsFromMap(Map<Object,Object> threadLocalMap){
		List<InheritableThreadLocal> list =inheritableExecutorThreadLocalList;
		for(InheritableThreadLocal threadLocal:list){
			threadLocal.setThreadLocalFromMap(threadLocalMap);
		}
	}

	/**
	 * 把setThreadLocalsFromMap方法set的线程变量内容清空,在task真正运行run方法后调用
	 */
	public static void removeThreadLocals(){
		List<InheritableThreadLocal> list =inheritableExecutorThreadLocalList;
		for(InheritableThreadLocal threadLocal:list){
			threadLocal.removeThreadLocal();
		}
	}
}

?与之配合的InheritableThreadLocalExecutor:

package com.hcd;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.*;

/**
 * 支持可继承的线程变量的线程池(配合InheritableThreadLocal使用)
 * Created by cd_huang on 2017/8/29.
 */
public class InheritableThreadLocalExecutor extends ThreadPoolExecutor {

	private static Logger logger = LoggerFactory.getLogger(InheritableThreadLocalExecutor.class);

	public InheritableThreadLocalExecutor(int corePoolSize,
	                                      int maximumPoolSize,
	                                      long keepAliveTime,
	                                      TimeUnit unit,
	                                      BlockingQueue<Runnable> workQueue) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
	}

	public InheritableThreadLocalExecutor(int corePoolSize,
	                                      int maximumPoolSize,
	                                      long keepAliveTime,
	                                      TimeUnit unit,
	                                      BlockingQueue<Runnable> workQueue,
	                                      ThreadFactory threadFactory) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
	}
	public InheritableThreadLocalExecutor(int corePoolSize,
	                                     int maximumPoolSize,
	                                     long keepAliveTime,
	                                     TimeUnit unit,
	                                     BlockingQueue<Runnable> workQueue,
	                                     RejectedExecutionHandler handler) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
	}

	public InheritableThreadLocalExecutor(int corePoolSize,
	                                     int maximumPoolSize,
	                                     long keepAliveTime,
	                                     TimeUnit unit,
	                                     BlockingQueue<Runnable> workQueue,
	                                     ThreadFactory threadFactory,
	                                     RejectedExecutionHandler handler) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
	}

	/**
	 * 重写执行线程实例的方法
	 * @param command
	 */
	@Override
	public void execute(Runnable command) {
		if (command == null){
			throw new NullPointerException();
		}
		TaskWithThreadLocal task =new TaskWithThreadLocal(command,InheritableThreadLocal.getThreadLocalsMap());
		super.execute(task);
	}

	/**
	 * 新线程执行时带上指定的线程信息
	 * @param
	 */
	private static class TaskWithThreadLocal implements Runnable{
		private Map<Object,Object> threadLocalMap;
		private Runnable delegate;

		public TaskWithThreadLocal(Runnable delegate, Map<Object,Object> threadLocalMap){
			this.delegate =delegate;
			this.threadLocalMap =threadLocalMap;
		}

		/**
		 * 重写run方法,在执行run方法前设置线程变量,执行run方法后清除线程变量
		 * 同时,打印了运行时的异常信息,并吞掉了delegate.run()运行时的异常,不往外抛
		 * (线程池默认会在任务运行异常后抛出异常,并销毁掉线程对象本身,也就是如果每个任务都运行异常了,那么用线程池的效率还不如直接新建线程,详情见ThreadPoolExecutor类1123行runWorkers方法 )
		 * jdk线程池这样处理的意义应该是希望通过将异常抛出,将异常交给线程对象本身自带的异常处理拦截器或JVM默认的全局异常处理拦截器捕获并处理,
		 * 这里直接去调用拦截器处理,不往外抛异常,避免线程实例的销毁
		 */
		@Override
		public void run() {
			InheritableThreadLocal.setThreadLocalsFromMap(threadLocalMap);
			try{
				try{
					delegate.run();
					//由于callable的call()方法执行过程的异常会被它的调用上级FutureTask的run()方法中处理而使异常不往外抛,为了打印异常日志这里统一进行异常日志打印的处理
					if(delegate instanceof FutureTask){
						try{
							((FutureTask)delegate).get();
						}catch (Throwable e){
							logger.error(e.getMessage(),e);
							Thread.currentThread().getUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),e);
						}
					}
				}catch (Throwable e){
					logger.error(e.getMessage(),e);
					Thread.currentThread().getUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),e);
				}
			}finally {
				InheritableThreadLocal.removeThreadLocals();
			}
		}
	}
}

?在Runable实例放进线程池的时候,new 一个TaskWithThreadLocal类,把线程变量放到threadLocalMap里面。实际放到线程池队列里的是TaskWithThreadLocal,TaskWithThreadLocal类的run方法执行时,会执行真正想执行的Runable实例的run方法。用TaskWithThreadLocal类传递threadLocalMap内容,有点类似jdk的InheritableThreadLocal类在Thread实例化时传递ThreadLocal.ThreadLocalMap inheritableThreadLocals。

? ? ?举个例子,比如我们用线程变量记录userId。UserIdUtil:

public class UserIdUtil {
	private static final InheritableThreadLocal<String> userIdLocal = new InheritableThreadLocal<>();

	public static String getUserId(){
		return userIdLocal.get();
	}

	public static void setUserId(String userId){
		userIdLocal.set(userId);
	}

	public static void removeUserId(){
		userIdLocal.remove();
	}
}

只需要把原本的ThreadLocal类改成InheritableThreadLocal类,即可在InheritableThreadLocalExecutor线程池中传递线程变量。

?

?

monospace;">?

?

?

发表评论
用户名: 匿名