Okio原理分析之简介

Okio原理分析之简介Okio是一个实现java.io和java.nio的库,更方便访问、存储和处理数据。作为OkHttp组件的一部分,在Android中引入支持HTTP的客户端Okio一些关键概念介绍先看一下类图,对整体框架有个大概的了解ByteString和Buffer保存数据ByteString代表一个不可变的字节序列。对于char数据,String是基础类型。Buffer可变的字节序列,像ArrayList,读写Buffer的操作与queue类似,从尾部写,从头部读,不需要管理position/limi

大家好,又见面了,我是你们的朋友全栈君。

Okio是一个实现java.io和java.nio的库,更方便访问、存储和处理数据。作为OkHttp组件的一部分,在Android中引入支持HTTP的客户端

Okio一些关键概念介绍

先看一下类图,对整体框架有个大概的了解
在这里插入图片描述

ByteString和Buffer保存数据
  • ByteString 代表一个不可变的字节序列。对于char数据,String是基础类型。
  • Buffer 可变的字节序列,像ArrayList,读写Buffer的操作与queue类似,从尾部写,从头部读,不需要管理position/limit/capacities

在内部实现,ByteString和Buffer做了一些优化来节约CPU和内存,如果把一个UTF-8字符串编码为ByteString,保存了一个引用,后面如果需要解码的时候可以直接使用,在encodeUtf8方法里面

Buffer被实现为一个segment链表,当你从一个Buffer移动数据到另外一个Buffer的时候,重新设置了segment的所属Buffer,没有直接copy一份数据,在多线程场景下很有帮助

class Buffer implements BufferedSink,BufferedSource
Source and Sink 移动数据

在okio里面,有自己的stream类型

  • Source:提供字节流,使用此接口可以从任意地方读取数据,如network,storage,或者是内存中的buffer.可以对源进行分层以转换提供的数据,例如解压缩、解密或删除协议帧,大部分应用不会直接操作一个Source,而是使用BufferSource,更简单高效,可以使用Okio.buffer(Source)把一个Source封装为BufferedSource。此接口功能类似于InputStream,
    • Source里面没有提供类似available()方法,对应的调用者可以用require方法指定需要多个byte
  • Sink:接收字节流,使用此接口可以写数据,往network、storage、或内存中的Buffer.Sink可以分层以转换接收的数据,例如压缩、加密、限制或添加协议帧。大部分应用不会直接操作一个Sink,而是使用BufferSink,更简单高效,可以使用Okio.buffer(Sink)把一个Sink封装为BufferedSink。使用方式类似于OutputStream

读的时候:先从InputStream里面读取8192大小的字节到一个Segment里面,head节点指向这个Segment,如果要读取的字节数大于Buffer里面的大小,则继续读取一个8192大小,直到Buffer大小超过要读取的字节数

写的时候:在2个buffer之间移动segment,情况要复杂一些:

  • 不浪费CPU
    • 复制大量数据是很消耗资源的操作,相反,在Okio里面,会把整个segment重新设置所属关系,从source buffer到target buffer
  • 不浪费内存
    • 作为不变变量,Buffer中相邻的Segment对应容量至少满50%,head和tail节点除外。
    • head segment不能维持不变性,因为应用会从消费这个segment里面的数据,降低容量
    • tail segment不能维持不变性,因为应用会从这个segment里面新增数据,可能需要一个完整的空Segment作为tail添加

和Java I/O的主要差异:

  • 提供了IO语义层面的超时
  • 容易实现 ,容易使用
  • 没有人为区分字符流和字节流,都作为数据处理,读写都按字节来操作
  • 容易测试
Segment

Segment 在Buffer里面是环形双链表,在SegmentPool里面是单链表

保存在Segment里面的byte数组可能被buffer和bytestring共享,当shared为true时,当前的segment既不能被回收,也不能被改变。唯一的例外是,当前Segment的owner可以在Segment里面添加数据,写入数据到limit或超出这个位置

  • 对于每个byte数组,只有唯一的所属的segment。
  • position/limits/prev/next 是不共享的

关键属性

  • int SIZE = 8192 segment里面byte数据的默认大小
  • int SHARE_MINIMUM = 1024 当segment里面的byte数据大小超过这个值时,segment会变成共享的,来避免复制数据
  • final byte[] data segment里面保存的数据,初始化后不能改变大小
  • int limit 指向segment的可写的起始位置
  • int pos 指向segment可读的起始位置
  • boolean shared 是否和其它segment或bytestring共享data数组,为true时表示共享
  • boolean owner 是否拥有byte数组,为true时表示这个segment拥有数组,可以进行写入
  • Segment next; 当前segment的下一个segment
  • Segment pre; 当前segment的前一个segment

