Okio原理分析之简介

Okio原理分析之简介Okio是一个实现java.io和java.nio的库,更方便访问、存储和处理数据。作为OkHttp组件的一部分,在Android中引入支持HTTP的客户端Okio一些关键概念介绍先看一下类图,对整体框架有个大概的了解ByteString和Buffer保存数据ByteString代表一个不可变的字节序列。对于char数据,String是基础类型。Buffer可变的字节序列,像ArrayList,读写Buffer的操作与queue类似,从尾部写,从头部读,不需要管理position/limi

大家好,又见面了,我是你们的朋友全栈君。

Okio是一个实现java.io和java.nio的库,更方便访问、存储和处理数据。作为OkHttp组件的一部分,在Android中引入支持HTTP的客户端

Okio一些关键概念介绍

先看一下类图,对整体框架有个大概的了解
在这里插入图片描述

ByteString和Buffer保存数据
  • ByteString 代表一个不可变的字节序列。对于char数据,String是基础类型。
  • Buffer 可变的字节序列,像ArrayList,读写Buffer的操作与queue类似,从尾部写,从头部读,不需要管理position/limit/capacities

在内部实现,ByteString和Buffer做了一些优化来节约CPU和内存,如果把一个UTF-8字符串编码为ByteString,保存了一个引用,后面如果需要解码的时候可以直接使用,在encodeUtf8方法里面

Buffer被实现为一个segment链表,当你从一个Buffer移动数据到另外一个Buffer的时候,重新设置了segment的所属Buffer,没有直接copy一份数据,在多线程场景下很有帮助

class Buffer implements BufferedSink,BufferedSource
Source and Sink 移动数据

在okio里面,有自己的stream类型

  • Source:提供字节流,使用此接口可以从任意地方读取数据,如network,storage,或者是内存中的buffer.可以对源进行分层以转换提供的数据,例如解压缩、解密或删除协议帧,大部分应用不会直接操作一个Source,而是使用BufferSource,更简单高效,可以使用Okio.buffer(Source)把一个Source封装为BufferedSource。此接口功能类似于InputStream,
    • Source里面没有提供类似available()方法,对应的调用者可以用require方法指定需要多个byte
  • Sink:接收字节流,使用此接口可以写数据,往network、storage、或内存中的Buffer.Sink可以分层以转换接收的数据,例如压缩、加密、限制或添加协议帧。大部分应用不会直接操作一个Sink,而是使用BufferSink,更简单高效,可以使用Okio.buffer(Sink)把一个Sink封装为BufferedSink。使用方式类似于OutputStream

读的时候:先从InputStream里面读取8192大小的字节到一个Segment里面,head节点指向这个Segment,如果要读取的字节数大于Buffer里面的大小,则继续读取一个8192大小,直到Buffer大小超过要读取的字节数

写的时候:在2个buffer之间移动segment,情况要复杂一些:

  • 不浪费CPU
    • 复制大量数据是很消耗资源的操作,相反,在Okio里面,会把整个segment重新设置所属关系,从source buffer到target buffer
  • 不浪费内存
    • 作为不变变量,Buffer中相邻的Segment对应容量至少满50%,head和tail节点除外。
    • head segment不能维持不变性,因为应用会从消费这个segment里面的数据,降低容量
    • tail segment不能维持不变性,因为应用会从这个segment里面新增数据,可能需要一个完整的空Segment作为tail添加

和Java I/O的主要差异:

  • 提供了IO语义层面的超时
  • 容易实现 ,容易使用
  • 没有人为区分字符流和字节流,都作为数据处理,读写都按字节来操作
  • 容易测试
Segment

Segment 在Buffer里面是环形双链表,在SegmentPool里面是单链表

保存在Segment里面的byte数组可能被buffer和bytestring共享,当shared为true时,当前的segment既不能被回收,也不能被改变。唯一的例外是,当前Segment的owner可以在Segment里面添加数据,写入数据到limit或超出这个位置

  • 对于每个byte数组,只有唯一的所属的segment。
  • position/limits/prev/next 是不共享的

