Dcatadmin中用异步队列以及chunk解决大数据量/大计算量数据导出问题

公司业务经常有几万几十万甚至上百万+的数据要导出,系统是Dcatadmin做的,目前当前页导出以及选中导出没问题,但有一些业务逻辑导出时有多层嵌套计算,所以会造成任务超时,故而使用异步队列来解决问题

2024-06-30 02:00:31   2025-02-09 10:46:48   PHP   168 views  

  Laravel   导出  

所需扩展包

1.Laravel Horizon

2.Laravel Maatwebsite\Excel

可替代Laravel Maatwebsite\Excel的更高性能的php扩展php:xlswriter

因为时间匆忙急着导出使用数据所以使用了Maatwebsite\Excel,当然如果项目前期或者时间足够的话还是建议使用phpxlswriter,这里也因为时间原因以及考虑后期数据的增量、时间等需求选择避开xls以及xlsx直接导出没有格式的csv文件。这里的例子均为导出某个数据表的全部数据后加工处理再导出。

配置Redis队列(.env文件)

注意根据自身项目自行选择是否需要使用php artisan config:clear清除缓存

REDIS_HOST=127.0.0.1
REDIS_PASSWORD=null
REDIS_PORT=6379
QUEUE_CONNECTION=redis

使用代码生成器生成控制器、数据迁移文件、模型、数据仓库、数据表等文件

因为几乎没有后期查询需求,就是随时导出随时下载取用,所以索引没有加。而根据时间、业务以及mysql的大数据量存储优势选择均为string类型(牺牲存储空间换取运行效率)。

file

ExportTasks控制器(ExportTaskController)

<?php

namespace App\Admin\Controllers;

use App\Admin\Repositories\ExportTask;
use App\Jobs\ProcessExportTask;
use Dcat\Admin\Admin;
use Dcat\Admin\Form;
use Dcat\Admin\Grid;
use Dcat\Admin\Show;
use Dcat\Admin\Http\Controllers\AdminController;
use Illuminate\Support\Facades\Storage;

class ExportTaskController extends AdminController
{
    /**
     * Make a grid builder.
     *
     * @return Grid
     */
    protected function grid()
    {
        $grid = new Grid(new ExportTask());

        $grid->model()->orderBy('id','desc');
        $grid->column('id')->sortable();
        $grid->column('method','导出数据')->display(function ($value){
            switch ($value){
                case 'exportOrders':
                    return '全部订单';
                case 'exportCards':
                    return '全部卡板';
                default:
                    return '未知数据';
            }
        });
        $grid->column('status')->display(function ($value) {
            $statusStyleMap = [
                '未开始'    => 'background-color: #777; color: white;',
                '进行中' => 'background-color: #ffc107; color: black;',
                '已完成'    => 'background-color: #28a745; color: white;',
                '导出失败'     => 'background-color: #dc3545; color: white;',
                '已删除'    => 'background-color: #FF9933; color: white;'
            ];
            $style = $statusStyleMap[$value] ?? 'background-color: #777; color: white;';
            return "<span class='label label-primary' style='padding: 5px 10px; border-radius: 4px; {$style}'>" . $value . "</span>";
        });
        $grid->column('file_path','文件名称')->display(function ($filePath) {
            return $filePath ? basename($filePath) : 'N/A';
        });
        $grid->column('created_at','导出开始时间');
        $grid->disableRowSelector();
        $grid->disableViewButton();
        $grid->disableEditButton();
        $grid->disableDeleteButton();

        $grid->actions(function (Grid\Displayers\Actions $actions) {
            $task = $actions->row;
            if ($task->file_path) {
                $actions->append('<a href="' . admin_url('export/export_tasks/download', ['id' => $task->id]) . '">文件下载</a>');
                $actions->append('<a href="javascript:void(0);" class="grid-row-delete" data-id="' . $task->id . '">文件删除</a>');
            }
        });

        // 注入自定义 JavaScript
        Admin::script($this->deleteScript());

        return $grid;
    }

