PHP借用Redis消息队列实现高并发下发送邮件功能

PHP借用Redis消息队列实现高并发下发送邮件功能

大家好,又见面了,我是全栈君。

参考:

我目前的做法是,借用redis的队列,把要发送的消息,全部放到里面,然后就不管了
有一个后台发送进程,来处理队列里面的数据
1.如果需要重发,则把发送失败的消息放到一个备份的队列里,每次循环开始前,都把备份队列里的数据放到发送的队列里。
2.php进程不建议常驻,因此,可以把一个进程的生命周期设置为1min,再借用cron来实现进程的重启

对接一个消息队列,把你要处理的任务放入消息队列,简单的可以用redis,复杂点的可以beanstalkd, rabbitmq等
如果坚持用PHP实现,写CLI脚本去这个消息队列拿消息,拿到消息之后处理你的耗时任务
亦可使用其它技术实现,python,java,看你们团队的实际情况和技术栈

PS: PHP有多任务的解决方案,用pthread扩展实现多线程或者pcntl扩展实现多进程,但也不要在web端做这个事情

这个可以借用消息队列做异步处理, 你web后台只是做个触发, 真正的邮件之类的通知,要到异步处理, 这样不会堵塞你web后台的操作,消息队列的话,有很多种方案, 简单点的就是利用redis自己实现一个,或者网上有类似的。
队列处理发送消息的动作的时候, 你可以根据你业务的重要, 比如, 我发送一次,不管成功不成功,无所谓,还是必须把消息发送成功, 必须发送成功的话, 你可以把失败的, 写到另外队列,做处理,或者做log记录之类的, 这个跟公司业务比较接近了。

1.正常的编写邮件发送代码

2.把最后的send改一下,变成存入redis队列的函数

3.编写一个取出redis队列内容的函数3,然后按个进行发送

4.在command或者shell模块编写一个函数4,进行调用步骤3的函数

5.在crontab进行指定php执行步骤4的函数,进行异步发送邮件

总结:

这个是因为php没有异步的功能,导致只能依靠linux的crontab进行异步

现在php的扩展swoole已经有了异步task,可以用来异步发送邮件!

那么如何实现异步消息队列发送邮件呢??

传统的操作方法是这样的:

  1. 用户输入邮件信息

  2. 服务器获取用户输入的数据,提交到第三方的邮件服务器

  3. 第三方邮件服务器发送邮件,返回处理结果

异步的处理邮件发送:

  1. 用户输入邮件相关信息

  2. 将注册信息存储在内存队列,通知用户发送成功

  3. 服务器端监听内存队列,将内存队列中的邮件数据依次发送 用户感知不到

两者的区别在哪?

异步相对于同步来说,页面非阻塞,减少了用户等待的时间体验相对来说比较好

Redis 应用-异步消息队列与延时队列

异步消息队列
说道消息队列,你肯定会想到 Kafka、Rabbitmq 等消息中间件,这些专业的消息中间件提供了很多功能特性,当然他的部署使用维护都是比较麻烦的。如果你对消息队列没那么高要求,想要轻量级的,使用 Redis 就没错啦。

Redis 通过 list 数据结构来实现消息队列。主要使用到如下命令:

lpush 和 rpush 入队列
lpop 和 rpop 出队列
blpop 和 brpop 阻塞式出队列

废话补不多说上代码:

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

//发送消息
$redis->lPush($list, $value);

//消费消息
while (true) {
    try {
        $msg = $redis->rPop($list);
        if (!$msg) {
            sleep(1);
        }
        //业务处理

    } catch (Exception $e) {
        echo $e->getMessage();
    }
}

上面代码会有个问题,如果队列长时间是空的,那 rpop 操作就会不断的循环执行,这样会导致 Redis 的 QPS 升高,影响性能。所以我们使用 sleep 来解决,当没有消息的时候阻塞一段时间。但其实这样还会带来另一个问题,就是 sleep 会导致消息的处理延迟增加。这个问题我们可以通过 blpop/brpop 来阻塞读取队列。

blpop/brpop 在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来。消息的延迟几乎为零。用 blpop/brpop 替代前面的 lpop/rpop,就完美解决了上面的问题。

还有一个需要注意的点是我们需要是用 try/catch 来进行异常捕获,如果一直阻塞在那里,Redis 服务器一般会主动断开掉空链接,来减少闲置资源的占用。

延迟队列
你是否在做电商项目的时候会遇到如下场景:

订单下单后超过一小时用户未支付,需要关闭订单
订单的评论如果 7 天未评价,系统需要自动产生一条评论
这个时候我们就需要用到延时队列了,顾名思义就是需要延迟一段时间后执行。Redis 可通过 zset 来实现。我们可以将有序集合的 value 设置为我们的消息任务,把 value 的 score 设置为消息的到期时间,然后轮询获取有序集合的中的到期消息进行处理。

实现代码如下:

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

$redis->zAdd($delayQueue,$tts, $value);

while(true) {
    try{
        $msg = $redis->zRangeByScore($delayQueue,0,time(),0,1);
        if(!$msg){
            continue;
        }
        //删除消息
        $ok = $redis.zrem($delayQueue,$msg);
        if($ok){
            //业务处理
        }
    } catch(\Exception $e) {

    }
}

