netty 自定义协议_自定义annotation

netty 自定义协议_自定义annotationNetty实现自定义协议

大家好,又见面了,我是你们的朋友全栈君。

关于协议,使用最为广泛的是HTTP协议,但是在一些服务交互领域,其使用则相对较少,主要原因有三方面:

HTTP协议会携带诸如header和cookie等信息,其本身对字节的利用率也较低,这使得HTTP协议比较臃肿,在承载相同信息的情况下,HTTP协议将需要发送更多的数据包;
HTTP协议是基于TCP的短连接,其在每次请求和响应的时候都需要进行三次握手和四次挥手,由于服务的交互设计一般都要求能够承载高并发的请求,因而HTTP协议这种频繁的握手和挥手动作会极大的影响服务之间交互的效率;
服务之间往往有一些根据其自身业务特性所独有的需求,而HTTP协议无法很好的服务于这些业务需求。
基于上面的原因,一般的服务之间进行交互时都会使用自定义协议,常见的框架,诸如dubbo,kafka,zookeeper都实现了符合其自身业务需求的协议,本文主要讲解如何使用Netty实现一款自定义的协议。

  1. 协议规定
    所谓协议,其本质其实就是定义了一个将数据转换为字节,或者将字节转换为数据的一个规范。一款自定义协议,其一般包含两个部分:消息头和消息体。消息头的长度一般是固定的,或者说是可确定的,其定义了此次消息的一些公有信息,比如当前服务的版本,消息的sessionId,消息的类型等等;消息体则主要是此次消息所需要发送的内容,一般在消息头的最后一定的字节中保存了当前消息的消息体的长度。下面是我们为当前自定义协议所做的一些规定:

名称 字段 字节数 描述
魔数新OA平台出租 magicNumber 4 一个固定的数字Q-166848365,一般用于指定当前字节序列是当前类型的协议,比如Java生成的class文件起始就使用0xCAFEBABE作为其标识符,对于本服务,这里将其定义为0x1314
主版本号 mainVersion 1 当前服务器版本代码的主版本号
次版本号 subVersion 1 当前服务器版本的次版本号
修订版本号 modifyVersion 1 当前服务器版本的修订版本号
会话id sessionId 8 当前请求的会话id,用于将请求和响应串联到一起
消息类型 messageType 1 请求:1,表示当前是一个请求消息;响应:2,表示当前是一个响应消息;Ping:3,表示当前是一个Ping消息;Pong:4,表示当前是一个Pong消息;Empty:5,表示当前是一个空消息,该消息不会写入数据管道中;
附加数据 attachments 不定 附加消息是字符串类型的键值对来表示的,这里首先使用2个字节记录键值对的个数,然后对于每个键和值,都首先使用4个字节记录其长度,然后是具体的数据,其形式如:键值对个数+键长度+键数据+值长度+值数据…
消息体长度 length 4字节 记录了消息体的长度
消息体 body 不定 消息体,服务之间交互所发送或接收的数据,其长度有前面的length指定
上述协议定义中,我们除了定义常用的请求和响应消息类型以外,还定义了Ping和Pong消息。Ping和Pong消息的作用一般是,在服务处于闲置状态达到一定时长,比如2s时,客户端服务会向服务端发送一个Ping消息,则会返回一个Pong消息,这样才表示客户端与服务端的连接是完好的。如果服务端没有返回相应的消息,客户端就会关闭与服务端的连接或者是重新建立与服务端的连接。这样的优点在于可以防止突然会产生的客户端与服务端的大量交互。

  1. 协议实现
    通过上面的定义其实我们可以发现,所谓协议,就是定义了一个规范,基于这个规范,我们可以将消息转换为相应的字节流,然后经由TCP传输到目标服务,目标服务则也基于该规范将字节流转换为相应的消息,这样就达到了相互交流的目的。这里面最重要的主要是如何基于该规范将消息转换为字节流或者将字节流转换为消息。这一方面,Netty为我们提供了ByteToMessageDecoder和MessageToByteEncoder用于进行消息和字节流的相互转换。首先我们定义了如下消息实体:

public class Message {

private int magicNumber;
private byte mainVersion;
private byte subVersion;
private byte modifyVersion;
private String sessionId;

private MessageTypeEnum messageType;
private Map<String, String> attachments = new HashMap<>();
private String body;

public Map<String, String> getAttachments() {

return Collections.unmodifiableMap(attachments);
}

public void setAttachments(Map<String, String> attachments) {

this.attachments.clear();
if (null != attachments) {

this.attachments.putAll(attachments);
}
}

public void addAttachment(String key, String value) {

attachments.put(key, value);
}

// getter and setter…
}
上述消息中,我们将协议中所规定的各个字段都进行了定义,并且定义了一个标志消息类型的枚举MessageTypeEnum,如下是该枚举的源码:

