记一次生产者消费者读取200w数据, 写入2000w数据的过程
in PHP with 0 条评论

记一次生产者消费者读取200w数据, 写入2000w数据的过程

in PHP with 0 comment

记一次生产者消费者读取200w数据, 写入2000w数据的过程

前情提要: 由于开发阶段,数据量的不充足,同时线上的产品数据需要使用到经纬度这一块数据. 就有数据部门的同事爬去了100w条数据,给的也是csv文件, 由此产生一个问题, 如何来大批量的读取这些数据呢

这里我们想到了两种方式, csv文件读取采用文件行读取, 读取excel 采用 phpexcel

为了兼容以后可能导入的xls文件,我这里就采用了 读取excel 的方法

我们需要考虑到两个问题

1) phpexcel 是读取到内存中的, 100w数据虽然不多,但如果不去设置响应的参数,极有可能导致内存溢出

2) 读取的这是商圈信息, 可是还需要进行 写入假数据 : 店铺 折扣 等, 这些都是应该在读取的时候同时进行写入mysql的, 怎么解决读写产生的发生异常问题

通过搜寻资料, 找到了可能比较适合现阶段的做法的解决办法

通过生产者 - 消费者模式 进行

何为 生产者 - 消费者模式

见文章: 聊聊并发——生产者消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

依照现有的问题自我解释一遍: 读数据的方法 便是生产者, 写数据的操作便是消费者, 通过该种方式,将读 写隔离开, 互不影响

以上也会产生几个问题

1) 生产者速度大于消费者速度
2) 消费者速度大于生产者速度

最简单的解决办法,便是增加生产者 或 增加消费者 提高速度. 这也是其灵魂所在吧

两者之间的通信方式当然多种多样, 这里我们采用 redis 队列作为通信介质进行

请输入图片描述

安装相关的包工具

excel 套餐包 : phpoffice/phpspreadsheet

假数据包 : fzaninotto/faker

实现

use PhpOffice\PhpSpreadsheet\Reader\IReadFilter;

# 参考来自于
# https://phpspreadsheet.readthedocs.io/en/latest/topics/reading-files/

// 这是一个指定读取某些行的过滤器, 来自于官方文档写法
class ChunkReadFilter implements IReadFilter
{
    private $startRow = 0;

    private $endRow = 0;

    /**
    * Set the list of rows that we want to read.
    *
    * @param mixed $startRow
    * @param mixed $chunkSize
    */
    public function setRows($startRow, $chunkSize)
    {
        $this->startRow = $startRow;
        $this->endRow = $startRow + $chunkSize;
    }

    public function readCell($column, $row, $worksheetName = '')
    {
        //  Only read the heading row, and the rows that are configured in $this->_startRow and $this->_endRow
        if (($row == 1) || ($row >= $this->startRow && $row < $this->endRow)) {
            return true;
        }

        return false;
    }
}

-----------

/**
* 数据转换
* Class Transform
* @package app\literacy
*/
class Transform
{
    protected $inputFileType = 'Csv';
    protected $inputFileName = __DIR__ . '/excels/d88_info.map_position.csv';
    protected $list_key = "excel_discount";
    protected $end = 1;

    /**
    * 使用cli 命令执行
    * php think psysh
    * (new \app\literacy\Transform())->write_excel()
    * 执行了这个命令之后,再次执行
    * (new \app\literacy\Transform())->consumer();
    *
    * write_excel 是生产者, 获取数据,写入到redis队列中
    * consumer 是消费者, 从redis中获取到数据, 写入到数据库中
    */
    public function write_excel()
    {
        $reader = IOFactory::createReader($this->inputFileType);

        // 设置每个循环读取的行数
        $chunkSize = 100;

        $chunkFilter = new ChunkReadFilter();

        # 设置过滤器
        $reader->setReadFilter($chunkFilter);

        // Loop to read our worksheet in "chunk size" blocks
        for ($startRow = 2; $startRow <= 1000000; $startRow += $chunkSize) {
            $chunkFilter->setRows($startRow, $chunkSize);
            $spreadsheet = $reader->load($this->inputFileName);
            $sheetData = $spreadsheet->getActiveSheet()->toArray(null, true, true, true);
            $this->producer($sheetData);


            unset($sheetData);
            unset($spreadsheet);
        }
    }

    .....

$sheetData 就是我们读取的数据,他是循环方式读取的, 每次读取的设置的是100行, 我测试了一下,读取100行的速度大约在1.5s

生产者

创建一个生产者,将数据写入到 redis队列中

    ....

    /**
    * 生产者
    */
    public function producer($data)
    {
        if($data) {
            $redis = redis_connect(2);
            $data = serialize($data);
            $key = md5(microtime(true));
            $redis->set($key, $data, 3600);
            $redis->lPush($this->list_key, $key);
        }else{
            echo "未读取到数据";
        }
    }

这里我们采用key-value的形式写入数据, 将key写入到队列中, 同理, 获取数据的时候,去list - key数据后再获取value数据, 并写入数据库

消费者

    /**
    * 消费者
    * 这个需要进行异步执行, 例如使用命令行
    * php think psysh
    * (new \app\literacy\Transform())->consumer();
    */
    public function consumer()
    {
        $redis = redis_connect(2);
        while ($this->end < 10) {
            $keys = $redis->blPop([$this->list_key], 50);
            if(!empty($keys)) {
                $key = $keys[1];
                $data = $redis->get($key);
                if($data) {
                    $redis->del($key);
                    $this->write_mysql(unserialize($data));
                }
            }else{
                $this->end = $this->end + 1;
            }
        }
    }

    /**
    * 把数据写入数据库
    * @param $data
    */
    public function write_mysql($origin_data)
    {
        $data = [];
        $superStoreIds = Superstore::field('id')->all();
        foreach ($origin_data as $k => $item) {
            if(!is_null($item['A'])) {
                $data['superstore_name'] = random_int(0,10000) . "店";
                $data['longitude'] = $item['C'];
                $data['latitude'] = $item['D'];
                $data['type'] = random_int(0,2);
                $data['create_time'] = time();
                $data['update_time'] = time();
                $data['creator_id'] = 1;
                $data['store_num'] = random_int(2,20);
                $sid = Db::connect('localhost_connect')->table('d88_superstore')->insertGetId($data);

                echo "商圈 +1";
                $this->store($sid, $data['store_num']);
            }

        }
    }

如上, 我们能看到 blPop: 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止. 也就是说,获取不到数据的时候, 会进行阻塞, 同时使用while也会为程序阻塞一段时间

不过由于我们需要在创建商圈的同时创建店铺折扣, 导致, 读取200w条数据, 实际加上假数据要写入1000 多万数据.

所以我们需要开启多个线程来跑这个消费者的命令

采用的方法

1) workman 来读取

2) cli 执行, 多窗口

因为的这里的内置了 psysh 所以, 直接使用

    php think psysh
    (new \app\literacy\Transform())->write_excel()
    执行了这个命令之后,再次执行
    (new \app\literacy\Transform())->consumer();

我们则开启多窗口信息跑命令

截图

Responses
备案号: 赣ICP备17004055号-2