    protected function deleteScript()
    {
        return <<<SCRIPT
$(document).off('click', '.grid-row-delete').on('click', '.grid-row-delete', function() {
    var id = $(this).data('id');
    if (confirm('您确定要删除这个文件吗?')) {
        $.ajax({
            method: 'delete',
            url: '/admin/export/export_tasks/' + id + '/delete',
            headers: {
                'X-CSRF-TOKEN': Dcat.token
            },
            success: function (data) {
                if (data.status) {
                    Dcat.success(data.message);
                } else {
                    Dcat.error(data.message);
                }
                Dcat.reload();
            },
            error: function (xhr, textStatus, errorThrown) {
                Dcat.error('操作失败: ' + xhr.responseText);
                Dcat.reload();
            }
        });
    }
});
SCRIPT;
    }

    /**
     * Make a show builder.
     *
     * @param mixed $id
     *
     * @return Show
     */
    protected function detail($id)
    {
        return Show::make($id, new ExportTask(), function (Show $show) {
            $show->field('id');
            $show->field('method');
            $show->field('status');
            $show->field('file_path');
            $show->field('created_at');
            $show->field('updated_at');
        });
    }

    /**
     * Make a form builder.
     *
     * @return Form
     */
    protected function form()
    {
        return Form::make(new ExportTask(), function (Form $form) {
            $form->select('method','导出数据')
                ->options([
                    'exportOrders' => '全部订单',
                    'exportCards' => '全部卡板',
                ])
                ->required();

            $form->saved(function (Form $form) {
                // 获取模型的实际ID
                $exportTaskId = $form->repository()->model()->id;
                // 从数据库中重新获取模型实例
                $exportTask = \App\Models\ExportTask::find($exportTaskId);

                ProcessExportTask::dispatch($exportTask)->onQueue('exports');
                admin_toastr('导出任务创建成功');
            });
        });
    }

    public function download($id)
    {
        $task = \App\Models\ExportTask::findOrFail($id);

        if (!$task->file_path || !Storage::exists('public/'.$task->file_path)) {
            // 使用 Dcat Admin 的 toastr 来显示错误消息
            admin_toastr('文件不存在', 'error');

            // 重定向回原页面或其他适当页面
            return redirect()->back();
        }

        return response()->download(storage_path('app/public/' . $task->file_path));
    }

    public function delete($id)
    {
        $task = \App\Models\ExportTask::findOrFail($id);

        if ($task->file_path && Storage::exists('public/'.$task->file_path)) {
            Storage::delete('public/'.$task->file_path);
            $task->file_path = null;
            $task->status = \App\Models\ExportTask::STATUS_DELETE;
            $task->save();  // 确保保存模型的更改

            return response()->json(['status' => true, 'message' => '文件删除成功']);
        }

        return response()->json(['status' => false, 'message' => '文件删除失败']);
    }
}

ExportTask模型定义(ExportTask)

正常来说 startExport()需要放在服务层,但个人规划服务层只做处理,所以直接在模型层定义进入服务层找对应方法。

<?php

namespace App\Models;

use Dcat\Admin\Traits\HasDateTimeFormatter;
use Illuminate\Database\Eloquent\Model;
use App\Services\ExportService;

class ExportTask extends Model
{
    use HasDateTimeFormatter;
    protected $fillable = ['method', 'status', 'file_path'];

    const STATUS_PENDING = '未开始';
    const STATUS_IN_PROGRESS = '进行中';
    const STATUS_COMPLETED = '已完成';
    const STATUS_ERROR = '导出失败';
    const STATUS_DELETE = '已删除';

    public function startExport()
    {
        $exportService = new ExportService();

        $this->update(['status' => self::STATUS_IN_PROGRESS]);

        // 调用服务层中的方法来处理导出逻辑
        if (method_exists($exportService, $this->method)) {
            $filePath = $exportService->{$this->method}();
            $this->update([
                'status' => self::STATUS_COMPLETED,
                'file_path' => $filePath,
            ]);
        } else {
            // 处理方法不存在时的错误处理
            $this->update(['status' => '错误']);
        }
    }
}

创建导出业务层:后期表的导出以及数据处理逻辑均在这里添加(ExportService)

下面是一些订单以及卡片的导出以及文件创建逻辑和数据处理逻辑,Logics内的均为业务深层嵌套的计算逻辑不便展示。


