Apache Dubbo 介绍以及扩展机制 SPI

Apache Dubbo 介绍以及扩展机制 SPI

Apache Dubbo 介绍

首先我们先问一句,Apahce Dubbo 是什么?这个问题恐怕我不会给太多答案给你,因为 Dubbo 的官网其实描述非常清楚明了,而且具备了非常全面的架构解析以及源码解析。所以,学习 Dubbo 非常有必要去 官网传送门

对于 Dubbo 我觉得从官网来的一句简介非常贴切:

Apache Dubbo™ 是一款微服务框架(Microservices Framework),它提供高性能 RPC 通信、服务发现、流量管理等服务治理能力,为你提供构建大规模微服务集群所需的全套解决方案。

从简介我们大致上可以了解,Dubbo 就是一个微服务框架。它提供了挺多的微服务相关的功能,例如:

  1. Transparent interface based RPC 基于接口的 RPC
  2. Intelligent load balancing 智能负载均衡
  3. Automatic service registration and discovery 服务发现与服务注册
  4. High extensibility 高性能扩展
  5. Runtime traffic routing 实时流量路由
  6. Visualized service governance 可视化服务治理

同样,其实如果你们平时使用 Spring Cloud 进行开发的话,我们也可以同样感受到非常多这种微服务的特性,例如 Spring Cloud Netflix 提供的服务注册/发现,负载均衡,服务调用,可视化治理等等。

Dubbo SPI

首先学习 Java 的同学都知道 Java 原生提供了一种基于接口的服务发现机制,它的名字就是 SPI (Service Provider Interface)。就是通过接口我们可以定义多个不同业务逻辑的实现类,并配置在制定文件中;而 Java 会通过读取文件,读取到你所指定的实现类并进行实例化/调用等操作从而实现以非侵入方式进行逻辑替换。典型的例子就是 JDBC 中的 DriverManager 通过 SPI 可以管理和注册不同数据库的 Driver,相信你们也可以感受到一个 DriverManager 可以接入不同数据库厂商的驱动。

那么 SPIDubbo 有什么关系呢?其实我们先来看一张 Dubbo 的架构图。

从架构图上来说,Dubbo 采用了分层的项目结构以及插件化的形式实现的。除了 ServiceConfig,实际上其他层面都是使用了 SPI,也就是 Dubbo 通过 SPI 这种机制来实现框架最大限度的灵活性,就好像 Spring 在框架外或者框架内都提供了非常多的扩展机制,可以说这是框架的必备技能。以下我举几个例子说明一下。例如 Proxy 是服务代理层,在这一层我们通过 SPI 可以自定义服务注册中心;Cluster 是属于路由层,在这层主要处理的多个服务提供者的路由规则/负载均衡/集群容错的实现。那么 Dubbo 也可以通过 SPI 来进行一个规则或均衡策略的选择实现逻辑处理替换;甚至在 Monitor 监控层,我们也可以进行自定义的实现等等。

现在你可以发现了吧,实际上 Dubbo 的分层架构使得 Dubbo 的每层都是可以替换,这个已经说明了 Dubbo 的扩展性是极强的。

但是 Java SPIDubbo SPI 是同一个东西吗?原理层面来说,他两是一样的;实现上来说,Dubbo SPIJava SPI 的增强版。它主要从几方面增强:

  1. 按需加载,非原生的一次性加载
  2. SPI 加载有着更好的故障追溯
  3. 更多的扩展点,例如 IOC 以及 AOP 的支持

这些东西都是非常优秀的一些设计理念,也是我们在源码解析的时候可以着重学习的地方!

Let’s Go!

Dubbo SPI 的例子

为了在源码解析的时候有更直观的感受,我们直接敲一个例子吧。

导入依赖

    <dependencies>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>dubbo</artifactId>
            <version>2.7.5</version>
        </dependency>
    </dependencies>

创建 SPI 所需的一个接口

package com.dubbo.demo;

public interface UserService {
	public void findById(int id);
}

虚拟两个不同的业务场景,我们写两个 UserService 的实现类 UserServiceImpl1.javaUserServiceImpl2.java

package com.dubbo.demo.impl;