在读写数据的时候具体流程,在下一篇里面介绍

TimeOut 和 AsyncTimeout

Okio在设计的时候,就考虑了超时的问题,在创建Source或Sink的时候,都会传入一个Timeout参数

  public static Sink sink(OutputStream out) {
    return sink(out, new Timeout());
  }
  
  public static Source source(InputStream in) {
    return source(in, new Timeout());
  }

Timeout 定义了执行某项任务时的策略,超过多长时间后会放弃任务。当一个任务超时,任务处于未定义状态,应该被放弃。比如说,如果从source里面读取超时,source应该被关闭,read操作应该稍后重试;如果写入sink超时,处理策略也一样:关闭sink,然后稍后重试

Timeout类里面提供了2种管理超时的策略:

  • long deadlineNanoTime 等待**某个任务(包含一个或多个操作)**完成的最长时间。使用deadline来设置一个上限时间去完成革项任务。比如说电池管理app应该限制每次加载内容花费的时间
  • long timeoutNanos 等待单个任务完成的最长时间,Timeouts通常用来处理网络操作的问题,比如说在网络请求中,服务端在10s内没有返回任何数据,可以假设服务端当前不可用
  • boolean hasDeadline 和deadlineNanoTime配合使用,当为true时,表示有定义

默认情况下,都为0

AsyncTimeout 继承自Timeout,当超时发生时,使用一个后台watchdog线程来处理相应的动作,使用此类可以给那些原生不支持超时的操作添加超时功能,如在socket里面阻塞在写操作

类的关键属性有

  • int TIMEOUT_WRITE_SIZE = 64 * 1024 一次写入的大小不能超过64KB,因为SegmentPool的最大缓存是64KB,超过这个大小会超时
  • long IDLE_TIMEOUT_MILLIS 在watchdog线程关闭之前,处于IDLE状态的时间,为60s
  • AsyncTimeout head 在watchdog线程里面会处理一个AsyncTimeout队列(有序队列)里面的每个超时事件,head表示首节点,即最先到达超时的节点,或者为null,当队列为空时
  • boolean inQueue 为true表示节点在当前的队列里面
  • AsyncTimeout next 队列里面指向下一个节点
  • long timeoutAt 如果被触发,此时间watchdog应该等待的时间,然后执行此AsyncTimeout

Watchdog的实现比较简单,循环读取队列里面的AsyncTimeout,并执行相应的timedOut操作,当队列为空时退出

  private static final class Watchdog extends Thread {
    Watchdog() {
      super("Okio Watchdog");
      setDaemon(true);
    }

    public void run() {
      while (true) {
        try {
          AsyncTimeout timedOut;
          synchronized (AsyncTimeout.class) {
            // 返回链表的head节点
            timedOut = awaitTimeout();

            // Didn't find a node to interrupt. Try again.
            if (timedOut == null) continue;

            // The queue is completely empty. Let this thread exit and let another watchdog thread
            // get created on the next call to scheduleTimeout().
            if (timedOut == head) {
              head = null;
              return;
            }
          }

          // Close the timed out node.
          timedOut.timedOut();
        } catch (InterruptedException ignored) {
        }
      }
    }
  }

在scheduleTimeout里面,如果head节点为空,则启动watchdog,然后设置当前节点的timeoutAt,并按时间顺序插入有序队列里面

Hash和加解密

Okio提供了2种压缩方式

  • InflaterSource/DeflaterSink 使用 默认压缩算法 压缩解压
  • GZipSink/GZipSource 使用 Gzip算法 压缩解压
File file = // ...
Sink fileSink = Okio.sink(file);
Sink gzipSink = new GzipSink(fileSink);
BufferedSink bufferedSink = Okio.buffer(gzipSink);

HashingSink 提供了支持各种算法计算Hash

    static void hashing() throws IOException {
        BufferedSource bufferedSource = Okio.buffer(Okio.source(new File(READ)));
        ByteString byteString = bufferedSource.readByteString();
        System.out.println("   md5: " + byteString.md5().hex());
        System.out.println("  sha1: " + byteString.sha1().hex());
        System.out.println("sha256: " + byteString.sha256().hex());
        System.out.println("sha512: " + byteString.sha512().hex());

        Buffer buffer = bufferedSource.getBuffer();
        System.out.println("   md5: " + buffer.md5().hex());
        System.out.println("  sha1: " + buffer.sha1().hex());
        System.out.println("sha256: " + buffer.sha256().hex());
        System.out.println("sha512: " + buffer.sha512().hex());
        bufferedSource.close();
    }

