之前一直看文档说协程之间通信可以用通道实现,但是都只是说可以用通道推数据和读数据,没有提到怎么同步保存数据
花了很多时间调试修改,实现了一下使用一个数据处理协程对数据进行读写,保存到协程上下文
<?php
use Swoole\Coroutine;
use function Swoole\Coroutine\run;
use function Swoole\Coroutine\go;
run(function() {
//写通道,供生产者写入数据
$channelWrite = new Coroutine\Channel(1);
//读通道,供消费者读取数据
$channelRead = new Coroutine\Channel(1);
//数据处理协程
go(function () use ($channelWrite, $channelRead) {
//用这个协程的上下文存储数据
$pcid =Coroutine::getCid();
$context = Coroutine::getContext($pcid);
$context['count'] = [];
go(function() use ($channelWrite, $pcid) {
//用父协程id获取上下文,直接传数据数据修改不会同步
$context = Coroutine::getContext($pcid);
//从写通道读取数据,放入父协程上下文保存
while(true) {
$data = $channelWrite->pop();
if (!$data) {
break;
}
$context['count'] = $data;
}
});
go(function() use ($channelRead, $pcid) {
//用父协程id获取上下文,直接传数据数据修改不会同步
$context = Coroutine::getContext($pcid);
//从父协程上下文获取实时数据,传入读通道,供消费协程使用
while(true) {
$res = $channelRead->push($context['count']);
if (!$res) {
break;
}
}
});
});
// 多个消费者协程和数据生产协程
go (function() use ($channelRead, $channelWrite) {
for ($i = 0; $i < 100; $i++) {
Coroutine::sleep(1);
if ($i % 2 == 0) {
$data = $channelRead->pop();
echo '读数据:', PHP_EOL;
print_r($data);
} else {
$channelWrite->push(array_fill(0, 5, $i));
echo '写入数据:'.$i, PHP_EOL;
}
}
$channelRead->close();
$channelWrite->close();
echo '关闭通道';
});
});
还需要对读写协程上下文做一个读写锁,应该就可以保证数据一致性
<?php
use Swoole\Coroutine;
use Swoole\Lock;
use Swoole\Timer;
use function Swoole\Coroutine\run;
use function Swoole\Coroutine\go;
run(function() {
$channelWrite = new Coroutine\Channel(1);
$channelRead = new Coroutine\Channel(1);
//数据处理协程
go(function () use ($channelWrite, $channelRead) {
//用这个协程的上下文存储数据
$pcid =Coroutine::getCid();
$context = Coroutine::getContext($pcid);
$context['count'] = 0;
//读写锁
$lock = new Lock(SWOOLE_RWLOCK);
go(function() use ($channelWrite, $pcid, $lock) {
//用父协程id获取上下文,直接传数据数据不一致
$context = Coroutine::getContext($pcid);
//从写通道读取数据,放入父协程上下文保存
while(true) {
$data = $channelWrite->pop();
if (!$data) {
break;
}
// echo '写入数据(等写锁)', PHP_EOL;
$lock->lock();
$context['count'] = $data;
// echo '写入数据', PHP_EOL;
$lock->unlock();
}
});
go(function() use ($channelRead, $pcid, $lock) {
//用父协程id获取上下文,直接传数据数据不一致
$context = Coroutine::getContext($pcid);
//从父协程上下文获取实时数据,传入读通道,供消费协程使用
while(true) {
// echo '读取数据(等读锁)', PHP_EOL;
$lock->lock_read();
$data = $context['count'];
// echo '读取数据完成', PHP_EOL;
$lock->unlock();
$res = $channelRead->push($data);
if (!$res) {
break;
}
}
});
});
// 多个消费者协程和数据生产协程
go (function() use ($channelRead, $channelWrite) {
Timer::tick(1000, function() use ($channelRead) {
$data = $channelRead->pop();
echo date('[i:s]') . '读数据:' . $data, PHP_EOL;
});
for ($i = 1; $i < 10; $i++) {
Coroutine::sleep(1.5);
$channelWrite->push($i);
echo date('[i:s]') . '写入数据:'.$i, PHP_EOL;
}
});
});
输出
[00:11]读数据:0
[00:11]写入数据:1
[00:12]读数据:0
[00:13]读数据:0
[00:13]写入数据:2
[00:14]读数据:1
[00:14]写入数据:3
[00:15]读数据:1
[00:16]读数据:2
[00:16]写入数据:4
[00:17]读数据:3
[00:17]写入数据:5
[00:18]读数据:3
[00:19]读数据:4
[00:19]写入数据:6
[00:20]读数据:5
[00:20]写入数据:7
[00:21]读数据:5
[00:22]读数据:6
[00:22]写入数据:8
[00:23]读数据:7
[00:23]写入数据:9
[00:24]读数据:7
[00:25]读数据:8
[00:26]读数据:9
[00:27]读数据:9
[00:28]读数据:9
[00:29]读数据:9
[00:30]读数据:9
[00:31]读数据:9
[00:32]读数据:9