<?php

namespace App\Services;

use App\Http\Logics\CardLogic;
use App\Models\Operator;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Storage;
use Illuminate\Support\Str;
use App\Models\Order;
use App\Http\Logics\PackageLogic;
use App\Models\Card;

class ExportService
{
    const CHUNK_SIZE = 1000; // 每个批次处理的记录数

    //全部订单
    public function exportOrders()
    {
        $date = now()->format('Ymd');
        $directory = 'public/exports/' . $date;
        $this->ensureDirectoryExists($directory); // 确保目录存在

        $fileName = $this->generateFileName('全部订单', 'csv');
        $filePath = $directory . '/' . $fileName;

        $this->exportOrdersToCSV($filePath);

        // 文件路径用于返回
        return 'exports/' . $date . '/' . $fileName;
    }

    //全部卡板
    public function exportCards()
    {
        $date = now()->format('Ymd');
        $directory = 'public/exports/' . $date;
        $this->ensureDirectoryExists($directory); // 确保目录存在

        $fileName = $this->generateFileName('全部卡板', 'csv');
        $filePath = $directory . '/' . $fileName;

        $this->exportCardsToCSV($filePath);

        return 'exports/' . $date . '/' . $fileName;
    }

    protected function exportOrdersToCSV($filePath)
    {
        $filePath = storage_path('app/' . $filePath);
        $file = fopen($filePath, 'w');

        fputcsv($file, ['ID', '单号', 'ICCID', '类型', '套餐', '价格', '状态', '存赠金额', '存赠状态', '生效月份', '生效月份(新)', '订单来源', '创建时间', '更新时间']);

        DB::table('orders')->orderBy('id','desc')->chunk(self::CHUNK_SIZE, function ($orders) use ($file) {
            foreach ($orders as $order) {
                $processedOrder = $this->processOrderData($order);
                fputcsv($file, [
                    $processedOrder->id,
                    "\t" . $processedOrder->order_no,
                    "\t" . $processedOrder->iccid,
                    $processedOrder->type,
                    $processedOrder->package_name,
                    $processedOrder->amount,
                    $processedOrder->status,
                    $processedOrder->gift_limit,
                    $processedOrder->gift_limit_status,
                    $processedOrder->month_type,
                    $processedOrder->recharge_date,
                    $processedOrder->source,
                    $processedOrder->created_at,
                    $processedOrder->updated_at,
                ]);
            }
        });

        fclose($file);
    }

    protected function exportCardsToCSV($filePath)
    {
        $filePath = storage_path('app/' . $filePath);
        $file = fopen($filePath, 'w');

        fputcsv($file, ['ID', '我方卡号', '已用量', '剩余量', '状态', '任务状态', '实名状态', 'iccid', '上游卡号', '运营商', '通道', '代理', '余额', '首充状态']);

        DB::table('cards')->orderBy('id','desc')->chunk(self::CHUNK_SIZE, function ($cards) use ($file) {
            foreach ($cards as $card) {
                $processedCard = $this->processCardData($card);
                fputcsv($file, [
                    $processedCard->id,
                    "\t" . $processedCard->card_no,
                    $processedCard->used,
                    $processedCard->remain,
                    $processedCard->status,
                    $processedCard->task_status,
                    $processedCard->real_name_status,
                    "\t" . $processedCard->iccid,
                    "\t" . $processedCard->up_card_no,
                    $processedCard->operator,
                    $processedCard->channel_name,
                    $processedCard->agent_name,
                    $processedCard->balance,
                    $processedCard->first_charge_status
                ]);
            }
        });

        fclose($file);
    }

