再次研究消息队列记的笔记——activemq

再次研究消息队列记的笔记——activemq

分布式事务–消息队列

1.思考

sso服务

用户服务

日志服务

购物服务(购物车合并)

短信服务

订单服务

库存服务

物流服务

如何让这么多的服务并行执行?【涉及到分布式事务:为了保证数据的一致性】

2.分布式事务

分布式事务:在分布式环境下,如何保证数据一致性

分布式事务会涉及到性能太低的一个通病。

方案:

  • xa协议下的两段式提交

  • xa两段式提交的进阶版:tcc

  • 基于消息,采用最终一致性策略的分布式事务

LNC 分布式框架.

分布式事务理论基础:CPA理论、BASE理论

3.XA协议

XA协议:数据库与事务管理器的一个标准。

在xa协议下,提交一个事务需要经过两个阶段

阶段一:预备提交

阶段二:提交

再次研究消息队列记的笔记------activemq

4.TCC

需要在业务层实现,try,confirm,和cancle的接口。

再次研究消息队列记的笔记------activemq

5.消息队列

在一个事务正在进行的同时,发出消息给其他的业务,如果消息发送失败,或者消息的执行失败,则回滚消息,重复执行,反复执行失败后,记录失败信息,后期补充性的处理;在消息系统中开启事务,消息的事务是指,保证消息被正常消费,否则回滚的一种机制

补偿机制:日志记录,定时器在某个时间再执行(重试执行)

重复执行,需要考虑幂等性处理逻辑。

6.疑问

  • 如何确保消息发送成功? 消息应答模式?

  • 消息发送失败如何处理?

  • 消息事务?

  • 消息幂等性如何处理?

  • 消息阻塞?死信队列。

消息队列

1.消息产品

RabbitMQ 、 Kafka、ActiveMQ

  • RabbitMQ的协议是AMQP(Advanced Message Queueing Protoco);AMQP通用行较强,非java环境经常使用,传输内容就是标准字符串。RabbitMQ用Erlang开发

  • ActiveMQ使用的是JMS(Java Messaging Service )协议,JMS是针对Java体系的传输协议,队列两端必须有JVM,所以如果开发环境都是java的话推荐使用ActiveMQ,可以用Java的一些对象进行传递比如Map、BLob、Stream等。ActiveMQ也支持AMQP协议。

  • Kafka性能超过ActiveMQ等传统MQ工具,集群扩展性好;Kafka在传输过程中可能会出现消息重复的情况,不保证发送顺序,没有消息事务功能;一般使用kafka处理大数据日志。

再次研究消息队列记的笔记------activemq

2.ActiveMQ

2.1 整合activemq

只需要加入整合依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
   <exclusions>
      <exclusion>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
   </exclusions>
</dependency>

<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-pool</artifactId>
   <version>5.15.2</version>
   <exclusions>
      <exclusion>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
   </exclusions>
</dependency>

2.2 队列消息

Queue 队列模式

Topic 发布订阅模式

Consumer 使用监听器监听MQ上是否有消息。

2.2.1 生产者


import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;

import javax.jms.*;

public class Product {
   
    public static void main(String[] args) {
   
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.7:61616");
        try {
   
            Connection connection = connectionFactory.createConnection();
            connection.start();
            //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
            Session session = connection.createSession(true, Session.SESSION_TRANSACTED); //Session.SESSION.TRASACTED 开启消息事务
            Queue testqueue = session.createQueue("TEST1");

            MessageProducer producer = session.createProducer(testqueue);
            TextMessage textMessage = new ActiveMQTextMessage();
            textMessage.setText("今天天气真好!我想出去走一走");
            producer.setDeliveryMode(DeliveryMode.PERSISTENT); //消息持久化
            producer.send(textMessage);
            session.commit();
            connection.close();
        }catch (Exception e){
   
            e.printStackTrace();
        }

    }
}

2.2.2 消费者

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Consumer {
   
    public static void main(String[] args) {
   
        ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.1.7:61616");
        try {
   
            Connection connection = connect.createConnection();
            connection.start();
            //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination testqueue = session.createQueue("TEST1");

            MessageConsumer consumer = session.createConsumer(testqueue);
            consumer.setMessageListener(new MessageListener() {
   
                @Override
                public void onMessage(Message message) {
   
                    if(message instanceof TextMessage){
   
                        try {
   
                            String text = ((TextMessage) message).getText();
                            System.out.println(text);

                           // session.rollback();
                        } catch (JMSException e) {
   
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                             session.rollback();
                        }
                    }
                }
            });

        }catch (Exception e){
   
            e.printStackTrace();;
        }
    }
}

