okio 的使用及源码分析

2022-09-05 16:31:20 浏览数 (1)

大家好,又见面了,我是你们的朋友全栈君。

文章目录

  • 简介
  • 一些重要的类
    • ByteString 和 Buffer
    • Source 和 Sink
    • RealBufferedSource 和 RealBufferedSink
  • 简单使用
    • 读取文本
    • 写入文本
  • 源码分析
    • Segment
      • 成员变量
      • 构造方法
      • pop
      • push
      • writeTo
      • compact(压缩机制)
      • split(共享机制)
    • SegmentPool
      • take
      • recycle
    • Buffer
      • readByte
      • writeByte
    • 超时机制
      • Timeout
      • AsyncTimeout
        • AsyncTimeout#sink
        • AsyncTimeout#enter
        • AsyncTimeout#Watchdog
        • AsyncTimeout#awaitTimeout
        • AsyncTimeout#exit
        • 小结
  • okio 的优点
  • 参考

简介

okio 补充了 java.io 和 java.nio 的内容,使得数据访问、存储和处理更加便捷。本文将简单介绍一下 okio 的使用以及基于 okio-1.17.0 版本,对 okio 的源码进行分析,最后总结一下 okio 的优点。

一些重要的类

ByteString 和 Buffer

ByteString 和 Buffer 是 okio 的两个核心基础类

  • ByteString 是一个不可变的字节序列。String 提供了对于字符数据的各种操作,但对于二进制数据,却没有这样的存在。ByteString 的出现填补了这个空缺,它提供了对二进制数据的各种操作,例如提取子串、判等、获取位置等,也能将数据编解码为十六进制、base64 和 UTF-8 格式。
  • Buffer 是一个可变的字节序列,就像 ArrayList 一样,不需要考虑它的容量。在写入和读取元素的时候,就像队列一样,从它的头部读取数据,尾部写入数据。Buffer 实现了 BufferSource 和 BufferSink,提供了访问数据缓冲区所需的一切 API。

其他:

  • 把一个 String 编码为 utf8 时,会引用原 String,后面解码时就可以直接返回了

Source 和 Sink

Source 和 Sink 类似于 InputStream 和 OutputStream,都是 IO 操作的顶级接口。其中 Source 为输入流、Sink 为输出流。它们还有一些新特性:

  1. 提供超时机制
  2. API 更加简洁,易于实现,例如 Source 仅仅声明了 read, close, timeout 方法
  3. 为了更方便地处理数据,还提供了 BufferedSource 和 BufferedSink 接口
  4. 不再区分字节流和字符流,它们都是数据,可以按照各种类型进行读写
  5. 便于测试,Buffer 同时实现了 BufferedSource 和 BufferedSink,便于编写测试代码

RealBufferedSource 和 RealBufferedSink

RealBufferedSource 和 RealBufferedSink 分别实现了 BufferedSource 和 BufferedSink 接口。

它们都有一个 Buffer 成员变量,在执行方法时,它们并没有做什么,实际调用的是 Buffer 的该方法。这体现了装饰模式。所以这也说明了 BufferedSource 和 BufferedSink 接口的真正实现都在 Buffer 中

为什么不各自实现呢?

因为 BufferedSource 和 BufferedSink 的高效实现有很大部分是共通的,为了避免同样的逻辑重复两遍,就把读写操作都在 Buffer 中实现,这样逻辑更加紧凑,也方便修改。 另一方面,Buffer 的实现可以满足“两用数据缓冲区”的需求,即可以从头部读取数据,也可以向尾部添加数据。对于单独的读和写,也提供了两个委托类:RealBufferedSource 和 RealBufferedSink。

简单使用

读取文本

以官方 demo 为例,这里读取的是一个 File 文件:

代码语言:javascript复制
public void readLines(File file) throws IOException { 
   
  try (Source fileSource = Okio.source(file);
       BufferedSource bufferedSource = Okio.buffer(fileSource)) { 
   

    while (true) { 
   
      String line = bufferedSource.readUtf8Line();
      if (line == null) break;

      if (line.contains("square")) { 
   
        System.out.println(line);
      }
    }

  }
}