public class UserServiceImpl1{
	public void findById(int id) {
    	System.out.println("UserServiceImpl1 find User " + id);
    }
}

UserServiceImpl2.java

package com.dubbo.demo.impl;

public class UserServiceImpl2{
	public void findById(int id) {
    	System.out.println("UserServiceImpl2 find User " + id);
    }
}

然后我们在 resource 创建一个名为 services 文件夹,然后创建一个名字为接口全限定名配置的文件 com.dubbo.demo.UserService。在文件中键入内容:

userServiceImpl1=com.dubbo.demo.impl.UserServiceImpl1
userServiceImpl2=com.dubbo.demo.impl.UserServiceImpl2

然后最后我们测试类 SPIDemo.java

public class SPIDemo {
	public static void main(String[] args) {
        ExtensionLoader<UserService> extensionLoader = ExtensionLoader.getExtensionLoader(UserService.class);
        
        //loader userServiceImpl1
        UserService userServiceImpl1 = extensionLoader.getExtension("userServiceImpl1");
        userServiceImpl1.findById(1);
        
        //loader userServiceImpl2
        UserService userServiceImpl2 = extensionLoader.getExtension("userServiceImpl2");
        userServiceImpl2.findById(2);
    }
}

结果输出

UserServiceImpl1 find User 1
UserServiceImpl2 find User 2

Dubbo SPI 源码解析

首先我们先来介绍一下 ExtensionLoaderExtensionLoader 在它的作用主要有几个:自动注入扩展依赖;封装扩展在扩展类中;默认的扩展是一个包装的适配器。

下面是它的相关属性代码

    //全局变量 key:value=类:扩展 
    private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>(64);
    //全局变量 key:value=类:实例
    private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<>(64);		
    //所属 Class 的类型
    private final Class<?> type;		
    //加载扩展类的工厂
    private final ExtensionFactory objectFactory;		
    //全局变量 key:value=类:实例别名
    private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<>();
    //全局变量 key:value=实例别名:类
    private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<>();
    //key:value=实例别名:cachedActivate 
    private final Map<String, Object> cachedActivates = new ConcurrentHashMap<>();
    //key:value=实例别名:缓存实例的 Holder
    private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>();
    //缓存适配器实例
    private final Holder<Object> cachedAdaptiveInstance = new Holder<>();
    private volatile Class<?> cachedAdaptiveClass = null;
    //缓存的默认名字
    private String cachedDefaultName;
    //缓存的扩展类
    private Set<Class<?>> cachedWrapperClasses;
    //加载策略顺序:DUBBO_INTERNAL_DIRECTORY;DUBBO_DIRECTORY;SERVICES_DIRECTORY
    private static volatile LoadingStrategy[] strategies = loadLoadingStrategies();		//策略加载

我们从 dmeo 中的 ExtensionLoader.getExtensionLoader 开始入手。

   public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
        if (type == null) {
            throw new IllegalArgumentException("Extension type == null");
        }
        if (!type.isInterface()) {
            throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
        }
        //判断有没有注解
        if (!withExtensionAnnotation(type)) {
            //throw Exception
        }
        //根据 类 去获取
        ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        //如果等于 null 就新建一个
        if (loader == null) {
            EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
            loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        }
        return loader;
    }

上面代码还是比较简单,可以看出只是为了为类创建一个 Holder。但是实际上,在 new ExtensionLoader() 的时候,还会有一些操作,这个下面会讲。上面生成了 Holder 后就会返回。然后调用 ExtensionLoader.getExtension()

    //通过名称获取扩展
    public T getExtension(String name, boolean wrap) {
        if (StringUtils.isEmpty(name)) {
            throw new IllegalArgumentException("Extension name == null");
        }
        //如果为 true 就返回默认对象
        if ("true".equals(name)) {
            return getDefaultExtension();
        }
        //获取 holder
        final Holder<Object> holder = getOrCreateHolder(name);
        Object instance = holder.get();
        if (instance == null) {
            synchronized (holder) {
                //从 holder 获取实例
                instance = holder.get();
                //如果实例为空,就创建实例
                if (instance == null) {
                    instance = createExtension(name, wrap);
                    holder.set(instance);
                }
            }
        }
        //返回
        return (T) instance;
    }

