PHP消息队列

1、消息队列概念

从本质上说消息对列就是一个队列结构的中间件,也就是说消息放入这个中间件之后就可以直接返回,并不需要系统立即处理,而另外会有一个程序读取这些数据,并按顺序进行逐次处理。

2、核心结构

业务系统 --(入队)--> 消息队列 --(出队)--> 队列处理系统

3、应用场景

1、数据冗余:比如订单系统,后续需要严格的进行数据转换和记录,消息队列可以把这些数据持久化的存储在队列中,然后有订单,后续处理程序进行获取,后续处理完之后在把这条记录进行删除来保证每一条记录都能够处理完成。
2、系统解耦:使用消息系统之后,入队系统和出队系统是分开的,也就说其中一个崩溃之后不会影响另外一个的正常运行。
3、异步通信:消息本身使用入队之后可以直接返回。
4、扩展性:例如订单队列,不仅可以处理订单,还可以给其他业务使用。
5、排序保证:有些场景需要按照产品的顺序进行处理比如单进单出从而保证数据按照一定的顺序处理,使用消息队列是可以的。
6、流量削峰:就是秒杀和抢购的时候,会出现明显的流量剧增,对服务器的压力非常大。
7、消息通讯:消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。

4、队列介质

1、数据库,例如mysql(可靠性高,易实现,速度慢)
2、缓存, 例如redis(速度快,单个消息报包过大时效率低)
3、消息系统,专业性强,可靠,学习成本高(例如rabbitMq是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。)
4、Beanstalkd (一个高性能、轻量级的分布式内存队列系统)

TP6使用消息队列

1、composer安装think-queue包

1
composer require topthink/think-queue

2、配置文件修改

配置文件位于 config/queue.php

公共配置
'default'=>'redis' //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动//我这里使用的是redis

3、简单封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
<?php
namespace app\http;

use think\queue\Job;
use think\facade\Queue;

class QueueServer
{
/**
* 队列任务类型
* @param string $taskType [任务类型]
* @param array $jobDataArr [业务数据]
* @param integer $execType [执行类型 1:即时执行 2:延迟执行]
* @param integer $waitTime [延迟执行时间,单位秒]
* @param string $jobQueueName [任务名称,默认为NULL]
*
* 命令执行:
* 有任务名称 php think queue:listen --queue [任务名称]
* 无任务名称 php think queue:listen
* @return [type] [description]
*/
public static function taskList($taskType = '', $jobDataArr = [], $execType = 1, $waitTime = 60, $jobQueueName = null){

switch ($taskType) {
case 'taskA':
$jobHandlerClassName = 'app\http\QueueServer@taskA';//多任务:任务的类名@方法名
break;
default:
return false;
break;
}

if($execType == 1){
$isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName);
}else{
$isPushed = Queue::later($waitTime,$jobHandlerClassName, $jobDataArr, $jobQueueName);
}

if($isPushed !== false){
return true;
}else{
return false;
}
}

/**
* 任务A
*/
public function taskA(Job $job,$data){

$isJobDone = $this->_doTaskA($data);

if ($isJobDone) {
//成功执行并删除任务
$job->delete();
print("Info: TaskA of Job MultiTask has been done and deleted"."\n");
}else{
//任务轮询4次后删除
if ($job->attempts() > 3) {
// 第1种处理方式:重新发布任务,该任务延迟10秒后再执行
//$job->release(10);
// 第2种处理方式:原任务的基础上1分钟执行一次并增加尝试次数
//$job->failed();
// 第3种处理方式:删除任务
$job->delete();
}
}
}

/**
* 执行任务A
* @param [type] $data [description]
* @return [type] [description]
*/
private function _doTaskA($data) {
// 业务逻辑
// ...

print("数据:".json_encode($data)."\n");
print("Info: doing TaskA of Job MultiTask "."\n");
return true;
}

}

4、监听任务并执行

1
2
php think queue:listen
php think queue:work

两种,具体的可选参数可以输入命令加 —help 查看

可配合supervisor使用,保证进程常驻