2.3 消息事务

producer提交时的事务 事务开启 只执行send并不会提交到队列中,只有当执行session.commit()时,消息才被真正的提交到队列中。
事务不开启 只要执行send,就进入到队列中。
consumer 接收时的事务 事务开启,签收必须写Session.SESSION_TRANSACTED 收到消息后,消息并没有真正的被消费。消息只是被锁住。一旦出现该线程死掉、抛异常,或者程序执行了session.rollback()那么消息会释放,重新回到队列中被别的消费端再次消费。
事务不开启,签收方式选择Session.AUTO_ACKNOWLEDGE 只要调用comsumer.receive方法 ,自动确认。
事务不开启,签收方式选择Session.CLIENT_ACKNOWLEDGE 需要客户端执行 message.acknowledge(),否则视为未提交状态,线程结束后,其他线程还可以接收到。 这种方式跟事务模式很像,区别是不能手动回滚,而且可以单独确认某个消息。
事务不开启,签收方式选择Session.DUPS_OK_ACKNOWLEDGE 在Topic模式下做批量签收时用的,可以提高性能。但是某些情况消息可能会被重复提交,使用这种模式的consumer要可以处理重复提交的问题。

2.4 消息持久化

通过producer.setDeliveryMode(DeliveryMode.PERSISTENT) 进行设置

持久化的好处就是当activemq宕机的话,消息队列中的消息不会丢失。非持久化会丢失。但是会消耗一定的性能。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/100743.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • Http.sys远程代码执行漏洞处理「建议收藏」

    Http.sys远程代码执行漏洞处理「建议收藏」项目的相关漏扫报告,级别紧急。处理方式也很简单,打上相应补丁即可.官网补丁下载路径:找到对应的系统版本补丁安装重启就可修改该漏洞https://docs.microsoft.com/zh-cn/security-updates/securitybulletins/2015/ms15-034漏洞描述:漏洞确认:用牛刀Metasploit确认存在相关漏洞安装补丁重启后复测已确认修复…

  • 【OpenCV入门教程之九】 非线性滤波专场:中值滤波、双边滤波[通俗易懂]

    正如我们上一篇文章中讲到的,线性滤波可以实现很多种不同的图像变换。然而非线性滤波,如中值滤波器和双边滤波器,有时可以达到更好的实现效果。邻域算子的其他一些例子还有对二值图像进行操作的形态学算子,用于计算距离变换和寻找连通量的半全局算子。先上一张截图:一、理论与概念讲解——从现象到本质1.1非线性滤波概述之前的那篇文章里,我们所考虑的滤波器都是线性的,即两个信号之和的响应和他们各自响应之和相等。换句话说,每个像素的输出值是一些输入像素的加权和,线性滤波器易于构造,并且易于从频率响应角度来进行分

  • k8s(七)Pod调度[通俗易懂]

    k8s(七)Pod调度[通俗易懂]k8s概述定向调度亲和性调度污点和容忍Pod的调度概述在默认情况下,一个Pod在哪个Node节点上运行,是由Scheduler组件采用相应的算法计算出来的,这个过程是不受人工控制的。但是在实际使用中,这并不满足需求,因为很多情况下,我们想控制某些Pod到达某些节点上,那么应该怎么做?这就要求了解kubernetes对Pod的调度规则,kubernetes提供了四大类调度方式。自动调度:运行在哪个Node节点上完全由Scheduler经过一系列的算法计算得出。定向调度:NodeName、NodeS

  • mysql 触发器介绍「建议收藏」

    mysql 触发器介绍「建议收藏」触发器(Trigger)是MySQL中非常实用的一个功能,它可以在操作者对表进行「增删改」之前(或之后)被触发,自动执行一段事先写好的SQL代码。本教程带领大家在实践中学习,你将学到触发器

  • python初级:基础知识学习-变量、数据类型、运算符、选择结构

    python初级:基础知识学习-变量、数据类型、运算符、选择结构

  • C++常量const建议收藏

    常量折叠概念常量折叠表面上的效果和宏替换是一样的,只是“效果上是一样的”,而两者真正的区别在于,宏是字符常量,在预编译宏替换完成后,该宏名字会消失,所有对宏的引用已经全部被替换为它所对应的值,编译器

    2021年12月19日

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号