多线程处理mq消息_实现多线程有几种方式

多线程处理mq消息_实现多线程有几种方式何为CMQ?腾讯云消息队列(CloudMessageQueue,CMQ)是一种分布式消息队列服务,它能够提供可靠的基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)之间的收发消息,存储在可靠有效的CMQ队列中,防止消息丢失。CMQ支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。——来源以及更多内容推荐看官方文档。…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全家桶1年46,售后保障稳定

何为CMQ?

        腾讯云消息队列(Cloud Message Queue,CMQ)是一种分布式消息队列服务,它能够提供可靠的基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)之间的收发消息,存储在可靠有效的 CMQ 队列中,防止消息丢失。 CMQ 支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。——来源以及更多内容推荐看官方文档

       之前公司内部使用rabbitMQ,但是运维调整部署全部迁移到腾讯云上,如果继续使用rabbitMQ,还需要运维自主去搭建环境,维护之类,而且经考察对rabbitMQ维护成本相比直接使用腾讯云的CQM高很多,所以最近技术部门对CMQ进行研究发现基本可以替代rabbitMQ,但是同时也发现一个比较严重的问题,使用cmq的mq功能,无法实现完全实现自动触发消息消费,因为cmq的消息监听基于长连接的,长时间没有消息推送会造成长连接断开,无法实现自动触发消息消费了。本文目的主要解决CQM自动触发消息消费。

利用spring中可以根据注解获取bean,调用对应通知方法,实现多线程自动拉取消息。

自定义注解Queue

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface IzkQueue {
    String value() default "";

    String queueName() default "";
}

Jetbrains全家桶1年46,售后保障稳定

消息处理器抽象统一接口

/**
* 消息处理器抽象统一接口
*/
public interface IBaseCmqHandler {

    /**
     * 处理从cmq中获取的消息
     *
     * @param queueName : 队列名
     * @param message   : 消息体
     * @return
     */
    boolean onMessage(String queueName, Message message);
}

CMQ消息监听类

@Slf4j
@Component
public class CmqListener implements ApplicationContextAware, ApplicationListener<ApplicationEvent> {

    @Setter
    private ApplicationContext applicationContext;
    @Autowired
    private TaskExecutor taskExecutor;

    private boolean isStart = false;

    /**
     * 获取所有的需要监听mq的类,以及注册的mq
     * @param applicationEvent
     */
    @Override
    public void onApplicationEvent(ApplicationEvent applicationEvent) {
        HashMap<String, IBaseCmqHandler> map = new HashMap<>(16);
        Map<String, Object> beanMap = applicationContext.getBeansWithAnnotation(IzkQueue.class);
        beanMap.forEach((key, value) -> {
            IzkQueue annotation = value.getClass().getAnnotation(IzkQueue.class);
            String queue = annotation.queueName();
            map.put(queue, (IBaseCmqHandler) value);
        });
        if (!isStart) {
            isStart = true;
            if (!CollectionUtils.isEmpty(map)) {
                taskExecutor.execute(() -> executeQueueHandler(map));
            }
        }
    }

    private void executeQueueHandler(HashMap<String, IBaseCmqHandler> map) {
        map.forEach((queueName, bean) -> {
            taskExecutor.execute(() -> receiveCmqMessage(queueName, bean));
        });

    }

