博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
dubbo服务暴露过程源码分析
阅读量:7044 次
发布时间:2019-06-28

本文共 28564 字,大约阅读时间需要 95 分钟。

hot3.png

本例以一个简单典型的服务发布为例,spring配置如下

//dubbo协议 
//zk注册中心
//服务接口public interface DemoService { public String sayHello(String name);}//实现public class DemoServiceImpl implements DemoService {public String sayHello(String name) {Random random=new Random();try {    Thread.sleep(800* random.nextInt(6));} catch (InterruptedException e) {    e.printStackTrace();}System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());return "Hello " + name + ", response form provider: " + RpcContext.getContext().getLocalAddress();}}

有博文 ,可以看到dubbo服务发布注册的启动方法有两个入口,

第一在ServiceBean类的onApplicationEvent()方法,在容器初始化完成后,执行暴露逻辑
第二在ServiceBean类的afterPropertiesSet()方法,当前servicebean属性构造完成后,执行暴露逻辑
具体暴露方法在其父类ServiceConfig的ServiceConfig方法里

//这是个同步的方法    public synchronized void export() {        if (provider != null) {            if (export == null) {                export = provider.getExport();            }            if (delay == null) {                delay = provider.getDelay();            }        }        if (export != null && !export) {            return;        }        //延迟暴露 通过delay参数配置的,延迟暴露,放入单独线程中。        if (delay != null && delay > 0) {            delayExportExecutor.schedule(new Runnable() {                public void run() {                    doExport();                }            }, delay, TimeUnit.MILLISECONDS);        } else {            //暴露方法            doExport();        }    }
/***     * 也是个同步的方法,暴露过程具体过程     */    protected synchronized void doExport() {        //....属性检查和赋值,代码略        //暴露过程        doExportUrls();    }     private void doExportUrls() {        //获取注册中心信息        List
registryURLs = loadRegistries(true); for (ProtocolConfig protocolConfig : protocols) { //多个协议,暴露多次 doExportUrlsFor1Protocol(protocolConfig, registryURLs); } } //暴露过程 private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List
registryURLs) { //属性的解析和赋值..... //..... 代码略..... //获取服务暴露范围,本地或者远程 String scope = url.getParameter(Constants.SCOPE_KEY); //配置为none不暴露 //不配默认,服务暴露远程同时在本地暴露 if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务) if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { //本地暴露 (***看这里***)关键1 exportLocal(url); } //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务) if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) { //有多个注册中心,暴露到多个注册中心 for (URL registryURL : registryURLs) { //url 添加dynamic 属性值 url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } //(***看这里***)关键2 //默认走到JavassistProxyFactory.getInvoker方法 Invoker
invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); //这里的invoker的url的协议是register类型 Exporter
exporter = protocol.export(invoker); exporters.add(exporter); } } else { //没有注册中心 ,只在本机ip打开服务端口,生成服务代理,并不注册到注册中心。 //(***看这里***)关键3 此处的url协议为dubbo Invoker
invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); Exporter
exporter = protocol.export(invoker); exporters.add(exporter); } } } this.urls.add(url); }

上面方法中有三个地方涉及到具体的服务暴露先看,关键1

本地暴露的逻辑