public enum MessageTypeEnum {

REQUEST((byte)1), RESPONSE((byte)2), PING((byte)3), PONG((byte)4), EMPTY((byte)5);

private byte type;

MessageTypeEnum(byte type) {

this.type = type;
}

public int getType() {

return type;
}

public static MessageTypeEnum get(byte type) {

for (MessageTypeEnum value : values()) {

if (value.type == type) {

return value;
}
}

throw new RuntimeException("unsupported type: " + type);

}
}
上述主要是定义了描述自定义协议相关的实体属性,对于消息的编码,本质就是依据上述协议方式将消息实体转换为字节流,如下是转换字节流的代码:

public class MessageEncoder extends MessageToByteEncoder<Message> {

@Override
protected void encode(ChannelHandlerContext ctx, Message message, ByteBuf out) {

// 这里会判断消息类型是不是EMPTY类型,如果是EMPTY类型,则表示当前消息不需要写入到管道中
if (message.getMessageType() != MessageTypeEnum.EMPTY) {

out.writeInt(Constants.MAGIC_NUMBER); // 写入当前的魔数
out.writeByte(Constants.MAIN_VERSION); // 写入当前的主版本号
out.writeByte(Constants.SUB_VERSION); // 写入当前的次版本号
out.writeByte(Constants.MODIFY_VERSION); // 写入当前的修订版本号
if (!StringUtils.hasText(message.getSessionId())) {

// 生成一个sessionId,并将其写入到字节序列中
String sessionId = SessionIdGenerator.generate();
message.setSessionId(sessionId);
out.writeCharSequence(sessionId, Charset.defaultCharset());
}

  out.writeByte(message.getMessageType().getType());    // 写入当前消息的类型
  out.writeShort(message.getAttachments().size());  // 写入当前消息的附加参数数量
  message.getAttachments().forEach((key, value) -> {
    Charset charset = Charset.defaultCharset();
    out.writeInt(key.length()); // 写入键的长度
    out.writeCharSequence(key, charset);    // 写入键数据
    out.writeInt(value.length());   // 希尔值的长度
    out.writeCharSequence(value, charset);  // 写入值数据
  });

  if (null == message.getBody()) {
    out.writeInt(0);    // 如果消息体为空,则写入0,表示消息体长度为0
  } else {
    out.writeInt(message.getBody().length());
    out.writeCharSequence(message.getBody(), Charset.defaultCharset());
  }
}

}
}
对于消息的解码,其过程与上面的消息编码方式基本一致,主要是基于协议所规定的将字节流数据转换为消息实体数据。如下是其转换过程:

public class MessageDecoder extends ByteToMessageDecoder {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {

Message message = new Message();
message.setMagicNumber(byteBuf.readInt()); // 读取魔数
message.setMainVersion(byteBuf.readByte()); // 读取主版本号
message.setSubVersion(byteBuf.readByte()); // 读取次版本号
message.setModifyVersion(byteBuf.readByte()); // 读取修订版本号
CharSequence sessionId = byteBuf.readCharSequence(
Constants.SESSION_ID_LENGTH, Charset.defaultCharset()); // 读取sessionId
message.setSessionId((String)sessionId);

message.setMessageType(MessageTypeEnum.get(byteBuf.readByte()));    // 读取当前的消息类型
short attachmentSize = byteBuf.readShort(); // 读取附件长度
for (short i = 0; i < attachmentSize; i++) {
  int keyLength = byteBuf.readInt();    // 读取键长度和数据
  CharSequence key = byteBuf.readCharSequence(keyLength, Charset.defaultCharset());
  int valueLength = byteBuf.readInt();  // 读取值长度和数据
  CharSequence value = byteBuf.readCharSequence(valueLength, Charset.defaultCharset());
  message.addAttachment(key.toString(), value.toString());
}

int bodyLength = byteBuf.readInt(); // 读取消息体长度和数据
CharSequence body = byteBuf.readCharSequence(bodyLength, Charset.defaultCharset());
message.setBody(body.toString());
out.add(message);

}
}
如此,我们自定义消息与字节流的相互转换工作已经完成。对于消息的处理,主要是要根据消息的不同类型,对消息进行相应的处理,比如对于request类型消息,要写入响应数据,对于ping消息,要写入pong消息作为回应。下面我们通过定义Netty handler的方式实现对消息的处理:

// 服务端消息处理器
public class ServerMessageHandler extends SimpleChannelInboundHandler<Message> {

// 获取一个消息处理器工厂类实例
private MessageResolverFactory resolverFactory = MessageResolverFactory.getInstance();

@Override
protected void channelRead0(ChannelHandlerContext ctx, Message message) throws Exception {

Resolver resolver = resolverFactory.getMessageResolver(message); // 获取消息处理器
Message result = resolver.resolve(message); // 对消息进行处理并获取响应数据
ctx.writeAndFlush(result); // 将响应数据写入到处理器中
}

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

resolverFactory.registerResolver(new RequestMessageResolver()); // 注册request消息处理器
resolverFactory.registerResolver(new ResponseMessageResolver());// 注册response消息处理器
resolverFactory.registerResolver(new PingMessageResolver()); // 注册ping消息处理器
resolverFactory.registerResolver(new PongMessageResolver()); // 注册pong消息处理器
}
}
// 客户端消息处理器
public class ClientMessageHandler extends ServerMessageHandler {

// 创建一个线程,模拟用户发送消息
private ExecutorService executor = Executors.newSingleThreadExecutor();

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

// 对于客户端,在建立连接之后,在一个独立线程中模拟用户发送数据给服务端
executor.execute(new MessageSender(ctx));
}