可以看到,主要步骤如下:

代码语言:javascript复制
    // 1. 构建 Source
    Source fileSource = Okio.source(file);
    // 2. 构建 BufferedSource(RealBufferedSource)
    BufferedSource bufferedSource = Okio.buffer(fileSource);
    // 3. 从 BufferedSource 中读取文本
    // 在 dmeo 中是按 utf8 格式逐行读取
    String line = bufferedSource.readUtf8Line();
  1. 根据 InputStream、File、Path 或 Socket(最终都转化为 InputStream)构建 Source
  2. 根据 Source 构建 BufferedSource(RealBufferedSource)
  3. 从 BufferedSource 中读取文本,demo 中是按 utf8 格式逐行读取。如此之外,还可以按字节读取。如果文件是自动定义的特殊结构,还可以使用 readInt、readLong 等方法。

写入文本

继续看官方 demo:

代码语言:javascript复制
public void writeEnv(File file) throws IOException { 
   
  try (Sink fileSink = Okio.sink(file);
       BufferedSink bufferedSink = Okio.buffer(fileSink)) { 
   

    for (Map.Entry<String, String> entry : System.getenv().entrySet()) { 
   
      bufferedSink.writeUtf8(entry.getKey());
      bufferedSink.writeUtf8("=");
      bufferedSink.writeUtf8(entry.getValue());
      bufferedSink.writeUtf8("n");
    }

  }

可以看到,主要步骤和读取文本时类似:

代码语言:javascript复制
    // 1. 构建 Sink
    Sink fileSink = Okio.sink(file);
    // 2. 构建 BufferedSink
    BufferedSink bufferedSink = Okio.buffer(fileSink);
    // 3. 向 BufferedSink 写入文本
    bufferedSink.writeUtf8("要写入的文本");
  1. 根据 OutputStream、File、Path 或 Socket(最终都转化为 OutputStream)构建 Sink
  2. 根据 Sink 构建 BufferedSink(RealBufferedSink)
  3. 向 BufferedSink 写入文本,除了写入 Stirng,还可以写入 byte[]、int、long。

源码分析

Segment

Segment 的字面意思是片段。okio 将 Buffer 分割成一个个 Segment,Segment 内部维护着固定长度的 byte 数组,数据存储在 byte 数组中,同时 Segment 拥有前面节点和后面节点的引用,是一个双向链表。

成员变量

代码语言:javascript复制
  // 每个 Segment 所含的字节数量
  static final int SIZE = 8192;

  // 要进行分割时,如果要分割的字节数达到该值,才进行 Segment 共享
  static final int SHARE_MINIMUM = 1024;

  // 存储数据
  final byte[] data;

  // 下一可读位置
  int pos;

  // 第一个可写位置
  int limit;

  // 当前 Segment 存储的数据是否被其他 Segment 或 ByteString 使用
  boolean shared;

  // 是否为当前数据的拥有者并可以继续写入数据
  boolean owner;

  // 指向下一个 Segment
  Segment next;

  // 指向前一个 Segment
  Segment prev;

构造方法

代码语言:javascript复制
  Segment() { 
   
    this.data = new byte[SIZE];
    this.owner = true;
    this.shared = false;
  }

  Segment(byte[] data, int pos, int limit, boolean shared, boolean owner) { 
   
    this.data = data;
    this.pos = pos;
    this.limit = limit;
    this.shared = shared;
    this.owner = owner;
  }

有两个构造方法,其中无参构造方法 owner 为 true、shared 为 false,说明数据的拥有者只有自己,没有被共享。

另一构造方法,则可以根据需要,返回相应的 Segment:

代码语言:javascript复制
  /** * Returns a new segment that shares the underlying byte array with this. Adjusting pos and limit * are safe but writes are forbidden. This also marks the current segment as shared, which * prevents it from being pooled. */
  final Segment sharedCopy() { 
   
    shared = true;
    return new Segment(data, pos, limit, true, false);
  }
代码语言:javascript复制
  /** Returns a new segment that its own private copy of the underlying byte array. */
  final Segment unsharedCopy() { 
   
    return new Segment(data.clone(), pos, limit, false, true);
  }

下面来看下 Segment 的几个方法:

pop

代码语言:javascript复制
  public final @Nullable Segment pop() { 
   
    Segment result = next != this ? next : null;
    prev.next = next;
    next.prev = prev;
    next = null;
    prev = null;
    return result;
  }

pop 方法将自己移除出链表,并将自己的前后两个节点连接起来,最后返回下一个 Segment。

既然有 pop 方法,那当然还有 push 方法:

push

代码语言:javascript复制
  public final Segment push(Segment segment) { 
   
    segment.prev = this;
    segment.next = next;
    next.prev = segment;
    next = segment;
    return segment;
  }

push 方法传入一个 Segment,将该 Segment 插入到自己后面并返回插入的 Segment。

writeTo

代码语言:javascript复制
  public final void writeTo(Segment sink, int byteCount) { 
   
	// 不能对无法写入数据的 Segment 操作
    if (!sink.owner) throw new IllegalArgumentException();
    if (sink.limit   byteCount > SIZE) { 
   
      // 正在共享的 Segment
      if (sink.shared) throw new IllegalArgumentException();
      if (sink.limit   byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
	  // 可能由于前面使用了 read 方法取出数据时导致 pos 后移(pos > 0)
	  // 这里先将从 pos 开始的数据移回到开头,即索引为 0 处,并更新 pos 和 limit
      System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
      sink.limit -= sink.pos;
      sink.pos = 0;
    }
	// 本 Segment 从 pos 开始取 byteCount 个字节写入到 sink 
    System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
	// 更新 sink 的可写位置和自己的下一读取位置
    sink.limit  = byteCount;
    pos  = byteCount;
  }

该方法从自己的 pos 位置开始,读取一定数量的字节并写入到另一 Segment 中。这是一种数据的转移,主要用于 Segment 的压缩。

compact(压缩机制)

因为每个 Segment 的存储的数据长度是固定的,如果经过一段时间后,每个 Segment 的数据长度不一,可能有些 Segment 只有很小的数据。这时就可以通过 Segment 的压缩机制进行优化,该机制通过 compact 方法实现:

代码语言:javascript复制
  public final void compact() { 
   
	// 上一节点就是自己,说明只有一个节点,无法压缩
    if (prev == this) throw new IllegalStateException();
	// 如果上一节点的数据不只自己拥有,那么不能压缩
    if (!prev.owner) return; // Cannot compact: prev isn't writable.
	// 当前自己拥有的数据量
    int byteCount = limit - pos;
	// 计算上一节点的可写入数据量
    int availableByteCount = SIZE - prev.limit   (prev.shared ? 0 : prev.pos);
	// 上一节点不足以容纳自己的数据,无法压缩
    if (byteCount > availableByteCount) return;
	// 将自己的数据写入到上一节点
    writeTo(prev, byteCount);
	// 将自己从链表中删除
    pop();
	// 回收
    SegmentPool.recycle(this);
  }

在进行压缩时,如果上一个 Segment 可以写入数据并足以容纳自己的数据,就将自己的数据写入到上一节点,然后将自己从链表中删除并回收。

split(共享机制)

split 方法可以从某个 Segment 中分割出一个新的 Segment。其中新 Segment 包含 [pos, pos byteCount) 的数据,而原 Segment 只剩下 [pos byteCount, limit) 的数据。最后新 Segment 会插入到原 Segment 的前面。具体看下该方法:

代码语言:javascript复制
  public final Segment split(int byteCount) { 
   
    if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
    
    Segment prefix;
	
	// 只有当数据量比较大时,才共享当前 Segment
    if (byteCount >= SHARE_MINIMUM) { 
   
      prefix = sharedCopy();
    } 
	// 数据量较少时不共享,将数据复制到新的 Segment
	else { 
   
      prefix = SegmentPool.take();
      System.arraycopy(data, pos, prefix.data, 0, byteCount);
    }

	// 更新新 Segment 的可写位置
    prefix.limit = prefix.pos   byteCount;
	// 更新本 Segment 的下一读取位置
    pos  = byteCount;
	// 将新 Segment 插入到自己前面
    prev.push(prefix);
	// 返回新的 Segment
    return prefix;
  }

在进行分割时,如果要分割的数据量比较巨大,那么将进行数据共享而不是数据复制,也就是说新 Segment 和原 Segment 引用同一个 byte 数组。而数据量比较小时,不会进行共享,因为太多的共享会导致 Segment 链变长,这时会进行复制。

如果是进行复制的话,新 Segment 的实例是通过 SegmentPool.take() 获得的。下面就分析一些 SegmentPool:

SegmentPool

SegmentPool 主要复制 Segment 的回收和闲置 Segment 的管理。Buffer 使用的 Segment 是从 SegmentPool 中获取的,这样可以避免频繁地创建和销毁 Segment。

这个类比较简单,只有三个成员变量,分别如下:

代码语言:javascript复制
  // SegmentPool 的最大存储字节数
  // 一个 Segment 存储 8 * 1024 个字节,所以 SegmentPool 只能存 8 个 Segment
  static final long MAX_SIZE = 64 * 1024; // 64 KiB.

  // 一个存储回收 Segment 的单向链表
  static @Nullable Segment next;

  // 当前所有 Segment 存储的总字节数
  static long byteCount;

SegmentPool 只有两个方法,一个是回收 Segment,一个是获取 Segment。

take

take 方法用于获取 Segment:

代码语言:javascript复制
  static Segment take() { 
   
    synchronized (SegmentPool.class) { 
   
      if (next != null) { 
   
        Segment result = next;
        next = result.next;
        result.next = null;
        byteCount -= Segment.SIZE;
        return result;
      }
    }
    return new Segment(); 
  }

如果当前池里没有 Segment 就新建一个 Segment 返回,否则就从单向链表中取一个 Segment 返回。

recycle

recycle 方法用于回收 Segment:

代码语言:javascript复制
  static void recycle(Segment segment) { 
   
    if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
	// 不能回收共享的 Segment
    if (segment.shared) return; 
    synchronized (SegmentPool.class) { 
   
      if (byteCount   Segment.SIZE > MAX_SIZE) return; // Pool is full.
      byteCount  = Segment.SIZE;
      segment.next = next;
      segment.pos = segment.limit = 0;
      next = segment;
    }
  }

共享状态的 Segment 不能回收,如果池没满就将回收该 Segment,将其加入单向链表中。

Buffer

Buffer 存储的是可变字节序列,内部使用 Segment 来存储元素,Segment 是一个双向循环链表,它的内部有一个 byte 数组用于存储元素。

Buffer 使用 Segment 的好处是:当元素要从一个 Buffer 移到另一个 Buffer 的时候,并不用进行数组元素的拷贝,只要改变 Segment 的所有者即可。

Buffer 既可以读,也可以写,下面分别调两个方法看一下读写的过程。

首先看 readByte 方法,该方法读取一个字节:

readByte

代码语言:javascript复制
  @Override 
  public byte readByte() { 
   
    if (size == 0) throw new IllegalStateException("size == 0");

	// 从头开始读
    Segment segment = head;
    int pos = segment.pos;
    int limit = segment.limit;

    byte[] data = segment.data;
    byte b = data[pos  ];
    size -= 1;

	// 读取元素后,若头 Segment 没有元素了,就从链表中删除并回收该 Segment
    if (pos == limit) { 
   
      head = segment.pop();
      SegmentPool.recycle(segment);
    } else { 
   
      segment.pos = pos;
    }

    return b;
  }

可以看到,读取元素的时候是从头开始读的。如果读取元素后,头 Segment 没有元素了,就从链表中删除并回收该 Segment。

继续看一个写的方法,这里看 writeByte:

writeByte

代码语言:javascript复制
  @Override 
  public Buffer writeByte(int b) { 
   
	// 返回一个满足所需容量的尾 Segment
    Segment tail = writableSegment(1);
	// 写入字节
    tail.data[tail.limit  ] = (byte) b;
    size  = 1;
    return this;
  }
代码语言:javascript复制
  Segment writableSegment(int minimumCapacity) { 
   
    if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();

	// 头 Segment 为空,则先创建一个双向循环链表
    if (head == null) { 
   
      head = SegmentPool.take(); 
      return head.next = head.prev = head;
    }

	// 找到尾 Segment,从尾部写入
    Segment tail = head.prev;
	// 若不满足容量,或者不能写入数据,就在尾部添加一个 Segment
    if (tail.limit   minimumCapacity > Segment.SIZE || !tail.owner) { 
   
      tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
    }
    return tail;
  }

可以看到,写入时是从尾部写入的,如果此时头结点为空,就会新建一个双向循环链表。如果尾 Segment 容量不足或不能写入数据,就会在尾部添加一个 Segment。

超时机制

Timeout

以输出流 Sink 为例,当我们用下面的方法包装输出流的时候:

代码语言:javascript复制
  public static Sink sink(OutputStream out) { 
   
    return sink(out, new Timeout());
  }

真正调用的是带两个参数的 sink 方法,第二个参数就是 TimeOut:

代码语言:javascript复制
  private static Sink sink(final OutputStream out, final Timeout timeout) { 
   
    if (out == null) throw new IllegalArgumentException("out == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Sink() { 
   
      @Override public void write(Buffer source, long byteCount) throws IOException { 
   
      
        while (byteCount > 0) { 
   
		  // 进行了超时判断
          timeout.throwIfReached();
            
          // ...
        }
      }

      @Override public Timeout timeout() { 
   
        return timeout;
      }
      
      // ...

    };
  }

在写入数据的时候,通过调用 Timeout 的 throwIfReached 方法进行了超时判断,该方法如下:

代码语言:javascript复制
  public void throwIfReached() throws IOException { 
   
	// 线程被 interrupted
    if (Thread.interrupted()) { 
   
      Thread.currentThread().interrupt(); // Retain interrupted status.
      throw new InterruptedIOException("interrupted");
    }

	// 到达超时时间
    if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) { 
   
      throw new InterruptedIOException("deadline reached");
    }
  }

除了 Timeout 外,还有一个 AsyncTimeout。

AsyncTimeout

AsyncTimeout 继承于 Timeout,TimeOut 用于同步计时,即在同一个线程中执行 IO 操作和计时,而 AsyncTimeout 提供了异步计时的功能。

首先看一下 AsyncTimeOut 的成员变量:

代码语言:javascript复制
  // 一次最多可以写入 64K 数据,超过该容量可能会导致在慢连接中超时,故做出限制
  private static final int TIMEOUT_WRITE_SIZE = 64 * 1024;
  
  // AsyncTimeout 单链表的头结点
  static @Nullable AsyncTimeout head;
  // 下一节点
  private @Nullable AsyncTimeout next;
  
  // 判断当前节点是否已经入队
  private boolean inQueue;
  
  // 超时时间
  private long timeoutAt;

可以看到,AsyncTimeout 是一个单链表。

什么时候会使用到 AsyncTimeout 呢?

当我们对 Socket 进行包装时就引入了异步计时机制,之所以在对 Socket 写操作时采取异步超时,是由 Socket 自身的性质决定的,Socket 经常会阻塞自己,无法同步计时,只能采取异步方式。

该过程从 Okio 的sink方法看起:

代码语言:javascript复制
  public static Sink sink(Socket socket) throws IOException { 
   
    if (socket == null) throw new IllegalArgumentException("socket == null");
    if (socket.getOutputStream() == null) throw new IOException("socket's output stream == null");
    // 把 Socket 封装到 AsyncTimeout 中
    AsyncTimeout timeout = timeout(socket);
    // 得到 Sink
    Sink sink = sink(socket.getOutputStream(), timeout);
    // AsyncTimeout 对 Sink 进行包装
    return timeout.sink(sink);
  }

该方法中,首先把 Socket 封装到 AsyncTimeout 中,得到 Sink 后,AsyncTimeout 会对 Sink 进行包装。

AsyncTimeout#sink

包装过程调用的是 AsyncTimeout 的 sink 方法:

代码语言:javascript复制
  public final Sink sink(final Sink sink) { 
   
    return new Sink() { 
   
      @Override public void write(Buffer source, long byteCount) throws IOException { 
   
          boolean throwOnTimeout = false;
          enter();
          try { 
   
            sink.write(source, toWrite);
            byteCount -= toWrite;
            throwOnTimeout = true;
          } catch (IOException e) { 
   
            throw exit(e);
          } finally { 
   
            exit(throwOnTimeout);
          }
        }
      }
    
      // ... 

    };
  }