关键属性

  • int SIZE = 8192 segment里面byte数据的默认大小
  • int SHARE_MINIMUM = 1024 当segment里面的byte数据大小超过这个值时,segment会变成共享的,来避免复制数据
  • final byte[] data segment里面保存的数据,初始化后不能改变大小
  • int limit 指向segment的可写的起始位置
  • int pos 指向segment可读的起始位置
  • boolean shared 是否和其它segment或bytestring共享data数组,为true时表示共享
  • boolean owner 是否拥有byte数组,为true时表示这个segment拥有数组,可以进行写入
  • Segment next; 当前segment的下一个segment
  • Segment pre; 当前segment的前一个segment

在读写数据的时候具体流程,在下一篇里面介绍

TimeOut 和 AsyncTimeout

Okio在设计的时候,就考虑了超时的问题,在创建Source或Sink的时候,都会传入一个Timeout参数

  public static Sink sink(OutputStream out) {
    return sink(out, new Timeout());
  }
  
  public static Source source(InputStream in) {
    return source(in, new Timeout());
  }

Timeout 定义了执行某项任务时的策略,超过多长时间后会放弃任务。当一个任务超时,任务处于未定义状态,应该被放弃。比如说,如果从source里面读取超时,source应该被关闭,read操作应该稍后重试;如果写入sink超时,处理策略也一样:关闭sink,然后稍后重试

Timeout类里面提供了2种管理超时的策略:

  • long deadlineNanoTime 等待**某个任务(包含一个或多个操作)**完成的最长时间。使用deadline来设置一个上限时间去完成革项任务。比如说电池管理app应该限制每次加载内容花费的时间
  • long timeoutNanos 等待单个任务完成的最长时间,Timeouts通常用来处理网络操作的问题,比如说在网络请求中,服务端在10s内没有返回任何数据,可以假设服务端当前不可用
  • boolean hasDeadline 和deadlineNanoTime配合使用,当为true时,表示有定义

默认情况下,都为0

AsyncTimeout 继承自Timeout,当超时发生时,使用一个后台watchdog线程来处理相应的动作,使用此类可以给那些原生不支持超时的操作添加超时功能,如在socket里面阻塞在写操作

类的关键属性有

  • int TIMEOUT_WRITE_SIZE = 64 * 1024 一次写入的大小不能超过64KB,因为SegmentPool的最大缓存是64KB,超过这个大小会超时
  • long IDLE_TIMEOUT_MILLIS 在watchdog线程关闭之前,处于IDLE状态的时间,为60s
  • AsyncTimeout head 在watchdog线程里面会处理一个AsyncTimeout队列(有序队列)里面的每个超时事件,head表示首节点,即最先到达超时的节点,或者为null,当队列为空时
  • boolean inQueue 为true表示节点在当前的队列里面
  • AsyncTimeout next 队列里面指向下一个节点
  • long timeoutAt 如果被触发,此时间watchdog应该等待的时间,然后执行此AsyncTimeout

Watchdog的实现比较简单,循环读取队列里面的AsyncTimeout,并执行相应的timedOut操作,当队列为空时退出

  private static final class Watchdog extends Thread {
    Watchdog() {
      super("Okio Watchdog");
      setDaemon(true);
    }

    public void run() {
      while (true) {
        try {
          AsyncTimeout timedOut;
          synchronized (AsyncTimeout.class) {
            // 返回链表的head节点
            timedOut = awaitTimeout();

            // Didn't find a node to interrupt. Try again.
            if (timedOut == null) continue;

            // The queue is completely empty. Let this thread exit and let another watchdog thread
            // get created on the next call to scheduleTimeout().
            if (timedOut == head) {
              head = null;
              return;
            }
          }

          // Close the timed out node.
          timedOut.timedOut();
        } catch (InterruptedException ignored) {
        }
      }
    }
  }

