EventLoop Source

eventLoop

之前也有过章节
node整理
Node.js

有看到石墨技术文档

cnode技术文档,作者:youth7

记录以下知识点:

  • nodejs的event是基于libuv浏览器的event loop则在html5的规范中明确定义,两个事物有明显的区别
  • process.nextTick()在6个阶段结束的时候都会执行

eventLoop

timers 执行setTimeout()setInterval()中到期的callback
I/O callbacks 上一轮循环中有少数的I/Ocallback会被延迟到这一轮的这一阶段执行
idle, prepare 仅内部使用
poll 最为重要的阶段,执行I/O callback,在适当的条件下会阻塞在这个阶段
check 执行setImmediate的callback
close callbacks 执行close事件的callback,例如socket.on("close",func)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
   ┌───────────────────────┐
┌─>│ timers │
│ └──────────┬────────────┘
│ ┌──────────┴────────────┐
│ │ I/O callbacks │
│ └──────────┬────────────┘
│ ┌──────────┴────────────┐
│ │ idle, prepare │
│ └──────────┬────────────┘ ┌───────────────┐
│ ┌──────────┴────────────┐ │ incoming: │
│ │ poll │<─────┤ connections, │
│ └──────────┬────────────┘ │ data, etc. │
│ ┌──────────┴────────────┐ └───────────────┘
│ │ check │
│ └──────────┬────────────┘
│ ┌──────────┴────────────┐
└──┤ close callbacks │
└───────────────────────┘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# /deps/uv/src/unix/core.c
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
int timeout;
int r;
int ran_pending;

r = uv__loop_alive(loop);
// if(uv_has_active_hanles||uv_has_active_reqs || lop->closing_handles!=NULL)retrun true
if (!r)
uv__update_time(loop);

while (r != 0 && loop->stop_flag == 0) {
uv__update_time(loop);
// main
uv__run_timers(loop);//timer phase
ran_pending = uv__run_pending(loop);//IO callback pharse
uv__run_idle(loop);//idle phase
uv__run_prepare(loop);// prepare phase
// main end

timeout = 0;
if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
timeout = uv_backend_timeout(loop);

uv__io_poll(loop, timeout);//poll phase
uv__run_check(loop);//check phase
uv__run_closing_handles(loop);//closing pharse

if (mode == UV_RUN_ONCE) {
/* UV_RUN_ONCE implies forward progress: at least one callback must have
* been invoked when it returns. uv__io_poll() can return without doing
* I/O (meaning: no callbacks) when its timeout expires - which means we
* have pending timers that satisfy the forward progress constraint.
*
* UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
* the check.
*/
// UV_RUN_ONCE 至少有一个回调执行,不然该循环就空转了,满足前进要求
// 这也是[文章](https://zehai.info/2020/04/10/2020-04-10-eventloop/)中写到:
// poll为空,eventloop将检查timer是否有快到的,如果需要执行,eventloop将要进入timers阶段来顺序执行timer callback
uv__update_time(loop);
uv__run_timers(loop);
}

r = uv__loop_alive(loop);
if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
break;
}

/* The if statement lets gcc compile it to a conditional store. Avoids
* dirtying a cache line.
*/
if (loop->stop_flag != 0)
loop->stop_flag = 0;

return r;
}

timers phase

执行setTimeout()setInterval()中到期的callback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void uv__run_timers(uv_loop_t* loop) {
struct heap_node* heap_node;
uv_timer_t* handle;

for (;;) {
heap_node = heap_min(timer_heap(loop));
if (heap_node == NULL)
break;
// 取出堆中最快要被执行的timer
// #define container_of(ptr, type, member)
// ((type *) ((char *) (ptr) - offsetof(type, member)))
// 没看懂 handle是怎么生成的
handle = container_of(heap_node, uv_timer_t, heap_node);
if (handle->timeout > loop->time)//执行时间大于eventloop循环一次时间,退出phase下次再说
break;

uv_timer_stop(handle);// remove handle
uv_timer_again(handle);// 多次重复的timer再塞进去
handle->timer_cb(handle);// invoke callback
}
}

I/O callbacks