    protected function processOrderData($order)
    {
        $month_typeArr = [
            0 => '加油包或余额充值',
            1 => '当月',
            2 => '次月',
        ];

        $status_labels = Order::STATUS;
        $type_labels = Order::TYPE;

        $order->status = $status_labels[$order->status] ?? '-';
        $order->month_type = $month_typeArr[$order->month_type] ?? '未知';
        $order->type = $type_labels[$order->type] ?? '-';

        // 获取 package 名称
        try {
            $package = PackageLogic::detail($order->package_id);
            $order->package_name = $package['name'] ?? '-';
        } catch (\Exception $e) {
            $order->package_name = 'N/A';
        }

        // 获取卡号
        $card = Card::where('iccid', $order->iccid)->select(['card_no'])->first();
        $order->card_no = $card ? $card->card_no : 'N/A';

        return $order;
    }
    protected function processCardData($card)
    {
        $firstChargeStatusArr = [
            0 => '未首充',
            1 => '已首充',
        ];

        $card->status = \App\Models\Card::STATUS_LABEL[$card->status] ?? '-';
        $card->real_name_status = \App\Models\Card::REAL_NAME_LABEL[$card->real_name_status] ?? '-';
        $card->first_charge_status = $firstChargeStatusArr[$card->first_charge_status] ?? '未知';
        $card->used = CardLogic::used($card->iccid) . " M";
        $card->remain = CardLogic::current($card->iccid) . " M";
        $card->operator = Operator::OPERATOR[$card->operator] ?? '';
        $card->channel_name = Cache::get('channel_id_name' . $card->channel_id) ?? '-';
        $card->agent_name = Cache::get('agent_id_name' . $card->agent_id) ?? '-';
        $card->balance = round($card->balance, 2);

        // 处理任务状态
        $card->task_status = \App\Models\Card::TASK_STATUS[$card->task_status] ?? 'N/A';

        return $card;
    }

    protected function generateFileName($prefix, $extension)
    {
        return Str::random(10) . '_' . now()->format('Ymd_His') . '_' . $prefix . '.' . $extension;
    }

    protected function ensureDirectoryExists($directory)
    {
        $fullPath = storage_path('app/' . $directory);
        if (!file_exists($fullPath)) {
            // 创建目录并设置权限
            mkdir($fullPath, 0755, true);

            // 递归地设置每个父目录的权限和所有者
            $this->setPermissionsAndOwner($fullPath);
        }
    }

    protected function setPermissionsAndOwner($path)
    {
        // 获取路径的每一部分
        $pathParts = explode('/', $path);
        $partialPath = '';

        foreach ($pathParts as $part) {
            if (empty($part)) {
                continue;
            }
            $partialPath .= '/' . $part;

            // 检查目录是否存在,如果存在,则设置权限和所有者
            if (file_exists($partialPath)) {
                chmod($partialPath, 0755); // 设置权限
                chown($partialPath, 'www'); // 更改所有者
            }
        }
    }
}

创建编写导出任务队列

1.创建导出队列文件

php artisan make:job ProcessExportTask

2.编辑导出队列文件(ProcessExportTask)


<?php

namespace App\Jobs;

use App\Models\ExportTask;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessExportTask implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $exportTask;

    public $timeout = null;  // 因为计算逻辑复杂且耗时过大故而不限制超时时间

    public function __construct(ExportTask $exportTask)
    {
        ini_set('memory_limit', '512M');
        $this->exportTask = $exportTask;
    }

    public function handle()
    {
        $this->exportTask->startExport();
    }

    public function failed(\Exception $exception)
    {
        // 当任务失败时,更新状态为 导出失败
        $this->exportTask->update(['status' => ExportTask::STATUS_ERROR]);
    }
}

执行队列以及监听

监听下载以及配置方法在文档中有介绍在这里就不过多赘述。前面控制器的form表单创建时saved过后有ProcessExportTask::dispatch($exportTask)->onQueue('exports');,所以要监听onQueue('exports')中的这个exports指定队列

1.执行监听exports队列(--timeout=0是不设置超时时间,因为深层嵌套运算逻辑所以有时候处理起来很慢所以直接不设置了)

php artisan queue:work redis --queue=exports --timeout=0

2.启动horizon监控

php artisan horizon

访问 域名/horizon即可查看队列状态

遇到的问题

1.在使用supervisor进程维护导出队列时如果执行php artisan horizon只能监控到被推进队列,监控不到成功失败状态,如果不执行则正常监控,此问题可能与supervisor或者horizon配置有关。
2.长时间不启动使用导出后的第一次可能会导出失败,即使redis的timeout设置为0也依旧如此,可能与php等配置有关。

效果展示

file

file

file