Spring Cloud Bus中的事件的订阅与发布(二)

Spring Cloud Bus中的事件的订阅与发布(二)

大家好,又见面了,我是全栈君。

在之前的文章Spring Cloud Bus中的事件的订阅与发布(一)介绍了消息总线的相关事件。 本文主要介绍消息总线的事件监听器以及消息的订阅与发布。

事件监听器

Spring Cloud Bus中,事件监听器的定义可以是实现ApplicationListener接口,或者是使用@EventListener注解的形式。我们看一下事件监听器的类图。

监听器

ApplicationListener接口实现有两个:刷新监听器
RefreshListener和环境变更监听器
EnvironmentChangeListener

RefreshListener

RefreshListener对应的事件是RefreshRemoteApplicationEvent

public class RefreshListener implements ApplicationListener<RefreshRemoteApplicationEvent> {
	private ContextRefresher contextRefresher;

	public RefreshListener(ContextRefresher contextRefresher) {
		this.contextRefresher = contextRefresher;
	}

	@Override
	public void onApplicationEvent(RefreshRemoteApplicationEvent event) {
		Set<String> keys = contextRefresher.refresh();
		log.info("Received remote refresh request. Keys refreshed " + keys);
	}
}
复制代码

对于刷新时间的处理,调用ContextRefresherrefresh()方法,而定义在Spring Cloud Context中的ContextRefresher用于提供上下文刷新的功能。我们具体看一下refresh()方法。

	public synchronized Set<String> refresh() {
		Map<String, Object> before = extract(
				this.context.getEnvironment().getPropertySources());
		addConfigFilesToEnvironment();
		Set<String> keys = changes(before,
				extract(this.context.getEnvironment().getPropertySources())).keySet();
		this.context.publishEvent(new EnvironmentChangeEvent(keys));
		this.scope.refreshAll();
		return keys;
	}
复制代码

实现很简单,先获取之前环境变量的key-value,然后重新加载新的配置环境文件,通过比对新旧环境变量的map集合,然后发布新的环境变更EnvironmentChangeEvent的事件。this.scope.refreshAll()销毁了在这个范围内,当前实例的所有bean并在下次方法的执行时强制刷新。

EnvironmentChangeListener

EnvironmentChangeListener对应的事件类是EnvironmentChangeRemoteApplicationEvent

public class EnvironmentChangeListener implements ApplicationListener<EnvironmentChangeRemoteApplicationEvent> {
	@Autowired
	private EnvironmentManager env;

	@Override
	public void onApplicationEvent(EnvironmentChangeRemoteApplicationEvent event) {
		Map<String, String> values = event.getValues();
		for (Map.Entry<String, String> entry : values.entrySet()) {
			env.setProperty(entry.getKey(), entry.getValue());
		}
	}
}
复制代码

RefreshListener的实现中,可以知道该事件的实现最终又发布了一个新的事件EnvironmentChangeListener。在刷新监听器中,构造了变更了的环境变量的map,交给环境变更监听器。上面对环境变更事件的处理,遍历变更了的配置环境属性,并在本地应用程序的环境中将新的属性值设置到对应的键。

TraceListener

TraceListener的实现是通过注解@EventListener的形式,监听的事件为:确认事件AckRemoteApplicationEvent和发送事件SentApplicationEvent