上一轮循环中有少数的I/Ocallback会被延迟到这一轮的这一阶段执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//deps/uv/src/unix/core.c
static int uv__run_pending(uv_loop_t* loop) {
QUEUE* q;
QUEUE pq;
uv__io_t* w;

if (QUEUE_EMPTY(&loop->pending_queue))//isEmpty
return 0;

QUEUE_MOVE(&loop->pending_queue, &pq);//move

while (!QUEUE_EMPTY(&pq)) {
q = QUEUE_HEAD(&pq);//find
QUEUE_REMOVE(q);//pop
QUEUE_INIT(q);
w = QUEUE_DATA(q, uv__io_t, pending_queue);
w->cb(loop, w, POLLOUT);//unitl queue empty
}

return 1;
}

Idle and prepare phase

/ loop /

void uv__run_idle(uv_loop_t* loop);

void uv__run_check(uv_loop_t* loop);

void uv__run_prepare(uv_loop_t* loop);

1
2
3
4
5
6
7
8
9
10
11
12
13
void uv__run_##name(uv_loop_t* loop) {
uv_##name##_t* h;
QUEUE queue;
QUEUE* q;
QUEUE_MOVE(&loop->name##_handles, &queue);//QUEUE_MOVE
while (!QUEUE_EMPTY(&queue)) {//util empty
q = QUEUE_HEAD(&queue);//pop
h = QUEUE_DATA(q, uv_##name##_t, queue);//element->handle
QUEUE_REMOVE(q);//remove
QUEUE_INSERT_TAIL(&loop->name##_handles, q);//insert tail
h->name##_cb(h);//callback
}
}

!!!poll phase!!!

最为重要的阶段,执行I/O callback,在适当的条件下会阻塞在这个阶段

可见poll阶段的任务就是阻塞等待监听的事件来临,然后执行对应的callback,其中阻塞是带有超时时间的,以下几种情况都会使得超时时间为0

  • uv_run处于UV_RUN_NOWAIT模式下
  • uv_stop()被调用
  • 没有活跃的handles和request
  • 有活跃的idle handles
  • 有等待关闭的handles

如果上述都不符合,则超时时间为距离现在最近的timer;如果没有timer则poll阶段会一直阻塞下去

个人理解nodejs的服务,大部分时间会被阻塞在这个阶段,而不去执行closing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
// 不行了,看不懂了
void uv__io_poll(uv_loop_t* loop, int timeout) {
struct pollfd events[1024];
struct pollfd pqry;
struct pollfd* pe;
struct poll_ctl pc;
QUEUE* q;
uv__io_t* w;
uint64_t base;
uint64_t diff;
int have_signals;
int nevents;
int count;
int nfds;
int i;
int rc;
int add_failed;

if (loop->nfds == 0) {
assert(QUEUE_EMPTY(&loop->watcher_queue));
return;
}

while (!QUEUE_EMPTY(&loop->watcher_queue)) {//until watcher queue empty
q = QUEUE_HEAD(&loop->watcher_queue);
QUEUE_REMOVE(q);
QUEUE_INIT(q);

w = QUEUE_DATA(q, uv__io_t, watcher_queue);
assert(w->pevents != 0);
assert(w->fd >= 0);
assert(w->fd < (int) loop->nwatchers);

pc.events = w->pevents;
pc.fd = w->fd;

add_failed = 0;
if (w->events == 0) {
pc.cmd = PS_ADD;
if (pollset_ctl(loop->backend_fd, &pc, 1)) {
if (errno != EINVAL) {
assert(0 && "Failed to add file descriptor (pc.fd) to pollset");
abort();
}
/* Check if the fd is already in the pollset */
pqry.fd = pc.fd;
rc = pollset_query(loop->backend_fd, &pqry);
switch (rc) {
case -1:
assert(0 && "Failed to query pollset for file descriptor");
abort();
case 0:
assert(0 && "Pollset does not contain file descriptor");
abort();
}
/* If we got here then the pollset already contained the file descriptor even though
* we didn't think it should. This probably shouldn't happen, but we can continue. */
add_failed = 1;
}
}
if (w->events != 0 || add_failed) {
/* Modify, potentially removing events -- need to delete then add.
* Could maybe mod if we knew for sure no events are removed, but
* content of w->events is handled above as not reliable (falls back)
* so may require a pollset_query() which would have to be pretty cheap
* compared to a PS_DELETE to be worth optimizing. Alternatively, could
* lazily remove events, squelching them in the mean time. */
pc.cmd = PS_DELETE;
if (pollset_ctl(loop->backend_fd, &pc, 1)) {
assert(0 && "Failed to delete file descriptor (pc.fd) from pollset");
abort();
}
pc.cmd = PS_ADD;
if (pollset_ctl(loop->backend_fd, &pc, 1)) {
assert(0 && "Failed to add file descriptor (pc.fd) to pollset");
abort();
}
}

w->events = w->pevents;
}

assert(timeout >= -1);
base = loop->time;
count = 48; /* Benchmarks suggest this gives the best throughput. */

for (;;) {
nfds = pollset_poll(loop->backend_fd,
events,
ARRAY_SIZE(events),
timeout);

/* Update loop->time unconditionally. It's tempting to skip the update when
* timeout == 0 (i.e. non-blocking poll) but there is no guarantee that the
* operating system didn't reschedule our process while in the syscall.
*/
SAVE_ERRNO(uv__update_time(loop));

if (nfds == 0) {
assert(timeout != -1);
return;
}

if (nfds == -1) {
if (errno != EINTR) {
abort();
}

if (timeout == -1)
continue;

if (timeout == 0)
return;

/* Interrupted by a signal. Update timeout and poll again. */
goto update_timeout;
}

have_signals = 0;
nevents = 0;

assert(loop->watchers != NULL);
loop->watchers[loop->nwatchers] = (void*) events;
loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds;

for (i = 0; i < nfds; i++) {
pe = events + i;
pc.cmd = PS_DELETE;
pc.fd = pe->fd;

/* Skip invalidated events, see uv__platform_invalidate_fd */
if (pc.fd == -1)
continue;

assert(pc.fd >= 0);
assert((unsigned) pc.fd < loop->nwatchers);

w = loop->watchers[pc.fd];

if (w == NULL) {
/* File descriptor that we've stopped watching, disarm it.
*
* Ignore all errors because we may be racing with another thread
* when the file descriptor is closed.
*/
pollset_ctl(loop->backend_fd, &pc, 1);
continue;
}

/* Run signal watchers last. This also affects child process watchers
* because those are implemented in terms of signal watchers.
*/
if (w == &loop->signal_io_watcher)
have_signals = 1;
else
w->cb(loop, w, pe->revents);

nevents++;
}

if (have_signals != 0)
loop->signal_io_watcher.cb(loop, &loop->signal_io_watcher, POLLIN);

loop->watchers[loop->nwatchers] = NULL;
loop->watchers[loop->nwatchers + 1] = NULL;

if (have_signals != 0)
return; /* Event loop should cycle now so don't poll again. */

if (nevents != 0) {
if (nfds == ARRAY_SIZE(events) && --count != 0) {
/* Poll for more events but don't block this time. */
timeout = 0;
continue;
}
return;
}

if (timeout == 0)
return;

if (timeout == -1)
continue;

update_timeout:
assert(timeout > 0);

diff = loop->time - base;
if (diff >= (uint64_t) timeout)
return;

timeout -= diff;
}
}

check phase

见idle prepare

close

关闭handle

1
2
3
4
5
6
7
8
9
10
11
12
13
static void uv__run_closing_handles(uv_loop_t* loop) {
uv_handle_t* p;
uv_handle_t* q;

p = loop->closing_handles;
loop->closing_handles = NULL;

while (p) {
q = p->next_closing;
uv__finish_close(p);
p = q;
}
}

where is process.nextTick

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//lib/internal/process/task_queues.js
// `nextTick()` will not enqueue any callback when the process is about to
// exit since the callback would not have a chance to be executed.
// 意思就是nextTick在进程快要结束时不会排队callback,因为没有机会执行
// 你们看引用的文档吧,我看不下去了😭
// 主要的思路是JS执行process.nexTick(),然后将callback交给c++执行
function nextTick(callback) {
if (typeof callback !== 'function')
throw new ERR_INVALID_CALLBACK(callback);

if (process._exiting)
return;

let args;
switch (arguments.length) {
case 1: break;
case 2: args = [arguments[1]]; break;
case 3: args = [arguments[1], arguments[2]]; break;
case 4: args = [arguments[1], arguments[2], arguments[3]]; break;
default:
args = new Array(arguments.length - 1);
for (let i = 1; i < arguments.length; i++)
args[i - 1] = arguments[i];
}

if (queue.isEmpty())
setHasTickScheduled(true);
const asyncId = newAsyncId();
const triggerAsyncId = getDefaultTriggerAsyncId();
const tickObject = {
[async_id_symbol]: asyncId,
[trigger_async_id_symbol]: triggerAsyncId,
callback,
args
};
if (initHooksExist())
emitInit(asyncId, 'TickObject', triggerAsyncId, tickObject);
queue.push(tickObject);//封装callback push
//进入c
}

question

1.setTimeout vs setImmediate

  • phase执行顺序
  • expire设置0是不是立刻执行
1
2
3
4
5
6
7
setTimeout(() => {
console.log('setTimeout')
}, 0)

setImmediate(() => {
console.log('setImmediate')
})
  • setTimeout/setInterval 的第二个参数取值范围是:[1, 2^31 - 1],如果超过这个范围则会初始化为 1,即 setTimeout(fn, 0) === setTimeout(fn, 1)。
  • setTimeout 的回调函数在 timer 阶段执行,setImmediate 的回调函数在 check 阶段执行,event loop 的开始会先检查 timer 阶段,但是在开始之前到 timer 阶段会消耗一定时间,所以就会出现两种情况:
    • timer 前的准备时间超过 1ms,满足 loop->time >= 1,则执行 timer 阶段(setTimeout)的回调函数
    • timer 前的准备时间小于 1ms,则先执行 check 阶段(setImmediate)的回调函数,下一次 event loop 执行 timer 阶段(setTimeout)的回调函数

在举例:

1
2
3
4
5
6
7
8
9
10
setTimeout(() => {
console.log('setTimeout')
}, 0)

setImmediate(() => {
console.log('setImmediate')
})

const start = Date.now()
while (Date.now() - start < 10);//准备时间超过1ms,则直接执行timer

2.setTimeout vs setImmediate 2

1
2
3
4
5
6
7
8
9
10
11
12
13
const fs = require('fs')

fs.readFile(__filename, () => {
setTimeout(() => {
console.log('setTimeout')
}, 0)

setImmediate(() => {
console.log('setImmediate')
})
})
//setImmediate
//setTimeout

在引用一下官方对于check phase的介绍

This phase allows a person to execute callbacks immediately after the poll phase has completed. If the poll phase becomes idle and scripts have been queued with setImmediate(), the event loop may continue to the check phase rather than waiting.

setImmediate() is actually a special timer that runs in a separate phase of the event loop. It uses a libuv API that schedules callbacks to execute after the poll phase has completed.

Generally, as the code is executed, the event loop will eventually hit the poll phase where it will wait for an incoming connection, request, etc. However, if a callback has been scheduled with setImmediate() and the poll phase becomes idle, it will end and continue to the check phase rather than waiting for poll events.

fs.readFile 的回调函数执行完后:

  1. 注册 setTimeout 的回调函数到 timer 阶段
  2. 注册 setImmediate 的回调函数到 check 阶段
  3. event loop 从 pool 阶段出来继续往下一个阶段执行,恰好是 check 阶段,所以 setImmediate 的回调函数先执行
  4. 本次 event loop 结束后,进入下一次 event loop,执行 setTimeout 的回调函数

所以,在 I/O Callbacks 中注册的 setTimeout 和 setImmediate,永远都是 setImmediate 先执行。

3.process.nextTick()

1
2
3
4
5
6
7
8
9
10
11
12
setInterval(() => {
console.log('setInterval')
}, 100)

process.nextTick(function tick () {
process.nextTick(tick)
})
//note
setImmediate(function immediate () {
console.log('111');//会直接打印出很多次111
setImmediate(immediate)
})

运行结果:setInterval 永远不会打印出来

//这个在node官方文档也有相关的描述

//我在这里也进行了笔记记录

//允许用户处理errors,清理不需要的资源,事件循环前 尝试重新连接

//有时有必要在eventloop继续之前,在call stack unwound之后,让callback执行

解释:process.nextTick 会无限循环,将 event loop 阻塞在 microtask 阶段,导致 event loop 上其他 macrotask 阶段的回调函数没有机会执行。//这段解释是前端的,后端是没有microtask的实际队列的

解决方法通常是用 setImmediate 替代 process.nextTick,如下:

1
2
3
4
5
6
7
setInterval(() => {
console.log('setInterval')
}, 100)

setImmediate(function immediate () {
setImmediate(immediate)
})

运行结果:每 100ms 打印一次 setInterval。

解释:process.nextTick 内执行 process.nextTick 仍然将 tick 函数注册到当前 microtask 的尾部,所以导致 microtask 永远执行不完; setImmediate 内执行 setImmediate 会将 immediate 函数注册到下一次 event loop 的 check 阶段,而不是当前正在执行的 check 阶段,所以给了 event loop 上其他 macrotask 执行的机会。

再看个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
setImmediate(() => {
console.log('setImmediate1')
setImmediate(() => {
console.log('setImmediate2')
})
process.nextTick(() => {
console.log('nextTick')
})
})

setImmediate(() => {
console.log('setImmediate3')
})

运行结果:

1
2
3
4
setImmediate1
setImmediate3
nextTick
setImmediate2

注意:并不是说 setImmediate 可以完全替代 process.nextTick,process.nextTick 在特定场景下还是无法被替代的,比如我们就想将一些操作放到最近的 microtask 里执行。

4.promise

1
2
3
4
5
const promise = Promise.resolve()
.then(() => {
return promise
})
promise.catch(console.error)

运行结果:

1
2
3
4
5
6
TypeError: Chaining cycle detected for promise #<Promise>
at <anonymous>
at process._tickCallback (internal/process/next_tick.js:188:7)
at Function.Module.runMain (module.js:667:11)
at startup (bootstrap_node.js:187:16)
at bootstrap_node.js:607:3

解释:promise.then 类似于 process.nextTick,都会将回调函数注册到 microtask 阶段。上面代码会导致死循环,类似前面提到的:

1
2
3
process.nextTick(function tick () {
process.nextTick(tick)
})

再看个例子:

1
2
3
4
5
6
7
8
9
const promise = Promise.resolve()

promise.then(() => {
console.log('promise')
})

process.nextTick(() => {
console.log('nextTick')
})

运行结果:

1
2
nextTick
promise

解释:promise.then 虽然和 process.nextTick 一样,都将回调函数注册到 microtask,但优先级不一样。process.nextTick 的 microtask queue 总是优先于 promise 的 microtask queue 执行。

5.promise执行顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
setTimeout(() => {
console.log(1)
}, 0)

new Promise((resolve, reject) => {
console.log(2)
for (let i = 0; i < 10000; i++) {
i === 9999 && resolve()
}
console.log(3)
}).then(() => {
console.log(4)
})
console.log(5)

运行结果:

1
2
3
4
5
2
3
5
4
1

解释:Promise 构造函数是同步执行的,所以先打印 2、3,然后打印 5,接下来 event loop 进入执行 microtask 阶段,执行 promise.then 的回调函数打印出 4,然后执行下一个 macrotask,恰好是 timer 阶段的 setTimeout 的回调函数,打印出 1。

6.综合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
setImmediate(() => {
console.log(1)
setTimeout(() => {
console.log(2)
}, 100)
setImmediate(() => {
console.log(3)
})
process.nextTick(() => {
console.log(4)
})
})
process.nextTick(() => {
console.log(5)
setTimeout(() => {
console.log(6)
}, 100)
setImmediate(() => {
console.log(7)
})
process.nextTick(() => {
console.log(8)
})
})
console.log(9)

运行结果:

1
2
3
4
5
6
7
8
9
9
5
8
1
7
4
3
6
2

process.nextTick、setTimeout 和 setImmediate 的组合,请读者自己推理吧。

other source code

setTimeout()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//lib/timers/promises.js
//setTimeout(function(){},expire)
function setTimeout(after, value, options = {}) {
const args = value !== undefined ? [value] : value;
if (options == null || typeof options !== 'object') {
return PromiseReject(
new ERR_INVALID_ARG_TYPE(
'options',
'Object',
options));
}
const { signal, ref = true } = options;
if (signal !== undefined &&
(signal === null ||
typeof signal !== 'object' ||
!('aborted' in signal))) {
return PromiseReject(
new ERR_INVALID_ARG_TYPE(
'options.signal',
'AbortSignal',
signal));
}
if (typeof ref !== 'boolean') {
return PromiseReject(
new ERR_INVALID_ARG_TYPE(
'options.ref',
'boolean',
ref));
}
// TODO(@jasnell): If a decision is made that this cannot be backported
// to 12.x, then this can be converted to use optional chaining to
// simplify the check.
if (signal && signal.aborted)
return PromiseReject(lazyDOMException('AbortError'));
return new Promise((resolve, reject) => {
const timeout = new Timeout(resolve, after, args, false, true);
if (!ref) timeout.unref();
insert(timeout, timeout._idleTimeout);
if (signal) {
signal.addEventListener('abort', () => {
if (!timeout._destroyed) {
// eslint-disable-next-line no-undef
clearTimeout(timeout);
reject(lazyDOMException('AbortError'));
}
}, { once: true });
}
});
}