当我用Channels.newChannel(is)在InputStream之外创建一个频道时,java标准库返回一个ReadableByteChannelImpl,它是:
private static class ReadableByteChannelImpl
extends AbstractInterruptibleChannel // Not really interruptible
implements ReadableByteChannel
{
InputStream in;
private static final int TRANSFER_SIZE = 8192;
private byte buf[] = new byte[0];
private boolean open = true;
private Object readLock = new Object();
ReadableByteChannelImpl(InputStream in) {
this.in = in;
}
public int read(ByteBuffer dst) throws IOException {
int len = dst.remaining();
int totalRead = 0;
int bytesRead = 0;
synchronized (readLock) {
while (totalRead < len) {
int bytesToRead = Math.min((len - totalRead),
TRANSFER_SIZE);
if (buf.length < bytesToRead)
buf = new byte[bytesToRead];
if ((totalRead > 0) && !(in.available() > 0))
break; // block at most once
try {
begin();
bytesRead = in.read(buf, 0, bytesToRead);
} finally {
end(bytesRead > 0);
}
if (bytesRead < 0)
break;
else
totalRead += bytesRead;
dst.put(buf, 0, bytesRead);
}
if ((bytesRead < 0) && (totalRead == 0))
return -1;
return totalRead;
}
}
protected void implCloseChannel() throws IOException {
in.close();
open = false;
}
}正如您所看到的,它在第一次调用read(ByteBuffer dst)时阻塞,并且再也不会阻塞。请参见:
if ((totalRead > 0) && !(in.available() > 0))
break; // block at most once这种奇怪的行为背后的原因是什么?
另外,在不真正使此通道可中断的情况下扩展AbstractInterruptibleChannel的动机是什么?
发布于 2009-12-13 06:02:31
如果它已经读取了至少一个字节,并且底层流声明没有字节为,那么它就不会阻塞。请注意,即使有一些字节可用,InputStream#available()也可以返回零,但它不应该承诺超过在没有阻塞的情况下可以读取的字节数。因此,这个ReadableByteChannel努力读取至少一个字节--假设所提供的ByteBuffer有至少一个字节的空间--并且在这样做之后,将不会再次尝试读取底层流,除非流承诺有更多的字节可用而不会阻塞。
至于为什么ReadableByteChannelImpl扩展AbstractInterruptibleChannel,我怀疑这是为了确保包装的InputStream在调用Channel#close()时将被正确关闭,InterruptibleChannel#close()进一步细化了它的契约。扩展AbstractInterruptibleChannel允许ReadableByteChannelImpl借用其线程安全的、打开-关闭的状态保护。
正如您所说,这有点虚假的广告,不是真正的可中断的,但它可以容忍从单独的线程关闭,并使这样做是幂等的。
https://stackoverflow.com/questions/1894727
复制相似问题