在scheduleTimeout里面,如果head节点为空,则启动watchdog,然后设置当前节点的timeoutAt,并按时间顺序插入有序队列里面

Hash和加解密

Okio提供了2种压缩方式

  • InflaterSource/DeflaterSink 使用 默认压缩算法 压缩解压
  • GZipSink/GZipSource 使用 Gzip算法 压缩解压
File file = // ...
Sink fileSink = Okio.sink(file);
Sink gzipSink = new GzipSink(fileSink);
BufferedSink bufferedSink = Okio.buffer(gzipSink);

HashingSink 提供了支持各种算法计算Hash

    static void hashing() throws IOException {
        BufferedSource bufferedSource = Okio.buffer(Okio.source(new File(READ)));
        ByteString byteString = bufferedSource.readByteString();
        System.out.println("   md5: " + byteString.md5().hex());
        System.out.println("  sha1: " + byteString.sha1().hex());
        System.out.println("sha256: " + byteString.sha256().hex());
        System.out.println("sha512: " + byteString.sha512().hex());

        Buffer buffer = bufferedSource.getBuffer();
        System.out.println("   md5: " + buffer.md5().hex());
        System.out.println("  sha1: " + buffer.sha1().hex());
        System.out.println("sha256: " + buffer.sha256().hex());
        System.out.println("sha512: " + buffer.sha512().hex());
        bufferedSource.close();
    }

使用Okio.cipherSink(Sink, Cipher) 或 Okio.cipherSource(Source, Cipher) 来加密、解密stream

void encryptAes(ByteString bytes, File file, byte[] key, byte[] iv)
    throws GeneralSecurityException, IOException {
  Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
  cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(key, "AES"), new IvParameterSpec(iv));
  try (BufferedSink sink = Okio.buffer(Okio.cipherSink(Okio.sink(file), cipher))) {
    sink.write(bytes);
  }
}

ByteString decryptAesToByteString(File file, byte[] key, byte[] iv)
    throws GeneralSecurityException, IOException {
  Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
  cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(key, "AES"), new IvParameterSpec(iv));
  try (BufferedSource source = Okio.buffer(Okio.cipherSource(Okio.source(file), cipher))) {
    return source.readByteString();
  }
}
Throttler 流量控制

使用Throttler可以控制Source和Sink的流量,给Source或Sink设置一个Throttler,然后通过bytesPerSecond来设置期望的值

类里面有3个关键变量:

  • bytesPerSecond: 允许的最大流量,使用0表示没有限制
  • waitByteCount: 当所请求的字节数大于该字节数且无法立即使用时,只需等到我们至少可以分配这么多字节。 使用此设置来设置一个理想字节数,保持在此区间的持续吞吐量。
  • maxByteCount: 每次调用允许申请的最大容量,也是在任何等待前返回的byte数量。

简单使用如下:

Socket socket = //...
Source socketSource = Okio.source(socket);
Throttler throttler = new Throttler();
Source throttledSource = throttler.throttle(socketSource);
BufferedSource bufferedSource = Okio.buffer(throttledSource);
bufferedSource.readUtf8(10 * 1024);
throttler.bytesPerTimePeriod(50, 1, SECONDS);
bufferedSource.readUtf8(10 * 1024);

Okio的简单使用

Okio里面的Source支持从text文件、binary文件,socket里面进行读取

