记一次生产者消费者读取200w数据, 写入2000w数据的过程
前情提要: 由于开发阶段,数据量的不充足,同时线上的产品数据需要使用到经纬度这一块数据. 就有数据部门的同事爬去了100w条数据,给的也是csv文件, 由此产生一个问题, 如何来大批量的读取这些数据呢
这里我们想到了两种方式, csv文件读取采用文件行读取, 读取excel 采用 phpexcel
为了兼容以后可能导入的xls文件,我这里就采用了 读取excel 的方法
我们需要考虑到两个问题
1) phpexcel 是读取到内存中的, 100w数据虽然不多,但如果不去设置响应的参数,极有可能导致内存溢出
2) 读取的这是商圈信息, 可是还需要进行 写入假数据 : 店铺 折扣 等, 这些都是应该在读取的时候同时进行写入mysql的, 怎么解决读写产生的发生异常问题
通过搜寻资料, 找到了可能比较适合现阶段的做法的解决办法
通过生产者 - 消费者模式 进行
何为 生产者 - 消费者模式
见文章: 聊聊并发——生产者消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
依照现有的问题自我解释一遍: 读数据的方法 便是生产者, 写数据的操作便是消费者, 通过该种方式,将读 写隔离开, 互不影响
以上也会产生几个问题
1) 生产者速度大于消费者速度
2) 消费者速度大于生产者速度
最简单的解决办法,便是增加生产者 或 增加消费者 提高速度. 这也是其灵魂所在吧
两者之间的通信方式当然多种多样, 这里我们采用 redis 队列作为通信介质进行
安装相关的包工具
excel 套餐包 : phpoffice/phpspreadsheet
假数据包 : fzaninotto/faker
实现
use PhpOffice\PhpSpreadsheet\Reader\IReadFilter;
# 参考来自于
# https://phpspreadsheet.readthedocs.io/en/latest/topics/reading-files/
// 这是一个指定读取某些行的过滤器, 来自于官方文档写法
class ChunkReadFilter implements IReadFilter
{
private $startRow = 0;
private $endRow = 0;
/**
* Set the list of rows that we want to read.
*
* @param mixed $startRow
* @param mixed $chunkSize
*/
public function setRows($startRow, $chunkSize)
{
$this->startRow = $startRow;
$this->endRow = $startRow + $chunkSize;
}
public function readCell($column, $row, $worksheetName = '')
{
// Only read the heading row, and the rows that are configured in $this->_startRow and $this->_endRow
if (($row == 1) || ($row >= $this->startRow && $row < $this->endRow)) {
return true;
}
return false;
}
}
-----------
/**
* 数据转换
* Class Transform
* @package app\literacy
*/
class Transform
{
protected $inputFileType = 'Csv';
protected $inputFileName = __DIR__ . '/excels/d88_info.map_position.csv';
protected $list_key = "excel_discount";
protected $end = 1;
/**
* 使用cli 命令执行
* php think psysh
* (new \app\literacy\Transform())->write_excel()
* 执行了这个命令之后,再次执行
* (new \app\literacy\Transform())->consumer();
*
* write_excel 是生产者, 获取数据,写入到redis队列中
* consumer 是消费者, 从redis中获取到数据, 写入到数据库中
*/
public function write_excel()
{
$reader = IOFactory::createReader($this->inputFileType);
// 设置每个循环读取的行数
$chunkSize = 100;
$chunkFilter = new ChunkReadFilter();
# 设置过滤器
$reader->setReadFilter($chunkFilter);
// Loop to read our worksheet in "chunk size" blocks
for ($startRow = 2; $startRow <= 1000000; $startRow += $chunkSize) {
$chunkFilter->setRows($startRow, $chunkSize);
$spreadsheet = $reader->load($this->inputFileName);
$sheetData = $spreadsheet->getActiveSheet()->toArray(null, true, true, true);
$this->producer($sheetData);
unset($sheetData);
unset($spreadsheet);
}
}
.....
$sheetData
就是我们读取的数据,他是循环方式读取的, 每次读取的设置的是100行, 我测试了一下,读取100行的速度大约在1.5s
生产者
创建一个生产者,将数据写入到 redis队列中
中
....
/**
* 生产者
*/
public function producer($data)
{
if($data) {
$redis = redis_connect(2);
$data = serialize($data);
$key = md5(microtime(true));
$redis->set($key, $data, 3600);
$redis->lPush($this->list_key, $key);
}else{
echo "未读取到数据";
}
}
这里我们采用key-value的形式写入数据, 将key写入到队列中, 同理, 获取数据的时候,去list - key数据后再获取value数据, 并写入数据库
消费者
/**
* 消费者
* 这个需要进行异步执行, 例如使用命令行
* php think psysh
* (new \app\literacy\Transform())->consumer();
*/
public function consumer()
{
$redis = redis_connect(2);
while ($this->end < 10) {
$keys = $redis->blPop([$this->list_key], 50);
if(!empty($keys)) {
$key = $keys[1];
$data = $redis->get($key);
if($data) {
$redis->del($key);
$this->write_mysql(unserialize($data));
}
}else{
$this->end = $this->end + 1;
}
}
}
/**
* 把数据写入数据库
* @param $data
*/
public function write_mysql($origin_data)
{
$data = [];
$superStoreIds = Superstore::field('id')->all();
foreach ($origin_data as $k => $item) {
if(!is_null($item['A'])) {
$data['superstore_name'] = random_int(0,10000) . "店";
$data['longitude'] = $item['C'];
$data['latitude'] = $item['D'];
$data['type'] = random_int(0,2);
$data['create_time'] = time();
$data['update_time'] = time();
$data['creator_id'] = 1;
$data['store_num'] = random_int(2,20);
$sid = Db::connect('localhost_connect')->table('d88_superstore')->insertGetId($data);
echo "商圈 +1";
$this->store($sid, $data['store_num']);
}
}
}
如上, 我们能看到 blPop
: 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止. 也就是说,获取不到数据的时候, 会进行阻塞, 同时使用while也会为程序阻塞一段时间
不过由于我们需要在创建商圈的同时创建店铺折扣, 导致, 读取200w条数据, 实际加上假数据要写入1000 多万数据.
所以我们需要开启多个线程来跑这个消费者的命令
采用的方法
1) workman 来读取
2) cli 执行, 多窗口
因为的这里的内置了 psysh 所以, 直接使用
php think psysh
(new \app\literacy\Transform())->write_excel()
执行了这个命令之后,再次执行
(new \app\literacy\Transform())->consumer();
我们则开启多窗口信息跑命令
本文由 邓尘锋 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: May 29, 2019 at 04:27 pm