/**

public final class MessageResolverFactory {

// 创建一个工厂类实例
private static final MessageResolverFactory resolverFactory = new MessageResolverFactory();
private static final List<Resolver> resolvers = new CopyOnWriteArrayList<>();

private MessageResolverFactory() {}

// 使用单例模式实例化当前工厂类实例
public static MessageResolverFactory getInstance() {

return resolverFactory;
}

public void registerResolver(Resolver resolver) {

resolvers.add(resolver);
}

// 根据解码后的消息,在工厂类处理器中查找可以处理当前消息的处理器
public Resolver getMessageResolver(Message message) {

for (Resolver resolver : resolvers) {

if (resolver.support(message)) {

return resolver;
}
}

throw new RuntimeException("cannot find resolver, message type: " + message.getMessageType());

}

}
上述工厂类比较简单,主要就是通过单例模式获取一个工厂类实例,然后提供一个根据具体消息来查找其对应的处理器的方法。下面我们来看看各个消息处理器的代码:

// request类型的消息
public class RequestMessageResolver implements Resolver {

private static final AtomicInteger counter = new AtomicInteger(1);

@Override
public boolean support(Message message) {

return message.getMessageType() == MessageTypeEnum.REQUEST;
}

@Override
public Message resolve(Message message) {

// 接收到request消息之后,对消息进行处理,这里主要是将其打印出来
int index = counter.getAndIncrement();
System.out.println(“[trx: ” + message.getSessionId() + “]”

  • index + “. receive request: ” + message.getBody());
    System.out.println(“[trx: ” + message.getSessionId() + “]”
  • index + “. attachments: ” + message.getAttachments());

    // 处理完成后,生成一个响应消息返回
    Message response = new Message();
    response.setMessageType(MessageTypeEnum.RESPONSE);
    response.setBody(“nice to meet you too!”);
    response.addAttachment(“name”, “xufeng”);
    response.addAttachment(“hometown”, “wuhan”);
    return response;
    }
    }
    // 响应消息处理器
    public class ResponseMessageResolver implements Resolver {

    private static final AtomicInteger counter = new AtomicInteger(1);

    @Override
    public boolean support(Message message) {

    return message.getMessageType() == MessageTypeEnum.RESPONSE;
    }

    @Override
    public Message resolve(Message message) {

    // 接收到对方服务的响应消息之后,对响应消息进行处理,这里主要是将其打印出来
    int index = counter.getAndIncrement();
    System.out.println(“[trx: ” + message.getSessionId() + “]”

  • index + “. receive response: ” + message.getBody());
    System.out.println(“[trx: ” + message.getSessionId() + “]”
  • index + “. attachments: ” + message.getAttachments());

    // 响应消息不需要向对方服务再发送响应,因而这里写入一个空消息
    Message empty = new Message();
    empty.setMessageType(MessageTypeEnum.EMPTY);
    return empty;
    }
    }
    // ping消息处理器
    public class PingMessageResolver implements Resolver {

    @Override
    public boolean support(Message message) {

    return message.getMessageType() == MessageTypeEnum.PING;
    }

    @Override
    public Message resolve(Message message) {

    // 接收到ping消息后,返回一个pong消息返回
    System.out.println(“receive ping message: ” + System.currentTimeMillis());
    Message pong = new Message();
    pong.setMessageType(MessageTypeEnum.PONG);
    return pong;
    }
    }
    // pong消息处理器
    public class PongMessageResolver implements Resolver {

    @Override
    public boolean support(Message message) {

    return message.getMessageType() == MessageTypeEnum.PONG;
    }

    @Override
    public Message resolve(Message message) {

    // 接收到pong消息后,不需要进行处理,直接返回一个空的message
    System.out.println(“receive pong message: ” + System.currentTimeMillis());
    Message empty = new Message();
    empty.setMessageType(MessageTypeEnum.EMPTY);
    return empty;
    }
    }
    如此,对于自定义协议的消息处理过程已经完成,下面则是使用用Netty实现的客户端与服务端代码:

// 服务端
public class Server {

public static void main(String[] args) {

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
br/>@Override
protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();
// 添加用于处理粘包和拆包问题的处理器
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
// 添加自定义协议消息的编码和解码处理器
pipeline.addLast(new MessageEncoder());
pipeline.addLast(new MessageDecoder());
// 添加具体的消息处理器
pipeline.addLast(new ServerMessageHandler());
}
});

  ChannelFuture future = bootstrap.bind(8585).sync();
  future.channel().closeFuture().sync();
} catch (InterruptedException e) {
  e.printStackTrace();
} finally {
  bossGroup.shutdownGracefully();
  workerGroup.shutdownGracefully();
}

}
}
public class Client {

public static void main(String[] args) {

NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {

bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.handler(new ChannelInitializer<SocketChannel>() {
br/>@Override
protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();
// 添加用于解决粘包和拆包问题的处理器
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
// 添加用于进行心跳检测的处理器
pipeline.addLast(new IdleStateHandler(1, 2, 0));
// 添加用于根据自定义协议将消息与字节流进行相互转换的处理器
pipeline.addLast(new MessageEncoder());
pipeline.addLast(new MessageDecoder());
// 添加客户端消息处理器
pipeline.addLast(new ClientMessageHandler());
}
});

  ChannelFuture future = bootstrap.connect("127.0.0.1", 8585).sync();
  future.channel().closeFuture().sync();
} catch (InterruptedException e) {
  e.printStackTrace();
} finally {
  group.shutdownGracefully();
}

}
}
运行上述代码之后,我们可以看到客户端和服务器分别打印了如下数据:

// 客户端
receive pong message: 1555123429356
[trx: d05024d2]1. receive response: nice to meet you too!
[trx: d05024d2]1. attachments: {hometown=wuhan, name=xufeng}
[trx: 66ee1438]2. receive response: nice to meet you too!
// 服务器
receive ping message: 1555123432279
[trx: f582444f]4. receive request: this is my 4 message.
[trx: f582444f]4. attachments: {name=xufeng}

  1. 小结
    本文首先将自定义协议与HTTP协议进行了对比,阐述了自定义协议的一些优点。然后定义了一份自定义协议,并且讲解了协议中各个字节的含义。最后通过Netty对自定义协议进行了实现,并且实现了基于自定义协议的心跳功能。

转载于:https://blog.51cto.com/14311280/2387060

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/106844.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 图形数字推理1000题及答案_小学奥数图形推理题

    图形数字推理1000题及答案_小学奥数图形推理题事情是这样滴!一个小伙伴在这两天提出一个问题如下:考虑到数字推理是浙江省考每年的必考题,图形题在去年的浙江省考中考查了四题。而图形题相较于分数数列、递推数列、多级数列等常见纯数字数列来说,在没有掌握一些常见技巧的前提下确实无从下手。这两天经过对图形题的系统性梳理发现其中有一些可操作的技巧与方法,希望能够帮助到即将踏入战场的浙江的小伙伴。当然,除了浙江的小伙伴之外,一些自主命题省份,如江苏、广东、吉…

  • JavaScript中window.open()和Window Location href的区别「建议收藏」

    JavaScript中window.open()和Window Location href的区别「建议收藏」目录1:window.location.href的用法:2:window.open()的用法3:window.open和window.location.href的区别1:区别2.window.open不一定是打开一个新窗口!!!!!!!!3:关于重新定位4.:在框架内指定页面打开连接5:是否打开其他网站地址6:window.open()经过设置后的弹…

  • GCC命令编译

    GCC命令编译GCC命令

    2022年10月13日
  • MySQL 获得当前日期时间(以及时间的转换)。[通俗易懂]

    MySQL 获得当前日期时间(以及时间的转换)。[通俗易懂]获取当前日期函数获得当前日期+时间(date+time)函数:now() 除了now()函数能获得当前的日期时间外,MySQL中还有下面的函数:current_timestamp()  current_timestamplocaltime()  localtimelocaltimestamp()  localtimestamp    这些日期时间函数,都等同…

  • adb 安装并运行 apk[通俗易懂]

    adb 安装并运行 apk[通俗易懂]1、安装apk命令:adbinstallapk路径即可,例如:adbinstallE:\filetestapplication.apk2、运行apkadbshellamstart-n apk包名/apk包名.活动名例如:AndroidManifest.xml

  • java中遍历数组_java遍历object数组

    java中遍历数组_java遍历object数组遍历数组目录遍历数组三种方式:for循环遍历foreach语句遍历Arrays工具类中toString静态方法遍历Arrays.deepToString()与Arrays.toString()的区别Java中对Array数组的常用操作(了解即可)三种方式: for for-each, toString 题目描述给一个数组:intArr={{5,7,15},{8,4,11},{3,6,13}};for循环遍历通常遍历数组都是使用f

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号