Netty中组件及源码
关于Netty中重要的组件 EventLoop, Pipeline , Handler , FastThreadLocal ,ByteBuf ,Allocator等组件源码分析。
1. 组件
1.1 DefaultChannelPipeline 组件
Pipeline是Netty异步处理的核心,开发者实现将Handler(处理回调)注册到Pipeline中。 Pipeline注册到EventLoop中,
当io.netty.channel.Channel监听到了IO事件以后,会将Message传播到 Pipeline中。
DefaultChannelPipeline是 ChannelPipeline的默认实现。
整个Pipeline就是责任链模式。
责任链设计思路的核心就是:每个handler都处理自己份内的事儿,当前的handler处理消息完毕,event传递到下一个handler。
使用责任链值得注意的点: Handler的先后顺序有讲究,需要特别注意前后Handler对Message的传递情况。
Netty有入站、出站2种event区分。 “引用计数对象”的retain 和 release。
fireChannelRead() 方法
Pipeline的fireChannelRead() 和 context 的fireChannelRead() 传递源头是不同的。
pipeline 从head传播
context 从下一个handler开始传播
每一个io.netty.channel.Channel都有独立的Pipeline。每一个Pipeline的Handler链都有 Head 和 Tail节点。
pipeline 从head传播:
1 | |
context 从下一个handler开始传播:
1 | |
DefaultChannelPipeline写出数据的流程
channel.writeAndFlush() 调用了AbstractChannel.write(Object,boolean,ChannelPromise)和 AbstractChannel.flush()两个方法。
flush()方法做的操作就是将 ChannelOutboundBuffer内保存的消息标记输出,并调用AbstractChannel.flush0()方法,向远程的socket写出数据。
如果调用pipeline的write方法,消息将通过TailContext层层向上传播,直到HeadContext 统一写出为止。
下面是源码分析:
1 | |
写出数据是否有线程安全问题?
write() 、writeAndFlush() 方法,都会将写出的消息(Object对象) 通过pipeline层层处理传递,最终交付给 HeadContext对象。
虽然每个ChannelHandler可能绑定了不同的 executor,但最终写出操作都由HeadContext来完成。 HeadContext对象由pipeline创建,它绑定的executor是reactor线程。因此不会出现线程安全问题。
writeAndFlush比write多了什么?
AbstractChannel内拥有一个 Unsafe对象,它来完成数据的交换任务。
数据写出时,有一个缓冲区(ChannelOutboundBuffer),如果消息不flush的话,消息不会向socket发出。
write()消息时,会把消息添加到ChannelOutboundBuffer, 当调用flush以后,会把buffer内所有消息都发射出去。
HeadContext 的write()方法由Netty的Unsafe接口写出数据。
Unsafe负责io.netty.channel.Channel中底层的IO操作。
Unsafe 有一个 ChannelOutboundBuffer对象,是channel写出数据的缓冲区。
1 | |
1 | |
当真正调用了 AbstractUnsafe.flush() 标记消息的flush状态,并flush0() 以后,才会调用doWrite()写到remote socket
1 | |
1.2 TailContext 组件
DefaultChannelPipeline的尾节点类型 TailContext。
在Handler消费一个ReferenceCounted对象时,必须要先调用retain()防止它被意外回收。
这么做的原因在于,DefaultChannelPipeline的尾节点,默认会释放1次ReferenceCounted对象的引用。
1 | |
1.3 DefaultChannelHandlerContext 组件
默认的HandlerContext对象。
由于Pipeline的存在,在这条链上的Handler都有上下游的关系。 而有些Handler是 Shareable的,向上下游传播状态信息需要则需要借助 Context。
万幸的是Netty提供了这样的功能(当然,对于非Shareable的Handler , Netty更推荐成员内部类的方式,它更直观简洁)。
使用 attr() 在上下文之间传递状态。
1 | |
context的attr绑定在了 Channel中 , io.netty.channel.Channel 继承了DefaultAttributeMap。
DefaultAttributeMap 是属性存储的实现。
1.4 ChannelOutboundBuffer
为什么要设计这个Buffer
首先他是一个buffer,拥有buffer的作用和好处。
通过高水位线,控制channel是否不可写。
当超过高水位线后。 unwritable 标志位将设置非0值。 只有unwritable==0时才表示channel可写。
触发
ChannelWritabilityChanged(通道可写状态改变事件)。Netty设计了一个 channel可写数据大小的水位线。
在ChannelOutboundBuffer中的消息实际还没有被写入socket中。
当ChannelOutboundBuffer中等待写的数据大小超过水位线以后,将更新 unwritable 的状态。当由0变化为>0时,将会触发
ChannelWritabilityChanged事件。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74//ChannelOutboundBuffer
//添加消息。但没有flush。主要是在更新 消息Entry的状态。
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
}
if (msg instanceof AbstractReferenceCountedByteBuf) {
((AbstractReferenceCountedByteBuf) msg).touch();
} else {
ReferenceCountUtil.touch(msg);
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(entry.pendingSize, false);
}
//增加 还没输出数据的占用的字节数量。
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
//当占用字节数量超过了高水位。
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
//更改 unwritable状态位的值
setUnwritable(invokeLater);
}
}
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1; //按位或。 无论原本值是几,最终结果值一定大于等于1
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0) {
//当由0变为其他值时,触发通道可写性事件。是否稍后执行传播。false表示立即传播。
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
private void fireChannelWritabilityChanged(boolean invokeLater) {
final ChannelPipeline pipeline = channel.pipeline();
if (invokeLater) {
//稍后执行,添加到任务队列里。
Runnable task = fireChannelWritabilityChangedTask;
if (task == null) {
fireChannelWritabilityChangedTask = task = new Runnable() {
@Override
public void run() {
pipeline.fireChannelWritabilityChanged();
}
};
}
channel.eventLoop().execute(task);
} else {
//立即执行(会在addMessage时立即执行)。
pipeline.fireChannelWritabilityChanged();
}
}
写数据流程
通过AbstrachChannel的writeAndFlush() 方法,我们知道期望写出的Messge,先通过pipeline的tail层层传递到 headContext,
然后通过 ChannelOutboundBuffer,触发高水位线,改变 isWritable()的返回值。
随后,调用了留给子类实现的抽象方法AbstractChannel.doWrite() 来写出数据。
下面,我们看一下NioSocketChannel.doWrite()的实现。
对于Nio来说最终调用写出数据一定是 java.nio.channels.SocketChannel.write()方法。
TCP的 SO_SNDBUF参数会随着OS而改变,因此Netty为OS进行调整适配。
1 | |
1.5 FastThreadLocal
FastThreadLocal是 ThreadLocal的一个变种。 使用FastThreadLocalThread(继承自Thread,内部有InternalThreadLocalMap)来访问FastThreadLocal时可以提供更好的访问性能 。
FastThreadLocal为什么快?
FastThreadLocal 通过索引常量,从数组中读取数据。 而不是使用 hashcode ,hash table 来寻找变量 ,每一个ThreadLocal对象都存储了一个int类型的索引。表示他在InternalThreadLocalMap中的位置。
虽然比使用hashtable只有轻微的性能优势, 但在频繁访问ThreadLocal的场景下,非常有用。
为了使用这样的优势(FastThreadLocal),你必须使用 FastThreadLocalThread或它的子类。
默认情况下,由io.netty.util.concurrent.DefaultThreadFactory创建的线程都是 FastThreadLocalThread 的原因也是如此(获得更高的ThreadLocal性能)。
reference https://zhuanlan.zhihu.com/p/662172520
必须是Thread的子类FastThreadLocalThread才可以使用 FastThreadLocal。
每一个FastThreadLocal对象都有一个 InternalThreadLocalMap的内部成员,以及 一个 Int类型表示index , 在InternalThreadLocalMap指定索引的位置存放ThreadLocal存储的数据。
详细流程参看
ThreadLocal.get()方法
FastThreadLocal 允许存储null值 ,null代表了一种合法的存储值。
InternalThreadLocalMap
FastThreadLocal内部存储映射,是FastThreadLocal的设计核心。
所有的存储对象,都存放在了 Object[] 数组中。
1 | |
get()
首先拿到FastThreadLocalThread中的存储结构 InternalThreadLocalMap ,然后在 index位置上,获取数据。
若该位置为UNSET表示这个ThreadLocal还没有设置值或者值已经被清空。(注意,FastThreadLocal中可以存储一个null ,null可以拥有实际的业务意义。)
没有设置值,或者值被清空时, 存储的是UNSET对象。 我们可以ThreadLocal.set(null);来表示业务上这个值设置过,并且为null。
1 | |
removeAll()
移除绑定到当前线程的全部ThreadLocal。
ThreadLocalMap中有一个slot存储了所有的ThreadLocal。 Slot中存放了一个Set<ThreadLocal> ,在 ThradLocal.set()时都会添加到Set中。
1 | |