在PHP中使用协程实现多任务调度

2017-03-25

这篇文章是根据鸟叔的博文- 在PHP中使用协程实现多任务调度,做了一些易于理解的注释,修改了其中的一处错误。

生成器为可中断的函数

​  要从生成器认识协程,理解它内部是如何工作是非常重要的:生成器是一种可中断的函数,在它里面的yield构成了中断点。

​  以前一篇文章的为例,调用 xrange(1,1000000) 的时候,xrange() 函数里代码其实并没有真正地运行。 它只是返回了一个迭代器,而这个迭代器实现了 Iterator 接口:

1
2
3
4
$range = xrange(0100000000);
var_dump($range); //object(Generator)[1]
var_dump($range instanceof Iterator); //boolean(true) ,
// note : instanceof 用于确定一个 PHP 变量是否属于某一类 class 的实例

Generator 类的主要方法:

1
2
3
4
5
6
7
8
Generator::current — 返回当前产生的值
Generator::key — 返回当前产生的键
Generator::next — 生成器继续执行
Generator::rewind — 重置迭代器
Generator::send — 向生成器中传入一个值,并且当做 yield 表达式的结果,然后继续执行生成器。
Generator::throw — 向生成器中抛入一个异常
Generator::valid — 检查迭代器是否被关闭
Generator::__wakeup — 序列化回调

​  调用迭代器的方法一次,其中的代码运行一次。例如,如果你调用$range->rewind(),那么 xrange() 里的代码就会运行到控制流第一次出现 yield 的地方。而函数内传递给 yield 语句的返回值可以通过$range->current()获取。

​  为了继续执行生成器中 yield 后的代码,你就需要调$range->next()方法。这将再次启动生成器,直到下一次 yield 语句出现。 因此,连续调用 next() 和 current() 方法,你就能从生成器里获得所有的值,直到再没有 yield 语句出现。

​  对 xrange() 来说,这种情形出现在$i 超过$end时。在这中情况下, 控制流将到达函数的终点,因此将不执行任何代码。一旦这种情况发生,vaild() 方法将返回假,这时迭代结束。

协程

  协程的支持是在迭代生成器的基础上,增加了可以回送数据给生成器的功能(调用者发送数据给被调用的生成器函数)。这就把生成器到调用者的单向通信转变为两者之间的双向通信。
  传递数据的功能是通过迭代器的send()方法实现的。下面的 logger() 协程是这种通信如何运行的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
function logger($fileName){

fileHandle = fopen(fileName,'a'); //'a' 写入方式打开,将文件指针指向文件末尾。

while (true) {
fwrite($fileHandle, yield"\n");
}
}

$logger = logger(DIR。'/log。txt');

$logger -> send('Foo');

$logger -> send('Bar');

  正如你能看到,这儿 yield 没有作为一个语句来使用,而是用作一个表达式,即它能被演化成一个值。这个值就是调用者传递给 send() 方法的值。在这个例子里,yield 表达式将首先被 ”Foo” 替代写入 Log,然后被 ”Bar” 替代写入 Log。

传入生成器的值。这个值将会被作为生成器当前所在的 yield 的返回值

  上面的例子里演示了yield作为接受者, 接下来我们看如何同时进行接收和发送的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
function gen(){

$ret = (yield 'yield_1');

var_dump($ret);

$ret = (yield 'yield_2');

var_dump($ret);

}

var_dump($gen->current()); // 当前 yield 指向 "yield1"

var_dump($gen->send('ret1')); // "ret1" gen 方法中的 var_dump 返回
// "yield2" $gen->return() 返回
var_dump($gen->send('ret2')); // "ret2" gen 方法中的 var_dump 返回
// NULL $gen->return() 返回

  要很快的理解输出的精确顺序可能稍微有点困难,但你确定要搞清楚为什按照这种方式输出,以便后续继续阅读。

另外,我要特别指出的有两点:

  • yield表达式两边的括号在PHP7以前不是可选的,也就是说在 PHP5.5 和 PHP5.6 中圆括号是必须的。
  • 你可能已经注意到调用 current() 之前没有调用 rewind()。这是因为生成迭代对象的时候已经隐含地执行了 rewind 操作.