下面是创建实例的代码

   private T createExtension(String name, boolean wrap) {
        //从配置文件中加载所有的拓展类,可得到“配置项名称”到“配置类”的映射关系表
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            //EXTENSION_INSTANCES 是一个 Map,是 Class:实例 的映射表
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                //这里会创建对象
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            //向实例注入
            injectExtension(instance);

            if (wrap) {
                List<Class<?>> wrapperClassesList = new ArrayList<>();
                if (cachedWrapperClasses != null) {
                    wrapperClassesList.addAll(cachedWrapperClasses);
                    wrapperClassesList.sort(WrapperComparator.COMPARATOR);
                    Collections.reverse(wrapperClassesList);
                }
                //循环创建 Wrapper
                if (CollectionUtils.isNotEmpty(wrapperClassesList)) {
                    for (Class<?> wrapperClass : wrapperClassesList) {
                        //通过注解获取
                        Wrapper wrapper = wrapperClass.getAnnotation(Wrapper.class);
                        //进行匹配
                        //讲实例作为参数传进 Wrapper 构造方法,并通过反射得到实例
                        //然后往 Wrapper 进行依赖注入,然后再 Wrapper 再次赋值给 instance 变量
                        if (wrapper == null
                                || (ArrayUtils.contains(wrapper.matches(), name) && !ArrayUtils.contains(wrapper.mismatches(), name))) {
                            instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                        }
                    }
                }
            }
            //若该对象实现了org.apache.dubbo.common.context.Lifecycle
            //则调用它的initialize()方法完成初始化
            initExtension(instance);
            return instance;
        } catch (Throwable t) {
            //...
        }
    }

上面的代码并不复杂,主要的步骤是:

  1. 通过 getExtensionClasses 获取所有拓展类
  2. 通过反射创建拓展对象
  3. 向拓展对象注入依赖
  4. 将拓展对象包裹进对应的 Wrapper 对象中
  5. 初始化对象

上面步骤为大致上的步骤,下面我们分步来细讲。

第一步

第一步主要是获取所有拓展类。我们来看看 getExtensionClasses() 的代码。

    private Map<String, Class<?>> getExtensionClasses() {
         //从spi注解中获取默认名字并挂载在cachedDefaultName这个属性上
        Map<String, Class<?>> classes = cachedClasses.get();
        //如果为空,则进行全部加载
        if (classes == null) {
            synchronized (cachedClasses) {
                classes = cachedClasses.get();
                if (classes == null) {
                    //加载扩展类
                    classes = loadExtensionClasses();
                    cachedClasses.set(classes);
                }
            }
        }
        return classes;
    }

上面代码主要是为加载拓展类进行判断

    private Map<String, Class<?>> loadExtensionClasses() {
        //缓存组件的名称
        cacheDefaultExtensionName();

        Map<String, Class<?>> extensionClasses = new HashMap<>();
        //加载策略
        for (LoadingStrategy strategy : strategies) {
            loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
            loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
        }

        return extensionClasses;
    }
    
    private void cacheDefaultExtensionName() {
        //获取注解 @SPI
        final SPI defaultAnnotation = type.getAnnotation(SPI.class);
        if (defaultAnnotation == null) {
            return;
        }
        //注解的值
        String value = defaultAnnotation.value();
        if ((value = value.trim()).length() > 0) {
            String[] names = NAME_SEPARATOR.split(value);
            if (names.length > 1) {
                //throw Exception
            }
            if (names.length == 1) {
                cachedDefaultName = names[0];
            }
        }
    }

