springboot整合rabbitmq,动态创建queue和监听queue

springboot整合rabbitmq,动态创建queue和监听queue1、pom.xml添加如下依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency>…

大家好,又见面了,我是你们的朋友全栈君。

一、pom.xml添加如下依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

         <!-- mq的依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.3.2</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
   
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>

二、整合rabbitmq

   (1)在application.properties中添加mq信息

#mq的连接信息,可直接多host连接和单host连接
mq.rabbit.address=192.168.1.1:5672,192.168.1.2:5672
mq.rabbit.virtualHost=/
mq.rabbit.username=guest
mq.rabbit.password=guest
mq.rabbit.exchange.name=mq.direct

#创建queue的数量
mq.rabbit.size=2

#消费者数量
mq.concurrent.consumers=4

#每个消费者获取的最大的消息投递数量
mq.prefetch.count=100

    (2)rabbitmqConfig工具类

@Configuration
public class RabbitConfig {

    @Value("${mq.rabbit.address}")
    String address;
    @Value("${mq.rabbit.username}")
    String username;
    @Value("${mq.rabbit.password}")
    String password;
    @Value("${mq.rabbit.virtualHost}")
    String mqRabbitVirtualHost;
    @Value("${mq.rabbit.exchange.name}")
    String exchangeName;
    @Value("${mq.rabbit.size}")

    int queueSize;

    @Value("${mq.concurrent.consumers}")
    int concurrentConsumers;
    @Value("${mq.prefetch.count}")
    int prefetchCount;

    //创建mq连接
    @Bean(name = "connectionFactory")
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();

        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(mqRabbitVirtualHost);

        connectionFactory.setPublisherConfirms(true);

        //该方法配置多个host,在当前连接host down掉的时候会自动去重连后面的host
        connectionFactory.setAddresses(address);
        return connectionFactory;
    }

   //监听处理类
    @Bean
    @Scope("prototype")
    public HandleService handleService() {
        return new HandleService();
    }

     //动态创建queue,命名为:hostName.queue1【192.168.1.1.queue1】,并返回数组queue名称
    @Bean
    public String[] mqMsgQueues() throws AmqpException, IOException {
        String[] queueNames = new String[queueSize];
        String hostName = OsUtil.getHostNameForLiunx();//获取hostName
        for (int i = 1; i <= queueSize; i++) {
            String queueName = String.format("%s.queue%d", hostName, i);
            connectionFactory().createConnection().createChannel(false).queueDeclare(queueName, true, false, false, null);
            connectionFactory().createConnection().createChannel(false).queueBind(queueName, exchangeName, queueName);
            queueNames[i - 1] = queueName;
        }
        return queueNames;
    }

    //创建监听器,监听队列
    @Bean
    public SimpleMessageListenerContainer mqMessageContainer(HandleService handleService) throws AmqpException, IOException {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        container.setQueueNames(mqMsgQueues());
        container.setExposeListenerChannel(true);
        container.setPrefetchCount(prefetchCount);//设置每个消费者获取的最大的消息数量
        container.setConcurrentConsumers(concurrentConsumers);//消费者个数
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式为手工确认
        container.setMessageListener(handleService);//监听处理类
        return container;
    }

}

(3)消费者

@Service
public class HandleService implements ChannelAwareMessageListener {
    private static final Logger logger = LoggerFactory.getLogger(HandleService.class);

    /**
     * @param
     * 1、处理成功,这种时候用basicAck确认消息;
     * 2、可重试的处理失败,这时候用basicNack将消息重新入列;
     * 3、不可重试的处理失败,这时候使用basicNack将消息丢弃。
     *
     *  basicNack(long deliveryTag, boolean multiple, boolean requeue)
     *   deliveryTag:该消息的index
     *  multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
     * requeue:被拒绝的是否重新入队列
     */
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        byte[] body = message.getBody();
        logger.info("接收到消息:" + new String(body));
        JSONObject jsonObject = null;
        try {
            jsonObject = JSONObject.parseObject(new String(body));
            if (消费成功) {
               logger.info("消息消费成功");
               channel.basicAck(message.getMessagePropertites().getDeliveryTag(),false);//确认消息消费成功     
            }else if(可重试的失败处理){
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
          } else {          //消费失败             
               channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);             
        } catch (JSONException e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);//消息丢弃
            logger.error("This message:" + jsonObject + " conversion JSON error ");
        }
    }

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/146685.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • GridLayout的使用

    GridLayout的使用GridLayout的使用:GridLayout的类层次结构图:java.lang.Object–java.awt.GridLayoutGridLayout比FlowLayout多了行和列的设置,也就是说你要先设置GridLayout共有几行几列,就如同二维平面一般,然后你加进去的组件会先填第一行的格子,然后再从第二行开始填,依此类扒,就像是一个个的格子一般。而

  • 计划任务 SchedulerFactoryBean 配置

    计划任务 SchedulerFactoryBean 配置Quartz是开源任务调度框架中的翘首,它提供了强大任务调度机制,同时保持了使用的简单性。Quartz允许开发人员灵活地定义触发器的调度时间表,并可以对触发器和任务进行关联映射。此外,Quartz提供了调度运行环境的持久化机制,可以保存并恢复调度现场,即使系统因故障关闭,任务调度现场数据并不会丢失。此外,Quartz还提供了组件式的侦听器、各种插件、线程池等功能。Spring为创建Quart…

  • 【实用】网页内容监控并实时推送百度解决方案「建议收藏」

    【实用】网页内容监控并实时推送百度解决方案「建议收藏」将网站最新内容实时推送百度是有利于内容原创保护和收录的,避免小站内容刚上线就被大站搞去,做了他人的嫁衣。但是网站天天手动去提交百度的话也是很浪费时间的,那么有没有什么方法可以自动将新内容推送百度呢?答案肯定是有的,实现网页内容监控就行了,然后将最新产出内容推送给百度。WEB视界网页内容监控原理将一批网站列表加入一个定时任务中,将所有属于本网站的URL提取出来并存储起来。然后定时任务每次…

  • NTP时间服务器搭建「建议收藏」

    1.yuminstallntpntpdate安装NTP服务器2.NTP服务器配置:修改配置文件vi/etc/ntp.conf3./etc/init.d/ntpdrestart重启服务4.ntpq-p查看状态5.date查看当前时间6.客户机同步时间ntpdatepool.ntp.org(pool.ntp.org为服务机ip地址,pool.ntp.o…

  • 指针函数到函数指针作为函数的返回值

    指针函数到函数指针作为函数的返回值转载自:https://www.cnblogs.com/yangjiquan/p/11465376.html首先说一下指针函数:1.指针函数的定义顾名思义,指针函数即返回指针的函数。其一般定义形式如下:类型名*函数名(函数参数表列);其中,后缀运算符括号”()”表示这是一个函数,其前缀运算符星号”*”表示此函数为指针型函数,其函数值为指针,即它带回来的值的类型为指针,当调用这个函数后,将得到一个”指向返回值为…的指针(地址),”类型名”表示函数返回的指针指向的类…

  • Java配置方式读取外部的资源配置文件

    Java配置方式读取外部的资源配置文件

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号