这里只分析 write 过程,首先看 enter 方法:

AsyncTimeout#enter
代码语言:javascript复制
  public final void enter() { 
   
    if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
    long timeoutNanos = timeoutNanos();
    boolean hasDeadline = hasDeadline();
    if (timeoutNanos == 0 && !hasDeadline) { 
   
      return; // No timeout and no deadline? Don't bother with the queue.
    }
    inQueue = true;
    scheduleTimeout(this, timeoutNanos, hasDeadline);
  }

先判断了一下入队状态,真正的调用是在 scheduleTimeout 方法中:

代码语言:javascript复制
  private static synchronized void scheduleTimeout(
      AsyncTimeout node, long timeoutNanos, boolean hasDeadline) { 
   
	
	// 还未开启 Watchdog 线程,先开启 Watchdog 线程
    if (head == null) { 
   
      head = new AsyncTimeout();
      new Watchdog().start();
    }

	// 设置超时时间
    long now = System.nanoTime();
    if (timeoutNanos != 0 && hasDeadline) { 
   
      node.timeoutAt = now   Math.min(timeoutNanos, node.deadlineNanoTime() - now);
    } else if (timeoutNanos != 0) { 
   
      node.timeoutAt = now   timeoutNanos;
    } else if (hasDeadline) { 
   
      node.timeoutAt = node.deadlineNanoTime();
    } else { 
   
      throw new AssertionError();
    }
	
	// 当前 AsyncTimeout 的剩余时间
    long remainingNanos = node.remainingNanos(now);
    for (AsyncTimeout prev = head; true; prev = prev.next) { 
   
	  // 找到插入位置(如果某节点的下一个节点为 null或下一节点的剩余时间更长,就插入到该节点后面)
      if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) { 
   
        node.next = prev.next;
        prev.next = node;
		// 如果插入到了最前面
        if (prev == head) { 
   
          AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
        }
        break;
      }
    }
  }

