Kafka源码学习:网络通信层(1)

Kafka源码学习:网络通信层(1)

因为工作过程中基本没有涉及到直接使用网络API编程的,所以借这次学习Kafka源码的机会,一起把Java NIO Api以及网络的编程模型套路一起学习一下。

本文主要目的是学习Kafka如何使用NIO与客户端通信的:包括连接的建立,数据的读取,给客户端的应答。

本文涉及的Kafka的源码版本为0.10.2。

文章比较长,不想看细节可以直接看总结

服务器的整体启动流程

服务器是在KafkaServer类的startUp方法中的ServerSocket.startUp方法拉起来。下面看下这个ServerSocket.startUp具体是怎么做的。

startUp根据配置信息中的每个Endpoint(Ip+Port)拉起一个Acceptor,然后这个Acceptor再拉起对应num.network.threads数目的Processor(这里的Acceptor和Processor都是runnable,都是拉起单独的线程跑的)

Acceptor的作用是接受请求,它在ServerChannel上注册了一个Selector,这个Selector注册了SelectionKey.OP_ACCEPT事件用来接受请求。如果有Accept进来,就使用RoundRobin的方式决定一个Processor,然后拿到客户端的Channel,配置好非阻塞模式,放入选出的Processor的newConnections队列里面。

Acceptor启动的时候会拉起num.network.threads数目的Processor。

这一块的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
     //初始化Acceptor与Processor
var processorBeginIndex = 0
config.listeners.foreach { endpoint =>
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
val processorEndIndex = processorBeginIndex + numProcessorThreads

for (i <- processorBeginIndex until processorEndIndex)
processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol)

val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
acceptors.put(endpoint, acceptor)
Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start()
acceptor.awaitStartup()

processorBeginIndex = processorEndIndex
}

//Acceptor拉起Processor
private[kafka] class Acceptor(val endPoint: EndPoint,
val sendBufferSize: Int,
val recvBufferSize: Int,
brokerId: Int,
processors: Array[Processor],
connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

private val nioSelector = NSelector.open()
val serverChannel = openServerSocket(endPoint.host, endPoint.port)

this.synchronized {
//拉起Processor
processors.foreach { processor =>
Utils.newThread(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
processor, false).start()
}
}


//Acceptor 的主逻辑
var currentProcessor = 0
while (isRunning) {
try {
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable)
accept(key, processors(currentProcessor))
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")

// round robin to the next processor thread
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
// to a select operation on a specific channel or a bad request. We don't want
// the broker to stop responding to requests from other clients in these scenarios.
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}


//Accept方法具体实现
def accept(key: SelectionKey, processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept() //这个socketChannel就是客户端的channel了
try {
connectionQuotas.inc(socketChannel.socket().getInetAddress)
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socketChannel.socket().setSendBufferSize(sendBufferSize)

debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
.format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id,
socketChannel.socket.getSendBufferSize, sendBufferSize,
socketChannel.socket.getReceiveBufferSize, recvBufferSize))

processor.accept(socketChannel)
} catch {
case e: TooManyConnectionsException =>
info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
close(socketChannel)
}
}

//processor.accept具体实现,其实是放入了newConnections队列里面
def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)
wakeup()
}

Processor的作用主要是处理新建的连接,读取客户端的请求,响应客户端,处理客户端关闭。
下面主要来说下Processor的构成还有是如何处理这些动作的。

Processor结构

Processor里面组成部分先介绍3个:一个Selector(这个selector是kafka包装过nioSelector的,里面定义了已收到和已发出的队列等等),一个上文提到的newConnections队列和一个RequestChannel。
这个RequestChannel是一个类,重要的结构有两个,一个RequestQueue ,一个ResponseQueues看名字就知道是用来存储请求和响应的。具体代码如下:

1
2
3
4
5
6
7
8


class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
private var responseListeners: List[(Int) => Unit] = Nil
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
for(i <- 0 until numProcessors)
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()

上文说过Processor是个Runnable,那么直接看它的run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
override def run() {
startupComplete()
while (isRunning) {
try {
// setup any new connections that have been queued up
configureNewConnections()
// register any new responses for writing
// 这个Response只是把response写到了transportLayer,并没有实际发送,实际发送是在下面的poll做的
processNewResponses()
poll()
processCompletedReceives()
processCompletedSends()
processDisconnected()
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause a bigger impact on the broker. Usually the exceptions thrown would
// be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
// or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
case e: ControlThrowable => throw e
case e: Throwable =>
error("Processor got uncaught exception.", e)
}
}

debug("Closing selector - processor " + id)
swallowError(closeAll())
shutdownComplete()
}

