Dubbo线程池隔离以及动态线程池

本文最后更新于:2022年7月11日 下午

很久没发博客了,想着偶尔还是来写写东西~

内容中有不少省略的代码,其中也有很多关键性的东西,不过内容和公司框架高度耦合,涉及的东西也比较多,所以也懒得修改后放出来了,直接做了跳过。

1、背景

在目前我负责的系统中,有一个网关,有http请求,也有dubbo请求,这里主要讨论dubbo接口的请求。

当然,作为一个网关,必然承载了上万的QPS,在这几万的QPS请求中,有着各种各样的场景,不同场景的RT也不尽相同,少的只有约10-20ms,而高的则达到150-200ms,其中场景当然也分主要场景以及不那么重要的场景。

在这种QPS的请求下,所有的场景都使用的同一个dubbo线程池,就会带来以下两个问题:

1、无法隔离场景请求

有可能低重要度的场景下游出现问题,导致RT升高,从而影响线程池中的其他重要场景

2、无法精细地看到场景线程池的运行情况

这个就很好理解了,毕竟即使是线程池监控,那么也只能够看到一个总体的情况,具体某个场景的线程运行情况我们自然无法得知。

2、解决方案以及扩展

2.1 dubbo请求流程

其实想一下还是很简单的,我们只要根据请求参数中的场景信息来选择对应的线程池不就行了嘛。

这里直接上dubbo请求流程:

我们可以先把目光放到整张图的右下角,ChannelHandler <- Dispathcer -> ThreadPool,这三个就是我们要关注的地方,当dubbo请求经过server后,会被ChannelHandler处理,ChannelHandler会选择对应的处理器(Dispatcher),而Dispathcer也会相应地选择相应的线程池。

上一个dubbo的配置就可以很明显地看出来,这个dispatcher是message,而其使用的是core为200的fixedPool。

1
2
3
dubbo.protocol.threadpool=fixed
dubbo.protocol.threads=200
dubbo.protocol.dispatcher=message

那么这个message dispacther是干了什么呢?

因为就这个处理器就几行代码,所以我们直接看代码吧。

我们可以看到MessageDispacther实际只处理了接收的消息,然后把这个消息扔进了一个线程池里执行,这其实和我们的目的很接近了!因为我们想要的其实和这个message处理器很接近,区别就是我们想根据请求参数的不同,把他们扔进不同的线程池里处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class MessageOnlyChannelHandler extends WrappedChannelHandler {

public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}

@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
}

2.2 第一步,线程池隔离

既然这个message dispacther和我们的需求差别已经不远了,就直接写一个这个类的子类吧。

那么怎么获取这次dubbo的请求参数呢,别急,看received这个方法上的message参数,这个就是这次请求的请求参数,不过这个请求参数是encode的状态,我们需要把这个参数decode一下,转成我们需要的请求参数。因为在dpp网关中,两个推荐请求的入参的父类都是同一个,所以我们可以很轻松地进行类型的转换。

来看下代码吧,因为代码里有注释,这里就不多解释了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private String getRequestScene(Object message) {
try {
// 只处理recieved的Request
if (message instanceof Request) {
// decode Object
decode(((Request) message).getData());
RpcInvocation inv = (RpcInvocation) ((Request) message).getData();
// 获取具体的请求参数,我们只用获取带场景的那个参数就可以了
for (Object requestParam : inv.getArguments()) {
if (requestParam instanceof YourRequest) {
// 获取场景
return ((YourRequest) requestParam).getScene();
}
}
}
} catch (Exception e) {
log.warn("Decode message error: ", e);
}
return null;
}

有个这个基础,我们就能根据场景来进行线程池的隔离了,剩下的大部分代码其实都是从配置中心读取相应场景的线程池配置等等…

最后别忘了在dubbo中配置spi拓展,将你写的这个自定义dispacther注册到dubbo中。

2.2 动态线程池管理

既然都做到了这一步,为什么不再整点活呢?如果你看过美团那篇动态线程池的文章,其实可以知道:动态管理线程池并不是什么难事,毕竟JDK的线程池已经给你提供了对应的接口修改coreSize以及poolSize等参数,其实只用监听配置中心的变化,根据参数实时更改就可以了~

2.3 线程池监控

当然,监控也是必不可少的一环。既然根据场景隔离了线程池,那么我也当然想看各个线程池的运行情况对吧,这个实现起来也很简单。

其实dubbo本身提供了一个map存放线程池,然后根据handler以及端口等参数从中取(虽然实际上如果你使用默认参数来配置的话里面就一个线程池而已)。

综上,我们也没必要造轮子,直接把我们创建的自定义线程池扔进dubbo的线程池repo里去就行了,这样在监控线程池的时候,只用从一个地方读取参数,简单多了。

这里放一个获取dubbo线程池map的代码,因为本身是private的,所以需要反射来获取。

至于监控,就写个定时任务读取线程池,然后往promethus中扔就行啦~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 获取Dubbo的线程池map
* <br/>
* 获取到后将dpp创建的线程池放入该map中,用作监控
*
* @return Dubbo的线程池map
*/
@SuppressWarnings("all")
private static ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> getDubboThreadPoolMap() {
if (executorServiceMap != null) {
return executorServiceMap;
}
ExecutorRepository executorRepository =
ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
if (!(executorRepository instanceof DefaultExecutorRepository)) {
return null;
}
Field data;
try {
data = DefaultExecutorRepository.class.getDeclaredField("data");
data.setAccessible(true);
synchronized (DppSceneThreadPoolRepo.class) {
if (executorServiceMap != null) {
return executorServiceMap;
}
executorServiceMap =
(ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>>) data.get(executorRepository);
return executorServiceMap;
}
} catch (NoSuchFieldException | IllegalAccessException e) {
log.warn("无法获取dubbo线程池map[{}],线程池监控会受到影响", e.getMessage());
return null;
}
}

3、上线以及后记

到这里这篇文章基本就结束了,我们最后实现了一个:动态dubbo线程池,同时还具备场景隔离功能,然后带线程池详细数据监控,可以说基本完美了。

其实我们还可以继续举一反三,将服务中的各个线程都做到精细化、动态化配置管理以及监控,从而更好地掌握服务的运行情况。

在写下这篇博客的时候,这个功能已经在生产环境运行了大半年了,其中经历了春节、618等大促,以及大大小小各种压测,提升了服务稳定性,看着监控调线程池也是真爽~


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!