@EventListener
	public void onAck(AckRemoteApplicationEvent event) {
		this.repository.add(getReceivedTrace(event));
	}

	@EventListener
	public void onSend(SentApplicationEvent event) {
		this.repository.add(getSentTrace(event));
	}

	protected Map<String, Object> getSentTrace(SentApplicationEvent event) {
		Map<String, Object> map = new LinkedHashMap<String, Object>();
		map.put("signal", "spring.cloud.bus.sent");
		map.put("type", event.getType().getSimpleName());
		map.put("id", event.getId());
		map.put("origin", event.getOriginService());
		map.put("destination", event.getDestinationService());
		if (log.isDebugEnabled()) {
			log.debug(map);
		}
		return map;
	}

	protected Map<String, Object> getReceivedTrace(AckRemoteApplicationEvent event) {
		Map<String, Object> map = new LinkedHashMap<String, Object>();
		map.put("signal", "spring.cloud.bus.ack");
		map.put("event", event.getEvent().getSimpleName());
		map.put("id", event.getAckId());
		map.put("origin", event.getOriginService());
		map.put("destination", event.getAckDestinationService());
		if (log.isDebugEnabled()) {
			log.debug(map);
		}
		return map;
	}
复制代码

在SentTrace中,主要记录了signal、事件类型type、id、源服务origin和目的服务destination的属性值。而在ReceivedTrace中,表示对事件的确认,主要记录了signal、事件类型event、id、源服务origin和目的服务destination的属性值。这些信息默认存储于内存中,可以通过/trace端点获取最近的事件信息,如下图所示:

{
    "timestamp": 1517229555629,
    "info": {
        "signal": "spring.cloud.bus.sent",
        "type": "RefreshRemoteApplicationEvent",
        "id": "c73a9792-9409-47af-993c-65526edf0070",
        "origin": "config-server:8888",
        "destination": "config-client:8000:**"
    }
},
{
	"timestamp": 1517227659384,
	"info": {
	    "signal": "spring.cloud.bus.ack",
	    "event": "RefreshRemoteApplicationEvent",
	    "id": "846f3a17-c344-4d29-93f3-01b73c5bf58f",
	    "origin": "config-client:8000",
	    "destination": "config-client:8000:**"
	}
}
复制代码

至于事件的发起,我们将在下一节结合消息的订阅与发布一起讲解。

消息的订阅与发布

Spring Cloud Bus基于Spring Cloud Stream,对特定主题的消息进行订阅与发布,事件以消息的形式传递到其他服务实例。

通道定义

既然是基于stream,我们首先看一下input和output的通道定义。

public interface SpringCloudBusClient {

String INPUT = "springCloudBusInput";

String OUTPUT = "springCloudBusOutput";

@Output(SpringCloudBusClient.OUTPUT)
MessageChannel springCloudBusOutput();

@Input(SpringCloudBusClient.INPUT)
SubscribableChannel springCloudBusInput();
}
复制代码

可以看到,bus中定义了springCloudBusInputspringCloudBusOutput两个通道,分别用于定于订阅与发布springCloudBus的消息。

bus属性定义

其次,我们看一下bus中关于stream的属性定义。在基础应用中我们就知道bus订阅的话题是springCloudBus,下面看一下在bus中的其他属性的定义。

@ConfigurationProperties("spring.cloud.bus")
public class BusProperties {

//环境变更相关的属性
private Env env = new Env();
// 刷新事件相关的属性
private Refresh refresh = new Refresh();
//与ack相关的属性
private Ack ack = new Ack();
//与追踪ack相关的属性
private Trace trace = new Trace();
//Spring Cloud Stream消息的话题
private String destination = "springCloudBus";

//标志位,bus是否可用
private boolean enabled = true;

...
}
复制代码

上面的bus属性,设置了一些默认值,正好与事实也是相符的,我们没有进行任何spring.cloud.bus配置也能够进行正常运行。通过在配置文件中修改相应的属性,实现bus的更多功能扩展。env、refresh、ack和trace分别对应不同的事件,在配置文件中有一个开关属性,默认都是开启的,我们可以根据需要进行关闭。

消息的监听与发送

上面两部分讲了stream通道和基本属性的定义,最后我们看下bus中对指定主题的消息如何发送与监听处理。在META-INF/spring.factories配置了EnableAutoConfiguration配置项为BusAutoConfiguration,在服务启动时会自动加载到Spring容器中,其中对于指定主题的消息如何发送与监听处理如下:

@Configuration
@ConditionalOnBusEnabled //bus启用的开关
@EnableBinding(SpringCloudBusClient.class) //绑定通道
@EnableConfigurationProperties(BusProperties.class)
public class BusAutoConfiguration implements ApplicationEventPublisherAware {

//注入source接口,用于发送消息
@Autowired
@Output(SpringCloudBusClient.OUTPUT)
private MessageChannel cloudBusOutboundChannel;

// 监听RemoteApplicationEvent事件
@EventListener(classes = RemoteApplicationEvent.class)
public void acceptLocal(RemoteApplicationEvent event) {
    if (this.serviceMatcher.isFromSelf(event)
            && !(event instanceof AckRemoteApplicationEvent)) {
        //当事件是来自自己的并且不是ack事件,则发送消息
    this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
    }
}
//消息的消费,也是事件的发起
@StreamListener(SpringCloudBusClient.INPUT)
public void acceptRemote(RemoteApplicationEvent event) {
    if (event instanceof AckRemoteApplicationEvent) {
        //ack事件
        if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
                && this.applicationEventPublisher != null) {
            //当开启bus追踪且不是自己的ack事件,则通知所有的注册该事件的监听者,否则直接返回
            this.applicationEventPublisher.publishEvent(event);
        }
        return;
    }
    //消费消息,该消息属于自己
    if (this.serviceMatcher.isForSelf(event)
            && this.applicationEventPublisher != null) {
        //不是自己发布的事件,正常处理
        if (!this.serviceMatcher.isFromSelf(event)) {
            this.applicationEventPublisher.publishEvent(event);
        }
        //消费之后,需要发送ack确认事件
        if (this.bus.getAck().isEnabled()) {
            AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
                    this.serviceMatcher.getServiceId(),
                    this.bus.getAck().getDestinationService(),
                    event.getDestinationService(), event.getId(), event.getClass());
            this.cloudBusOutboundChannel
                    .send(MessageBuilder.withPayload(ack).build());
            this.applicationEventPublisher.publishEvent(ack);
        }
    }
    //事件追踪相关,若是开启追踪事件则执行
    if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) {
        // 不论其来源,准备发送事件,发布了之后供本地消费
        this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
                event.getOriginService(), event.getDestinationService(),
                event.getId(), event.getClass()));
    }
}

//...
}
复制代码

@ConditionalOnBusEnabled注解是bus的开关,默认开启。@EnableBinding绑定了SpringCloudBusClient中定义的通道。在应用服务启动时,自动化配置类加载了bus的API端点、刷新、ACK追踪以及bus环境变量的配置等beans。 @Output表示输出output绑定目标将由框架创建,由该通道发送消息。 还涉及到上面列出来的两个主要方法:acceptLocalacceptRemote

acceptLocal是一个基于注解实现的事件监听器,监听的事件类型是RemoteApplicationEvent,对于该事件的处理方法是,当事件是来自自己的并且不是ack事件,则发送消息。

@StreamListener注解是Spring Cloud Stream中提供的,用来标识一个方法作为@EnableBinding绑定的input通道的监听器。acceptRemote方法, 传递的参数RemoteApplicationEvent就是stream中的消息。如果是确认类事件,当开启了事件追踪且事件不是来自于自身,则发布该事件,对于确认类事件,处理已经完成; 如果自身需要处理该事件且该事件不是来自自身,则发布该事件。需要注意的是,当开启事件追踪时,构造一个确认事件并将该事件发布;最后,当开启了事件追踪,这边的处理是注册已发送的事件,以便发布供本地消费,而不论其来源。

总结

本文在上一篇介绍Spring Cloud Bus中的事件基础上,结合源码继续介绍事件的监听器以及事件的订阅与发布是如何在消息总线中实现的。 消息总线常用于传播状态的变更和管理指令的发布。而消息总线最常用的场景就是更新应用服务的配置信息,需要结合Config Server使用,当然消息总线的实现其实是基于Spring Cloud Stream,Stream封装了各种不同的MQ中间件,产生的消息实则是推送配置信息的变更。