这个是错的,文档中这么写的:send:如果当这个方法被调用时,生成器不在 yield 表达式,那么在传入值之前,它会先运行到第一个 yield 表达式

send 方法返回的值都指向下一个 yield。

多任务协作

  如果阅读了上面的 logger() 例子, 你也许会疑惑“为了双向通信我为什么要使用协程呢?我完全可以使用其他非协程方法实现同样的功能啊?”, 是的,你是对的,但上面的例子只是为了演示了基本用法,这个例子其实并没有真正的展示出使用协程的优点。

  正如上面介绍里提到的,协程是非常强大的概念,不过却应用的很稀少而且常常十分复杂。要给出一些简单而真实的例子很难。

  在这篇文章里,我决定去做的是使用协程实现多任务协作。我们要解决的问题是你想并发地运行多任务(或者“程序”。不过我们都知道 CPU 在一个时刻只能运行一个任务(不考虑多核的情况)。因此处理器需要在不同的任务之间进行切换,而且总是让每个任务运行 “一小会儿”。

  多任务协作这个术语中的“协作”很好的说明了如何进行这种切换的:它要求当前正在运行的任务自动把控制传回给调度器,这样就可以运行其他任务了。这与“抢占”多任务相反,抢占多任务是这样的:调度器可以中断运行了一段时间的任务,不管它喜欢还是不喜欢。协作多任务在 Windows 的早期版本 windows95 和 Mac OS 中有使用, 不过它们后来都切换到使用抢先多任务了. 理由相当明确:如果你依靠程序自动交出控制的话,那么一些恶意的程序将很容易占用整个 CPU,不与其他任务共享。

  现在你应当明白协程和任务调度之间的关系:yield 指令提供了任务中断自身的一种方法,然后把控制交回给任务调度器,因此协程可以运行多个其他任务。更进一步来说,yield 还可以用来在任务和调度器之间进行通信。

  为了实现我们的多任务调度,首先实现“任务” — 一个用轻量级的包装的协程函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class Task {
protected $taskId;
protected $coroutine;//协同
protected $sendValue = null;
//protected $beforeFirstYield = true;

public function __construct($taskId, Generator $coroutine)
{
$this->taskId = $taskId;
$this->coroutine = $coroutine;
}

public function getTaskId()
{
return $this->taskId;
}

public function setSendValue($sendValue)
{
$this->sendValue = $sendValue;
}

public function run()
{
$retval = $this->coroutine->send($this->sendValue);//向生成器中传入一个值
$this->sendValue = null;
return $retval;
}
// 这是原文的 run,因为不需要 beforeFirstYield

public function old_run()
{
if ($this->beforeFirstYield) {
$this->beforeFirstYield = false;
return $this->coroutine->current();
} else {
$retval = $this->coroutine->send($this->sendValue);
$this->sendValue = null;
return $retval;
}
}

public function isFinished()
{
return !$this->coroutine->valid(); //检查迭代器是否被关闭
}
}

  如代码,一个任务就是用任务 ID 标记的一个协程(函数)。使用 setSendValue() 方法,你可以指定哪些值将被发送到下次的恢复(在之后你会了解到我们需要这个),run() 函数确实没有做什么,除了调用 send() 方法的协同程序,要理解为什么添加了一个 beforeFirstYieldflag 变量,需要考虑下面的代码片段:

由于上文提到的那个错误,这里的代码不展示,有需要可以去原文查看,这里去掉 beforeFirstYieldflag 这个变量,它的作用是,确定第一个yield的值能被正确返回。

  调度器现在不得不比多任务循环要做稍微多点了,然后才运行多任务:

这里实在突兀,下面是一个调度程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?php
class Scheduler {
protected $maxTaskId = 0;
protected $taskMap = []; // taskId => task
protected $taskQueue; //任务队列

public function __construct()
{
$this->taskQueue = new SplQueue(); //SplQueue类提供使用双向链表实现的队列的主要功能。
}

public function newTask(Generator $coroutine)
{
//协同
$tid = ++$this->maxTaskId;
$task = new Task($tid, $coroutine);
$this->taskMap[$tid] = $task;
$this->schedule($task);
return $tid;
}

public function schedule(Task $task)
{
$this->taskQueue->enqueue($task); //加入到队列队尾
}

public function run()
{
while (!$this->taskQueue->isEmpty()) {
$task = $this->taskQueue->dequeue();
$task->run();

if ($task->isFinished()) {
unset($this->taskMap[$task->getTaskId()]);
} else {
$this->schedule($task);
}
}
}
}

SplQueue 类提供使用双向链表实现的队列的主要功能。主要方法:

  • dequeue:从队列中删除节点
  • enqueue:向队列中添加一个元素。
  • setIteratorMode:设置迭代的模式
  • isEmpty:检查双向链表是否为空。

  newTask() 方法(使用下一个空闲的任务 id)创建一个新任务,然后把这个任务放入任务 map 数组里。 接着它通过把任务放入任务队列里来实现对任务的调度。接着 run() 方法扫描任务队列,运行任务。如果一个任务结束了,那么它将从队列里删除, 否则它将在队列的末尾再次被调度。

  让我们看看下面具有两个简单(没有什么意义)任务的调度器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
function task1() {
for ($i = 1; $i <= 10; ++$i) {
echo "This is task 1 iteration $i.\n";
yield;
}
}

function task2() {
for ($i = 1; $i <= 5; ++$i) {
echo "This is task 2 iteration $i.\n";
yield;
}
}

$scheduler = new Scheduler;
$scheduler->newTask(task1());
$scheduler->newTask(task2());
$scheduler->run();

  两个任务都仅仅回显一条信息,然后使用yield把控制回传给调度器。输出结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
This is task 1 iteration 1
This is task 1 iteration 2
This is task 2 iteration 1
This is task 2 iteration 2
This is task 1 iteration 3
This is task 2 iteration 3
This is task 1 iteration 4
This is task 2 iteration 4
This is task 1 iteration 5
This is task 2 iteration 5
This is task 1 iteration 6
This is task 1 iteration 7
This is task 1 iteration 8
This is task 1 iteration 9
This is task 1 iteration 10

  对前五个迭代来说,两个任务是交替运行的,而在第二个任务结束后,只有第一个任务继续运行。

与调度器之间通信

  既然调度器已经运行了, 那么我们来看下一个问题:任务和调度器之间的通信。

  我们将使用进程用来和操作系统会话的同样的方式来通信:系统调用。

  我们需要系统调用的理由是操作系统与进程相比它处在不同的权限级别上。因此为了执行特权级别的操作(如杀死另一个进程),就不得不以某种方式把控制传回给内核,这样内核就可以执行所说的操作了。再说一遍,这种行为在内部是通过使用中断指令来实现的。过去使用的是通用的int指令,如今使用的是更特殊并且更快速的 syscall/sysenter 指令。

  我们的任务调度系统将反映这种设计:不是简单地把调度器传递给任务(这样就允许它做它想做的任何事),我们将通过给yield表达式传递信息来与系统调用通信。这儿yield即是中断,也是传递信息给调度器(和从调度器传递出信息)的方法。

  为了说明系统调用, 我们对可调用的系统调用做一个小小的封装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class SystemCall {

protected $callback;

public function __construct(callable $callback)
{
$this->callback = $callback;
}

public function __invoke(Task $task, Scheduler $scheduler)
{
$callback = $this->callback;
return $callback($task, $scheduler);
}
}

function callback() {
echo 'test';
}

$fun_call = 'callback';
$syscall = new SystemCall($fun_call);
var_dump(is_callable($syscall)); //检测参数是否为合法的可调用结构 true
var_dump($syscall); //object(SystemCall)

  它和其他任何可调用的对象(使用 _invoke )一样的运行, 不过它要求调度器把正在调用的任务和自身传递给这个函数.

  为了解决这个问题我们不得不微微的修改调度器的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public function run() {
while (!$this->taskQueue->isEmpty()) {
$task = $this->taskQueue->dequeue();
$retval = $task->run();

if ($retval instanceof SystemCall) { // instanceof 用于确定一个 php 变量是会否属于某个类
$retval($task, $this);
continue;
}

if ($task->isFinished()) {
unset($this->taskMap[$task->getTaskId()]);
} else {
$this->schedule($task);
}
}
}

  第一个系统调用除了返回任务ID外什么都没有做:

1
2
3
4
5
6
7
function getTaskId()
{
return new SystemCall(function(Task $task, Scheduler $scheduler) {
$task->setSendValue($task->getTaskId());
$scheduler->schedule($task);
});
}

  这个函数设置任务 id 为下一次发送的值,并再次调度了这个任务。由于使用了系统调用,所以调度器不能自动调用任务,我们需要手工调度任务(稍后你将明白为什么这么做)。要使用这个新的系统调用的话,我们要重新编写以前的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
function task($max)
{
$tid = (yield getTaskId()); // <-- here's the syscall!

for ($i = 1; $i <= $max; ++$i) {
echo "This is task $tid iteration $i.\n";
yield;
}
}

$scheduler = new Scheduler;

$scheduler->newTask(task(10));
$scheduler->newTask(task(5));

$scheduler->run();
?>

  这段代码将给出与前一个例子相同的输出,请注意系统调用如何同其他任何调用一样正常地运行,只不过预先增加了yield。

  要创建新的任务,然后再杀死它们的话,需要两个以上的系统调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
function newTask(Generator $coroutine) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($coroutine) {
$task->setSendValue($scheduler->newTask($coroutine));
$scheduler->schedule($task);
}
);
}