    /**
     * 功能描述 : 将队列与对应的消息处理器进行匹配,并进行消息消费
     *
     * @param queueName  : queue name
     * @param cmqHandler : 具体的消息处理器
     * @return
     * @created 2019-07-14 16:55
     */
    private void receiveCmqMessage(String queueName, IBaseCmqHandler cmqHandler) {
        try {
            while (true) {
                // 睡眠 释放cpu资源
                Thread.sleep(10);
                CmqQueue cmqQueue = applicationContext.getBean(queueName, CmqQueue.class);
                Message message = cmqQueue.receiveMsg();

                if (null != message) {
                    log.info("时间:{}, 队列:{}, 收到消息:{}", LocalDateTime.now(), queueName, message.msgBody);
                    if (!StringUtils.isEmpty(message.msgBody) && !StringUtils.isEmpty(message.receiptHandle)) {
                        taskExecutor.execute(() -> {
                            try {
                                // 处理消息
                                if (cmqHandler.onMessage(queueName, message)) {
                                    // 消费成功 删除消息
                                    cmqQueue.deleteMsg(message.receiptHandle);
                                } else {
                                    taskExecutor.execute(() -> receiveCmqMessage(queueName, cmqHandler));
                                }
                            } catch (Exception e) {
                                log.error("消息处理失败 --> 队列名:{}, 已进行自动补偿,Exception:", queueName, e);
                                taskExecutor.execute(() -> receiveCmqMessage(queueName, cmqHandler));
                            }
                        });
                    }
                }
            }
        } catch (Exception e) {
            log.error("消息执行失败 --> 队列名:{}, 已进行自动补偿,Exception:", queueName, e);
            taskExecutor.execute(() -> receiveCmqMessage(queueName, cmqHandler));
        }
    }

}

关于上述涉及到类CmqQueue是公司内部封装类,将queue队列和cmq的账号绑定,只是大概展示一下,仅供参考。

账号信息类

@Data
public class MqAccount {
    private String host;
    private String port;
    private String username;
    private String password;
    private String vhost;
    private String secretId;
    private String secretKey;
    private String endpoint;
    private String queueEndpoint;
}

CmqQueue的信息类

public class CmqQueue extends AbstractMq {
    private static final Logger LOGGER = LoggerFactory.getLogger(CmqQueue.class);
    private Account account;
    private Queue queue;

    public CmqQueue(MqAccount mqAccount, String queueName) {
        mqAccount = (MqAccount)Preconditions.checkNotNull(mqAccount);
        Preconditions.checkNotNull(queueName);
        queueName = this.getNameWithSuffix(queueName);
        this.init(mqAccount, queueName);
    }

    private void init(MqAccount mqAccount, String queueName) {
        this.account = new Account(mqAccount.getQueueEndpoint(), mqAccount.getSecretId(), mqAccount.getSecretKey());
        ArrayList list = Lists.newArrayList();

        try {
            this.account.listQueue(queueName, -1, -1, list);
            long count = list.stream().filter((name) -> {
                return queueName.equalsIgnoreCase(name);
            }).count();
            if (count == 0L) {
                QueueMeta meta = new QueueMeta();
                this.account.createQueue(queueName, meta);
            } else {
                LOGGER.warn("cmq queueName  {}  has exist", queueName);
            }

            this.queue = this.account.getQueue(queueName);
        } catch (Exception var7) {
            LOGGER.error("cmq createQueue error", var7);
            throw new RuntimeException(var7);
        }
    }

    public void setQueueAttr(QueueMeta meta) {
        try {
            this.queue.setQueueAttributes(meta);
        } catch (Exception var3) {
            LOGGER.error("cmq setQueueAttr error", var3);
        }

    }