loadDirectory 方法就是加载某目录下的资源,然后通过 loadResource 方法加载资源。我们继续跟下去。

    private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type,
                               boolean extensionLoaderClassLoaderFirst, boolean overridden, String... excludedPackages) {
        String fileName = dir + type;
        try {
            Enumeration<java.net.URL> urls = null;
            //获取 classLoader
            ClassLoader classLoader = findClassLoader();

            //获取 ClassLoader 这里会尝试以拓展类所在的类加载器为首要对象
            if (extensionLoaderClassLoaderFirst) {
                ClassLoader extensionLoaderClassLoader = ExtensionLoader.class.getClassLoader();
                if (ClassLoader.getSystemClassLoader() != extensionLoaderClassLoader) {
                    urls = extensionLoaderClassLoader.getResources(fileName);
                }
            }

            if (urls == null || !urls.hasMoreElements()) {
                if (classLoader != null) {
                    urls = classLoader.getResources(fileName);
                } else {
                    urls = ClassLoader.getSystemResources(fileName);
                }
            }

            if (urls != null) {
                while (urls.hasMoreElements()) {
                    java.net.URL resourceURL = urls.nextElement();
                    loadResource(extensionClasses, classLoader, resourceURL, overridden, excludedPackages);
                }
            }
        } catch (Throwable t) {
            //...
        }
    }

loadResource 方法主要是读取文件,然后解析文件内容,通过反射加载类后,然后调用 loadClass 方法进行操作缓存。代码如下:

    private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader,
                              java.net.URL resourceURL, boolean overridden, String... excludedPackages) {
        try {
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    // # 分割,只要 # 前面的内容
                    final int ci = line.indexOf('#');
                    if (ci >= 0) {
                        line = line.substring(0, ci);
                    }
                    line = line.trim();
                    if (line.length() > 0) {
                        try {
                            String name = null;
                            // = 符号进行分割,分别拿到键值对
                            // 键 为别名,值为类的全限定名
                            int i = line.indexOf('=');
                            if (i > 0) {
                                name = line.substring(0, i).trim();
                                line = line.substring(i + 1).trim();
                            }
                            if (line.length() > 0 && !isExcluded(line, excludedPackages)) {
                                loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name, overridden);
                            }
                        } catch (Throwable t) {
                            //throw Exception
                        }
                    }
                }
            }
        } catch (Throwable t) {
            //...
        }
    }

loadClass

    private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name,
                           boolean overridden) throws NoSuchMethodException {
        if (!type.isAssignableFrom(clazz)) {
            throw new IllegalStateException("Error occurred when loading extension class (interface: " +
                    type + ", class line: " + clazz.getName() + "), class "
                    + clazz.getName() + " is not subtype of interface.");
        }
        //如果 class 有 Adapter 注解
        if (clazz.isAnnotationPresent(Adaptive.class)) {
            cacheAdaptiveClass(clazz, overridden);
        } else if (isWrapperClass(clazz)) { //是否是扩展类,是的话就加入 cachedWrapperClasses 属性
            cacheWrapperClass(clazz);
        } else {                                 
            clazz.getConstructor();  //检测是否有默认构造起
            if (StringUtils.isEmpty(name)) {  
                //// 如果 name 为空,则尝试从 Extension 注解中获取 name,或使用小写的类名作为 name
                name = findAnnotationName(clazz);
                if (name.length() == 0) {
                    //throw Exception
                }
            }
            //分割名字
            String[] names = NAME_SEPARATOR.split(name);
            if (ArrayUtils.isNotEmpty(names)) {
                //存储 name 和 Activate 到 cachedActivates
                cacheActivateClass(clazz, names[0]);
                for (String n : names) {
                    //存储 Class 到名字的映射关系
                    cacheName(clazz, n);
                    //存储名字到 Class 的映射关系
                    saveInExtensionClass(extensionClasses, clazz, n, overridden);
                }
            }
        }
    }

Dobbo IOC

估计学 Java 的同学,对于 Spring IOC 都是非常了解的。实际上 Dubbo 也实现了 IOC,不过不像 Spring 那么健全,Dubbo 实现的更是为了符合自己的需求的。Dubbo IOC 主要是通过 setter 方法进行注入依赖的。步骤就是获取实例的所有方法,然后遍历。遍历的过程中看看是否名字有 setter 字段。如果有就通过 ObjectFactory 获取依赖对象,然后通过反射调用 setter 方法将依赖设置到目标对象当中。代码如下:

    private T injectExtension(T instance) {
        //如果为空,直接返回
        if (objectFactory == null) {
            return instance;
        }

        try {
            //循环方法
            for (Method method : instance.getClass().getMethods()) {
                //如果不是 setter 就跳过
                if (!isSetter(method)) {
                    continue;
                }
                
                //如果有 @DisableInject 注解的化就不会进行注入
                if (method.getAnnotation(DisableInject.class) != null) {
                    continue;
                }
                //获取第一个参数 因为是 setter 方法 所以没有多余的参数了
                Class<?> pt = method.getParameterTypes()[0];
                if (ReflectUtils.isPrimitives(pt)) {
                    continue;
                }

                try {
                    //获取 property
                    String property = getSetterProperty(method);
                    Object object = objectFactory.getExtension(pt, property);
                    if (object != null) {
                        //反射调用
                        method.invoke(instance, object);
                    }
                } catch (Exception e) {
                    //...
                }

            }
        } catch (Exception e) {
            //...
        }
        return instance;
    }