function killTask($tid) {
return new SystemCall(
function(Task $task, Scheduler $scheduler) use ($tid) {
$task->setSendValue($scheduler->killTask($tid));
$scheduler->schedule($task);
}
);
}

  killTask函数需要在调度器里增加一个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<?php
public function killTask($tid) {
if (!isset($this->taskMap[$tid])) {
return false;
}

unset($this->taskMap[$tid]);

// This is a bit ugly and could be optimized so it does not have to walk the queue,
// but assuming that killing tasks is rather rare I won't bother with it now
foreach ($this->taskQueue as $i => $task) {
if ($task->getTaskId() === $tid) {
unset($this->taskQueue[$i]);
break;
}
}
return true;
}

  用来测试新功能的微脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
function childTask() {
$tid = (yield getTaskId());
while (true) {
echo "Child task $tid still alive!\n";
yield;
}
}

function task() {
$tid = (yield getTaskId());
$childTid = (yield newTask(childTask()));

for ($i = 1; $i <= 6; ++$i) {
echo "Parent task $tid iteration $i.\n";
yield;

if ($i == 3) yield killTask($childTid);
}
}

$scheduler = new Scheduler;
$scheduler->newTask(task());
$scheduler->run();

  这段代码将打印以下信息:

1
2
3
4
5
6
7
8
9
Parent task 1 iteration 1.
Child task 2 still alive!
Parent task 1 iteration 2.
Child task 2 still alive!
Parent task 1 iteration 3.
Child task 2 still alive!
Parent task 1 iteration 4.
Parent task 1 iteration 5.
Parent task 1 iteration 6.

  经过三次迭代以后子任务将被杀死,因此这就是”Child is still alive”消息结束的时候。不过你要明白这还不是真正的父子关系。因为在父任务结束后子任务仍然可以运行,子任务甚至可以杀死父任务。可以修改调度器使它具有更层级化的任务结构,不过这个不是我们这个文章要继续讨论的范围了。

现在你可以实现许多进程管理调用。例如 wait(它一直等待到任务结束运行时),exec(它替代当前任务)和 fork(它创建一个当前任务的克隆)。fork非常酷,而且你可以使用PHP的协程真正地实现它,因为它们都支持克隆。

让我们把这些留给有兴趣的读者吧,我们来看下一个议题。

下面的内容还没有搞特别清楚,弄好了再补充。