使用Okio.cipherSink(Sink, Cipher) 或 Okio.cipherSource(Source, Cipher) 来加密、解密stream

void encryptAes(ByteString bytes, File file, byte[] key, byte[] iv)
    throws GeneralSecurityException, IOException {
  Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
  cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(key, "AES"), new IvParameterSpec(iv));
  try (BufferedSink sink = Okio.buffer(Okio.cipherSink(Okio.sink(file), cipher))) {
    sink.write(bytes);
  }
}

ByteString decryptAesToByteString(File file, byte[] key, byte[] iv)
    throws GeneralSecurityException, IOException {
  Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
  cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(key, "AES"), new IvParameterSpec(iv));
  try (BufferedSource source = Okio.buffer(Okio.cipherSource(Okio.source(file), cipher))) {
    return source.readByteString();
  }
}
Throttler 流量控制

使用Throttler可以控制Source和Sink的流量,给Source或Sink设置一个Throttler,然后通过bytesPerSecond来设置期望的值

类里面有3个关键变量:

  • bytesPerSecond: 允许的最大流量,使用0表示没有限制
  • waitByteCount: 当所请求的字节数大于该字节数且无法立即使用时,只需等到我们至少可以分配这么多字节。 使用此设置来设置一个理想字节数,保持在此区间的持续吞吐量。
  • maxByteCount: 每次调用允许申请的最大容量,也是在任何等待前返回的byte数量。

简单使用如下:

Socket socket = //...
Source socketSource = Okio.source(socket);
Throttler throttler = new Throttler();
Source throttledSource = throttler.throttle(socketSource);
BufferedSource bufferedSource = Okio.buffer(throttledSource);
bufferedSource.readUtf8(10 * 1024);
throttler.bytesPerTimePeriod(50, 1, SECONDS);
bufferedSource.readUtf8(10 * 1024);

Okio的简单使用

Okio里面的Source支持从text文件、binary文件,socket里面进行读取

从文本文件里面读写:

    static final String READ = System.getProperty("user.dir") + File.separator + "read.txt";
    static final String WRITE = System.getProperty("user.dir") + File.separator + "write.txt";

    public static void main(String[] args) throws IOException {
        readLines(new File(READ));
        writeEnv(new File(WRITE));
        exploreCharset();
        hashing();
    }

    static void readLines(File file) throws IOException {
        // The try-with-resources statement is a try statement that declares one or more resources.
        // A resource is an object that must be closed after the program is finished with it.
        // The try-with-resources statement ensures that each resource is closed at the end of the statement.
        // Any object that implements java.lang.AutoCloseable,
        // which includes all objects which implement java.io.Closeable, can be used as a resource.
        try (BufferedSource bufferedSource = Okio.buffer(Okio.source(file))) {
            while (true) {
                String line = bufferedSource.readUtf8Line();
                if (line == null) break;
                if (line.contains("chadm")) {
                    System.out.println(line);
                }
            }
        }
    }

    static void writeEnv(File file) throws IOException {
        try (BufferedSink bufferedSink = Okio.buffer(Okio.sink(file))) {
            for (Map.Entry<Object, Object> entry : System.getProperties().entrySet()) {
                bufferedSink.writeUtf8((String) entry.getKey())
                        .writeUtf8("=")
                        .writeUtf8((String) entry.getValue())
                        .writeUtf8("\n");
            }
        }
    }

从二进制文件里面读取

  void encode(Bitmap bitmap, BufferedSink sink) throws IOException {
    int height = bitmap.height();
    int width = bitmap.width();

    int bytesPerPixel = 3;
    int rowByteCountWithoutPadding = (bytesPerPixel * width);
    int rowByteCount = ((rowByteCountWithoutPadding + 3) / 4) * 4;
    int pixelDataSize = rowByteCount * height;
    int bmpHeaderSize = 14;
    int dibHeaderSize = 40;

    // BMP Header
    sink.writeUtf8("BM"); // ID.
    sink.writeIntLe(bmpHeaderSize + dibHeaderSize + pixelDataSize); // File size.
    sink.writeShortLe(0); // Unused.
    sink.writeShortLe(0); // Unused.
    sink.writeIntLe(bmpHeaderSize + dibHeaderSize); // Offset of pixel data.

    // DIB Header
    sink.writeIntLe(dibHeaderSize);
    sink.writeIntLe(width);
    sink.writeIntLe(height);
    sink.writeShortLe(1);  // Color plane count.
    sink.writeShortLe(bytesPerPixel * Byte.SIZE);
    sink.writeIntLe(0);    // No compression.
    sink.writeIntLe(16);   // Size of bitmap data including padding.
    sink.writeIntLe(2835); // Horizontal print resolution in pixels/meter. (72 dpi).
    sink.writeIntLe(2835); // Vertical print resolution in pixels/meter. (72 dpi).
    sink.writeIntLe(0);    // Palette color count.
    sink.writeIntLe(0);    // 0 important colors.

    // Pixel data.
    for (int y = height - 1; y >= 0; y--) {
      for (int x = 0; x < width; x++) {
        sink.writeByte(bitmap.blue(x, y));
        sink.writeByte(bitmap.green(x, y));
        sink.writeByte(bitmap.red(x, y));
      }

      // Padding for 4-byte alignment.
      for (int p = rowByteCountWithoutPadding; p < rowByteCount; p++) {
        sink.writeByte(0);
      }
    }
  }

