大家好,又见面了,我是你们的朋友全栈君。
本文是的前一篇文章 Okhttp IO 之 Segment & SegmentPool 的基础上写的,如果你没看懂前面的文章,那么看本文会相当的吃力,因为很多关键的代码都是在前面这篇文章中剖析的。
ByteString
okio
中添加一个类 ByteString
,顾名思义就是字节串,这里做一个概要的讲解,具体的实现大家可以去看源码。
既然是字节串,它内部就是用一个字节数组支持的。
final byte[] data;
既然用字节数组支持的,那么就可以用一个字节数组来构造,当然还可以用 String
,甚至还可以用 NIO
的 ByteBuffer
和 InputStream
来构造。
既然名字与 String
沾边,也可以像 String
那样进行比较和查询。
当然它的功能不止于此,ByteString
还可以把字节进行编码,例如 md5()
,还可以为 URL
进行 URL-safe Base64
转换。
public String base64Url() {
return Base64.encodeUrl(data);
}
Source
Source
与 InputStream
对应,都代表字节输入流。
public interface Source extends Closeable {
long read(Buffer sink, long byteCount) throws IOException;
Timeout timeout();
@Override void close() throws IOException;
}
Source
接口比较简单,只定义了如何从 Buffer
中读取字节。
Buffer
是okio
的类,既可以当作输入源,也可以当作输出源,后面会详细说明。
Source
接口还加入了一个特色的方法 timeout()
,用来规定从输入源读取超时的时间。
Okio
的设计者为了支持 Java IO
,Java NIO
和 Socket
,提供了一个工具类 Okio
来把它们转化为 Source
public static Source source(final InputStream in) {}
public static Source source(File file) throws FileNotFoundException {}
public static Source source(Path path, OpenOption... options) throws IOException {}
public static Source source(final Socket socket) throws IOException {}
BufferedSource
BufferedSource
接口继承于 Source
接口。
public interface BufferedSource extends Source, ReadableByteChannel {
}
从继承关系,它还继承了 NIO
的 ReadableByteChannel
,也就是说它支持 ByteBuffer
传输数据。
从命名看,它提供了缓存功能,但是这个缓存并不像传统的 Java IO
一样,它用 Buffer
类来代替传统的字节数组。
/** Returns this source's internal buffer. */
Buffer buffer();
Buffer
为何能当作缓存用,后面会说到。
如果你以为 BufferedSource
只是像 Java IO
的 BufferedInputStream
一样提供了单一的缓存功能,那你就错了。
- 提供了
ByteArrayInputStream
读取字节数组的方法read(byte[] sink)
- 提供了
DataInputStream
的读取基本类型和String
的方法,例如readInt()
,readString()
,readUtf8()
。 - 提供了
BufferedReader
特有的readLine()
方法,只不过在BufferedSource
中,它的方法名为readUtf8Line()
,readUtf8LineStrict()
,readUtf8LineStrict()
。 - 还提供了读取
okio
中ByteString
的方法,readByteString()
,读取okio
的输出流Sink
的方法readFully(Buffer sink, long byteCount)
和readAll(Sink sink)
- 还提供了转化为
InputStream
的接口。
Sink
public interface Sink extends Closeable, Flushable {
void write(Buffer source, long byteCount) throws IOException;
@Override void flush() throws IOException;
Timeout timeout();
@Override void close() throws IOException;
}
Sink
的 write()
方法指定了输出源只能是 okio
的 Buffer
类。
BufferedSink
BufferedSink
接口继承自 Sink
接口,它也是用 okio
的 Buffer
类实现缓存
public interface BufferedSink extends Sink, WritableByteChannel {
Buffer buffer();
}
BufferedSink
与 BufferedSource
提供的功能是对应的,这里就不细述了。
这里我们需要注意一点, BufferedSink
还继承了 WritableByteChannel
,因此它支持 ByteBuffer
操作。
Buffer
重点来了,Buffer
是 okio
的集大成者,为何这么说呢?
public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
}
Buffer
居然同时实现了 BufferedSource
和 BufferedSink
。
Buffer 接收数据
首先,我们把 Buffer
当作是输出源,先看下最基本的方法,如何写入字节数组。
@Override
public Buffer write(byte[] source) {
if (source == null) throw new IllegalArgumentException("source == null");
return write(source, 0, source.length);
}
@Override
public Buffer write(byte[] source, int offset, int byteCount) {
if (source == null) throw new IllegalArgumentException("source == null");
// 检测参数的合法性
checkOffsetAndCount(source.length, offset, byteCount);
// 计算 source 要写入的最后一个字节的 index 值
int limit = offset + byteCount;
while (offset < limit) {
// 获取循环链表尾部的一个 Segment
Segment tail = writableSegment(1);
// 计算最多可写入的字节
int toCopy = Math.min(limit - offset, Segment.SIZE - tail.limit);
// 把 source 复制到 data 中
System.arraycopy(source, offset, tail.data, tail.limit, toCopy);
// 调整写入的起始位置
offset += toCopy;
// 调整尾部Segment 的 limit 位置
tail.limit += toCopy;
}
// 调整 Buffer 的 size 大小
size += byteCount;
return this;
}
在上篇文章中说过 Buffer
是会形成一个循环双向链表的,那么这个写字节数组的原理就很清楚了,循环地获取尾部结点 Segment
,然后向其中写入数据,直到数据写完为止。
看下 writableSegment()
是如何获取链表尾部的 Segment
的。
/** * Returns a tail segment that we can write at least {@code minimumCapacity} * bytes to, creating it if necessary. */
Segment writableSegment(int minimumCapacity) {
if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
// 如果链表的头指针为null,就会SegmentPool中取出一个
if (head == null) {
head = SegmentPool.take(); // Acquire a first segment.
return head.next = head.prev = head;
}
// 获取前驱结点,也就是尾部结点
Segment tail = head.prev;
// 如果一个字节也不能读,或者不是拥有者
if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
// 从SegmentPool中获取一个Segment,插入到循环双链表当前结点的后面
tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
}
return tail;
}
参数int minimumCapacity
代表获取的 Segment
最少要能写多少个字节进去。
首先判断链表的头指针 head
是否为 null
,如果为 null
就从 SegmentPool
中获取一个,然后形成循环双链表。
如果 head
不为 null
,就获取前驱结点,也就是尾部结点。 为何要获取尾部结点?因为要写入数据嘛,肯定使用后入式。
获取到尾部 Segment
后会有两个判断
1. 是否能写入参数中规定的最少的字节数
2. 这个 Segment
是底层数组的拥有者。 只有是拥有者,才有权力修改底层数组的值。
如果不满足这两个条件,证明获取到这个尾部Segment
不合格,就要调用 SegmentPool.take()
再次获取一个 Segment
,然后插入到循环链表的尾部,怎么插入的? 调用尾部 Segment
的 push()
方法,这个方法我在前面文章中讲述了原理。
当你了解了 writableSegment()
获取链表尾部结点的原理后,通过源码就很容易理解很多向Buffer
写入数据的方法,例如 write(ByteBuffer source)
.
然而,有两个方法,我看了后很不舒服
@Override public long writeAll(Source source) throws IOException {
if (source == null) throw new IllegalArgumentException("source == null");
long totalBytesRead = 0;
for (long readCount; (readCount = source.read(this, Segment.SIZE)) != -1; ) {
totalBytesRead += readCount;
}
return totalBytesRead;
}
@Override public BufferedSink write(Source source, long byteCount) throws IOException {
while (byteCount > 0) {
long read = source.read(this, byteCount);
if (read == -1) throw new EOFException();
byteCount -= read;
}
return this;
}
write
类的方法,指的是向当前的 Buffer
中写入数据,而这两个方法从实现的角度看,明明是写出数据好吧? 这样命名我真没看懂。
最后我们来看一个方法,这个方法被人们传的很神,搞得我开始还以为 okio
可以替代 Java IO
来使用。这个方法就是用来Buffer
之间数据传递的 write(Buffer source, long byteCount)
@Override
public void write(Buffer source, long byteCount) {
if (source == null) throw new IllegalArgumentException("source == null");
if (source == this) throw new IllegalArgumentException("source == this");
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
// Is a prefix of the source's head segment all that we need to move?
// 如果 Source Buffer 的头结点可用字节数大于要写出的字节数
if (byteCount < (source.head.limit - source.head.pos)) {
// 获取当前 Buffer 的尾部结点
Segment tail = head != null ? head.prev : null;
// 如果尾部结点有足够空间可以写数据,并且这个结点是底层数组的拥有者,就直接向尾部结点中写数据,然后就结束了
if (tail != null && tail.owner
&& (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {
// Our existing segments are sufficient. Move bytes from source's head to our tail.
source.head.writeTo(tail, (int) byteCount);
source.size -= byteCount;
size += byteCount;
return;
} else { // 如果不满足前面的情况,就把 Source Buffer 的头结点分割为两个 Segment,然后头指针指向分割后的第一个Segment
// We're going to need another segment. Split the source's head
// segment in two, then move the first of those two to this buffer.
source.head = source.head.split((int) byteCount);
}
}
// Remove the source's head segment and append it to our tail.
Segment segmentToMove = source.head;
long movedByteCount = segmentToMove.limit - segmentToMove.pos;
// 头结点从 Source Buffer 的链表中移除
source.head = segmentToMove.pop();
// 如果头结点为 null, 直接改变指针位置即可
if (head == null) {
head = segmentToMove;
head.next = head.prev = head;
} else { // 如果头指针不为 null,那就把 Source Buffer 的 head 加入到 Sink Buffer 的链表
Segment tail = head.prev;
tail = tail.push(segmentToMove);
// 加入链表后,尝试合并尾部的两个结点
tail.compact();
}
source.size -= movedByteCount;
size += movedByteCount;
byteCount -= movedByteCount;
}
}
这个方法在源码中有大量的注释,因为是 okio
的核心所在。 现在我对这个注释进行下翻译,为后面分析代码作准备。
write(Buffer source, long byteCount)
是把 Buffer source
中的数据移动到当前 Buffer
的链表尾部的 Segment
中。 注意,是移动,不是复制,就是这一点,经常被外界夸大。
用移动而不是复制,是为了平衡两个冲突点:CPU 和 内存。 我们往往会为了性能牺牲内存,或者为了内存牺牲性能。
复制大量数据是一个很昂贵(expensive
)的操作,而移动数据,就只是修改修改指针而已,所以这就避免浪费了CPU。
为了节约内存,规定相邻的两个Segment
,它们各自的数据填充度至少应该为 50%,如果都少于 50%,会合并这两个结点。 当然,由于是循环链表,头结点和尾结点在理论上说是相邻的,但是它们不能参与合并,因为头结点是为了读数据,尾结点是为了写数据,如果参与合并就乱套了。
那么怎么移动数据呢?有三种情况
1. 假如说 Source Buffer
的链表为 [100%, 2%]
,而 Sink Buffer
的链表为 [99%, 3%]
,那么移动进行链表的移动后,Sink Buffer
就变为了 [100%, 2%, 99%, 3%]
。
2. 假如说 Source Buffer
的链表为 [100%, 40%]
,而 Sink Buffer
的链表为 [30%, 80%]
,那么移动后的结果为 [100%, 70%, 80%]
。 第三个结点 [30%]
被合并到了前驱结点 [40%]
中去了,然后 [30%]
这个结点被回收了。
3. 假如说从 Source Buffer
中的头结点的 Segment
的填充度是 [100%]
,我现在只想复制 30%
的数据出去,而 Sink Buffer
的尾部结点不能写(因为不是拥有者)或者空间紧张而不够写,那怎么办呢? 可以把这个 Segment
切分为两个 Segemnt
,填充度分为另 [30%]
和 [70%]
,然后把这个 [30%]
移动到 Sink Buffer
的链表中。
那么有人会问,第一种情况下,为何不选择合并 [2%] 和 [99%]
呢,因为就算合并了,也不能合并为一个,这样就不能回收结点,也就不能起到节约内存的目的。 那么有人可能又会问,那我后面的数据一直往前移动,总能回收几个结点吧? 理论是没错,但是这样一样,大量的复制数据岂不是过度浪费CPU了?
现在理解了原理,敢不敢跟着我的注释,去挑战下源码呢?
Buffer 读数据
把 Buffer
当作输入源,就可以读数据,首先看下把数据读到字节数组中
public int read(byte[] sink, int offset, int byteCount) {
checkOffsetAndCount(sink.length, offset, byteCount);
Segment s = head;
if (s == null) return -1;
int toCopy = Math.min(byteCount, s.limit - s.pos);
System.arraycopy(s.data, s.pos, sink, offset, toCopy);
s.pos += toCopy;
size -= toCopy;
if (s.pos == s.limit) {
head = s.pop();
SegmentPool.recycle(s);
}
return toCopy;
}
原理就是字节数组之间的复制。
最后看一个 Buffer
之间的数据读取
@Override
public long read(Buffer sink, long byteCount) {
if (sink == null) throw new IllegalArgumentException("sink == null");
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (size == 0) return -1L;
if (byteCount > size) byteCount = size;
sink.write(this, byteCount);
return byteCount;
}
原来,它就是复用前面讲到的 Buffer
之间写数据方法来完成 Buffer
之间数据的读取。
结束
网上大量文章一直传着 okio
的 IO
操作是数据的移动而不是复制,看完本文你搞清楚了吗? 它其实指的是在 Buffer
之间传输数据。 而其它的操作,其实都只是建立在 Java IO
, Java NIO
和 Socket
之上的。 okio
比 Java IO/NIO
好吗? 彼此彼此吧,关键看用到哪了。
本文把最基础的东西剖析了下,但是 okio
并不止于此。本篇文章是为了后面分析 Okhttp
源码中的 okio
操作做准备的。
最后附上一张 okio
的关系图
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/133201.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...