Kafka源码学习:网络通信层(2)

Kafka源码学习:网络通信层(2)

上一篇里面讲了Kafka的服务端是如何收发请求的,这一篇主要学习一下网络层收到的数据是如何跟业务逻辑层打交道的

上篇文章讲到了processCompletedReceives方法里面把completedReceives队列里面的NetworkReceive拿出来组成了RequestChannel.Session对象,然后把这个Session对象放到了RequestChannel.Request对象里面。
其中Session由一个principal和一个远端地址构成

Request则是由之前读到的请求的payload(NetworkReceive.payload)和当前的processorID,connectionId等信息组成,还有header:RequestHeader跟body:AbstractRequest两个成员。使用payload构造header和body。
请求头里面包含了api_key,api_version等信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class RequestHeader extends AbstractRequestResponse {

private static final Field API_KEY_FIELD = REQUEST_HEADER.get("api_key");
private static final Field API_VERSION_FIELD = REQUEST_HEADER.get("api_version");
private static final Field CLIENT_ID_FIELD = REQUEST_HEADER.get("client_id");
private static final Field CORRELATION_ID_FIELD = REQUEST_HEADER.get("correlation_id");

private final short apiKey;
private final short apiVersion;
private final String clientId;
private final int correlationId;
//省略
}

请求体根据请求头中不同的apiKey来构造不同的请求体。

1
2
3
4
5
6
7
8
9
10
// AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
public static AbstractRequest getRequest(int requestId, short versionId, ByteBuffer buffer) {
ApiKeys apiKey = ApiKeys.forId(requestId);
switch (apiKey) {
case PRODUCE:
return ProduceRequest.parse(buffer, versionId);
case FETCH:
return FetchRequest.parse(buffer, versionId);
case LIST_OFFSETS:
//以下省略

这样一个完整的request就构造出来了,构造出来之后,放入了这个processor所有的请求队列里面(RequestChannel.requestQueue.put(request)),processCompletedReceives方法结束。

放入队列之后谁又来拿出来呢?

放入队列的请求被一个叫KafkaRequestHandler的类拿出来并且处理,代码如下:(我通过IDE的OPEN CALL hierarchy找到的)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

/**
* A thread that answers kafka requests.
*/
class KafkaRequestHandler(id: Int,
brokerId: Int,
val aggregateIdleMeter: Meter,
val totalHandlerThreads: Int,
val requestChannel: RequestChannel,
apis: KafkaApis,
time: Time) extends Runnable with Logging {
this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "

def run() {
while(true) {
try {
var req : RequestChannel.Request = null
while (req == null) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
val startSelectTime = time.nanoseconds
req = requestChannel.receiveRequest(300)
val idleTime = time.nanoseconds - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
}

if(req eq RequestChannel.AllDone) {
debug("Kafka request handler %d on broker %d received shut down command".format(
id, brokerId))
return
}
req.requestDequeueTimeMs = time.milliseconds
trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
apis.handle(req)
} catch {
case e: Throwable => error("Exception when handling request", e)
}
}
}

def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
}


class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
time: Time,
numThreads: Int) extends Logging with KafkaMetricsGroup {

/* a meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)

this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
val threads = new Array[Thread](numThreads)
val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
threads(i).start()
}

def shutdown() {
info("shutting down")
for(handler <- runnables)
handler.shutdown
for(thread <- threads)
thread.join
info("shut down completely")
}
}

可以看到这是一个runnable,并且通过while(true)不停的拿请求出来,最后交给 apis.handle(req)方法来处理这个请求。

这个KafkaRequestHandler被下面的KafkaRequestHandlerPool给拉起来的。可以从代码看到是直接拉起了numThreads个线程,共用一个apis和requestChannel实例。

这个api跟kafkaRequestHandlerPool都是在kafkaServer.startup里面拉起来的。

1
2
3
4
5
6
7
8
//kafkaServer.startup方法
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator,
kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
clusterId, time)

requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
config.numIoThreads)

从上面的代码可以看到KafkaRequestHandlerPool这个处理请求的线程池的requestChannel是从socketServer里面直接拿到的,这个池的大小就是config.numIoThreads,与processor数量一致。

现在回到KafkaRequestHandler里面,看他的run方法最后是apis.handle(req),这里简单看一下是如何处理请求的。

1
2
3
4
5
6
7
8
9
10
11
12
13
def handle(request: RequestChannel.Request) {
try {
trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
ApiKeys.forId(request.requestId) match {
case ApiKeys.PRODUCE => handleProducerRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
//后面省略
}
}
}

可以这里是根据requestId来的一个模式匹配(可以理解成java里面的switch,scala没有switch)。然后进入对应的handleXXX函数,在具体的业务逻辑完成之后,调用

1
requestChannel.sendResponse(new RequestChannel.Response(request, respBody))

把响应发送到对应processor的响应队列中。然后响应被上一篇文章说的流程给发送出去.
handleXXX里面由于是多线程共同调用的,不少代码都是有锁来保护的。

总结一下:
Processor线程负责解析请求,并把请求放入到请求队列中,另一个HandlerPool不停的从请求队列里面拿出已经解析好的请求,并执行对应的业务逻辑,完成业务逻辑之后把响应组成response放到对应processorID的响应队列中,由对应的processor发出去。
这个handlerPool的大小与IO线程数目一致。