try里面的代码说明了processor做的事情,下面逐一说明

配置新进连接 ConfigureNewConnections

配置新连接主要做了一件事情就是把Acceptor放到newConnection队列里面的客户端的channel拿出来,然后把channel注册上Selector和对应的SelectionKey.OP_READ,这样在select的时候就能拿到客户端发出的请求了,然后把channel进行包装成KafkaChannel并attach到原始channel上。根据channel的基本信息构建一个key,放入Selector维护的ChannelMap里面。具体代码如下:

这里说下KafkaChannel这个类,这个类里面有个TransportLayer和一个Send属性。TransportLayerk可以由一个SelectionKey构建出来,他主要负责底层对channel的read,write,close等操作(因为之前把channel attach到SelectionKey上了所以可以拿到channel)。send属性是用来维护要发送的应答的。Kafka实际是一个request-response模型,一个channel同一时间只有一个请求,不会有多个,所以send只有一个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//配置新连接
private def configureNewConnections() {
while (!newConnections.isEmpty) {
val channel = newConnections.poll()
try {
debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
val localHost = channel.socket().getLocalAddress.getHostAddress
val localPort = channel.socket().getLocalPort
val remoteHost = channel.socket().getInetAddress.getHostAddress
val remotePort = channel.socket().getPort
val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
selector.register(connectionId, channel)
} catch {
// We explicitly catch all non fatal exceptions and close the socket to avoid a socket leak. The other
// throwables will be caught in processor and logged as uncaught exceptions.
case NonFatal(e) =>
val remoteAddress = channel.getRemoteAddress
// need to close the channel here to avoid a socket leak.
close(channel)
error(s"Processor $id closed connection from $remoteAddress", e)
}
}
}

public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
key.attach(channel);
this.channels.put(id, channel);
}

处理响应 ProcessNewResponses

根据当前到Processor的序号(上文提到了Processor是多个)从RequestChannel里面拿到属于自己的响应队列,然后拿出响应并根据responseAction做出响应。这里的responseAction有3种:

  1. NoOpAction 啥也不做,只把SelectionKey.OP_READ重新注册到selector上
  2. SendAction 根据response里面的destination(就是channelId)拿到对应的KafkaChannel对象,把要发送的Send对象放到KafkaChannel对应的Send字段上,然后把写操作打开( this.transportLayer.addInterestOps(SelectionKey.OP_WRITE)),注意并没有实际发送.然后在inflightResponses记录下这个响应
  3. CloseConnectionAction 把对应channel给close掉。

SendAction有关代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//processor.sendResponse
protected[network] def sendResponse(response: RequestChannel.Response) {
trace(s"Socket server received response to send, registering for write and sending data: $response")
val channel = selector.channel(response.responseSend.destination)
// `channel` can be null if the selector closed the connection because it was idle for too long
if (channel == null) {
warn(s"Attempting to send response via channel for which there is no open connection, connection id $id")
response.request.updateRequestMetrics()
}
else {
selector.send(response.responseSend)
inflightResponses += (response.request.connectionId -> response)
}
}

//selector.send
public void send(Send send) {
String connectionId = send.destination();
if (closingChannels.containsKey(connectionId))
this.failedSends.add(connectionId);
else {
KafkaChannel channel = channelOrFail(connectionId, false);
try {
channel.setSend(send);
} catch (CancelledKeyException e) {
this.failedSends.add(connectionId);
close(channel, false);
}
}
}

public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

这里明显能看出对于一个Channel上,send只能有一个

真正的读写 poll

poll方法通过对selector进行select来确定readyKeys的数量,如果大于0,就pollSelectionKeys,具体看下poll和pollSelectionKeys这个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
//processor.poll
private def poll() {
try selector.poll(300)
catch {
case e @ (_: IllegalStateException | _: IOException) =>
error(s"Closing processor $id due to illegal state or IO exception")
swallow(closeAll())
shutdownComplete()
throw e
}
}
//selector.poll
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");

clear();

if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
timeout = 0;

/* check ready keys */
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
}

addToCompletedReceives();

long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());

