也是因为之前自己的不谨慎, 在写 Java 编程方法论 - Reactor 与 Webflux 的时候, 因觉得 tomcat 关于 connector 部分已经有不错的博文了, 草草参考了下, 并没有对源码进行深入分析, 导致自己在录制分享视频的时候, 发现自己文章内容展现的和源码并不一致, 又通过搜索引擎搜索了一些中文博客的文章, 并不尽如人意, 索性, 自己的就通过最新的源码来重新梳理一下关于 tomcat connector 部分内容, 也是给自己一个警醒, 凡事务必仔细仔细再仔细!
参考源码地址: https://github.com/apache/tomcat
Tomcat 的启动过程详解
### 启动与结束 Tomcat 基本操作
在 Linux 系统下, 启动和关闭 Tomcat 使用命令操作.
进入 Tomcat 下的 bin 目录:
cd /java/tomcat/bin
启动 Tomcat 命令:
./startup.sh
停止 Tomcat 服务命令:
./shutdown.sh
执行 tomcat 的 ./shutdown.sh 后, 虽然 tomcat 服务不能正常访问了, 但是 ps -ef | grep tomcat 后, 发现 tomcat 对应的 java 进程未随 Web 容器关闭而销毁, 进而存在僵尸 java 进程. 网上看了下导致僵尸进程的原因可能是有非守护线程 (即 User Thread) 存在, jvm 不会退出(当 JVM 中所有的线程都是守护线程的时候, JVM 就可以退出了; 如果还有一个或以上的非守护线程则 JVM 不会退出). 通过一下命令查看 Tomcat 进程是否结束:
ps -ef|grep tomcat
如果存在用户线程, 给 kill 掉就好了即使用 kill -9 pid
启动过程 Bootstrap 详解
我们接着从 startup.sh 这个 shell 脚本中可以发现, 其最终调用了 catalina.sh start , 于是, 我们找到 catalina.sh 里, 在 elif [ "$1" = "start" ] ; 处, 我们往下走, 可以发现, 其调用了 org.apache.catalina.startup.Bootstrap.java 这个类下的 start() 方法:
- /**
- * org.apache.catalina.startup.Bootstrap
- * Start the Catalina daemon.
- * @throws Exception Fatal start error
- */
- public void start()
- throws Exception {
- if( catalinaDaemon==null ) init();
- Method method = catalinaDaemon.getClass().getMethod("start", (Class [] )null);
- method.invoke(catalinaDaemon, (Object [])null);
- }
这里, 在服务器第一次启动的时候, 会调用其 init() , 其主要用于创建 org.apache.catalina.startup.Catalina.java 的类实例:
- /**
- * org.apache.catalina.startup.Bootstrap
- * Initialize daemon.
- * @throws Exception Fatal initialization error
- */
- public void init() throws Exception {
- initClassLoaders();
- Thread.currentThread().setContextClassLoader(catalinaLoader);
- SecurityClassLoad.securityClassLoad(catalinaLoader);
- // Load our startup class and call its process() method
- if (log.isDebugEnabled())
- log.debug("Loading startup class");
- Class<?> startupClass = catalinaLoader.loadClass("org.apache.catalina.startup.Catalina");
- Object startupInstance = startupClass.getConstructor().newInstance();
- // Set the shared extensions class loader
- if (log.isDebugEnabled())
- log.debug("Setting startup class properties");
- String methodName = "setParentClassLoader";
- Class<?> paramTypes[] = new Class[1];
- paramTypes[0] = Class.forName("java.lang.ClassLoader");
- Object paramValues[] = new Object[1];
- paramValues[0] = sharedLoader;
- Method method =
- startupInstance.getClass().getMethod(methodName, paramTypes);
- method.invoke(startupInstance, paramValues);
- catalinaDaemon = startupInstance;
- }
启动过程 Catalina 详解
Catalina 中 start 解读
接着, 在 Bootstrap 的 start()方法中会调用 Catalina 实例的 start 方法:
- /**
- * org.apache.catalina.startup.Catalina
- * Start a new server instance.
- */
- public void start() {
- if (getServer() == null) {
- load();
- }
- if (getServer() == null) {
- log.fatal(sm.getString("catalina.noServer"));
- return;
- }
- long t1 = System.nanoTime();
- // Start the new server
- try {
- getServer().start();
- } catch (LifecycleException e) {
- log.fatal(sm.getString("catalina.serverStartFail"), e);
- try {
- getServer().destroy();
- } catch (LifecycleException e1) {
- log.debug("destroy() failed for failed Server", e1);
- }
- return;
- }
- long t2 = System.nanoTime();
- if(log.isInfoEnabled()) {
- log.info(sm.getString("catalina.startup", Long.valueOf((t2 - t1) / 1000000)));
- }
- // Register shutdown hook
- if (useShutdownHook) {
- if (shutdownHook == null) {
- shutdownHook = new CatalinaShutdownHook();
- }
- Runtime.getRuntime().addShutdownHook(shutdownHook);
- // If JULI is being used, disable JULI's shutdown hook since
- // shutdown hooks run in parallel and log messages may be lost
- // if JULI's hook completes before the CatalinaShutdownHook()
- LogManager logManager = LogManager.getLogManager();
- if (logManager instanceof ClassLoaderLogManager) {
- ((ClassLoaderLogManager) logManager).setUseShutdownHook(
- false);
- }
- }
- if (await) {
- await();
- stop();
- }
- }
在这里面, 我们主要关心 load() , getServer().start() , 对于后者, 在它的前后我们看到有启动时间的计算, 这也是平时我们在启动 tomcat 过程中所看到的日志打印输出所在, 后面的我这里就不提了.
Catalina 中 load()解读
首先我们来看 load(), 这里, 其会通过 createStartDigester() 创建并配置我们将用来启动的 Digester, 然后获取我们所配置的 ServerXml 文件, 依次对里面属性进行配置, 最后调用 getServer().init() :
- /**
- * org.apache.catalina.startup.Catalina
- * Start a new server instance.
- */
- public void load() {
- if (loaded) {
- return;
- }
- loaded = true;
- long t1 = System.nanoTime();
- initDirs();
- // Before digester - it may be needed
- initNaming();
- // Set configuration source
- ConfigFileLoader.setSource(new CatalinaBaseConfigurationSource(Bootstrap.getCatalinaBaseFile(), getConfigFile()));
- File file = configFile();
- // Create and execute our Digester
- Digester digester = createStartDigester();
- try (ConfigurationSource.Resource resource = ConfigFileLoader.getSource().getServerXml()) {
- InputStream inputStream = resource.getInputStream();
- InputSource inputSource = new InputSource(resource.getURI().toURL().toString());
- inputSource.setByteStream(inputStream);
- digester.push(this);
- digester.parse(inputSource);
- } catch (Exception e) {
- if (file == null) {
- log.warn(sm.getString("catalina.configFail", getConfigFile() + "] or [server-embed.xml"), e);
- } else {
- log.warn(sm.getString("catalina.configFail", file.getAbsolutePath()), e);
- if (file.exists() && !file.canRead()) {
- log.warn(sm.getString("catalina.incorrectPermissions"));
- }
- }
- return;
- }
- getServer().setCatalina(this);
- getServer().setCatalinaHome(Bootstrap.getCatalinaHomeFile());
- getServer().setCatalinaBase(Bootstrap.getCatalinaBaseFile());
- // Stream redirection
- initStreams();
- // Start the new server
- try {
- getServer().init();
- } catch (LifecycleException e) {
- if (Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE")) {
- throw new java.lang.Error(e);
- } else {
- log.error(sm.getString("catalina.initError"), e);
- }
- }
- long t2 = System.nanoTime();
- if(log.isInfoEnabled()) {
- log.info(sm.getString("catalina.init", Long.valueOf((t2 - t1) / 1000000)));
- }
- }
这里, 这个 server 从哪里来, 我们从 digester.addObjectCreate("Server", "org.apache.catalina.core.StandardServer", "className"); 中可以知道, 其使用了这个类的实例, 我们再回到 digester.push(this); digester.parse(inputSource); 这两句代码上来, 可知, 未开始解析时先调用 Digester.push(this), 此时栈顶元素是 Catalina, 这个用来为 catalina 设置 server, 这里, 要对 digester 的解析来涉及下:
如解析到 <Server> 时就会创建 StandardServer 类的实例并反射调用 Digester 的 stack 栈顶对象的 setter 方法(调用的方法通过传入的 name 值确定).
digester 中涉及的 IntrospectionUtils.setProperty(top, name, value) 方法, 即 top 为栈顶对象, name 为这个栈顶对象要设置的属性名, value 为要设置的属性值.
刚开始时栈顶元素是 Catalina , 即调用 Catalina.setServer(Server object) 方法设置 Server 为后面调用 Server.start() 做准备, 然后将 StandardServer 对象实例放入 Digester 的 stack 对象栈中.
getServer().init()
接下来, 我们来看 getServer().init() , 由上知, 我们去找 org.apache.catalina.core.StandardServer.java 这个类, 其继承 LifecycleMBeanBase 并实现了 Server , 通过 LifecycleMBeanBase 此类, 说明这个 StandardServer 管理的生命周期, 即通过 LifecycleMBeanBase 父类 LifecycleBase 实现的 init() 方法:
- //org.apache.catalina.util.LifecycleBase.java
- @Override
- public final synchronized void init() throws LifecycleException {
- if (!state.equals(LifecycleState.NEW)) {
- invalidTransition(Lifecycle.BEFORE_INIT_EVENT);
- }
- try {
- setStateInternal(LifecycleState.INITIALIZING, null, false);
- initInternal();
- setStateInternal(LifecycleState.INITIALIZED, null, false);
- } catch (Throwable t) {
- handleSubClassException(t, "lifecycleBase.initFail", toString());
- }
- }
于是, 我们关注 initInternal() 在 StandardServer 中的实现, 代码过多, 这里就把过程讲下:
1, 调用父类 org.apache.catalina.util.LifecycleMBeanBase#initInternal 方法, 注册 MBean
2, 注册本类的其它属性的 MBean
3,NamingResources 初始化 : globalNamingResources.init();
4, 从 common ClassLoader 开始往上查看, 直到 SystemClassLoader, 遍历各个 classLoader 对应的查看路径, 找到 jar 结尾的文件, 读取 Manifest 信息, 加入到 ExtensionValidator#containerManifestResources 属性中.
5, 初始化 service, 默认实现是 StandardService.
i) 调用 super.initInternal()方法
ii) container 初始化, 这里 container 实例是 StandardEngine.
iii) Executor 初始化
iv)Connector 初始化:
- a)org.apache.catalina.connector.Connector Connector[HTTP/1.1-8080]
- b) org.apache.catalina.connector.Connector Connector[AJP/1.3-8009]
Catalina 中 start 里的 getServer().start()解读
这里, 我们可以看到 StandardServer 的父类 org.apache.catalina.util.LifecycleBase.java 的实现:
- @Override
- public final synchronized void start() throws LifecycleException {
- if (LifecycleState.STARTING_PREP.equals(state) || LifecycleState.STARTING.equals(state) ||
- LifecycleState.STARTED.equals(state)) {
- if (log.isDebugEnabled()) {
- Exception e = new LifecycleException();
- log.debug(sm.getString("lifecycleBase.alreadyStarted", toString()), e);
- } else if (log.isInfoEnabled()) {
- log.info(sm.getString("lifecycleBase.alreadyStarted", toString()));
- }
- return;
- }
- if (state.equals(LifecycleState.NEW)) {
- init();
- } else if (state.equals(LifecycleState.FAILED)) {
- stop();
- } else if (!state.equals(LifecycleState.INITIALIZED) &&
- !state.equals(LifecycleState.STOPPED)) {
- invalidTransition(Lifecycle.BEFORE_START_EVENT);
- }
- try {
- setStateInternal(LifecycleState.STARTING_PREP, null, false);
- startInternal();
- if (state.equals(LifecycleState.FAILED)) {
- // This is a 'controlled' failure. The component put itself into the
- // FAILED state so call stop() to complete the clean-up.
- stop();
- } else if (!state.equals(LifecycleState.STARTING)) {
- // Shouldn't be necessary but acts as a check that sub-classes are
- // doing what they are supposed to.
- invalidTransition(Lifecycle.AFTER_START_EVENT);
- } else {
- setStateInternal(LifecycleState.STARTED, null, false);
- }
- } catch (Throwable t) {
- // This is an 'uncontrolled' failure so put the component into the
- // FAILED state and throw an exception.
- handleSubClassException(t, "lifecycleBase.startFail", toString());
- }
- }
对于 StandardServer , 我们关注的是其对于 startInternal(); 的实现, 源码不贴了, 具体过程如下:
1, 触发 CONFIGURE_START_EVENT 事件.
2, 设置本对象状态为 STARTING
3,NameingResource 启动: globalNamingResources.start();
4,StandardService 启动.
i) 设置状态为 STARTING
ii) container 启动, 即 StandardEngine 启动
iii) Executor 启动
iv) Connector 启动:
- a)org.apache.catalina.connector.Connector Connector[HTTP/1.1-8080]
- b) org.apache.catalina.connector.Connector Connector[AJP/1.3-8009]
终于, 我们探究到了我要讲的主角 Connector .
Connector 解读
Connector 构造器
我们由 apache-tomcat-9.0.14\conf 目录 (此处请自行下载相应版本的 tomcat) 下的 server.xml 中的 Connector 配置可知, 其默认 8080 端口的配置协议为 HTTP/1.1 .
- <Connector port="8080" protocol="HTTP/1.1"
- connectionTimeout="20000"
- redirectPort="8443" />
- <!-- Define an AJP 1.3 Connector on port 8009 -->
- <Connector port="8009" protocol="AJP/1.3" redirectPort="8443" />
知道了这些, 我们去看它的代码中的实现:
- public Connector() {
- this("org.apache.coyote.http11.Http11NioProtocol");
- }
- public Connector(String protocol) {
- boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
- AprLifecycleListener.getUseAprConnector();
- if ("HTTP/1.1".equals(protocol) || protocol == null) {
- if (aprConnector) {
- protocolHandlerClassName = "org.apache.coyote.http11.Http11AprProtocol";
- } else {
- protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol";
- }
- } else if ("AJP/1.3".equals(protocol)) {
- if (aprConnector) {
- protocolHandlerClassName = "org.apache.coyote.ajp.AjpAprProtocol";
- } else {
- protocolHandlerClassName = "org.apache.coyote.ajp.AjpNioProtocol";
- }
- } else {
- protocolHandlerClassName = protocol;
- }
- // Instantiate protocol handler
- ProtocolHandler p = null;
- try {
- Class<?> clazz = Class.forName(protocolHandlerClassName);
- p = (ProtocolHandler) clazz.getConstructor().newInstance();
- } catch (Exception e) {
- log.error(sm.getString(
- "coyoteConnector.protocolHandlerInstantiationFailed"), e);
- } finally {
- this.protocolHandler = p;
- }
- // Default for Connector depends on this system property
- setThrowOnFailure(Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE"));
- }
对于 tomcat8.5 以上, 其默认就是 Http11NioProtocol 协议, 这里, 我们给其设定了 HTTP/1.1 , 但根据上面的 if 语句的判断, 是相等的, 也就是最后还是选择的 Http11NioProtocol .
Connector 初始化与启动
同样, 由上一节可知, 我们会涉及到 Connector 初始化, 也就是其也会继承 LifecycleMBeanBase , 那么, 我们来看其相关 initInternal() 实现:
- @Override
- protected void initInternal() throws LifecycleException {
- super.initInternal();
- if (protocolHandler == null) {
- throw new LifecycleException(
- sm.getString("coyoteConnector.protocolHandlerInstantiationFailed"));
- }
- // Initialize adapter
- adapter = new CoyoteAdapter(this);
- protocolHandler.setAdapter(adapter);
- if (service != null) {
- protocolHandler.setUtilityExecutor(service.getServer().getUtilityExecutor());
- }
- // Make sure parseBodyMethodsSet has a default
- if (null == parseBodyMethodsSet) {
- setParseBodyMethods(getParseBodyMethods());
- }
- if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) {
- throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoApr",
- getProtocolHandlerClassName()));
- }
- if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() &&
- protocolHandler instanceof AbstractHttp11JsseProtocol) {
- AbstractHttp11JsseProtocol<?> jsseProtocolHandler =
- (AbstractHttp11JsseProtocol<?>) protocolHandler;
- if (jsseProtocolHandler.isSSLEnabled() &&
- jsseProtocolHandler.getSslImplementationName() == null) {
- // OpenSSL is compatible with the JSSE configuration, so use it if APR is available
- jsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName());
- }
- }
- try {
- protocolHandler.init();
- } catch (Exception e) {
- throw new LifecycleException(
- sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e);
- }
- }
这里涉及的过程如下:
1, 注册 MBean
2,CoyoteAdapter 实例化, CoyoteAdapter 是请求的入口. 当有请求时, CoyoteAdapter 对状态进行了处理, 结尾处对请求进行回收, 中间过程交由 pipeline 来处理.
3,protocolHandler 初始化(org.apache.coyote.http11.Http11Protocol)
在这一步中, 完成了 endpoint 的初始化
关于启动就不说了, 其设定本对象状态为 STARTING, 同时调用 protocolHandler.start(); , 接下来, 就要进入我们的核心节奏了.
- @Override
- protected void startInternal() throws LifecycleException {
- // Validate settings before starting
- if (getPortWithOffset() <0) {
- throw new LifecycleException(sm.getString(
- "coyoteConnector.invalidPort", Integer.valueOf(getPortWithOffset())));
- }
- setState(LifecycleState.STARTING);
- try {
- protocolHandler.start();
- } catch (Exception e) {
- throw new LifecycleException(
- sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);
- }
- }
Protocol 的相关解读
这里, 我们直接从其抽象实现 org.apache.coyote.AbstractProtocol.java 来看, 其也是遵循生命周期的, 所以其也要继承 LifecycleMBeanBase 并实现自己的 init() 与 start() 等生命周期方法, 其内部都是由相应的自实现的 endpoint 来执行具体逻辑:
- //org.apache.coyote.AbstractProtocol.java
- @Override
- public void init() throws Exception {
- if (getLog().isInfoEnabled()) {
- getLog().info(sm.getString("abstractProtocolHandler.init", getName()));
- logPortOffset();
- }
- if (oname == null) {
- // Component not pre-registered so register it
- oname = createObjectName();
- if (oname != null) {
- Registry.getRegistry(null, null).registerComponent(this, oname, null);
- }
- }
- if (this.domain != null) {
- rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName());
- Registry.getRegistry(null, null).registerComponent(
- getHandler().getGlobal(), rgOname, null);
- }
- String endpointName = getName();
- endpoint.setName(endpointName.substring(1, endpointName.length()-1));
- endpoint.setDomain(domain);
- endpoint.init();
- }
- @Override
- public void start() throws Exception {
- if (getLog().isInfoEnabled()) {
- getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
- logPortOffset();
- }
- endpoint.start();
- monitorFuture = getUtilityExecutor().scheduleWithFixedDelay(
- new Runnable() {
- @Override
- public void run() {
- if (!isPaused()) {
- startAsyncTimeout();
- }
- }
- }, 0, 60, TimeUnit.SECONDS);
- }
拿 org.apache.coyote.http11.Http11AprProtocol 这个类来讲, 其接收的是 NioEndpoint 来进行构造器的实现, 其内部的方法的具体实现也经由此 NioEndpoint 来实现其逻辑:
- public class Http11NioProtocol extends AbstractHttp11JsseProtocol<NioChannel> {
- private static final Log log = LogFactory.getLog(Http11NioProtocol.class);
- public Http11NioProtocol() {
- super(new NioEndpoint());
- }
- @Override
- protected Log getLog() { return log; }
- // -------------------- Pool setup --------------------
- public void setPollerThreadCount(int count) {
- ((NioEndpoint)getEndpoint()).setPollerThreadCount(count);
- }
- public int getPollerThreadCount() {
- return ((NioEndpoint)getEndpoint()).getPollerThreadCount();
- }
- public void setSelectorTimeout(long timeout) {
- ((NioEndpoint)getEndpoint()).setSelectorTimeout(timeout);
- }
- public long getSelectorTimeout() {
- return ((NioEndpoint)getEndpoint()).getSelectorTimeout();
- }
- public void setPollerThreadPriority(int threadPriority) {
- ((NioEndpoint)getEndpoint()).setPollerThreadPriority(threadPriority);
- }
- public int getPollerThreadPriority() {
- return ((NioEndpoint)getEndpoint()).getPollerThreadPriority();
- }
- // ----------------------------------------------------- JMX related methods
- @Override
- protected String getNamePrefix() {
- if (isSSLEnabled()) {
- return "https-" + getSslImplementationShortName()+ "-nio";
- } else {
- return "http-nio";
- }
- }
- }
Endpoint 相关解读
这里, EndPoint 用于处理具体连接和传输数据, 即用来实现网络连接和控制, 它是服务器对外 I/O 操作的接入点. 主要任务是管理对外的 socket 连接, 同时将建立好的 socket 连接交到合适的工作线程中去.
里面两个主要的属性类是 Acceptor 和 Poller , SocketProcessor .
我们以 NioEndpoint 为例, 其内部请求处理具体的流程如下:
结合上一节最后, 我们主要还是关注其对于 Protocol 有关生命周期方法的具体实现:
- //org.apache.tomcat.util.NET.AbstractEndpoint.java
- public final void init() throws Exception {
- if (bindOnInit) {
- bindWithCleanup();
- bindState = BindState.BOUND_ON_INIT;
- }
- if (this.domain != null) {
- // Register endpoint (as ThreadPool - historical name)
- oname = new ObjectName(domain + ":type=ThreadPool,name=\"" + getName() + "\"");
- Registry.getRegistry(null, null).registerComponent(this, oname, null);
- ObjectName socketPropertiesOname = new ObjectName(domain +
- ":type=ThreadPool,name=\"" + getName() + "\",subType=SocketProperties");
- socketProperties.setObjectName(socketPropertiesOname);
- Registry.getRegistry(null, null).registerComponent(socketProperties, socketPropertiesOname, null);
- for (SSLHostConfig sslHostConfig : findSslHostConfigs()) {
- registerJmx(sslHostConfig);
- }
- }
- }
- public final void start() throws Exception {
- if (bindState == BindState.UNBOUND) {
- bindWithCleanup();
- bindState = BindState.BOUND_ON_START;
- }
- startInternal();
- }
- //org.apache.tomcat.util.NET.AbstractEndpoint.java
- private void bindWithCleanup() throws Exception {
- try {
- bind();
- } catch (Throwable t) {
- // Ensure open sockets etc. are cleaned up if something goes
- // wrong during bind
- ExceptionUtils.handleThrowable(t);
- unbind();
- throw t;
- }
- }
这两个方法主要调用 bind (此处可以查阅 bindWithCleanup() 的具体实现) 和 startlntemal 方法, 它们是模板方法, 可以自行根据需求实现, 这里, 我们参考 NioEndpoint 中的实现, bind 方法代码如下:
- //org.apache.tomcat.util.NET.NioEndpoint.java
- @Override
- public void bind() throws Exception {
- initServerSocket();
- // Initialize thread count defaults for acceptor, poller
- if (acceptorThreadCount == 0) {
- // FIXME: Doesn't seem to work that well with multiple accept threads
- acceptorThreadCount = 1;
- }
- if (pollerThreadCount <= 0) {
- //minimum one poller thread
- pollerThreadCount = 1;
- }
- setStopLatch(new CountDownLatch(pollerThreadCount));
- // Initialize SSL if needed
- initialiseSsl();
- selectorPool.open();
- }
这里的 bind 方法中首先初始化了 ServerSocket (这个东西我们在 jdk 网络编程里都接触过, 就不多说了, 这里是封装了一个工具类, 看下面实现), 然后检查了代表 Acceptor 和 Poller 初始化的线程数量的 acceptorThreadCount 属性和 pollerThreadCount 属性, 它们的值至少为 1.
- // Separated out to make it easier for folks that extend NioEndpoint to
- // implement custom [server]sockets
- protected void initServerSocket() throws Exception {
- if (!getUseInheritedChannel()) {
- serverSock = ServerSocketChannel.open();
- socketProperties.setProperties(serverSock.socket());
- InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
- serverSock.socket().bind(addr,getAcceptCount());
- } else {
- // Retrieve the channel provided by the OS
- Channel ic = System.inheritedChannel();
- if (ic instanceof ServerSocketChannel) {
- serverSock = (ServerSocketChannel) ic;
- }
- if (serverSock == null) {
- throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
- }
- }
- serverSock.configureBlocking(true); //mimic APR behavior
- }
这里, Acceptor 用于接收请求, 将接收到请求交给 Poller 处理, 它们都是启动线程来处理的. 另外还进行了初始化 SSL 等内容. NioEndpoint 的 startInternal 方法代码如下:
- /**
- * The socket pollers.
- */
- private Poller[] pollers = null;
- /**
- * Start the NIO endpoint, creating acceptor, poller threads.
- */
- @Override
- public void startInternal() throws Exception {
- if (!running) {
- running = true;
- paused = false;
- processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
- socketProperties.getProcessorCache());
- eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
- socketProperties.getEventCache());
- nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
- socketProperties.getBufferPool());
- // Create worker collection
- if ( getExecutor() == null ) {
- createExecutor();
- }
- initializeConnectionLatch();
- // Start poller threads
- pollers = new Poller[getPollerThreadCount()];
- for (int i=0; i<pollers.length; i++) {
- pollers[i] = new Poller();
- Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
- pollerThread.setPriority(threadPriority);
- pollerThread.setDaemon(true);
- pollerThread.start();
- }
- startAcceptorThreads();
- }
- }
这里首先初始化了一些属性, 初始化的属性中的 processorCache 是 SynchronizedStack<SocketProcessor> 类型, SocketProcessor 是 NioEndpoint 的一个内部类, Poller 接收到请求后就会交给它处理, SocketProcessor 又会将请求传递到 Handler .
然后启动了 Poller 和 Acceptor 来处理请求, 这里我们要注意的的是, pollers 是一个数组, 其管理了一堆 Runnable , 由前面可知, 假如我们并没有对其进行设定, 那就是 1, 也就是说, 其默认情况下只是一个单线程. 这个线程创建出来后就将其设定为守护线程, 直到 tomcat 容器结束, 其自然也会跟着结束.
这里, 我们想要对其进行配置的话, 可以在 server.xml 中进行相应设定:
- <Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol"
- connectionTimeout="20000"
- maxHeaderCount="64"
- maxParameterCount="64"
- maxHttpHeaderSize="8192"
- URIEncoding="UTF-8"
- useBodyEncodingForURI="false"
- maxThreads="128"
- minSpareThreads="12"
- acceptCount="1024"
- connectionLinger="-1"
- keepAliveTimeout="60"
- maxKeepAliveRequests="32"
- maxConnections="10000"
- acceptorThreadCount="1"
- pollerThreadCount="2"
- selectorTimeout="1000"
- useSendfile="true"
- selectorPool.maxSelectors="128"
- redirectPort="8443" />
启动 Acceptor 的 startAcceptorThreads 方法在 AbstractEndpoint 中, 代码如下:
- protected void startAcceptorThreads() {
- int count = getAcceptorThreadCount();
- acceptors = new ArrayList<>(count);
- for (int i = 0; i <count; i++) {
- Acceptor<U> acceptor = new Acceptor<>(this);
- String threadName = getName() + "-Acceptor-" + i;
- acceptor.setThreadName(threadName);
- acceptors.add(acceptor);
- Thread t = new Thread(acceptor, threadName);
- t.setPriority(getAcceptorThreadPriority());
- t.setDaemon(getDaemon());
- t.start();
- }
- }
这里的 getAcceptorThreadCount 方法就是获取的 init 方法中处理过的 acceptorThreadCount 属性, 获取到后就会启动相应数量的 Acceptor 线程来接收请求. 默认同样是 1, 其创建线程的方式和 Poller 一致, 就不多说了.
这里, 我们再来看下 webapps/docs/config/http.xml 的文档说明:
- <attribute name="acceptorThreadCount" required="false">
- <p>The number of threads to be used to accept connections. Increase this
- value on a multi CPU machine, although you would never really need more
- than <code>2</code>. Also, with a lot of non keep alive connections, you
- might want to increase this value as well. Default value is
- <code>1</code>.</p>
- </attribute>
- <attribute name="pollerThreadCount" required="false">
- <p>(int)The number of threads to be used to run for the polling events.
- Default value is <code>1</code> per processor but not more than 2.<br/>
- When accepting a socket, the operating system holds a global lock. So the benefit of
- going above 2 threads diminishes rapidly. Having more than one thread is for
- system that need to accept connections very rapidly. However usually just
- increasing <code>acceptCount</code> will solve that problem.
- Increasing this value may also be beneficial when a large amount of send file
- operations are going on.
- </p>
- </attribute>
由此可知, acceptorThreadCount 用于设定接受连接的线程数. 在多 CPU 机器上增加这个值, 虽然你可能真的不需要超过 2 个. 哪怕有很多非 keep alive 连接, 你也可能想要增加这个值. 其默认值为 1.
pollerThreadCount 用于为轮询事件运行的线程数. 默认值为每个处理器 1 个但不要超过 2 个(上面的优化配置里的设定为 2). 接受 socket 时, 操作系统将保持全局锁定. 因此, 超过 2 个线程的好处迅速减少. 当系统拥有多个该类型线程, 它可以非常快速地接受连接. 所以增加 acceptCount 就可以解决这个问题. 当正在进行大量发送文件操作时, 增加此值也可能是有益的.
Acceptor 和 Poller 的工作方式
我们先来看一张 NioEndpoint 处理的的时序图:
Acceptor 工作方式
我们由前面可知, Acceptor 和 Poller 都实现了 Runnable 接口, 所以其主要工作流程就在其实现的 run 方法内, 这里我们先来看 Acceptor 对于 run 方法的实现:
- //org.apache.tomcat.util.NET.NioEndpoint.java
- @Override
- protected SocketChannel serverSocketAccept() throws Exception {
- return serverSock.accept();
- }
- //org.apache.tomcat.util.NET.Acceptor.java
- public class Acceptor<U> implements Runnable {
- private static final Log log = LogFactory.getLog(Acceptor.class);
- private static final StringManager sm = StringManager.getManager(Acceptor.class);
- private static final int INITIAL_ERROR_DELAY = 50;
- private static final int MAX_ERROR_DELAY = 1600;
- private final AbstractEndpoint<?,U> endpoint;
- private String threadName;
- protected volatile AcceptorState state = AcceptorState.NEW;
- public Acceptor(AbstractEndpoint<?,U> endpoint) {
- this.endpoint = endpoint;
- }
- public final AcceptorState getState() {
- return state;
- }
- final void setThreadName(final String threadName) {
- this.threadName = threadName;
- }
- final String getThreadName() {
- return threadName;
- }
- @Override
- public void run() {
- int errorDelay = 0;
- // Loop until we receive a shutdown command
- while (endpoint.isRunning()) {
- // Loop if endpoint is paused
- while (endpoint.isPaused() && endpoint.isRunning()) {
- state = AcceptorState.PAUSED;
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- // Ignore
- }
- }
- if (!endpoint.isRunning()) {
- break;
- }
- state = AcceptorState.RUNNING;
- try {
- //if we have reached max connections, wait
- endpoint.countUpOrAwaitConnection();
- // Endpoint might have been paused while waiting for latch
- // If that is the case, don't accept new connections
- if (endpoint.isPaused()) {
- continue;
- }
- U socket = null;
- try {
- // Accept the next incoming connection from the server
- // socket
- // 创建一个 socketChannel, 接收下一个从服务器进来的连接
- socket = endpoint.serverSocketAccept();
- } catch (Exception ioe) {
- // We didn't get a socket
- endpoint.countDownConnection();
- if (endpoint.isRunning()) {
- // Introduce delay if necessary
- errorDelay = handleExceptionWithDelay(errorDelay);
- // re-throw
- throw ioe;
- } else {
- break;
- }
- }
- // Successful accept, reset the error delay
- errorDelay = 0;
- // Configure the socket
- // 如果 EndPoint 处于 running 状态并且没有没暂停
- if (endpoint.isRunning() && !endpoint.isPaused()) {
- // setSocketOptions() will hand the socket off to
- // an appropriate processor if successful
- if (!endpoint.setSocketOptions(socket)) {
- endpoint.closeSocket(socket);
- }
- } else {
- endpoint.destroySocket(socket);
- }
- } catch (Throwable t) {
- ExceptionUtils.handleThrowable(t);
- String msg = sm.getString("endpoint.accept.fail");
- // APR specific.
- // Could push this down but not sure it is worth the trouble.
- if (t instanceof Error) {
- Error e = (Error) t;
- if (e.getError() == 233) {
- // Not an error on HP-UX so log as a warning
- // so it can be filtered out on that platform
- // See bug 50273
- log.warn(msg, t);
- } else {
- log.error(msg, t);
- }
- } else {
- log.error(msg, t);
- }
- }
- }
- state = AcceptorState.ENDED;
- }
- ...
- public enum AcceptorState {
- NEW, RUNNING, PAUSED, ENDED
- }
- }
由上面 run 方法可以看到, Acceptor 使用 serverSock.accept() 阻塞的监听端口, 如果有连接进来, 拿到了 socket , 并且 EndPoint 处于正常运行状态, 则调用 NioEndPoint 的 setSocketOptions 方法, 对于 setSocketOptions , 概括来讲就是根据 socket 构建一个 NioChannel , 然后把这个的 NioChannel 注册到 Poller 的事件列表里面, 等待 poller 轮询:
- /**
- * org.apache.tomcat.util.NET.NioEndpoint.java
- * Process the specified connection.
- * 处理指定的连接
- * @param socket The socket channel
- * @return <code>true</code> if the socket was correctly configured
- * and processing may continue, <code>false</code> if the socket needs to be
- * close immediately
- * 如果 socket 配置正确, 并且可能会继续处理, 返回 true
- * 如果 socket 需要立即关闭, 则返回 false
- */
- @Override
- protected boolean setSocketOptions(SocketChannel socket) {
- // Process the connection
- try {
- //disable blocking, APR style, we are gonna be polling it
- socket.configureBlocking(false);
- Socket sock = socket.socket();
- socketProperties.setProperties(sock);
- // 从缓存中拿一个 nioChannel 若没有, 则创建一个. 将 socket 传进去
- NioChannel channel = nioChannels.pop();
- if (channel == null) {
- SocketBufferHandler bufhandler = new SocketBufferHandler(
- socketProperties.getAppReadBufSize(),
- socketProperties.getAppWriteBufSize(),
- socketProperties.getDirectBuffer());
- if (isSSLEnabled()) {
- channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
- } else {
- channel = new NioChannel(socket, bufhandler);
- }
- } else {
- channel.setIOChannel(socket);
- channel.reset();
- }
- // 从 pollers 数组中获取一个 Poller 对象, 注册这个 nioChannel
- getPoller0().register(channel);
- } catch (Throwable t) {
- ExceptionUtils.handleThrowable(t);
- try {
- log.error(sm.getString("endpoint.socketOptionsError"), t);
- } catch (Throwable tt) {
- ExceptionUtils.handleThrowable(tt);
- }
- // Tell to close the socket
- return false;
- }
- return true;
- }
- /**
- * Return an available poller in true round robin fashion.
- *
- * @return The next poller in sequence
- */
- public Poller getPoller0() {
- int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
- return pollers[idx];
- }
关于 getPoller0() , 默认情况下, 由前面可知, 这个 pollers 数组里只有一个元素, 这点要注意. 我们来看 NioEndPoint 中的 Poller 实现的 register 方法, 主要做的就是在 Poller 注册新创建的套接字.
- /**
- * Registers a newly created socket with the poller.
- *
- * @param socket The newly created socket
- */
- public void register(final NioChannel socket) {
- socket.setPoller(this);
- NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
- socket.setSocketWrapper(ka);
- ka.setPoller(this);
- ka.setReadTimeout(getConnectionTimeout());
- ka.setWriteTimeout(getConnectionTimeout());
- ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
- ka.setSecure(isSSLEnabled());
- // 从缓存中取出一个 PollerEvent 对象, 若没有则创建一个. 将 socket 和 NioSocketWrapper 设置进去
- PollerEvent r = eventCache.pop();
- ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
- if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
- else r.reset(socket,ka,OP_REGISTER);
- // 添到到该 Poller 的事件列表
- addEvent(r);
- }
对以上过程进行一下总结:
从 Acceptor 接收到请求, 它做了如下工作:
如果达到了最大连接数, 则等待. 否则, 阻塞监听端口.
监听到有连接, 则创建一个 socketChannel. 若服务正常运行, 则把 socket 传递给适当的处理器. 如果成功, 会关闭 socket.
在这里, 会调用 NioEndPoint 的 setSocketOptions 方法, 处理指定的连接:
将 socket 设置为非阻塞
从缓存中拿一个 nioChannel 若没有, 则创建一个. 将 socket 传进去.
从 pollers 数组中获取一个 Poller 对象, 把 nioChannel 注册到该 Poller 中.
其中最后一步注册的过程, 是调用 Poller 的 register()方法:
创建一个 NioSocketWrapper, 包装 socket. 然后配置相关属性, 设置 interestOps 为 SelectionKey.OP_READ
从缓存中取出一个 PollerEvent 对象, 若没有则创建一个. 初始化或者重置此 Event 对象, 会将其 interestOps 设置为 OP_REGISTER (Poller 轮询时会用到)
将新的 PollerEvent 添加到这个 Poller 的事件列表 events, 等待 Poller 线程轮询.
Poller 工作方式
由前面可知, poller 也实现了 Runnable 接口, 并在 start 的这部分生命周期执行的过程中创建对应工作线程并加入其中, 所以, 我们来通过其 run 方法来看下其工作机制.
其实上面已经提到了 Poller 将一个事件注册到事件队列的过程. 接下来 Poller 线程要做的事情其实就是如何处理这些事件.
Poller 在 run 方法中会轮询事件队列 events, 将每个 PollerEvent 中的 SocketChannel 的 interestOps 注册到 Selector 中, 然后将 PollerEvent 从队列里移除. 之后就是 SocketChanel 通过 Selector 调度来进行非阻塞的读写数据了.
- /**
- * Poller class.
- */
- public class Poller implements Runnable {
- private Selector selector;
- private final SynchronizedQueue<PollerEvent> events =
- new SynchronizedQueue<>();
- private volatile boolean close = false;
- private long nextExpiration = 0;//optimize expiration handling
- private AtomicLong wakeupCounter = new AtomicLong(0);
- private volatile int keyCount = 0;
- public Poller() throws IOException {
- this.selector = Selector.open();
- }
- public int getKeyCount() { return keyCount; }
- public Selector getSelector() { return selector;}
- /**
- * The background thread that adds sockets to the Poller, checks the
- * poller for triggered events and hands the associated socket off to an
- * appropriate processor as events occur.
- */
- @Override
- public void run() {
- // Loop until destroy() is called
- // 循环直到 destroy() 被调用
- while (true) {
- boolean hasEvents = false;
- try {
- if (!close) {
- // 遍历 events, 将每个事件中的 Channel 的 interestOps 注册到 Selector 中
- hasEvents = events();
- if (wakeupCounter.getAndSet(-1)> 0) {
- //if we are here, means we have other stuff to do
- //do a non blocking select
- // 如果走到了这里, 代表已经有就绪的 IO Channel
- // 调用非阻塞的 select 方法, 直接返回就绪 Channel 的数量
- keyCount = selector.selectNow();
- } else {
- // 阻塞等待操作系统返回 数据已经就绪的 Channel, 然后被唤醒
- keyCount = selector.select(selectorTimeout);
- }
- wakeupCounter.set(0);
- }
- if (close) {
- events();
- timeout(0, false);
- try {
- selector.close();
- } catch (IOException ioe) {
- log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
- }
- break;
- }
- } catch (Throwable x) {
- ExceptionUtils.handleThrowable(x);
- log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
- continue;
- }
- //either we timed out or we woke up, process events first
- // 如果上面 select 方法超时, 或者被唤醒, 先将 events 队列中的 Channel 注册到 Selector 上.
- if ( keyCount == 0 ) hasEvents = (hasEvents | events());
- Iterator<SelectionKey> iterator =
- keyCount> 0 ? selector.selectedKeys().iterator() : null;
- // Walk through the collection of ready keys and dispatch
- // any active event.
- // 遍历已就绪的 Channel, 并调用 processKey 来处理该 Socket 的 IO.
- while (iterator != null && iterator.hasNext()) {
- SelectionKey sk = iterator.next();
- NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
- // Attachment may be null if another thread has called
- // cancelledKey()
- // 如果其它线程已调用, 则 Attachment 可能为空
- if (attachment == null) {
- iterator.remove();
- } else {
- iterator.remove();
- // 创建一个 SocketProcessor, 放入 Tomcat 线程池去执行
- processKey(sk, attachment);
- }
- }//while
- //process timeouts
- timeout(keyCount,hasEvents);
- }//while
- getStopLatch().countDown();
- }
- ...
- }
上面读取已就绪 Channel 的部分, 是十分常见的 Java NIO 的用法, 即 Selector 调用 selectedKeys(), 获取 IO 数据已经就绪的 Channel, 遍历并调用 processKey 方法来处理每一个 Channel 就绪的事件. 而 processKey 方法会创建一个 SocketProcessor, 然后丢到 Tomcat 线程池中去执行.
这里还需要注意的一个点是, events()方法, 用来处理 PollerEvent 事件, 执行 PollerEvent.run(), 然后将 PollerEvent 重置再次放入缓存中, 以便对象复用.
- /**
- * Processes events in the event queue of the Poller.
- *
- * @return <code>true</code> if some events were processed,
- * <code>false</code> if queue was empty
- */
- public boolean events() {
- boolean result = false;
- PollerEvent pe = null;
- for (int i = 0, size = events.size(); i <size && (pe = events.poll()) != null; i++ ) {
- result = true;
- try {
- // 把 SocketChannel 的 interestOps 注册到 Selector 中
- pe.run();
- pe.reset();
- if (running && !paused) {
- eventCache.push(pe);
- }
- } catch ( Throwable x ) {
- log.error(sm.getString("endpoint.nio.pollerEventError"), x);
- }
- }
- return result;
- }
所以, PollerEvent.run()方法才是我们关注的重点:
- /**
- * PollerEvent, cacheable object for poller events to avoid GC
- */
- public static class PollerEvent implements Runnable {
- private NioChannel socket;
- private int interestOps;
- private NioSocketWrapper socketWrapper;
- public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) {
- reset(ch, w, intOps);
- }
- public void reset(NioChannel ch, NioSocketWrapper w, int intOps) {
- socket = ch;
- interestOps = intOps;
- socketWrapper = w;
- }
- public void reset() {
- reset(null, null, 0);
- }
- @Override
- public void run() {
- //Acceptor 调用 Poller.register()方法时, 创建的 PollerEvent 的 interestOps 为 OP_REGISTER, 因此走这个分支
- if (interestOps == OP_REGISTER) {
- try {
- socket.getIOChannel().register(
- socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
- } catch (Exception x) {
- log.error(sm.getString("endpoint.nio.registerFail"), x);
- }
- } else {
- final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
- try {
- if (key == null) {
- // The key was cancelled (e.g. due to socket closure)
- // and removed from the selector while it was being
- // processed. Count down the connections at this point
- // since it won't have been counted down when the socket
- // closed.
- socket.socketWrapper.getEndpoint().countDownConnection();
- ((NioSocketWrapper) socket.socketWrapper).closed = true;
- } else {
- final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
- if (socketWrapper != null) {
- //we are registering the key to start with, reset the fairness counter.
- int ops = key.interestOps() | interestOps;
- socketWrapper.interestOps(ops);
- key.interestOps(ops);
- } else {
- socket.getPoller().cancelledKey(key);
- }
- }
- } catch (CancelledKeyException ckx) {
- try {
- socket.getPoller().cancelledKey(key);
- } catch (Exception ignore) {}
- }
- }
- }
- @Override
- public String toString() {
- return "Poller event: socket [" + socket + "], socketWrapper [" + socketWrapper +
- "], interestOps [" + interestOps + "]";
- }
- }
至此, 可以看出 Poller 线程的作用
将 Acceptor 接收到的请求注册到 Poller 的事件队列中
Poller 轮询事件队列中, 处理到达的事件, 将 PollerEvent 中的通道注册到 Poller 的 Selector 中
轮询已就绪的通道, 对每个就绪通道创建一个 SocketProcessor, 交由 Tomcat 线程池去处理
剩下的事情, 就是 SocketProcessor 怎么适配客户端发来请求的数据, 然后怎样交给 Servlet 容器去处理了.
即 Poller 的 run 方法中最后调用的 processKey(sk, attachment); :
- protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
- try {
- if ( close ) {
- cancelledKey(sk);
- } else if ( sk.isValid() && attachment != null ) {
- if (sk.isReadable() || sk.isWritable() ) {
- if ( attachment.getSendfileData() != null ) {
- processSendfile(sk,attachment, false);
- } else {
- unreg(sk, attachment, sk.readyOps());
- boolean closeSocket = false;
- // Read goes before write
- if (sk.isReadable()) {
- if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
- closeSocket = true;
- }
- }
- if (!closeSocket && sk.isWritable()) {
- if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
- closeSocket = true;
- }
- }
- if (closeSocket) {
- cancelledKey(sk);
- }
- }
- }
- } else {
- //invalid key
- cancelledKey(sk);
- }
- } catch ( CancelledKeyException ckx ) {
- cancelledKey(sk);
- } catch (Throwable t) {
- ExceptionUtils.handleThrowable(t);
- log.error(sm.getString("endpoint.nio.keyProcessingError"), t);
- }
- }
即从 processSocket 这个方法中会用到 SocketProcessor 来处理请求:
- /**
- * Process the given SocketWrapper with the given status. Used to trigger
- * processing as if the Poller (for those endpoints that have one)
- * selected the socket.
- *
- * @param socketWrapper The socket wrapper to process
- * @param event The socket event to be processed
- * @param dispatch Should the processing be performed on a new
- * container thread
- *
- * @return if processing was triggered successfully
- */
- public boolean processSocket(SocketWrapperBase<S> socketWrapper,
- SocketEvent event, boolean dispatch) {
- try {
- if (socketWrapper == null) {
- return false;
- }
- SocketProcessorBase<S> sc = processorCache.pop();
- if (sc == null) {
- sc = createSocketProcessor(socketWrapper, event);
- } else {
- sc.reset(socketWrapper, event);
- }
- Executor executor = getExecutor();
- if (dispatch && executor != null) {
- executor.execute(sc);
- } else {
- sc.run();
- }
- } catch (RejectedExecutionException ree) {
- getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
- return false;
- } catch (Throwable t) {
- ExceptionUtils.handleThrowable(t);
- // This means we got an OOM or similar creating a thread, or that
- // the pool and its queue are full
- getLog().error(sm.getString("endpoint.process.fail"), t);
- return false;
- }
- return true;
- }
SocketProcessor 处理请求
这里简单提一下 SocketProcessor 的处理过程, 帮助大家对接到 Servlet 容器处理上. 通过上面可以知道, 具体处理一个请求, 是在 SocketProcessor 通过线程池去执行的, 这里, 我们来看其执行一次请求的时序图:
由图中可以看到, SocketProcessor 中通过 Http11ConnectionHandler , 拿到 Htpp11Processor , 然后 Htpp11Processor 会调用 prepareRequest 方法来准备好请求数据. 接着调用 CoyoteAdapter 的 service 方法进行 request 和 response 的适配, 之后交给 Tomcat 容器进行处理.
下面通过一个系列调用来表示下过程:
connector.getService().getContainer().getPipeline().getFirst().invoke(request,response);
这里首先从 Connector 中获取到 Service ( Connector 在 initInternal 方法中创建 CoyoteAdapter 的时候已经将自己设置到了 CoyoteAdapter 中), 然后从 Service 中获取 Container , 接着获取管道, 再获取管道的第一个 Value, 最后调用 invoke 方法执行请求. Service 中保存的是最顶层的容器, 当调用最顶层容器管道的 invoke 方法时, 管道将逐层调用各层容器的管道中 Value 的 invoke 方法, 直到最后调用 Wrapper 的管道中的 BaseValue ( StandardWrapperValve)来处理 Filter 和 Servlet.
将请求交给 Tomcat 容器处理后, 然后将请求一层一层传递到 Engine,Host,Context,Wrapper, 最终经过一系列 Filter, 来到了 Servlet, 执行我们自己具体的代码逻辑. 其中, 容器之间数据的传递用到了管道流的设计.
至此关于 Connector 的一些东西就算涉及差不多了, 剩下的假如以后有精力的话, 继续探究下, 接着分享 Webflux 的解读去.
来源: http://www.tuicool.com/articles/73yIBza