關閉流程:
包括了bossGroup和workGroup的關閉
開始源碼,按照下面的方式完成操作,啟動
EchoServer
1. 進入 workerGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
不斷跟進源碼,進入到了
SingleThreadEventExecutor
的
shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)
方法中
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
if (timeout < quietPeriod) {
throw new IllegalArgumentException(
"timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
}
ObjectUtil.checkNotNull(unit, "unit");
if (isShuttingDown()) {
return terminationFuture();
}
boolean inEventLoop = inEventLoop();
boolean wakeup;
int oldState;
for (;;) {
if (isShuttingDown()) {
return terminationFuture();
}
int newState;
wakeup = true;
oldState = state;
if (inEventLoop) {
newState = ST_SHUTTING_DOWN;
} else {
switch (oldState) {
case ST_NOT_STARTED:
case ST_STARTED://如果原來是啟動的狀态
newState = ST_SHUTTING_DOWN;//設定為正在關閉狀态
break;
default:
newState = oldState;
wakeup = false;
}
}
// CAS更改狀态
if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
break;
}
}
gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
gracefulShutdownTimeout = unit.toNanos(timeout);
if (ensureThreadStarted(oldState)) {
return terminationFuture;
}
if (wakeup) {
taskQueue.offer(WAKEUP_TASK);
if (!addTaskWakesUp) {
wakeup(inEventLoop);
}
}
return terminationFuture();
}
進入
NioEventLoop
的
run()
方法中,給下面代碼打上斷點
1. isShuttingDown()
方法源碼
isShuttingDown()
如果我們更改了狀态為
ST_SHUTTING_DOWN
,
run()
方法在selector阻塞以後就一定會進入該代碼中。
2. closeAll()
方法
closeAll()
3. confirmShutdown()
方法
confirmShutdown()
跟蹤源碼,進入
SingleThreadEventExecutor
的
confirmShutdown()
方法
protected boolean confirmShutdown() {
if (!isShuttingDown()) {
return false;
}
if (!inEventLoop()) {
throw new IllegalStateException("must be invoked from an event loop");
}
// 取消所有的定時任務
cancelScheduledTasks();
if (gracefulShutdownStartTime == 0) {
// 計算關閉的任務開始的時間
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
}
// 如果有task/hook則不關閉,
if (runAllTasks() || runShutdownHooks()) {
if (isShutdown()) {
// Executor shut down - no new tasks anymore.
return true;
}
// There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
// terminate if the quiet period is 0.
// See https://github.com/netty/netty/issues/4241
if (gracefulShutdownQuietPeriod == 0) {
return true;
}
taskQueue.offer(WAKEUP_TASK);
return false;
}
final long nanoTime = ScheduledFutureTask.nanoTime();
// 比較是否超過了可以容忍的關閉的最大時間
if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
return true;
}
// 關閉任務的靜默期内,通過lastExecutionTime判斷是否有任務執行
if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
// Check if any tasks were added to the queue every 100ms.
// TODO: Change the behavior of takeTask() so that it returns on timeout.
taskQueue.offer(WAKEUP_TASK);
try {
Thread.sleep(100);// 如果有任務就sleep,讓其不關閉
} catch (InterruptedException e) {
// Ignore
}
return false;
}
// 靜默期内沒有任務執行,馬上關閉
// No tasks were added for last quiet period - hopefully safe to shut down.
// (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
return true;
}
3.1 進入 runAllTasks()
方法
runAllTasks()
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
// 關閉的時候會有很多unregister操作的task,會先運作他們
fetchedAll = fetchFromScheduledTaskQueue();
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
// 擷取我們最後執行的時間
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
最後還有一件很重要的事,
SingleThreadEventExecutor
開啟線程去執行人的時候會在這個方法裡面阻塞,裡面有很多空循環的for,除非shundown才能break
到最後會執行
cleanup();
來實作對selector的關閉。
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}
try {
// Run all remaining tasks and shutdown hooks. At this point the event loop
// is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
// graceful shutdown with quietPeriod.
for (;;) {
if (confirmShutdown()) {
break;
}
}
// Now we want to make sure no more tasks can be added from this point. This is
// achieved by switching the state. Any new tasks beyond this point will be rejected.
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
break;
}
}
// We have the final set of tasks in the queue now, no more can be added, run all remaining.
// No need to loop here, this is the final pass.
confirmShutdown();
} finally {
// 關閉selector
try {
cleanup();
} finally {
// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
// the future. The user may block on the future and once it unblocks the JVM may terminate
// and start unloading classes.
// See https://github.com/netty/netty/issues/6596.
FastThreadLocal.removeAll();
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.countDown();
int numUserTasks = drainTasks();
if (numUserTasks > 0 && logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + numUserTasks + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}