单机应用内,在进程内部,我们可以使用ThreadLocal传递应用上下文的方式. 当前的 Spring Secrucity , Spring TransactionManager, ?Log4J MDC, Struts2 ActionContext等等应用场景随处可见.
?
但在是分布式系统下,由于不是在同一个进程内,所以无法使用ThreadLocal. 那么什么是分布式ThreadLocal呢?就是将一个系统中的ThreadLocal信息可以传递至下一个系统,将两者的调用可以关联起来。如对应用有一个调用,我们生成一个请求ID (traceId),在后面所有分布式系统调用中,可以通过这个traceId将所有调用关联起来,这样查找调用日志都将十分方便.
?
我们现在使用的通讯协议,一般都包含两部分:Header,Body. 如 Soap Header,Http Header. 通过自定义Header,可以带上我们的自定义信息。 然后在服务器端解析Header,再得到自定义信息。那么就可以完成Distributed ThreadLocal的功能。
?
?
?
如上图,通过两个拦截器,client在调用之前,将DistrbiutedThreadLocal中的信息放在soap header中,在服务端方法调用之前,从soap header中取回?DistrbiutedThreadLocal信息。
?
?
以下为CXF webservice的实现代码,一个DistributedThreadLocal及增加了两个拦截器. hessian 也可以自定义Header,完成传递.
?
DistributedThreadLocal
?
/** * 分布式 ThreadLocal, 存放在ThreadLocal中的数据可以传输至另外一台机器上 * @author badqiu */ public class DistributedThreadLocal { public static String DISTRIBUTED_THREAD_LOCAL_KEY_PREFIX = "tl_"; public static ThreadLocal<Map<String, String>> threadLocal = new ThreadLocal<Map<String, String>>(); public static void putAll(Map<String, String> map) { getMap().putAll(map); } public static void put(String key, String value) { getMap().put(key, value); } public static String get(String key) { Map<String, String> map = threadLocal.get(); if (map == null) return null; return (String) map.get(key); } public static Map<String, String> getMap() { Map<String, String> map = threadLocal.get(); if (map == null) { map = new HashMap(); threadLocal.set(map); } return map; } public static void clear() { threadLocal.set(null); } }
?
DistributedThreadLocalInSOAPHeaderInterceptor
?
/** * 输入(In)拦截器,用于从 WebService SOAP 的Header中取回DistributedThreadLocal中的信息,并存放在DistributedThreadLocal中 * * @author badqiu */ public class DistributedThreadLocalInSOAPHeaderInterceptor extends AbstractSoapInterceptor { private SAAJInInterceptor saajIn = new SAAJInInterceptor(); public DistributedThreadLocalInSOAPHeaderInterceptor() { super(Phase.PRE_PROTOCOL); getAfter().add(SAAJInInterceptor.class.getName()); } public void handleMessage(SoapMessage message) throws Fault { SOAPMessage doc = message.getContent(SOAPMessage.class); if (doc == null) { saajIn.handleMessage(message); doc = message.getContent(SOAPMessage.class); } Map<String,String> headers = toHeadersMap(doc); DistributedThreadLocal.putAll(headers); } private Map toHeadersMap(SOAPMessage doc) { SOAPHeader header = getSOAPHeader(doc); if (header == null) { return new HashMap(0); } Map<String,String> headersMap = new HashMap(); NodeList nodes = header.getChildNodes(); for(int i=0; i<nodes.getLength(); i++) { Node item = nodes.item(i); if(item.hasChildNodes()) { headersMap.put(item.getLocalName(), item.getFirstChild().getNodeValue()); } } return headersMap; } private SOAPHeader getSOAPHeader(SOAPMessage doc) { SOAPHeader header; try { header = doc.getSOAPHeader(); } catch (SOAPException e) { throw new RuntimeException(e); } return header; } }?
DistributedThreadLocalOutSOAPHeaderInterceptor
?
/** * 输出(Out)拦截器,用于将DistributedThreadLocal中的信息存放在 WebService SOAP 的Header中 * * @author badqiu */ public class DistributedThreadLocalOutSOAPHeaderInterceptor extends AbstractSoapInterceptor { public DistributedThreadLocalOutSOAPHeaderInterceptor() { super(Phase.WRITE); } public void handleMessage(SoapMessage message) throws Fault { List<Header> headers = message.getHeaders(); Map<String,String> threadlocalMap = DistributedThreadLocal.getMap(); for(Map.Entry<String, String> entry : threadlocalMap.entrySet()) { headers.add(getHeader(entry.getKey(), entry.getValue())); } } private Header getHeader(String key, String value) { QName qName = new QName(key); Document document = DOMUtils.createDocument(); Element element = document.createElement(key); element.appendChild(document.createTextNode(value)); SoapHeader header = new SoapHeader(qName, element); return (header); } }?
CXF spring配置文件:
?
server端:?
?
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jaxws="http://cxf.apache.org/jaxws" xmlns:cxf="http://cxf.apache.org/core" xsi:schemaLocation="http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd" default-lazy-init="true"> <description>Apache CXF的Web Service配置</description> <import resource="classpath:META-INF/cxf/cxf.xml" /> <import resource="classpath:META-INF/cxf/cxf-servlet.xml" /> <import resource="classpath:META-INF/cxf/cxf-extension-soap.xml" /> <!-- jax-ws endpoint定义 --> <jaxws:endpoint address="/hello" > <jaxws:implementor ref="hello" /> <jaxws:inInterceptors> <bean class="cn.org.rapid_framework.distributed.threadlocal.cfx.TraceIdInSOAPHeaderInterceptor"/> </jaxws:inInterceptors> </jaxws:endpoint> <!-- WebService的实现Bean定义 --> <bean id="hello" class="cn.org.rapid_framework.hessian.HessianTest.HelloImpl" /> </beans>
?
client端:
?
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jaxws="http://cxf.apache.org/jaxws" xmlns:cxf="http://cxf.apache.org/core" xsi:schemaLocation="http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd" default-lazy-init="true"> <description>Apache CXF Web Service Client端配置</description> <jaxws:client id="hello" serviceClass="cn.org.rapid_framework.hessian.HessianTest.Hello" address="http://localhost:8080/service/hello" > <jaxws:outInterceptors> <bean class="cn.org.rapid_framework.distributed.threadlocal.cfx.TraceIdOutSOAPHeaderInterceptor"/> </jaxws:outInterceptors> </jaxws:client> </beans>?
通过分布式应用上下文,暂时想到的几个应用场景.
?
1. Log4j MDC traceId传递. ?通过一个traceId,将所有相关的 操作所有的日志信息关联起来。
2. sessionId 传递, 让我们的应用也有状态,可以使用session什么的
3. Security(username,password)传递. 在需要安全调用的地方,避免污染接口,需要显式的在接口传递username,password. 相对应的 WSSecurity也可以走这个通道
?
?
?
?
?
?
分布式应用上下文的概念,全球首创,欢迎转载(因为google 搜索不到相关文章,或许早已经有相同的概念了,欢迎提醒我)。
?
?
?
?
?