这里又产生了一个问题,同一个任务可能会被多个进程取到之后再使用 zrem 进行争抢,那些没抢到的进程都是白取了一次任务,这是浪费。解决办法:将 zrangebyscore 和 zrem 使用 lua 脚本进行原子化操作,这样多个进程之间争抢任务时就不会出现这种浪费了。

Redis可靠队列

前一篇文章《Redis消息队列》介绍了一种简单的FIFO队列的实现。

FIFO队列中的消息一经发送出去,便从队列里删除。如果由于网络原因消费者没有收到消息,或者消费者在处理这条消息的过程中崩溃了,就再也无法还原出这条消息。也就是说,FIFO队列不能保证消息会传递成功

究其原因,在于FIFO队列缺乏消息确认机制,即消费者向队列报告消息已收到或已处理的机制。可靠队列便是加入了这一机制的消息队列。

Redis在RPOPLPUSH命令的文档中提供了一种利用这一命令实现可靠队列的方式。RPOPLPUSH命令可以在从一个list中获取消息的同时把这条消息复制到另一个list里,并且这个过程是原子的

Image for post

利用RPOPLPUSH实现的可靠队列由两个列表组成,一个存储待处理的消息(pending list),另一个存储处理中的消息(processing list)。

生产者通过LPUSH将消息发送到待处理列表:

127.0.0.1:6379> LPUSH queue:pending "message"

消费者使用RPOPLPUSH从待处理列表获取消息,同时将它加入处理中列表:

127.0.0.1:6379> RPOPLPUSH queue:pending queue:processing
"message"

此时这条消息已经从待处理列表中删除,并且复制到了处理中列表:

127.0.0.1:6379> LRANGE queue:pending 0 -1
(empty list or set)
127.0.0.1:6379> LRANGE queue:processing 0 -1
1) "message"

消费者在收到消息或者处理完消息后,使用LREM命令从处理中列表删除这条消息,即完成了消息确认

127.0.0.1:6379> LREM queue:processing 1 "message"

使用LREM而不是RPOP的原因在于,在并发时,不能保证处理中的消息能按加入列表的先后顺序被确认;而RPOP会按顺序删除消息。

没有被确认的消息会一直存储在处理中列表。如果一个消息在处理中列表呆的时间过长,那么可以认为这个消息的传递或处理失败了。我们可以设定一个超时时间,定时扫描处理中列表,将超时的消息重新放回待处理列表等待重新传递。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/111606.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • 作业2 分析TGA文件「建议收藏」

    一、TGA文件格式解析二、文件格式文件头(TgaFileHeader):由图像描述信息字段长度、颜色表类型、图像类型、颜色表说明和图像说明五个字段组成,总计18字节,描述了图像存储的基本信息,应用程序可依据该部分字段值读写图像数据。图像/颜色表数据(Image/ColorMapData):由图像描述信息(可选)、颜色表数据和图像数据三部分组成,用于存储图片的图像信息。开发者自定义区域(DeveloperArea):包含开发者定义字段列表和开发者字典(用于存储开发者定义字段的值),该区域为

  • java字符串类型转换为int_java中double转int类型

    java字符串类型转换为int_java中double转int类型标题Java类型转换:int转double由于double的范围比int数据类型大,所以当int值被赋给double时,java会自动将int值转换为double。隐式转换:由于double数据类型的范围和内存大小都比int大,因此从int到double的转换是隐式的。并不需要像doubletoint转换那样进行类型转换;使用Double.valueOf()方法/***Ja…

  • 表达式1&&2&&3&&4_矩阵化表达式求值

    表达式1&&2&&3&&4_矩阵化表达式求值这道题一看就跟正常表达式求值差不多,我们需要一步一步保存前一部分的得0和得1的方案数,然后再与后面的进行运算,求出下个状态的方法总数,看到这里,很多朋友肯定想到了动态规划,然而如何保存运算顺序呢,那就要用到中缀表达式转后缀表达式,再按照运算顺序进行计算,下面是代码:#include<cstdio>#include<cstring>#include<…

  • linux vim :E325 解决办法[通俗易懂]

    linux vim :E325 解决办法[通俗易懂]在终端中输入ls-la找到.swp文件(这个文件是个隐藏文件,所以删除时要加上.),然后rm.你的swp文件。

  • SQL 游标使用示例

    SQL 游标使用示例SQL游标(cursor)详细说明及内部循环使用示例定义    游标(cursor)是系统为用户开设的一个数据缓冲区,存放SQL语句的执行结果。每个游标区都有一个名字,用户可以用SQL语句逐一从游标中获取记录,并赋给主变量,交由主语言进一步处理。    游标是处理结果集的一种机制吧,它可以定位到结果集中的某一行,多数据进行读写,也可以移动游标定位到你所需要的行中进行操作数据。一般复杂的存储过程,都会有游标的出现,他的用处主

  • JAVA异常处理实战心得

    JAVA异常处理实战心得1.异常分类异常Exception是Java中非常常用的功能,它可以简化代码,并且增强代码的安全性。尤其是在各种服务相关的代码中,可能正常业务逻辑的代码量很少,大部分都是各种trycatch处理各种异常的代码,因为实际中异常情况很多,为了保证服务的健壮与稳定性,要尽可能考虑与处理掉各种异常情况。所以在java中遇到大段大段的trycatch也就不足为奇。(图片来自网络)从上面的图可以看…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号