那么 ObjectFactory 我们知道类似 IOC 容器,但是它什么时候加载的呢?它其实是在第一次 new ExtensionLoader<T>(type) 的时候就加载了,我们看看 ExtensionLoader 构造起器。

    private ExtensionLoader(Class<?> type) {
        this.type = type;
        //通过 getExtensionLoader 获取 ExtensionFactory 类的拓展类
        //获取到后,直接获取它的适配类
        objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    }

那么 ExtensionFactory 的拓展类有哪些呢?其实我们可以根据它的全限定名就可以找到它的配置文件了。内容主要是:

adaptive=org.apache.dubbo.common.extension.factory.AdaptiveExtensionFactory
spi=org.apache.dubbo.common.extension.factory.SpiExtensionFactory

从配置文件我们知道了适配类是 AdaptiveExtensionFactory。那么这个 getAdaptiveExtension() 怎么获取的呢?我们继续跟!

    public T getAdaptiveExtension() {
        //还是从装载适配器实例缓存里面找
        Object instance = cachedAdaptiveInstance.get();
        //双重检查
        if (instance == null) {
            if (createAdaptiveInstanceError != null) {
                //throw Exception
            }

            synchronized (cachedAdaptiveInstance) {
                instance = cachedAdaptiveInstance.get();
                if (instance == null) {
                    try {
                        //创建适配器托赵雷
                        instance = createAdaptiveExtension();
                        //缓存起来
                        cachedAdaptiveInstance.set(instance);
                    } catch (Throwable t) {
                        //...
                    }
                }
            }
        }
        //返回
        return (T) instance;
    }
    private T createAdaptiveExtension() {
        try {
            //
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());
        } catch (Exception e) {
            //throw Exception
        }
    }
    
    private Class<?> getAdaptiveExtensionClass() {
        //获取所有的拓展类
        getExtensionClasses();
        //有适配拓展类,就返回
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;
        }
        //如果没有适配拓展类,就创建一个
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
    }

    private Class<?> createAdaptiveExtensionClass() {
        //封装成对应的代码
        String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
        //获取 ClassLoader
        ClassLoader classLoader = findClassLoader();
        //通过 ExtensionLoader 获取 Compiler 的拓展类
        org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        //编译
        return compiler.compile(code, classLoader);
    }

从上面其实我们已经知道 ExtensionFactory 类似于 SpringBeanFactory。所以它一定也有对应的查找 Bean 的逻辑处理。我们来看看!

@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {

    private final List<ExtensionFactory> factories;

    public AdaptiveExtensionFactory() {
        //初始化所有的ExtensionFactory并挂载到factories属性上
        ExtensionLoader<ExtensionFactory> loader = ExtensionLoader
            .getExtensionLoader(ExtensionFactory.class);
        List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
        for (String name : loader.getSupportedExtensions()) {
            list.add(loader.getExtension(name));
        }
        factories = Collections.unmodifiableList(list);
    }

    @Override
    public <T> T getExtension(Class<T> type, String name) {
        //循环遍历所有的factory挨个获取一次
        for (ExtensionFactory factory : factories) {
            T extension = factory.getExtension(type, name);
            if (extension != null) {
                return extension;
            }
        }
        return null;
    }

}

结语

Dubbo SPIDubbo 架构中非常重要的一个机制,承载了 Dubbo 框架由上至下的过程。这种架构思想可以应用于轻量级别的框架选择。


作者:野区杰西
链接:https://juejin.im/post/6891446230867116045