大家好,又见面了,我是你们的朋友全栈君。
Okio是一个实现java.io和java.nio的库,更方便访问、存储和处理数据。作为OkHttp组件的一部分,在Android中引入支持HTTP的客户端
Okio一些关键概念介绍
先看一下类图,对整体框架有个大概的了解
ByteString和Buffer保存数据
- ByteString 代表一个不可变的字节序列。对于char数据,String是基础类型。
- Buffer 可变的字节序列,像ArrayList,读写Buffer的操作与queue类似,从尾部写,从头部读,不需要管理position/limit/capacities
在内部实现,ByteString和Buffer做了一些优化来节约CPU和内存,如果把一个UTF-8字符串编码为ByteString,保存了一个引用,后面如果需要解码的时候可以直接使用,在encodeUtf8方法里面
Buffer被实现为一个segment链表,当你从一个Buffer移动数据到另外一个Buffer的时候,重新设置了segment的所属Buffer,没有直接copy一份数据,在多线程场景下很有帮助
class Buffer implements BufferedSink,BufferedSource
Source and Sink 移动数据
在okio里面,有自己的stream类型
- Source:提供字节流,使用此接口可以从任意地方读取数据,如network,storage,或者是内存中的buffer.可以对源进行分层以转换提供的数据,例如解压缩、解密或删除协议帧,大部分应用不会直接操作一个Source,而是使用BufferSource,更简单高效,可以使用Okio.buffer(Source)把一个Source封装为BufferedSource。此接口功能类似于InputStream,
- Source里面没有提供类似available()方法,对应的调用者可以用require方法指定需要多个byte
- Sink:接收字节流,使用此接口可以写数据,往network、storage、或内存中的Buffer.Sink可以分层以转换接收的数据,例如压缩、加密、限制或添加协议帧。大部分应用不会直接操作一个Sink,而是使用BufferSink,更简单高效,可以使用Okio.buffer(Sink)把一个Sink封装为BufferedSink。使用方式类似于OutputStream
读的时候:先从InputStream里面读取8192大小的字节到一个Segment里面,head节点指向这个Segment,如果要读取的字节数大于Buffer里面的大小,则继续读取一个8192大小,直到Buffer大小超过要读取的字节数
写的时候:在2个buffer之间移动segment,情况要复杂一些:
- 不浪费CPU
- 复制大量数据是很消耗资源的操作,相反,在Okio里面,会把整个segment重新设置所属关系,从source buffer到target buffer
- 不浪费内存
- 作为不变变量,Buffer中相邻的Segment对应容量至少满50%,head和tail节点除外。
- head segment不能维持不变性,因为应用会从消费这个segment里面的数据,降低容量
- tail segment不能维持不变性,因为应用会从这个segment里面新增数据,可能需要一个完整的空Segment作为tail添加
和Java I/O的主要差异:
- 提供了IO语义层面的超时
- 容易实现 ,容易使用
- 没有人为区分字符流和字节流,都作为数据处理,读写都按字节来操作
- 容易测试
Segment
Segment 在Buffer里面是环形双链表,在SegmentPool里面是单链表
保存在Segment里面的byte数组可能被buffer和bytestring共享,当shared为true时,当前的segment既不能被回收,也不能被改变。唯一的例外是,当前Segment的owner可以在Segment里面添加数据,写入数据到limit或超出这个位置
- 对于每个byte数组,只有唯一的所属的segment。
- position/limits/prev/next 是不共享的
关键属性
- int SIZE = 8192 segment里面byte数据的默认大小
- int SHARE_MINIMUM = 1024 当segment里面的byte数据大小超过这个值时,segment会变成共享的,来避免复制数据
- final byte[] data segment里面保存的数据,初始化后不能改变大小
- int limit 指向segment的可写的起始位置
- int pos 指向segment可读的起始位置
- boolean shared 是否和其它segment或bytestring共享data数组,为true时表示共享
- boolean owner 是否拥有byte数组,为true时表示这个segment拥有数组,可以进行写入
- Segment next; 当前segment的下一个segment
- Segment pre; 当前segment的前一个segment
在读写数据的时候具体流程,在下一篇里面介绍
TimeOut 和 AsyncTimeout
Okio在设计的时候,就考虑了超时的问题,在创建Source或Sink的时候,都会传入一个Timeout参数
public static Sink sink(OutputStream out) {
return sink(out, new Timeout());
}
public static Source source(InputStream in) {
return source(in, new Timeout());
}
Timeout 定义了执行某项任务时的策略,超过多长时间后会放弃任务。当一个任务超时,任务处于未定义状态,应该被放弃。比如说,如果从source里面读取超时,source应该被关闭,read操作应该稍后重试;如果写入sink超时,处理策略也一样:关闭sink,然后稍后重试
Timeout类里面提供了2种管理超时的策略:
- long deadlineNanoTime 等待**某个任务(包含一个或多个操作)**完成的最长时间。使用deadline来设置一个上限时间去完成革项任务。比如说电池管理app应该限制每次加载内容花费的时间
- long timeoutNanos 等待单个任务完成的最长时间,Timeouts通常用来处理网络操作的问题,比如说在网络请求中,服务端在10s内没有返回任何数据,可以假设服务端当前不可用
- boolean hasDeadline 和deadlineNanoTime配合使用,当为true时,表示有定义
默认情况下,都为0
AsyncTimeout 继承自Timeout,当超时发生时,使用一个后台watchdog线程来处理相应的动作,使用此类可以给那些原生不支持超时的操作添加超时功能,如在socket里面阻塞在写操作
类的关键属性有
- int TIMEOUT_WRITE_SIZE = 64 * 1024 一次写入的大小不能超过64KB,因为SegmentPool的最大缓存是64KB,超过这个大小会超时
- long IDLE_TIMEOUT_MILLIS 在watchdog线程关闭之前,处于IDLE状态的时间,为60s
- AsyncTimeout head 在watchdog线程里面会处理一个AsyncTimeout队列(有序队列)里面的每个超时事件,head表示首节点,即最先到达超时的节点,或者为null,当队列为空时
- boolean inQueue 为true表示节点在当前的队列里面
- AsyncTimeout next 队列里面指向下一个节点
- long timeoutAt 如果被触发,此时间watchdog应该等待的时间,然后执行此AsyncTimeout
Watchdog的实现比较简单,循环读取队列里面的AsyncTimeout,并执行相应的timedOut操作,当队列为空时退出
private static final class Watchdog extends Thread {
Watchdog() {
super("Okio Watchdog");
setDaemon(true);
}
public void run() {
while (true) {
try {
AsyncTimeout timedOut;
synchronized (AsyncTimeout.class) {
// 返回链表的head节点
timedOut = awaitTimeout();
// Didn't find a node to interrupt. Try again.
if (timedOut == null) continue;
// The queue is completely empty. Let this thread exit and let another watchdog thread
// get created on the next call to scheduleTimeout().
if (timedOut == head) {
head = null;
return;
}
}
// Close the timed out node.
timedOut.timedOut();
} catch (InterruptedException ignored) {
}
}
}
}
在scheduleTimeout里面,如果head节点为空,则启动watchdog,然后设置当前节点的timeoutAt,并按时间顺序插入有序队列里面
Hash和加解密
Okio提供了2种压缩方式
- InflaterSource/DeflaterSink 使用 默认压缩算法 压缩解压
- GZipSink/GZipSource 使用 Gzip算法 压缩解压
File file = // ...
Sink fileSink = Okio.sink(file);
Sink gzipSink = new GzipSink(fileSink);
BufferedSink bufferedSink = Okio.buffer(gzipSink);
HashingSink 提供了支持各种算法计算Hash
static void hashing() throws IOException {
BufferedSource bufferedSource = Okio.buffer(Okio.source(new File(READ)));
ByteString byteString = bufferedSource.readByteString();
System.out.println(" md5: " + byteString.md5().hex());
System.out.println(" sha1: " + byteString.sha1().hex());
System.out.println("sha256: " + byteString.sha256().hex());
System.out.println("sha512: " + byteString.sha512().hex());
Buffer buffer = bufferedSource.getBuffer();
System.out.println(" md5: " + buffer.md5().hex());
System.out.println(" sha1: " + buffer.sha1().hex());
System.out.println("sha256: " + buffer.sha256().hex());
System.out.println("sha512: " + buffer.sha512().hex());
bufferedSource.close();
}
使用Okio.cipherSink(Sink, Cipher) 或 Okio.cipherSource(Source, Cipher) 来加密、解密stream
void encryptAes(ByteString bytes, File file, byte[] key, byte[] iv)
throws GeneralSecurityException, IOException {
Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(key, "AES"), new IvParameterSpec(iv));
try (BufferedSink sink = Okio.buffer(Okio.cipherSink(Okio.sink(file), cipher))) {
sink.write(bytes);
}
}
ByteString decryptAesToByteString(File file, byte[] key, byte[] iv)
throws GeneralSecurityException, IOException {
Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(key, "AES"), new IvParameterSpec(iv));
try (BufferedSource source = Okio.buffer(Okio.cipherSource(Okio.source(file), cipher))) {
return source.readByteString();
}
}
Throttler 流量控制
使用Throttler可以控制Source和Sink的流量,给Source或Sink设置一个Throttler,然后通过bytesPerSecond来设置期望的值
类里面有3个关键变量:
bytesPerSecond
: 允许的最大流量,使用0表示没有限制waitByteCount
: 当所请求的字节数大于该字节数且无法立即使用时,只需等到我们至少可以分配这么多字节。 使用此设置来设置一个理想字节数,保持在此区间的持续吞吐量。maxByteCount
: 每次调用允许申请的最大容量,也是在任何等待前返回的byte数量。
简单使用如下:
Socket socket = //...
Source socketSource = Okio.source(socket);
Throttler throttler = new Throttler();
Source throttledSource = throttler.throttle(socketSource);
BufferedSource bufferedSource = Okio.buffer(throttledSource);
bufferedSource.readUtf8(10 * 1024);
throttler.bytesPerTimePeriod(50, 1, SECONDS);
bufferedSource.readUtf8(10 * 1024);
Okio的简单使用
Okio里面的Source支持从text文件、binary文件,socket里面进行读取
从文本文件里面读写:
static final String READ = System.getProperty("user.dir") + File.separator + "read.txt";
static final String WRITE = System.getProperty("user.dir") + File.separator + "write.txt";
public static void main(String[] args) throws IOException {
readLines(new File(READ));
writeEnv(new File(WRITE));
exploreCharset();
hashing();
}
static void readLines(File file) throws IOException {
// The try-with-resources statement is a try statement that declares one or more resources.
// A resource is an object that must be closed after the program is finished with it.
// The try-with-resources statement ensures that each resource is closed at the end of the statement.
// Any object that implements java.lang.AutoCloseable,
// which includes all objects which implement java.io.Closeable, can be used as a resource.
try (BufferedSource bufferedSource = Okio.buffer(Okio.source(file))) {
while (true) {
String line = bufferedSource.readUtf8Line();
if (line == null) break;
if (line.contains("chadm")) {
System.out.println(line);
}
}
}
}
static void writeEnv(File file) throws IOException {
try (BufferedSink bufferedSink = Okio.buffer(Okio.sink(file))) {
for (Map.Entry<Object, Object> entry : System.getProperties().entrySet()) {
bufferedSink.writeUtf8((String) entry.getKey())
.writeUtf8("=")
.writeUtf8((String) entry.getValue())
.writeUtf8("\n");
}
}
}
从二进制文件里面读取
void encode(Bitmap bitmap, BufferedSink sink) throws IOException {
int height = bitmap.height();
int width = bitmap.width();
int bytesPerPixel = 3;
int rowByteCountWithoutPadding = (bytesPerPixel * width);
int rowByteCount = ((rowByteCountWithoutPadding + 3) / 4) * 4;
int pixelDataSize = rowByteCount * height;
int bmpHeaderSize = 14;
int dibHeaderSize = 40;
// BMP Header
sink.writeUtf8("BM"); // ID.
sink.writeIntLe(bmpHeaderSize + dibHeaderSize + pixelDataSize); // File size.
sink.writeShortLe(0); // Unused.
sink.writeShortLe(0); // Unused.
sink.writeIntLe(bmpHeaderSize + dibHeaderSize); // Offset of pixel data.
// DIB Header
sink.writeIntLe(dibHeaderSize);
sink.writeIntLe(width);
sink.writeIntLe(height);
sink.writeShortLe(1); // Color plane count.
sink.writeShortLe(bytesPerPixel * Byte.SIZE);
sink.writeIntLe(0); // No compression.
sink.writeIntLe(16); // Size of bitmap data including padding.
sink.writeIntLe(2835); // Horizontal print resolution in pixels/meter. (72 dpi).
sink.writeIntLe(2835); // Vertical print resolution in pixels/meter. (72 dpi).
sink.writeIntLe(0); // Palette color count.
sink.writeIntLe(0); // 0 important colors.
// Pixel data.
for (int y = height - 1; y >= 0; y--) {
for (int x = 0; x < width; x++) {
sink.writeByte(bitmap.blue(x, y));
sink.writeByte(bitmap.green(x, y));
sink.writeByte(bitmap.red(x, y));
}
// Padding for 4-byte alignment.
for (int p = rowByteCountWithoutPadding; p < rowByteCount; p++) {
sink.writeByte(0);
}
}
}
从Socket里读写
private void transfer(Socket sourceSocket, Source source, Sink sink) {
try {
Buffer buffer = new Buffer();
for (long byteCount; (byteCount = source.read(buffer, 8192L)) != -1; ) {
sink.write(buffer, byteCount);
sink.flush();
}
} catch (IOException e) {
System.out.println("transfer failed from " + sourceSocket + ": " + e);
} finally {
closeQuietly(sink);
closeQuietly(source);
closeQuietly(sourceSocket);
openSockets.remove(sourceSocket);
}
}
public static void main(String[] args) throws IOException {
SocksProxyServer proxyServer = new SocksProxyServer();
proxyServer.start();
URL url = new URL("https://publicobject.com/helloworld.txt");
URLConnection connection = url.openConnection(proxyServer.proxy());
try (BufferedSource source = Okio.buffer(Okio.source(connection.getInputStream()))) {
for (String line; (line = source.readUtf8Line()) != null; ) {
System.out.println(line);
}
}
proxyServer.shutdown();
}
更多例子,可以参考官方给的 Okio Demo
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/145129.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...