订阅最新文章,欢迎关注我的公众号

参考

Spring Cloud Bus-v1.3.3

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/107670.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • java静态全局变量和全局变量的区别_java静态全局变量

    java静态全局变量和全局变量的区别_java静态全局变量Java的面向对象的代码结构会使在多个位置引用变量更加困难。有时也很难确定给定变量应属于哪个类,尤其是当它是一个广泛使用的值(例如数据库连接器或数学常数)时。Java全局变量怎么定义?在许多语言中,当遇到这样的问题时,我们可以声明一个全局变量。但是,不幸的是,Java从技术上不允许在全局范围内创建变量。在本文中,我们将介绍如何在Java中模拟和使用全局变量。什么是全局变量?全局变量是可以从任何范围访问的变量。许多编程语言都具有用于声明全局变量的特殊语法,例如,Python使我们可以使

  • socket常用函数_socket是可重入函数吗

    socket常用函数_socket是可重入函数吗前言socketpair是Linux下的函数,其主要作用是创建一对套节字来进行进程间通信,其与匿名管道(PIPE)的作用相似,这两个套节字均可读可写.具体介绍见本博客另一篇文章:https://blog.csdn.net/wufuhuai/article/details/79747912实现我们都知道socket不仅能够进行跨进程通信,而且socket是可以双向通信的,即是…

    2022年10月14日
  • Sublime Text 3 全程详细图文使用教程

    Sublime Text 3 全程详细图文使用教程转自:http://www.cnblogs.com/wind128/p/4409422.html一、&nbsp;前言&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;使用SublimeText也有几个年头了,版本也从2升级到3了,但犹如寒天饮冰水,冷暖尽自知。最初也是不知道从何下手,满世界…

  • [025] 微信公众帐号开发教程第1篇-引言

    [025] 微信公众帐号开发教程第1篇-引言接触微信公众帐号已经有两个多月的时间了,在这期间,除了陆续完善个人公众帐号xiaoqrobot以外,还带领团队为公司开发了两个企业应用:一个是普通类型的公众帐号,另一个是会议类型的公众帐号。经过这3个公众帐号的开发,对目前微信公众平台开放的api算是比较熟悉了,像文本消息、图文消息、音乐消息、语音消息、位置消息等全部用到过,菜单也使用过。所以,就有了写微信公众帐号开发教程的想法,将学习到的技术经验

  • 数据分析常见方法及模型分类[通俗易懂]

    数据分析常见方法及模型分类[通俗易懂]今天跟大家分享一下比较常见的数据分析方法以及模型分类。在工作中,有很多的数据分析方法和模型,但是对于新入门的人来说,可能不能够一下子就找到合适的数据分析方法以及模型,进而影响到工作的进度。所以今天小白就来给大家介绍一些比较常见的数据分析方法以及模型的分类。一般来说,我们可以将数据分析方法分为对比分析、相关分析、分类分析以及综合分析四类,其中前三类主要是以定性的数据分析方法和模型为主,而对于…

  • apache 虚拟主机配置详解_linux 配置虚拟主机

    apache 虚拟主机配置详解_linux 配置虚拟主机文章目录Apache虚拟主机企业应用部署一个端口不同域名调试worker工作模式进行压力测试权限设置Apache配置文件详解ApacheRewrite规则讲解Apache配置文件权限操作总结Apache虚拟主机企业应用企业真实环境中,一台WEB服务器发布单个网站会非常浪费资源,所以一台WEB服务器上会发布多个网站,少则3-5个,多则2-30。在一台服务器上发布多哥网站,也称之为部署多个虚拟主机,WEB虚拟主机配置方法有三种:基于单个IP地址多个Socket端口基于

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号