swoole协程通信

之前一直看文档说协程之间通信可以用通道实现,但是都只是说可以用通道推数据和读数据,没有提到怎么同步保存数据

花了很多时间调试修改,实现了一下使用一个数据处理协程对数据进行读写,保存到协程上下文

<?php
use Swoole\Coroutine;

use function Swoole\Coroutine\run;
use function Swoole\Coroutine\go;



run(function() {
    //写通道,供生产者写入数据
    $channelWrite = new Coroutine\Channel(1);
    //读通道,供消费者读取数据
    $channelRead = new Coroutine\Channel(1);

    //数据处理协程
    go(function () use ($channelWrite, $channelRead) {
        //用这个协程的上下文存储数据
        $pcid =Coroutine::getCid();
        $context = Coroutine::getContext($pcid);
        $context['count'] = [];
        go(function() use ($channelWrite, $pcid) {
            //用父协程id获取上下文,直接传数据数据修改不会同步
            $context = Coroutine::getContext($pcid);
            //从写通道读取数据,放入父协程上下文保存
            while(true) {
                $data = $channelWrite->pop();
                if (!$data) {
                    break;
                }
                $context['count'] = $data;
            }
        });
        go(function() use ($channelRead, $pcid) {
            //用父协程id获取上下文,直接传数据数据修改不会同步
            $context = Coroutine::getContext($pcid);
            //从父协程上下文获取实时数据,传入读通道,供消费协程使用
            while(true) {
                $res = $channelRead->push($context['count']);
                if (!$res) {
                    break;
                }
            }
        });
    });

    // 多个消费者协程和数据生产协程
    go (function() use ($channelRead, $channelWrite) {
        for ($i = 0; $i < 100; $i++) {
            Coroutine::sleep(1);
            if ($i % 2 == 0) {
                $data = $channelRead->pop();
                echo '读数据:', PHP_EOL;
                print_r($data);
            } else {
                $channelWrite->push(array_fill(0, 5, $i));
                echo '写入数据:'.$i, PHP_EOL;
            }
        }
        $channelRead->close();
        $channelWrite->close();
        echo '关闭通道';
    });
});

还需要对读写协程上下文做一个读写锁,应该就可以保证数据一致性

<?php
use Swoole\Coroutine;
use Swoole\Lock;
use Swoole\Timer;

use function Swoole\Coroutine\run;
use function Swoole\Coroutine\go;



run(function() {
    $channelWrite = new Coroutine\Channel(1);
    $channelRead = new Coroutine\Channel(1);

    //数据处理协程
    go(function () use ($channelWrite, $channelRead) {
        //用这个协程的上下文存储数据
        $pcid =Coroutine::getCid();
        $context = Coroutine::getContext($pcid);
        $context['count'] = 0;

        //读写锁
        $lock = new Lock(SWOOLE_RWLOCK);


        go(function() use ($channelWrite, $pcid, $lock) {
            //用父协程id获取上下文,直接传数据数据不一致
            $context = Coroutine::getContext($pcid);
            //从写通道读取数据,放入父协程上下文保存
            while(true) {
                $data = $channelWrite->pop();
                if (!$data) {
                    break;
                }
                // echo '写入数据(等写锁)', PHP_EOL;
                $lock->lock();
                $context['count'] = $data;
                // echo '写入数据', PHP_EOL;
                $lock->unlock();
            }
        });
        go(function() use ($channelRead, $pcid, $lock) {
            //用父协程id获取上下文,直接传数据数据不一致
            $context = Coroutine::getContext($pcid);
            //从父协程上下文获取实时数据,传入读通道,供消费协程使用
            while(true) {
                // echo '读取数据(等读锁)', PHP_EOL;
                $lock->lock_read();
                $data = $context['count'];
                // echo '读取数据完成', PHP_EOL;
                $lock->unlock();
                $res = $channelRead->push($data);
                if (!$res) {
                    break;
                }
            }
        });
    });

    // 多个消费者协程和数据生产协程
    go (function() use ($channelRead, $channelWrite) {
        Timer::tick(1000, function() use ($channelRead) {
            $data = $channelRead->pop();
            echo date('[i:s]') . '读数据:' . $data, PHP_EOL;
        });
        for ($i = 1; $i < 10; $i++) {
            Coroutine::sleep(1.5);
            $channelWrite->push($i);
            echo date('[i:s]') . '写入数据:'.$i, PHP_EOL;
        }
    });
});

输出

[00:11]读数据:0
[00:11]写入数据:1
[00:12]读数据:0
[00:13]读数据:0
[00:13]写入数据:2
[00:14]读数据:1
[00:14]写入数据:3
[00:15]读数据:1
[00:16]读数据:2
[00:16]写入数据:4
[00:17]读数据:3
[00:17]写入数据:5
[00:18]读数据:3
[00:19]读数据:4
[00:19]写入数据:6
[00:20]读数据:5
[00:20]写入数据:7
[00:21]读数据:5
[00:22]读数据:6
[00:22]写入数据:8
[00:23]读数据:7
[00:23]写入数据:9
[00:24]读数据:7
[00:25]读数据:8
[00:26]读数据:9
[00:27]读数据:9
[00:28]读数据:9
[00:29]读数据:9
[00:30]读数据:9
[00:31]读数据:9
[00:32]读数据:9

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注