Rust Shrugged

Programming and a free mind

Netty(二)线程模型与 Bytebuf

ServerBootStrap 中的 EventLoopGroup

ServerBootStrap 实际上内部是两个 group,解释:

Most of the times using the same group for accepting and handling the accepted connections is working out quite well and so allows you to save some threads. The only time when you may not want to do this is if the handling logic of the accepted connections will keep the EventLoops so busy that you are not able to accept fast enough.

Norman SO answer

Vert.x 是一个基于 Netty 的框架,但是其初始化是将一个线程组(EventLoopGroup)同时用于 boss 与 worker。Norman回答到,这种方法大多数时候都挺不错,并且节省了一些线程。我认为这里说的节省了的线程是 boss 线程,在没有新的连接进来的时候可以参与到 handler 的任务中。然而,当 handler 要处理的任务消耗太多 CPU 时间时,EventLoopGroup 中的所有 EventLoop,也就是线程,会全都忙于处理任务,不能很快地、甚至无法 accept 新进来的连接。

对于 boss EventLoopGroup,什么情况下使用超过一个线程?

在构造函数中可以选择线程数:NioEventLoopGroup(threadsAmount)。但是,既然 boss group 的职责只是接受 accept 新连接并且分发,按照 Proactor 的模型,一个线程足矣。当一个 EventLoopGroup 被多个 ServerBoostrap 分享,则按需增加线程。

EventLoopGroup 默认线程数

如果在 EventLoopGroup 初始化时没有指定线程数,则由以下的静态初始块设置默认线程数。代码出自MultithreadEventLoopGroup.java 第 40 行。

1
2
3
4
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
// 在本人的双核机器上,有四个逻辑处理器,乘以二之后 DEFAULT_EVENT_LOOP_THREADS 为 8

ChannelHandler 与 EventLoopGroup 中的线程

用自己语言更具体解释一下“Netty 4中的Handler处理在IO线程中,如果Handler处理中有耗时的操作(如数据库相关),会让IO线程等待,影响性能。”

我们之前已经阐明了不要阻塞当前 I/O 线程的重要性。我们再以另一种方式重申一次:“永远不要将一个长时间运行的任务放入到执行队列中,因为它将阻塞需要在同一线程执行的任何其他任务。” 如果必须要进行阻塞调用或者执行长时间运行的任务,我们建议使用一个专门的 EventExecutor。(见 6.2.1 节的“ChannelHandler 的执行和阻塞”)。

– 摘自 Netty in Action P96

![EventLoop 执行逻辑](/content/images/2019/EventLoop 执行逻辑.png)

总结一下,EventLoop 中的线程又被称为 I/O 线程,一个 Channel 中的所有事件(出站/入站)都只会被其所绑定的这个 I/O 线程处理。一个 Channel 永远一个 EventLoop I/O 线程绑定,而一个 I/O 线程则可能被分配给多个 Channel。如果这个 I/O 线程陷入执行一个 CPU 时间耗时长的任务,所有与之关联的 Channel 中的任务都会被搁置。

源码如下,来自 AbstractChannelHandlerContext.java#write()。在执行写任务时,先判断了执行线程是否是该 Channel 绑定的线程,如果是则交由执行,如果不是则新增一个 WriteTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
final AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
...
...
}

Netty 3 中的不和谐

这里需要注意一下,某个给定的 EventLoop 中的所有事件,都是由同一个线程处理。这是 Netty 4 对 Netty 3 的改进。在 Netty 3 中,出站事件可以由 I/O 线程执行,也可以由调用线程执行,这就使得 Race Condition 成为可能,比如一个 Channel 上的 Channel.write() 被同时调用时。

官方 Wiki 上有一个 Thread model 的页面是针对 Netty 3 的过期信息,可以看看当时开发者对这个问题的考量。

Bytebuf 源码

UnpooledHeapByteBuf 基于 JVM 堆内存进行内存分配,每次使用,都从头创建一个新的 实例。性能:相比堆外内存的分配和释放,成本仍然更低一些。

UnpooledHeapByteBuf 拥有三个成员变量:

1
2
3
private final ByteBufAllocator alloc;  // 用于分配内存
private byte[] array; // 以数组作为缓冲区
private ByteBuffer tmpNioBuf; // ByteBuf 与 ByteBuffer 转换时使用

注意这里为了提升性能与位操作,Netty 并没有直接使用原生的 ByteBuffer,而是使用数组作为缓存。

java.io 梳理与其中的三种设计模式

Reader 抽象类下的类为例。它们的主要构造函数如下:

1
2
3
4
5
6
FilterReader(Reader)
BufferedReader(Reader)

FileReader(File)
StringReader(String)
InputStreamReader(InputStream [, Charset])

Reader UML

大致上,java.io 可以分为:

  1. 字节流操作:InputStreamOutputStream
  2. 字符操作:ReaderWriter
  3. 针对磁盘操作:File

适配器模式 Adapter pattern

InputStreamReader 是字节流与字符流之间的桥梁,它的实现使用了适配器模式。适配器模式通过聚合将某个接口适配成为另一个接口来方便使用,类图如下:

InputStreamReader Adapter Pattern