该方法中,如果还未开启 Watchdog 线程,先开启 Watchdog 线程。然后设定超时时间以及将当前 AsyncTimeout 插入到链表的合适位置。

先看一下 Watchdog 线程:

AsyncTimeout#Watchdog
代码语言:javascript复制
  private static final class Watchdog extends Thread { 
   
  
    public void run() { 
   
      while (true) { 
   
        try { 
   
          AsyncTimeout timedOut;
          synchronized (AsyncTimeout.class) { 
   
			// 得到超时的节点
            timedOut = awaitTimeout();

            // 未找到超时节点,但链表还有节点,再次尝试
            if (timedOut == null) continue;
            
            // 链表没有节点了,将 head 置空,并退出线程
            if (timedOut == head) { 
   
              head = null;
              return;
            }
          }
          
          // 回调,外部重新改方法进行超时处理,例如关闭 Socket
          timedOut.timedOut();
        } catch (InterruptedException ignored) { 
   
        }
      }
    }
  }

可以看到,真正进行超时判断的是 awaitTimeout 方法:

AsyncTimeout#awaitTimeout
代码语言:javascript复制
  static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException { 
   

	// 拿到下一节点
    AsyncTimeout node = head.next;

    if (node == null) { 
   
      long startNanos = System.nanoTime();
	  // 等待一段时间
      AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
	  // 经过一段时间后还没有节点,就会返回 head,否则返回 null
      return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
          ? head  // The idle timeout elapsed.
          : null; // The situation has changed.
    }

	// 该节点剩余的时间
    long waitNanos = node.remainingNanos(System.nanoTime());

	// 如果该节点还未超时,继续等待,等待时间结束后返回 null
    if (waitNanos > 0) { 
   
      long waitMillis = waitNanos / 1000000L;
      waitNanos -= (waitMillis * 1000000L);
      AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
      return null;
    }
	
	// 移除并返回超时的节点
    head.next = node.next;
    node.next = null;
    return node;
  }

