dubbo 剖析:记一个异步方法调用的坑_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > dubbo 剖析:记一个异步方法调用的坑

dubbo 剖析:记一个异步方法调用的坑

 2018/5/22 18:11:46  就是我leonardc  程序员俱乐部  我要评论(0)
  • 摘要:注:文章中的坑出现在2.5.4版本之前,这个坑在2.5.4版本已经得到修复。一、问题描述问题描述场景描述,如上图所示:客户端远程异步调用服务A,服务A在处理客户端请求的过程中需要远程同步调用服务B,服务A从服务B的响应中取数据时,得到的是null!!!二、原因分析RPC请求响应参数传递过程2.1Client的请求发送过程1)Client在发起RPC调用请求前,将请求参数构建成RpcInvocation;2)Client在发起RPC调用请求前,会经过Filter处理
  • 标签:方法 一个 异步

?

注:文章中的坑出现在2.5.4版本之前,这个坑在2.5.4版本已经得到修复。

一、问题描述

class="alignCenter" alt="" style="max-width: 96%; height: auto; vertical-align: middle; border-style: none; margin: 0px auto 10px; text-align: center; display: block;" src="/Upload/Images/2018052218/6FDE605FCC635835.jpg">

问题描述

场景描述,如上图所示:

monospace; border-radius: 3px; background-color: #f7f7f9; border: 0px; white-space: normal; font-weight: 600; font-size: 14px;">客户端?远程异步调用?服务A?,?服务A?在处理客户端请求的过程中需要远程同步调用?服务B?,服务A?从?服务B?的响应中取数据时,得到的是?null

!!!

二、原因分析

RPC请求响应参数传递过程

2.1 Client的请求发送过程

1)Client在发起RPC调用请求前,将请求参数构建成?RpcInvocation?;

2)Client在发起RPC调用请求前,会经过Filter处理:

  • ConsumerContextFilter?会将请求信息,如invoker、invocation、Address等,写入?RpcContext?;
@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        RpcContext.getContext()
                .setInvoker(invoker)
                .setInvocation(invocation)
                .setLocalAddress(NetUtils.getLocalHost(), 0)
                .setRemoteAddress(invoker.getUrl().getHost(),
                        invoker.getUrl().getPort());
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);
        }
        try {
            return invoker.invoke(invocation);
        } finally {
            RpcContext.getContext().clearAttachments();
        }
    }

}

3)Client在发起RPC调用请求前,会经过AbstractInvoker:

  • AbstractInvoker?会将?RpcContext?中的?attachments?内容写入到?RpcInvocation?,以实现附加参数的传递;
Map<String, String> context = RpcContext.getContext().getAttachments();
        if (context != null) {
            invocation.addAttachmentsIfAbsent(context);
        }
  • AbstractInvoker?会从RPC请求参数?URL?中?ASYNC_KEY?的值,并设置到?RpcInvocation?的attachment?中;
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
           invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
       }

4)Client在发起RPC调用请求时,会经过DubboInvoker:

  • DubboInvoker会优先从?RpcInvocation?的?attachment?中获取并判断?ASYNC_KEY?是否为true,以实现消费端的异步调用;
public static boolean isAsync(URL url, Invocation inv) {
        boolean isAsync;
        //如果Java代码中设置优先.
        if (Boolean.TRUE.toString().equals(inv.getAttachment(Constants.ASYNC_KEY))) {
            isAsync = true;
        } else {
            isAsync = url.getMethodParameter(getMethodName(inv), Constants.ASYNC_KEY, false);
        }
        return isAsync;
    }

5)Client在发起RPC调用请求时,会将?RpcInvocation?作为调用参数传递给服务提供方:

  • RpcInvocation?中的扩展属性?attachments?,实现了请求调用扩展信息传递的功能;

2.2 服务A的请求接收和执行过程

1)服务端在接收到RPC请求,调用真正实现接口前,会经过?ContextFilter?。

  • ContextFilter?会将请求信息,如invoker、invocation、Address等,写入?RpcContext?;
  • ContextFilter?会将请求参数?RpcInvocation?的?attachments?扩展信息取出,过滤掉某些特定KEY之后,将其余扩展属性设置到当前?RpcContext?的?attachments?中;
Map<String, String> attachments = invocation.getAttachments();
        if (attachments != null) {
            attachments = new HashMap<String, String>(attachments);
            attachments.remove(Constants.PATH_KEY);
            attachments.remove(Constants.GROUP_KEY);
            attachments.remove(Constants.VERSION_KEY);
            attachments.remove(Constants.DUBBO_VERSION_KEY);
            attachments.remove(Constants.TOKEN_KEY);
            attachments.remove(Constants.TIMEOUT_KEY);
            attachments.remove(Constants.ASYNC_KEY);//清空消费端的异步参数,2.5.4版本才新加进去的
        }
        RpcContext.getContext()
                .setInvoker(invoker)
                .setInvocation(invocation)
                .setAttachments(attachments)
                .setLocalAddress(invoker.getUrl().getHost(),
                        invoker.getUrl().getPort());

其中?attachments.remove(Constants.ASYNC_KEY);//清空消费端的异步参数?这行代码是在dubbo的2.5.4版本才加进去的,也就是之前的版本中并没有这行代码。

2)在2.5.4版本之前,对于?Client?发来的异步调用请求,其?RpcInvocation?参数中包含了?ASYNC=true?的?attachment?扩展信息:

  • 此时?ASYNC=true?的这个扩展信息就会被设置到服务A的?RpcContext?的扩展属性中;
  • 在?服务A?处理RPC调用,执行实际接口实现类的逻辑时,因为依赖的?服务B?,所以会继续发送RPC调用请求给?服务B?;
  • 服务A?调用?服务B?时,?服务A?的?RpcContext?的扩展属性会被写入到?A -> B?的?RpcInvocation?参数中,这就导致?ASYNC=true?的扩展属性参数被误传到?A -> B?的?RpcInvocation?参数中,进而导致在服务A发起RPC请求调用时触发了错误的异步调用逻辑;
  • 此时?服务A?获取到的RPC执行结果?RpcResult?的内容当然是个空;
else if (isAsync) {   //1. 异步,有返回值
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {    //3. 异步->同步(默认的通信方式)
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }

以上就是这个坑的产生原因

三、解决方法

自己写了个Filter,添加到Dubbo服务提供方接收请求后、实际处理请求前的Filter执行链中。

从请求参数?URL?中解析出?ASYNC?扩展参数标识,而不依赖?RpcInvocation?中的值。

@Activate(group = {Constants.PROVIDER}, order = -999)
public class DubboSyncFilter implements Filter {
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        //避免RpcContext透传,使用配置文件的async
        boolean isAsync = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false);
        RpcContext.getContext().setAttachment(Constants.ASYNC_KEY, String.valueOf(isAsync));
        return invoker.invoke(invocation);
    }
}
上一篇: JAVA File URL平台兼容性 下一篇: 没有下一篇了!
发表评论
用户名: 匿名