Lettuce 同步源码分析
在上一篇分享中分享了单机模式异步连接创建过程 Lettuce 创建连接过程源码分析; 在本次分享内容主要介绍同步命令的处理过程.
Lettuce 是基于 Netty 的 Redis 高级客户端, 对于异步命令来说是天然的, 那么 lettuce 中是如何处理同步命令的呢? 实际上同步连接还是对异步命令的一次封装; 下面我们就通过源码进行分析看看 Lettuce 中的具体实现.
通过上一篇文章中可以知道在 StatefulRedisConnectionImpl 中创建 异步模式, 同步模式以及响应式模式命令处理模式, 那么我们就从 该处看起
- public StatefulRedisConnectionImpl(RedisChannelWriter writer, RedisCodec<K, V> codec, Duration timeout) {
- super(writer, timeout);
- this.codec = codec;
- // 创建异步 redis 命令处理模式
- this.async = newRedisAsyncCommandsImpl();
- // 创建 redis 命令同步处理模式
- this.sync = newRedisSyncCommandsImpl();
- // 创建 redis 命令响应式处理模式
- this.reactive = newRedisReactiveCommandsImpl();
- }
通过这里似乎看不出同步处理模式同异步处理模式有什么关联, 那么我们在深入进去看一下
- protected RedisCommands<K, V> newRedisSyncCommandsImpl() {
- return syncHandler(async(), RedisCommands.class, RedisClusterCommands.class);
- }
在这段代码中可以看到 async(), 这个就是 redis 命令异步处理模式, 那么它是如何封装的呢?
- protected <T> T syncHandler(Object asyncApi, Class<?>... interfaces) {
- // 对异步 API 创建调用处理器
- FutureSyncInvocationHandler h = new FutureSyncInvocationHandler((StatefulConnection<?, ?>) this, asyncApi, interfaces);
- // 创建动态代理
- return (T) Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), interfaces, h);
- }
通过上面对源码可以发现原来是对异步 api 创建了一个 JDK 动态代理; 那么关键的逻辑还是在 FutureSyncInvocationHandler 中, 对于动态代理的知识就不在展开了.
在 invoke 处理是在 AbstractInvocationHandler 中完成的, 它将一些基本公用的抽象在了基类中, 将特殊的实现延迟到子类中实现.
- public final Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- // 如果参数为 null 则 将 args 设置为 "{}"
- if (args == null) {
- args = NO_ARGS;
- }
- // 如果参数长度为 0 同时方法名称为 hashCode 则直接返回 hashCode
- if (args.length == 0 && method.getName().equals("hashCode")) {
- return hashCode();
- }
- // 如果是 equals
- if (args.length == 1 && method.getName().equals("equals") && method.getParameterTypes()[0] == Object.class) {
- Object arg = args[0];
- if (arg == null) {
- return false;
- }
- if (proxy == arg) {
- return true;
- }
- return isProxyOfSameInterfaces(arg, proxy.getClass()) && equals(Proxy.getInvocationHandler(arg));
- }
- // 如果是 toString
- if (args.length == 0 && method.getName().equals("toString")) {
- return toString();
- }
- return handleInvocation(proxy, method, args);
- }
在 FutureSyncInvocationHandler 中实现了同步命令处理过程, 其源码如下:
- protected Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable {
- try {
- // 获取当前 method 在 asyncApi 中对应的方法
- Method targetMethod = this.translator.get(method);
- // 调用异步接口
- Object result = targetMethod.invoke(asyncApi, args);
- // 如果返回结果是 RedisFuture 类型
- if (result instanceof RedisFuture<?>) {
- // 类型强转
- RedisFuture<?> command = (RedisFuture<?>) result;
- // 如果不是事务控制方法 同时还在事务中则返回 null
- if (isNonTxControlMethod(method.getName()) && isTransactionActive(connection)) {
- return null;
- }
- // 是事务控制方法, 或不在事务中则进行如下处理
- // 等待超时或取消
- LettuceFutures.awaitOrCancel(command, connection.getTimeout().toNanos(), TimeUnit.NANOSECONDS);
- // 返回结果, 这里处理不是很好 上一步中就可以直接返回了
- return command.get();
- }
- // 如果不是 RedisFuture 类型则直接返回
- return result;
- } catch (InvocationTargetException e) {
- throw e.getTargetException();
- }
- }
在上文中有一段是获取获取指定方法在 delegate 中对应方法的处理, 下面就看看这个处理是如何实现的
- /**
- * 方法翻译器
- */
- protected static class MethodTranslator {
- private final static WeakHashMap<Class<?>, MethodTranslator> TRANSLATOR_MAP = new WeakHashMap<>(32);
- // 真实方法和代理类中方法映射表
- private final Map<Method, Method> map;
- private MethodTranslator(Class<?> delegate, Class<?>... methodSources) {
- map = createMethodMap(delegate, methodSources);
- }
- /**
- * 通过指定代理类, 和目标类创建方法翻译器
- */
- public static MethodTranslator of(Class<?> delegate, Class<?>... methodSources) {
- // 同步代码块
- synchronized (TRANSLATOR_MAP) {
- // 如果翻译器映射表中不存在 delegate 的翻译器则创建一个新的
- return TRANSLATOR_MAP.computeIfAbsent(delegate, key -> new MethodTranslator(key, methodSources));
- }
- }
- private Map<Method, Method> createMethodMap(Class<?> delegate, Class<?>[] methodSources) {
- Map<Method, Method> map;
- List<Method> methods = new ArrayList<>();
- // 遍历源类, 找到所有 public 方法
- for (Class<?> sourceClass : methodSources) {
- methods.addAll(getMethods(sourceClass));
- }
- map = new HashMap<>(methods.size(), 1.0f);
- // 创建方法和代理类的方法的映射表
- for (Method method : methods) {
- try {
- map.put(method, delegate.getMethod(method.getName(), method.getParameterTypes()));
- } catch (NoSuchMethodException ignore) {
- }
- }
- return map;
- }
- // 获取目标方法中的所有方法
- private Collection<? extends Method> getMethods(Class<?> sourceClass) {
- // 目标方法集合
- Set<Method> result = new HashSet<>();
- Class<?> searchType = sourceClass;
- while (searchType != null && searchType != Object.class) {
- // 将目标类中所有 public 方法添加到集合中
- result.addAll(filterPublicMethods(Arrays.asList(sourceClass.getDeclaredMethods())));
- // 如果 souceClass 是接口类型
- if (sourceClass.isInterface()) {
- // 获取 souceClass 的所有接口
- Class<?>[] interfaces = sourceClass.getInterfaces();
- // 遍历接口, 将接口的 public 方法也添加到方法集合中
- for (Class<?> interfaceClass : interfaces) {
- result.addAll(getMethods(interfaceClass));
- }
- searchType = null;
- } else {// 如果不是接口则查找父类
- searchType = searchType.getSuperclass();
- }
- }
- return result;
- }
- // 获取给定方法集合中所有 public 方法
- private Collection<? extends Method> filterPublicMethods(List<Method> methods) {
- List<Method> result = new ArrayList<>(methods.size());
- for (Method method : methods) {
- if (Modifier.isPublic(method.getModifiers())) {
- result.add(method);
- }
- }
- return result;
- }
- public Method get(Method key) {
- // 从方法映射表中获取目标方法
- Method result = map.get(key);
- // 如果目标方法不为 null 则返回, 否则抛出异常
- if (result != null) {
- return result;
- }
- throw new IllegalStateException("Cannot find source method" + key);
- }
- }
- }
来源: https://www.cnblogs.com/wei-zw/p/9249632.html