该方法主要是为了得到超时的节点,并将超时的节点移除出链表。

下面回到 sink,继续看 write 方法,在调用完 enter 方法后,会调用 Buffer 的 write 方法进行写入,无论是写入过程抛出了异常,还是最后正常结束,都会在 finally 块中调用 exit(boolean) 方法:

AsyncTimeout#exit
代码语言:javascript复制
  final void exit(boolean throwOnTimeout) throws IOException { 
   
    // 是否超时
    boolean timedOut = exit();
    if (timedOut && throwOnTimeout) throw newTimeoutException(null);
  }
  
  public final boolean exit() { 
   
    if (!inQueue) return false;
    inQueue = false;
    return cancelScheduledTimeout(this);
  }
  
  /** Returns true if the timeout occurred. */
  private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) { 
   
    // 从链表找到该节点并移除,能找到说明未超时
    for (AsyncTimeout prev = head; prev != null; prev = prev.next) { 
   
      if (prev.next == node) { 
   
        prev.next = node.next;
        node.next = null;
        return false;
      }
    }

    // 未找到该节点,说明发生了超时,在 Watchdog 线程中删除了该节点
    return true;
  }

在该方法中,如果发现超时的话,就会抛出异常,没有超时的话,就将 AsyncTimeout 移除出单链表。