在开头的代码可以看到,InputStreamReader 的构造器需要传入一个字节流 InputSream,以及可选的字符集。如果没有传入字符集就会使用默认的。通过持有一个 InputStreamInputStreamReader 拥有了该接口的功能。同时,InputStreamReader 还是一个实现了 Reader 类,这样一来它就把 InputStream 适配成了一个 Reader 类。

装饰器模式 Decorator pattern

这个应该是 java.io 更为人熟知的,装饰器模式与适配器模式都可以看作包装模式 Wrapper。通过对类的层层包裹,装饰器模式增强了目标类的功能。类图示意如下:

IO Decorator Pattern

DecoratorComponent 的实现类,但是其内部也持有一个 Decorator 类。不同的 Decorator 类下的 ConcreteDecorator 子类包含了需要增强的功能。

java.io 中的典型使用:

1
2
FilterReader pbr = new PushbackReader(
new BufferedReader(new InputStreamReader(System.in)))

二者的不同

装饰器模式更多的是对目标类的功能上的增强;适配器没有对功能进行增进,只是为了使目标类能适应调用方的接口,方便被调用。

模板方法

在模板方法模式中,大的算法框架在抽象父类中被定义好,某些业务相关的实现则延迟到子类。

Reader 抽象类中,read 方法的默认实现都留给了被重载的 read(char[], int, int),如下:

1
2
3
4
int read()
int read(char[] cbuf)

abstract int read(char[] cbuf, int offset, int len)

具体的 read 的实现留给子类。

可变长泛型参数与堆污染

定义

什么是堆污染

一个类型为泛型 T 的变量被指向一个类型不是 T 的对象,这就是堆污染。典型的情况是

1
2
3
4
List<T> listOfTs = new ArrayList<>(Arrays.asList(t));
List<E> listOfEs = (List<E>)(Object)listOfTs; // 此时指向了含有 T 的 List, 但是不会抛出异常,只是在 Object -> List<E> 的转型中抛出警告

E e = listOfEs.get(0); // 运行时错误,java.lang.ClassCastException

之所以编译器无法在转型时检查出类型转换错误,是因为 Java 泛型是通过类型擦除实现的,在泛型代码内部无法获得任何有关泛型参数类型的信息。

一个更好一点的关于可变长泛型参数和堆污染一起的例子,来自 Oracle 官方文档

1
2
3
4
5
public static void faultyMethod(List<String>... l) {
Object[] objectArray = l; // Valid
objectArray[0] = Arrays.asList(42);
String s = l[0].get(0); // ClassCastException thrown here
}

关于泛型的几大限制中,最明显的一个就是不能创造泛型数组,如

1
List<Integer>[] arrayOfLists = new List<Integer>[2];  // compile-time error

但是这一限制在可变长的泛型参数中被打破了。在可变长参数中,一个数组会在函数调用时建立(Effective Java,Item 53),每次调用都会引起一次数组分配和初始化。当可变长参数是泛型时,一个泛型数组就被创造出来了。

实操时的问题

我在 tiny-spring 中写了一个方法,给定注解参数,获取所有带有该注解的类。

1
2
3
4
5
6
7
8
9
10
11
12
13
public static Set<Class<?>> getClassSetByAnnotation(Class<? extends Annotation>... cls) {
Set<Class<?>> classSet = new HashSet<>();

for (Class<?> c : getClassSet(basePkg)) {
for (Class<? extends Annotation> annotationCls : cls) {
if (c.isAnnotationPresent(annotationCls)) {
classSet.add(c);
}
}
}
return classSet;
}

这里就出现了一个可变长泛型参数的问题。另一个隐藏的问题是,这个方法并没有对传入的注解参数的数量检查参数为零的情况,而可变长参数允许传入零个参数。一个优美一点的解决方案是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@SafeVarargs
public static Set<Class<?>> getClassSetByAnnotation(Class<? extends Annotation> firstCls, Class<? extends Annotation>... cls) {
Set<Class<?>> classSet = new HashSet<>();

for (Class<?> c : getClassSet(basePkg)) {
if (c.isAnnotationPresent(firstCls)) {
classSet.add(c);
}
for (Class<? extends Annotation> annotationCls : cls) {
if (c.isAnnotationPresent(annotationCls)) {
classSet.add(c);
}
}
}
return classSet;
}

这样既省去明显的检查参数数量,又能在调用处强制调用者传入至少一个参数,这也是这个方法的本意。

@SafeVarargs 的使用标准

如上所见,可以将报警用 @SafeVarargs 注解掉。但是我们知道,这个注解仅仅是消除编译警告,完全不能保证消除这个错误。那么什么时候应该使用这个注解呢?

在 StackOverflow 有一个票的回答

If your method has an argument of type T... (where T is any type parameter), then:

  • Safe: If your method only depends on the fact that the elements of the array are instances of T
  • Unsafe: If it depends on the fact that the array is an instance of T[]

Effective Java 也表示了类似的意思:如果你的方法使用可变长泛型参数的时候,只是用来一次传入多个参数,并且实际使用的时候只是把它们当作多个参数来看,那么就没有问题。如果你的方法体中把可变长参数当作数组来使用,那就会有问题,不要使用这个注解。

0%