2/25/2009

Tomcat6中Http11Protocol的线程模型

记得大学期间拨号上网,查询信息还是通过Yahoo和Excite这两个网站.记得为了了解电脑上AGP接口和IDE接口有什么不同,自己会花好长时间去查询,研究.很多新事物,新科技产品的工作原理都可以在 HowStuffWorks找到答案,Howstuffworks的动画或者图解能够帮助我们理解一个高科技设备的基本原理,所以这个网站至今一直收藏,阅读。昨天突然想查一下Tomcat6中Http11Protocol中是如何工作的。 Howstuffworks没有software的专栏,如果有的话,一定可以从这个网站找到满意的答案。昨天的问题没有通过Google找到答案,所以开始Dig code ...

在tomcat启动时候,会把Connector, Host, Engine, Wrapper等对象都创建出来。当给Connector的protocol配置成Http1.1 时, Connector会调用Http11Protocol,直到servlet.service().



上图是tomcat6中接受和处理socket请求(Http请求)有关的类. Connetor是连接处理的控制类,在配置文件中的<connector/>都是这个类来负责处理。 如果配置的protocol是Http/1.1, Connector则会启动Http11Protocol. Http11Protocol会创建JIoEdpoint来正在接受socket并且分配到具体的线程中处理。 Acceptor用来接受Socket请求,WorkerStack是Tomcat自己内部的线程池。Tomcat6同时也支持Executor线程池。 在每个线程中会调用Http11ConnectionHandler来处理每个socket,如果是servlet请求,则会调用CoyoteAdapter.process去执行servlet.service().

在JIoEndpint中通过createWorkerThread()方法来将创建好的woker, offer()到WorkerStack中。Woker在创建的同时start(),但run()方法会block直到assin()方法被调用,Woker才处理socket对象。 Acceptor接受到socket后,通过调用getWorkerThread().assign(socket); 来激活worker.run()方法:

public void run() {

// Process requests until we receive a shutdown signal
while (running) {

// Wait for the next socket to be assigned
Socket socket = await();
if (socket == null)
continue;

// Process the request from this socket
if (!setSocketOptions(socket) || !handler.process(socket)) {
// Close socket
try {
socket.close();
} catch (IOException e) {
}
}

// Finish up this request
socket = null;
recycleWorkerThread(this);

}

}

从以上代码可以看出socket的真正的处理是在HttpConnectionHandler中。 HttpConnectionHandler是Http11Protocol中的static class,它由一个Http11Protocol创建。在HttpConnectionHandler中有Http11processor队列,Http11Processor可以设置Adapato。在创建 Http11processor时,会将Http11protocol上的CoyoteAdapter设置给Http11Processor。在运行HttpConnectionHandler.process()时会调用Http11Processor的process(socket)方法,这个方法最终会调用CoyoteAdpater.service()方法,从而完成调用serlvet上的service方法。在这几个类中,用的最多的就是线程池和对象池。Tomcat中对于需要为每个线程创建的对象都会建立对象池来循环使用,这样可以节省创建对象的时间:

protected static class Http11ConnectionHandler implements Handler {

protected Http11Protocol proto;
protected AtomicLong registerCount = new AtomicLong(0);
protected RequestGroupInfo global = new RequestGroupInfo();

protected ConcurrentLinkedQueue recycledProcessors =
new ConcurrentLinkedQueue() {
protected AtomicInteger size = new AtomicInteger(0);
public boolean offer(Http11Processor processor) {
boolean offer = (proto.processorCache == -1) ? true : (size.get() < proto.processorCache);
//avoid over growing our cache or add after we have stopped
boolean result = false;
if ( offer ) {
result = super.offer(processor);
if ( result ) {
size.incrementAndGet();
}
}
if (!result) unregister(processor);
return result;
}