    public String sendMsg(String msg) {
        try {
            return this.queue.sendMessage(msg);
        } catch (Exception var3) {
            LOGGER.error("cmq queuename:{},sendMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3});
            return null;
        }
    }

    public List<String> batchSendMsg(List<String> msgs) {
        try {
            return this.queue.batchSendMessage(msgs);
        } catch (Exception var3) {
            LOGGER.error("cmq queuename:{},batchSendMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3});
            return null;
        }
    }

    public Message receiveMsg() {
        Message message = null;

        try {
            message = this.queue.receiveMessage(10);
        } catch (Exception var3) {
            LOGGER.error("cmq queuename:{},receiveMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3});
        }

        return message;
    }

    public List<Message> batchReceiveMsg(int numOfMsg) {
        try {
            return this.queue.batchReceiveMessage(numOfMsg, 10);
        } catch (Exception var3) {
            LOGGER.error("cmq queuename:{},batchReceiveMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3});
            return null;
        }
    }

    public void deleteMsg(String receiHandle) {
        try {
            this.queue.deleteMessage(receiHandle);
        } catch (Exception var3) {
            LOGGER.error("cmq queuename:{},deleteMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3});
        }

    }

    public void batchDeleteMsg(List<String> receiHandles) {
        try {
            this.queue.batchDeleteMessage(receiHandles);
        } catch (Exception var3) {
            LOGGER.error("cmq queuename:{},batchDeleteMsg errorinfo:{},error:", new Object[]{this.exchangeName, var3.toString(), var3});
        }

    }
}

public abstract class AbstractMq {
    protected String exchangeName;
    protected String exchangeType = "topic";

    public AbstractMq() {
    }

    protected String getExchangeType() {
        return this.exchangeType;
    }

    protected String getNameWithSuffix(String name) {
        return !DeveloperUtil.isLocalDebug() ? name + "_" + Util.runEvn : name + "_local";
    }
}

Demo案例

@IzkQueue(queueName = "queueDemo",value = "demo")
public class MessageDemo implements IBaseCmqHandler {
    @Override
    public boolean onMessage(String queueName, Message message) {
        //todo
        return false;
    }
}

总结

       不将就是发现的原动力,多思考多动手。       

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/219183.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 零基础学Java(10)面向对象-使用LocalDate类完成日历设计

    零基础学Java(10)面向对象-使用LocalDate类完成日历设计前言在我们完成这个日历设计前,需要了解Java中的预定义类LocalDate的一些用法语法LocalDate.now()//2022-07-01会构造一个新对象,表示构造这个对象时的日期。

  • pycharm配置flask环境_pycharm集成Django

    pycharm配置flask环境_pycharm集成Django参考:使用Pycharm+Flask开启DEBUG模式的坑pycharm创建flask应用Flask在Pycharm开启调试模式

    2022年10月29日
  • JAVA CAS实现原理与使用

    JAVA CAS实现原理与使用在JDK5之前Java语言是靠synchronized关键字保证同步的,这会导致有锁(后面的章节还会谈到锁)。锁机制存在以下问题:(1)在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题。(2)一个线程持有锁会导致其它所有需要此锁的线程挂起。(3)如果一个优先级高的线程等待一个优先级低的线程释放锁会导致优先级倒置,引起性能风险。volatile…

  • 注意Mikrotik ROS Webproxy的“漏洞”

    注意Mikrotik ROS Webproxy的“漏洞”在使用ROSWebproxy做代理时,外网的IP也可以连入,将你的ROS代理服务器当作跳板!这种情况会引起外网网卡流量和内网网卡对不上的情况:如下图于是用Torch检查的时候发现了问题:1080口流量超大,而且在转发连表里没有流量,但是在WAN口的input连表里有高流量。也就是说和这个1080口连接的IP并没有和内网进行通!关闭代理后连接消失!…

  • xshell7怎么连接虚拟机_centos ssh

    xshell7怎么连接虚拟机_centos ssh文章目录一、xshell简介二、xshell安装三、xshell链接虚拟机的centos71.查看虚拟机中centos系统的ip2.利用ip链接一、xshell简介Xshell[1]是一个强大的安全终端模拟软件,它支持SSH1,SSH2,以及MicrosoftWindows平台的TELNET协议。Xshell通过互联网到远程主机的安全连接以及它创新性的设计和特色帮助用户在复…

  • 雷达测距和超声波测距_超声波测距的原理是什么

    雷达测距和超声波测距_超声波测距的原理是什么本实验是基于MSP430利用HC-SR04超声波传感器进行测距,测距范围是3-65cm,讲得到的数据显示在LCD1602液晶屏上。模块工作原理如下(1)采用IO触发测距,给至少10us的高电平信号;(2)模块自动发送8个40khz的方波,自动检测是否有信号返回;(3)有信号返回,通过IO输出一高电平,高电平持续的时间就是超声波从发射到返回的时间(4计算测试距离测试距离=(高电平时间*声速(340M/S))/2;根据工作原理,我们可以选择两种模式驱动1.采用中断+定时器

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号