// we use the time at the end of select to ensure that we don't close any connections that
// have just been processed in pollSelectionKeys
maybeCloseOldestConnection(endSelect);
}

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
KafkaChannel channel = channel(key);

// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
if (idleExpiryManager != null)
idleExpiryManager.update(channel.id(), currentTimeNanos);

try {

/* complete any connections that have finished their handshake (either normally or immediately) */
//似乎是客户端连接服务器的逻辑
if (isImmediatelyConnected || key.isConnectable()) {
if (channel.finishConnect()) {
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
SocketChannel socketChannel = (SocketChannel) key.channel();
log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
socketChannel.socket().getReceiveBufferSize(),
socketChannel.socket().getSendBufferSize(),
socketChannel.socket().getSoTimeout(),
channel.id());
} else
continue;
}

/* if channel is not ready finish prepare */
if (channel.isConnected() && !channel.ready())
channel.prepare();

/* if channel is ready read from any connections that have readable data */
//从底层channel读,按照包来读
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
}

/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}

/* cancel any defunct sockets */
if (!key.isValid())
close(channel, true);

} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel, true);
}
}
}

这里主要看后面的两个逻辑读和写,前面的似乎是客户端的逻辑,因为这个类是公用的,目前还不确定.
先看读逻辑

1
2
3
4
5
6
7
8
9
10
11
12
channel.ready() && key.isReadable() && !hasStagedReceive(channel)
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
//addToStagedReceives方法
private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
if (!stagedReceives.containsKey(channel))
stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());

Deque<NetworkReceive> deque = stagedReceives.get(channel);
deque.add(receive);
}

可以看到如果channel已经就绪,同时可读,也没有未处理的请求,就把数据读出来,放到stagedReceives这个队列里面去.看到这个while似乎请求包可以有多个,但是前面我们已经看到了,应答包只能有1个,这个有点奇怪了。这里有一篇2015年1月的文章也是分析kafka网络层的,那个时候对于channel,read还只能read一个包.这里需要对照客户端的代码一起看,看一下是不是客户端也用了send这个结构,只能send一个。

channel.read方法读取数据是标准的按包读,先读4个字节来确定包长度,然后开对应大小的buffer把剩下的读进来。
具体的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public NetworkReceive read() throws IOException {
NetworkReceive result = null;

if (receive == null) {
receive = new NetworkReceive(maxReceiveSize, id);
}

receive(receive);
if (receive.complete()) {
receive.payload().rewind();
result = receive;
receive = null;
}
return result;
}

private long receive(NetworkReceive receive) throws IOException {
return receive.readFrom(transportLayer);
}

public long readFrom(ScatteringByteChannel channel) throws IOException {
return readFromReadableChannel(channel);
}

// Need a method to read from ReadableByteChannel because BlockingChannel requires read with timeout
// See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work
// This can go away after we get rid of BlockingChannel
@Deprecated
public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
int read = 0;
if (size.hasRemaining()) {
int bytesRead = channel.read(size); //这个size是一个长度为4的bytebuffer
if (bytesRead < 0)
throw new EOFException();
read += bytesRead;
if (!size.hasRemaining()) {
size.rewind();
int receiveSize = size.getInt();
if (receiveSize < 0)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
if (maxSize != UNLIMITED && receiveSize > maxSize)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");

this.buffer = ByteBuffer.allocate(receiveSize);
}
}
if (buffer != null) {
int bytesRead = channel.read(buffer);
if (bytesRead < 0)
throw new EOFException();
read += bytesRead;
}

return read;
}

@Override
public boolean complete() {
return !size.hasRemaining() && !buffer.hasRemaining();
}

这里可以看到如果网络层数据不完整,就是size或者buffer有remaining的时候,就返回了一个空的result,等下一次poll的时候再来。

下面的写方法就是直接把channel上持有的send发出去,然后放入completedSends队列里面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

//in selector poll
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
//channel.write
public Send write() throws IOException {
Send result = null;
if (send != null && send(send)) {
result = send;
send = null;
}
return result;
}



private boolean send(Send send) throws IOException {
send.writeTo(transportLayer);
if (send.completed())
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

return send.completed();
}