小结

AsyncTimeout 用于异步计时,采取异步是为了应对某些读写时会发生阻塞的情况,例如对 Socket 进行写操作时,Socket 经常会阻塞自己,无法同步计时,只能采取异步方式。

在开始读写操作前,会给 AsyncTimeout 设置超时时间,并将其加入到一个单链表中,单链表中的节点按照剩余时间由短到长排列。在另一个线程里,会对这条链表进行监控,找到超时的节点,找到后回调给外界,外界就可以进行相应的回收处理,例如关闭 Socket。在读写操作完成后,如果没有发生超时,就将该 AsyncTimeout 从单链表中删除;如果发生超时,就会抛出异常。

okio 的优点

  1. 使用方便
  • 不再区分字节流和字符流,只有 Source 和 Sink,分别对应输入流和输出流。
  • Buffer 实现了 BufferedSource 和 BufferedSink,同时具有读和写的功能,提供了读写所需的一切 API。
  • 提供了 ByteString,可以很方便地对二进制数据进行各种操作和进行各种值的转换。
  1. 速度快
  • okio 对 Buffer 存储的数据进行了分块处理,每个 Segment 用一个双向循环链表连接起来,这样在进行 IO 操作时就可以以块为单位进行操作,提高吞吐量。
  • 由于 Buffer 是以 Segment 链表来管理元素,所以在 Buffer 间要转移元素的时候,并不用进行数组元素的拷贝,只要改变链表头指针的所有者即可。
  • ByteString 会保留一份对原来 String 的引用,这样当你把 UTF-8 的 String 转换为 ByteString 后,下次再要解码出 String 时就直接返回之前的 String,速度非常快。
  1. 提供超时机制
  • 内部会根据不同情况进行同步计时或异步计时,超时后会抛出异常,不会阻塞线程。而且不仅在 IO 操作有超时判断,在 flush、close 等方法也有超时判断。
  1. 内存复用
  • okio 有一个 SegmentPool,可以用来回收和获取空闲 Segment。在高频率通信时,可以有效避免频繁的 GC。

参考

  • square/okio
  • Okio深入分析——基础使用部分
  • 拆轮子系列:拆 Okio
  • OKHttp源码解析(五)–OKIO简介及FileSystem
  • Okio源码分析
  • 关于Okio的优点缓存byte[],避免频繁GC

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/135438.html原文链接:https://javaforall.cn

0 人点赞