从Socket里读写

  private void transfer(Socket sourceSocket, Source source, Sink sink) {
    try {
      Buffer buffer = new Buffer();
      for (long byteCount; (byteCount = source.read(buffer, 8192L)) != -1; ) {
        sink.write(buffer, byteCount);
        sink.flush();
      }
    } catch (IOException e) {
      System.out.println("transfer failed from " + sourceSocket + ": " + e);
    } finally {
      closeQuietly(sink);
      closeQuietly(source);
      closeQuietly(sourceSocket);
      openSockets.remove(sourceSocket);
    }
  }
  
  public static void main(String[] args) throws IOException {
        SocksProxyServer proxyServer = new SocksProxyServer();
        proxyServer.start();

        URL url = new URL("https://publicobject.com/helloworld.txt");
        URLConnection connection = url.openConnection(proxyServer.proxy());
        try (BufferedSource source = Okio.buffer(Okio.source(connection.getInputStream()))) {
            for (String line; (line = source.readUtf8Line()) != null; ) {
                System.out.println(line);
            }
        }

        proxyServer.shutdown();
    }

更多例子,可以参考官方给的 Okio Demo

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/145129.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • PhpStorm 2021.5.2 有效激活码(最新序列号破解)

    PhpStorm 2021.5.2 有效激活码(最新序列号破解),https://javaforall.cn/100143.html。详细ieda激活码不妨到全栈程序员必看教程网一起来了解一下吧!

  • 2020-2021 北森智鼎行测题库解题思路[通俗易懂]

    2020-2021 北森智鼎行测题库解题思路[通俗易懂]做了几套题,感觉差不多了,买了几个题库,大概看了下,题目不同,思路差不多。有需要的可以多看看解析,然后刷刷题,剩下的交给运气就好了。石墨文档地址https://shimo.im/docs/WdY3

  • Linux环境下MySql卸载[通俗易懂]

    Linux环境下MySql卸载[通俗易懂]MySQL的安装方法有很多种,常见的有yum、rpm和源码安装,那么针对不同的安装方法,也存在不同的卸载方法,其中yum和rpm安装的卸载方法一样。本节主要介绍Linux下如何彻底卸载已安装过的mysql,以便能顺利安装下一个版本的mysql。1、源码安装卸载虽然源码安装时相对复杂,但是它的卸载却很简单。只要在安装目录下直接执行makeuninstall这个命令,就可以卸载源码安装的mysql,前提是你在这之前没有执行过makeclean。如果执行过makeclean,也没关系,那就直

  • C++:无法解析的外部符号问题 与 头文件包含注意要点

    C++:无法解析的外部符号问题 与 头文件包含注意要点无法解析的外部符号

  • 数据仓库ods是什么意思_数据仓库ODS全拼

    数据仓库ods是什么意思_数据仓库ODS全拼1.引言本篇主要讲述操作数据存储(ODS)系统产生的背景、定义、特点,以及它与数据仓库的区别。在前两篇,笔者介绍了什么是数据仓库?为什么需要数据仓库?数据仓库系统的体系结构是什么?因此可能在读者心里已经形成了企业数据存储的DB~DW两层体系结构的概念,但在实际应用中,并不总是这样,有时候我们可能需要ODS这一系统来搭建DB~ODS~DW三层数据体系,那么什么是ODS?为什么需要ODS?ODS与D…

  • matlab fmincon 精度,fmincon与quadprog误差

    matlab fmincon 精度,fmincon与quadprog误差该楼层疑似违规已被系统折叠隐藏此楼查看此楼有没有人能帮我看看这两个程序的运行结果为什么会不一样啊??困扰了好久……程序一:%text1.mclearall;clc;a=[0.058,0.075,0.092,0.111,0.136,0.092]’;b=[0.062,0.085,0.128,0.149,0.164,0.148]’;alpha=[0.054,0.075,0.096,0.1…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号