?php/** * 案例标题多进程模型设计 * 说明Hyperf自定义Process实现多进程任务处理每个进程独立消费不同队列 * 需要安装的包 * composer require hyperf/process * composer require hyperf/redis */declare(strict_types1);// app/Process/OrderConsumerProcess.php namespaceApp\Process;useHyperf\Process\AbstractProcess;useHyperf\Process\Annotation\Process;useHyperf\Redis\Redis;usePsr\Container\ContainerInterface;usePsr\Log\LoggerInterface;/** * 订单消费进程专门处理订单相关的异步任务 * 用注解标记这是个Process框架启动时会自动拉起 */#[Process(name:order-consumer,// 进程名ps aux里能看到nums:2,// 启动2个进程并行消费enableCoroutine:true// 进程内开协程单进程可以并发处理)]classOrderConsumerProcessextendsAbstractProcess{privatebool$runningtrue;// 控制循环的开关privateRedis$redis;privateLoggerInterface$logger;publicfunction__construct(ContainerInterface$container){parent::__construct($container);$this-redis$container-get(Redis::class);$this-logger$container-get(LoggerInterface::class);}/** * 进程主逻辑框架启动后会一直跑这个方法 * 不能returnreturn了进程就死了框架会重启它 */publicfunctionhandle():void{$this-logger-info(订单消费进程启动,[pidposix_getpid()]);while($this-running){try{// BLPOP 阻塞式取队列没数据就等超时2秒再循环// 这样不会空转CPU也不会错过数据$data$this-redis-blPop([queue:order:new],2);if(!$data){continue;// 超时了没数据继续等}$taskjson_decode($data[1],true);// 解析任务数据$this-processOrder($task);}catch(\Throwable$e){// 捕获所有异常进程不能因为单个任务失败就挂掉$this-logger-error(订单处理异常: .$e-getMessage(),[trace$e-getTraceAsString(),]);usleep(100000);// 出错了等100ms再继续防止疯狂报错刷日志}}}/** * 处理单个订单任务 */privatefunctionprocessOrder(array$task):void{$orderId$task[order_id]??0;$action$task[action]??unknown;$this-logger-info(处理订单任务,[order_id$orderId,action$action]);// 这里写具体业务逻辑比如发短信、更新状态等switch($action){casesend_sms:// 发短信通知用户$this-logger-info(发送短信通知,[order_id$orderId]);break;caseupdate_inventory:// 扣减库存$this-logger-info(扣减库存,[order_id$orderId]);break;default:$this-logger-warning(未知的订单任务类型:{$action});}}/** * 收到停止信号时把running设为false让循环优雅退出 * 不要强制kill等当前任务处理完再退 */publicfunctionisEnable($server):bool{returntrue;// 返回true表示这个进程需要启动}}// app/Process/ReportProcess.php namespaceApp\Process;useHyperf\Process\AbstractProcess;useHyperf\Process\Annotation\Process;usePsr\Container\ContainerInterface;usePsr\Log\LoggerInterface;/** * 报表生成进程单独一个进程跑不影响Web请求 */#[Process(name:report-generator,nums:1,enableCoroutine:false)]classReportProcessextendsAbstractProcess{privateLoggerInterface$logger;publicfunction__construct(ContainerInterface$container){parent::__construct($container);$this-logger$container-get(LoggerInterface::class);}publicfunctionhandle():void{while(true){$nowdate(H:i);// 每天凌晨2点生成前一天的报表if($now02:00){$this-generateDailyReport();sleep(61);// 睡1分钟多防止这个整点跑两次}sleep(30);// 没到点就30秒检查一次时间}}privatefunctiongenerateDailyReport():void{$this-logger-info(开始生成日报,[datedate(Y-m-d,strtotime(-1 day))]);// 实际业务查数据库、聚合数据、写入报表表、发邮件通知}}// app/Command/SendTaskCommand.php 往队列投任务 namespaceApp\Command;useHyperf\Command\Annotation\Command;useHyperf\Command\CommandasHyperfCommand;useHyperf\Redis\Redis;#[Command]classSendTaskCommandextendsHyperfCommand{protected?string$nametask:send;publicfunction__construct(privateRedis$redis){parent::__construct();}publicfunctionhandle():void{// 往订单队列投一个任务让OrderConsumerProcess去消费$taskjson_encode([order_id12345,actionsend_sms,created_attime(),]);$this-redis-rPush(queue:order:new,$task);// 从右边push进程从左边pop$this-line(任务已投递到队列);}}