private void exportLocal(URL url) {        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {            URL local = URL.valueOf(url.toFullString())                    .setProtocol(Constants.LOCAL_PROTOCOL)//设置为injvm 协议                    .setHost(NetUtils.LOCALHOST)                    .setPort(0);            //这里的protocol是Protocol$Adpative的实例(spi机制)        //proxyFactory是ProxyFactory$Adpative实例(spi机制)            Exporter
exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); //放到暴露列表 exporters.add(exporter); logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry"); } }

ProxyFactory$Adpative的getInvoker方法

public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {        if (arg2 == null)             throw new IllegalArgumentException("url == null");        com.alibaba.dubbo.common.URL url = arg2;        String extName = url.getParameter("proxy", "javassist");        if (extName == null)            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);        //这里的extension默认是JavassistProxyFactory实例,	return extension.getInvoker(arg0, arg1, arg2);    }

JavassistProxyFactory的getInvoker方法

//proxy 是服务实现类,type是服务接口 public 
Invoker
getInvoker(T proxy, Class
type, URL url) { //利用Wrapper类通过服务接口生成对应的代理类。 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); //实现抽象类AbstractProxyInvoker抽象方法doInvoke,并调用(proxy, type, url)构造函数实例化匿名类 return new AbstractProxyInvoker
(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class
[] parameterTypes, Object[] arguments) throws Throwable { //这里调用代理类的invokeMethod方法 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }

AbstractProxyInvoker的构造方法

public AbstractProxyInvoker(T proxy, Class
type, URL url) { if (proxy == null) { throw new IllegalArgumentException("proxy == null"); } if (type == null) { throw new IllegalArgumentException("interface == null"); } if (!type.isInstance(proxy)) { throw new IllegalArgumentException(proxy.getClass().getName() + " not implement interface " + type); } this.proxy = proxy; this.type = type; this.url = url;//赋值ur到自身(invoker),这个后面用到 }

AbstractProxyInvoker的invoke方法

public Result invoke(Invocation invocation) throws RpcException {        try {	   //回调用doInvoke方法,会传入执行实例proxy,方法名和方法参数类型和值            return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));        } catch (InvocationTargetException e) {            return new RpcResult(e.getTargetException());        } catch (Throwable e) {            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);        }    }

这里以文章开头DemoService接口为例通过Wrapper.getWrapper返回的类代码,这里需要代码hack

package com.alibaba.dubbo.common.bytecode;import com.alibaba.dubbo.common.DemoService;import java.lang.reflect.InvocationTargetException;import java.util.Map;public class Wrapper0 extends Wrapper  implements ClassGenerator.DC{  public static String[] pns;  public static Map pts;  public static String[] mns;  public static String[] dmns;  public static Class[] mts0;  public String[] getPropertyNames()  {    return pns;  }  public boolean hasProperty(String paramString)  {    return pts.containsKey(paramString);  }  public Class getPropertyType(String paramString)  {    return (Class)pts.get(paramString);  }  public String[] getMethodNames()  {    return mns;  }  public String[] getDeclaredMethodNames()  {    return dmns;  }  public void setPropertyValue(Object paramObject1, String paramString, Object paramObject2)  {    try    {      DemoService localDemoService = (DemoService)paramObject1;    }    catch (Throwable localThrowable)    {      throw new IllegalArgumentException(localThrowable);    }    throw new NoSuchPropertyException("Not found property \"" + paramString + "\" filed or setter method in class com.alibaba.dubbo.common.DemoService.");  }  public Object getPropertyValue(Object paramObject, String paramString)  {    try    {      DemoService localDemoService = (DemoService)paramObject;    }    catch (Throwable localThrowable)    {      throw new IllegalArgumentException(localThrowable);    }    throw new NoSuchPropertyException("Not found property \"" + paramString + "\" filed or setter method in class com.alibaba.dubbo.common.DemoService.");  }     //(**看这里,关键方法实现****)  public Object invokeMethod(Object paramObject, String paramString, Class[] paramArrayOfClass, Object[] paramArrayOfObject)    throws InvocationTargetException  {    DemoService localDemoService;    try    {    //赋值执行实例,这里是接口实现类,DemoServiceImpl对象      localDemoService = (DemoService)paramObject;    }    catch (Throwable localThrowable1)    {      throw new IllegalArgumentException(localThrowable1);    }    try    {    //根据传入的要调用的方法名paramString,方法参数值,调用执行实例方法      if (("sayHello".equals(paramString)) || (paramArrayOfClass.length == 1))        return localDemoService.sayHello((String)paramArrayOfObject[0]);    }    catch (Throwable localThrowable2)    {      throw new InvocationTargetException(localThrowable2);    }    throw new NoSuchMethodException("Not found method \"" + paramString + "\" in class com.alibaba.dubbo.common.DemoService.");  }}

到这比较清楚了解,具体的代理过程了。

第一步上面invoker对象生成好后,接下来就要通过Protocol$Adpative的export方法暴露服务

public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {        if (arg0 == null)            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");        if (arg0.getUrl() == null)            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");        com.alibaba.dubbo.common.URL url = arg0.getUrl();        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());        if (extName == null)            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");	    //根据上文本地调用这里的protocal协议别设置为injvm	    //所以这里会走到InjvmProtocol的export方法        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);        return extension.export(arg0);    }

看下InjvmProtocol的export方法

public 
Exporter
export(Invoker
invoker) throws RpcException { //返回InjvmExporter对象 return new InjvmExporter
(invoker, invoker.getUrl().getServiceKey(), exporterMap); }

InjvmExporter构造函数

InjvmExporter(Invoker
invoker, String key, Map
> exporterMap) { super(invoker);//invoker赋给自身 this.key = key; this.exporterMap = exporterMap; //存的形式,serviceKey:自身(exporter) put到map关联起来,这样可以通过servciekey找到exporterMap然后找到invoker exporterMap.put(key, this); }

这里的exporterMap是由InjvmProtocol实例拥有,而InjvmProtocol有时单例的,因为InjvmProtocol类有以下实例和方法:

//静态自身成员变量    private static InjvmProtocol INSTANCE;    //构造方法,把自身赋值给INSTANCE对象    public InjvmProtocol() {        INSTANCE = this;    }

所以exporterMap对象也是单例的。同时这里顺便看下InjvmProtocol的refer方法,本地服务的引用查找也是通过自身exporterMap对象。

public 
Invoker
refer(Class
serviceType, URL url) throws RpcException { //把exporterMap对象赋值给InjvmInvoker return new InjvmInvoker
(serviceType, url, url.getServiceKey(), exporterMap); } //具体查找过程 public Result doInvoke(Invocation invocation) throws Throwable { //通过exporterMap获取exporter Exporter
exporter = InjvmProtocol.getExporter(exporterMap, getUrl()); if (exporter == null) { throw new RpcException("Service [" + key + "] not found."); } RpcContext.getContext().setRemoteAddress(NetUtils.LOCALHOST, 0); return exporter.getInvoker().invoke(invocation); }

以上是本地服务的发布和引用过程

远程服务发布暴露过程

接下来看下本例中会用到的远程服务发布暴露过程,即暴露服务端口并发布服务信息到注册中心。也即是关键2代码处

通过上面本地服务暴露过程分析可以知道,远程服务的态代理生成过程和本地服务代理生成是一样的,唯一区别点是构造invoker是传入的url是registryURL
传入url的不同,会造成下面这句的执行过程的不同

Exporter<?> exporter = protocol.export(invoker);

这里protocol通过spi走的是Protocol$Adpative的export方法:

public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {        if (arg0 == null)            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");        if (arg0.getUrl() == null)            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");        com.alibaba.dubbo.common.URL url = arg0.getUrl();        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());        if (extName == null)            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");        //由于传入的url是registryURL所以会走RegistryProtocol的export方法	com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);        return extension.export(arg0);    }

RegistryProtocol的export方法如下

public 
Exporter
export(final Invoker
originInvoker) throws RpcException { //export invoker 暴露invoker (***看doLocalExport方法**) final ExporterChangeableWrapper
exporter = doLocalExport(originInvoker); //registry provider 获取对应注册中心操作对象 final Registry registry = getRegistry(originInvoker); //获取要注册到注册中心的地址 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); //注册服务url到注册中心(***把服务信息注册到注册中心**) registry.register(registedProviderUrl); // 订阅override数据 // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保证每次export都返回一个新的exporter实例 //实现一个匿名类实现接口Exporter return new Exporter
() { public Invoker
getInvoker() { return exporter.getInvoker(); } //取消暴露的过程 public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { //取消注册 registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { //取消订阅 overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }; }

doLocalExport方法,这里方法里涉及过程比较多

private 
ExporterChangeableWrapper
doLocalExport(final Invoker
originInvoker) { //通过原始originInvoker构造缓存key String key = getCacheKey(originInvoker); ExporterChangeableWrapper
exporter = (ExporterChangeableWrapper
) bounds.get(key); //缓存没有,走具体暴露逻辑 if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper
) bounds.get(key); if (exporter == null) { //InvokerDelegete是RegistryProtocol类的静态内部类,继承自InvokerWrapper, //通过构造器赋值持有代理originInvoker和服务暴露协议url对象,算是包装一层 //而url 是通过getProviderUrl(originInvoker)返回的,此时url的协议已是dubbo,即服务暴露的协议 final Invoker
invokerDelegete = new InvokerDelegete
(originInvoker, getProviderUrl(originInvoker)); //ExporterChangeableWrapper是RegistryProtocol的私有内部类实现了Exporter接口。 //通过调用它的构造方法(Exporter
exporter, Invoker
originInvoker)构造exporterWrapper实例 //而这里传入的exporter是通过(Exporter
) protocol.export(invokerDelegete)语句创建 //由上一步知道,这里的invokerDelegete里url属性的protocol协议已经是dubbo //下面具体看下protocol.export(invokerDelegete)方法。 exporter = new ExporterChangeableWrapper
((Exporter
) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return exporter; }

还对Protocol$Adpative类的export逻辑分析

public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {        if (arg0 == null)            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");        if (arg0.getUrl() == null)            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");        com.alibaba.dubbo.common.URL url = arg0.getUrl();        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());        if (extName == null)            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");        //由于这里invokerDelegete的url属性的protocol是dubbo,所以这里走DubboProtocol的export协议	com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);        return extension.export(arg0);    }

 接着看下DubboProtocol的export方法

public 
Exporter
export(Invoker
invoker) throws RpcException { URL url = invoker.getUrl(); // get serviceKey. String key = serviceKey(url);//key的组成group/service:version:port //构造服务的exporter //如同InjvmProtocol一样,DubboProtocol也是单例的 所以这里exporterMap也是单例的 DubboExporter
exporter = new DubboExporter
(invoker, key, exporterMap); //通过key放入exporterMap,把持有invoker的exporter 和serviceKey关联 //这个在后面服务调用时,可以通过key找到对应的exporter进而找到invoker提供服务 exporterMap.put(key, exporter); //export an stub service for dispaching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } //根据url开启一个服务,比如绑定端口,开始接受请求信息(**继续看这里***) openServer(url); return exporter; }
private void openServer(URL url) {        //key=host:port 用于定位server        String key = url.getAddress();        //client也可以暴露一个只有server可以调用的服务。        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);        if (isServer) {            //服务实例放到serverMap,key是host:port            //这里的serverMap也单例的            ExchangeServer server = serverMap.get(key);            if (server == null) {                //通过createServer(url)方法获取server (***看这里***)                serverMap.put(key, createServer(url));            } else {                //server支持reset,配合override功能使用                server.reset(url);            }        }    }     /***     * 开启服务     * @param url     * @return     */    private ExchangeServer createServer(URL url) {        //默认开启server关闭时发送readonly事件        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());        //默认开启heartbeat        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);        /***         * 通过server key 检查是否是dubbo目前spi扩展支持的传输框架。默认是netty         */        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))            throw new RpcException("Unsupported server type: " + str + ", url: " + url);         //通过codec key 获取编解码方案,兼容dubbo1,默认是dubbo1compatible ,否则默认dubbo 编解码方案        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);        ExchangeServer server;        try {            //构造具体服务实例,	    //Exchangers是门面类,里面封装了具体交换层实现,并调用它的bind方法(***看这里***)            server = Exchangers.bind(url, requestHandler);        } catch (RemotingException e) {            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);        }        //这里会验证一下,客户端传输层实现        //如果没有对应的实现,会抛出异常        str = url.getParameter(Constants.CLIENT_KEY);        if (str != null && str.length() > 0) {            Set
supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }

跟踪方法

//Exchangers.bind方法public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {        if (url == null) {            throw new IllegalArgumentException("url == null");        }        if (handler == null) {            throw new IllegalArgumentException("handler == null");        }        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");	//通过spi会走HeaderExchanger的bind逻辑        return getExchanger(url).bind(url, handler);    }    //HeaderExchanger的bind方法    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {        //Transporters也是门面类,封装了具体的传输层实现查找和bind过程        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));    }    //HeaderExchangeServer的构造函数      public HeaderExchangeServer(Server server) {        if (server == null) {            throw new IllegalArgumentException("server == null");        }        this.server = server;        this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);        this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);        if (heartbeatTimeout < heartbeat * 2) {            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");        }        //开启心跳        startHeatbeatTimer();    }

  继续跟进方法

//Transporters.bind方法public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {        if (url == null) {            throw new IllegalArgumentException("url == null");        }        if (handlers == null || handlers.length == 0) {            throw new IllegalArgumentException("handlers == null");        }        ChannelHandler handler;        if (handlers.length == 1) {            handler = handlers[0];        } else {            handler = new ChannelHandlerDispatcher(handlers);        }	//根据spi 这里具体走 NettyTransporter.bind        return getTransporter().bind(url, handler);    }    //NettyTransporter的bind方法    public Server bind(URL url, ChannelHandler listener) throws RemotingException {       //可以看到这里是NettyServer实例        return new NettyServer(url, listener);    }   //NettyServer构造器    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {       //调用父类AbstractServer构造器        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));    }

   父类AbstractServer的构造函数

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {        super(url, handler);        localAddress = getUrl().toInetSocketAddress();        String host = url.getParameter(Constants.ANYHOST_KEY, false)                || NetUtils.isInvalidLocalHost(getUrl().getHost())                ? NetUtils.ANYHOST : getUrl().getHost();        bindAddress = new InetSocketAddress(host, getUrl().getPort());        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);        try {            //打开端口,启动服务,在其子类实现,这里是NettyServer的doOpen()方法(***看这里**)            doOpen();            if (logger.isInfoEnabled()) {                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());            }        } catch (Throwable t) {            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);        }        //fixme replace this with better method        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));    }

NettyServer的doOpen()方法:

protected void doOpen() throws Throwable {        //通过netty 开启服务监听端口        NettyHelper.setNettyLoggerFactory();        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));        bootstrap = new ServerBootstrap(channelFactory);        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);        channels = nettyHandler.getChannels();        // https://issues.jboss.org/browse/NETTY-365        // https://issues.jboss.org/browse/NETTY-379        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {            public ChannelPipeline getPipeline() {                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);                ChannelPipeline pipeline = Channels.pipeline();                /*int idleTimeout = getIdleTimeout();                if (idleTimeout > 10000) {                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));                }*/                pipeline.addLast("decoder", adapter.getDecoder());//解码器                pipeline.addLast("encoder", adapter.getEncoder());//编码器                pipeline.addLast("handler", nettyHandler);//NettyHandler 扩展netty双向handler基类r 可以接受进站和出站数据流                return pipeline;            }        });        // bind 地址 开启端口        channel = bootstrap.bind(getBindAddress());    }

    以上基本梳理了的dubbo从service配置解析到生成服务代理,并通过netty开启服务端口的服务暴露过程。

转载于:https://my.oschina.net/u/146130/blog/1630870

你可能感兴趣的文章
《响应式Web设计全流程解析》一1.4 我们都是交互设计师
查看>>
搭建一个私有的Docker registry
查看>>
《手机测试Robotium实战教程》——第1章,第1.2节自动化测试和手动测试的对比...
查看>>
《精通QTP——自动化测试技术领航》—第1章1.6节对象库(下)之进阶编程篇
查看>>
深入实践Spring Boot2.1.1 MySQL依赖配置
查看>>
《树莓派开发实战(第2版)》——2.6 使用条件和约束指定证据
查看>>
《Windows网络与通信程序设计(第3版)》——第1章 计算机网络基础1.1 网络的概念和网络的组成...
查看>>
线程基础之遗漏和扩展部分
查看>>
导出SQL运行结果的方法总结
查看>>
support-v4包时会出现说有v4和v7有异常的处理
查看>>
【Spark Summit East 2017】使用Spark对仙女星系数据进行分析
查看>>
任务调度SchedulerX系列之什么是简单任务多机版
查看>>
MaxCompute2.0性能评测:更强大、更高效之上的更快速
查看>>
有限状态机问题编程实践
查看>>
PostgreSQL 10.0 preview 功能增强 - slave提前苏醒
查看>>
技术流乱入拜年帖 - 小鸡吉吉和小象(PostgreSQL)Pi吉的鸡年传奇
查看>>
保证数据一致性的常见做法
查看>>
用POSTFIX,DOVECOT,OPENWEBMAIL集成在CENTOS上
查看>>
View绘制原理——怎么画?
查看>>
HTTP/1.x 及 Service Worker 缓存实践小结
查看>>