从文本文件里面读写:

    static final String READ = System.getProperty("user.dir") + File.separator + "read.txt";
    static final String WRITE = System.getProperty("user.dir") + File.separator + "write.txt";

    public static void main(String[] args) throws IOException {
        readLines(new File(READ));
        writeEnv(new File(WRITE));
        exploreCharset();
        hashing();
    }

    static void readLines(File file) throws IOException {
        // The try-with-resources statement is a try statement that declares one or more resources.
        // A resource is an object that must be closed after the program is finished with it.
        // The try-with-resources statement ensures that each resource is closed at the end of the statement.
        // Any object that implements java.lang.AutoCloseable,
        // which includes all objects which implement java.io.Closeable, can be used as a resource.
        try (BufferedSource bufferedSource = Okio.buffer(Okio.source(file))) {
            while (true) {
                String line = bufferedSource.readUtf8Line();
                if (line == null) break;
                if (line.contains("chadm")) {
                    System.out.println(line);
                }
            }
        }
    }

    static void writeEnv(File file) throws IOException {
        try (BufferedSink bufferedSink = Okio.buffer(Okio.sink(file))) {
            for (Map.Entry<Object, Object> entry : System.getProperties().entrySet()) {
                bufferedSink.writeUtf8((String) entry.getKey())
                        .writeUtf8("=")
                        .writeUtf8((String) entry.getValue())
                        .writeUtf8("\n");
            }
        }
    }

从二进制文件里面读取

  void encode(Bitmap bitmap, BufferedSink sink) throws IOException {
    int height = bitmap.height();
    int width = bitmap.width();

    int bytesPerPixel = 3;
    int rowByteCountWithoutPadding = (bytesPerPixel * width);
    int rowByteCount = ((rowByteCountWithoutPadding + 3) / 4) * 4;
    int pixelDataSize = rowByteCount * height;
    int bmpHeaderSize = 14;
    int dibHeaderSize = 40;

    // BMP Header
    sink.writeUtf8("BM"); // ID.
    sink.writeIntLe(bmpHeaderSize + dibHeaderSize + pixelDataSize); // File size.
    sink.writeShortLe(0); // Unused.
    sink.writeShortLe(0); // Unused.
    sink.writeIntLe(bmpHeaderSize + dibHeaderSize); // Offset of pixel data.

    // DIB Header
    sink.writeIntLe(dibHeaderSize);
    sink.writeIntLe(width);
    sink.writeIntLe(height);
    sink.writeShortLe(1);  // Color plane count.
    sink.writeShortLe(bytesPerPixel * Byte.SIZE);
    sink.writeIntLe(0);    // No compression.
    sink.writeIntLe(16);   // Size of bitmap data including padding.
    sink.writeIntLe(2835); // Horizontal print resolution in pixels/meter. (72 dpi).
    sink.writeIntLe(2835); // Vertical print resolution in pixels/meter. (72 dpi).
    sink.writeIntLe(0);    // Palette color count.
    sink.writeIntLe(0);    // 0 important colors.

    // Pixel data.
    for (int y = height - 1; y >= 0; y--) {
      for (int x = 0; x < width; x++) {
        sink.writeByte(bitmap.blue(x, y));
        sink.writeByte(bitmap.green(x, y));
        sink.writeByte(bitmap.red(x, y));
      }

      // Padding for 4-byte alignment.
      for (int p = rowByteCountWithoutPadding; p < rowByteCount; p++) {
        sink.writeByte(0);
      }
    }
  }

从Socket里读写

  private void transfer(Socket sourceSocket, Source source, Sink sink) {
    try {
      Buffer buffer = new Buffer();
      for (long byteCount; (byteCount = source.read(buffer, 8192L)) != -1; ) {
        sink.write(buffer, byteCount);
        sink.flush();
      }
    } catch (IOException e) {
      System.out.println("transfer failed from " + sourceSocket + ": " + e);
    } finally {
      closeQuietly(sink);
      closeQuietly(source);
      closeQuietly(sourceSocket);
      openSockets.remove(sourceSocket);
    }
  }
  
  public static void main(String[] args) throws IOException {
        SocksProxyServer proxyServer = new SocksProxyServer();
        proxyServer.start();

        URL url = new URL("https://publicobject.com/helloworld.txt");
        URLConnection connection = url.openConnection(proxyServer.proxy());
        try (BufferedSource source = Okio.buffer(Okio.source(connection.getInputStream()))) {
            for (String line; (line = source.readUtf8Line()) != null; ) {
                System.out.println(line);
            }
        }

        proxyServer.shutdown();
    }

更多例子,可以参考官方给的 Okio Demo

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/145129.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号