返回到poll方法,在pollSelectionKeys之后会通过 addToCompletedReceives方法把stagedReceive放到CompletedReceives里面去。注意这两个结构都是kafka自己包装的Selector类里面的属性(这个类里面有个nio的Selector)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void addToCompletedReceives() {
if (!this.stagedReceives.isEmpty()) {
Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
KafkaChannel channel = entry.getKey();
if (!channel.isMute()) {
Deque<NetworkReceive> deque = entry.getValue();
addToCompletedReceives(channel, deque);
if (deque.isEmpty())
iter.remove();
}
}
}
}

private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) {
NetworkReceive networkReceive = stagedDeque.poll();
this.completedReceives.add(networkReceive);
this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
}

处理收到的结果 processCompletedReceives

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//processor.processCompletedReceives
private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>
try {
val openChannel = selector.channel(receive.source)
val session = {
// Only methods that are safe to call on a disconnected channel should be invoked on 'channel'.
val channel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress)
}
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
buffer = receive.payload, startTimeMs = time.milliseconds, listenerName = listenerName,
securityProtocol = securityProtocol)
requestChannel.sendRequest(req)
selector.mute(receive.source)
} catch {
case e @ (_: InvalidRequestException | _: SchemaException) =>
// note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
error(s"Closing socket for ${receive.source} because of error", e)
close(selector, receive.source)
}
}
}

这个方法就是把之前selector读到的请求包装城session对象和request对象,放入一开始说的requestChannel队列里面。然后把对应的channel mute掉,就是transportLayer.removeInterestOps(SelectionKey.OP_READ);

处理已经发送的请求 processCompletedSends

1
2
3
4
5
6
7
8
9
10
//processor.processCompletedSends
private def processCompletedSends() {
selector.completedSends.asScala.foreach { send =>
val resp = inflightResponses.remove(send.destination).getOrElse {
throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
}
resp.request.updateRequestMetrics()
selector.unmute(send.destination)
}
}

这个方法就是根据processor.poll方法中已经成功发送应答列表,把在ProcessNewResponses这一步放入的inflightResponses的response给remove掉,然后unmute,即使把OP_READ给加回来。

处理已经断开的连接processDisconnected

这个方法就是做一些清理工作,把还没发送的响应给remove掉。connectionQuotas减掉。其中selector的disconnected队列会在客户端发起close,或者客户端超时,或者poll方法在清理过期连接等等情况的时候被加入。
selector的close会先把channel给close掉,再放入这个队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//processor.processDisconnected
private def processDisconnected() {
selector.disconnected.asScala.foreach { connectionId =>
val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
}.remoteHost
inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics())
// the channel has been closed by the selector but the quotas still need to be updated
connectionQuotas.dec(InetAddress.getByName(remoteHost))
}
}

//selector.doClose
private void doClose(KafkaChannel channel, boolean notifyDisconnect) {
try {
channel.close();
} catch (IOException e) {
log.error("Exception closing connection to node {}:", channel.id(), e);
}
this.sensors.connectionClosed.record();
this.stagedReceives.remove(channel);
if (notifyDisconnect)
this.disconnected.add(channel.id());
}

总结一下整个流程:

服务器通过一个Acceptor来接受请求,多个Processor来处理读写。Processor里面维护这kafka封装过的selector。Selector维护着注册在上面的channel,并且处理对于channel的实际读写。

Acepptor把接受到的连接交给Processor的newConnection队列,Processor拿到连接并注册到自己的Selector上。

Processor通过requestChannel(里面包含了请求与响应2个队列)拿到自己要处理的响应,通过selector发出,selector这时候只是找到对应的Channel并设置send属性,并打开OP_WRITE,并记录到inflightResponses队列。

然后processor通过poll方法,实际调用selector的poll方法把客户端的请求读出来放入CompletedReceives队列,把对客户端的响应写到客户端,并移除OP_WRITE,放入completedSends队列。

接着Processor把从CompletedReceives读到的请求构造成request,扔到requestChannel队列里面去,然后移除OP_READ(读完了,请求还没处理,不能在读了)

然后Processor从completedSends队列连取出之前成功发出的响应,把对应inflightResponses移除,同时打开OP_READ。(写好了响应,可以接着接受请求了)

最后处理连接断开,把对应inflightResponses里面的响应移除。

可以看到Selector自己维护着channels,completedSends,completedReceives,processor从selector连获取这些信息,而且发送与接受实际完成者都是Seletor。

从性能上来说,selector有多个,可以增加读写性能,因为建立连接远小于在连接上读写数据。

接下来会写一下接受到的请求如何与上层API交互。