深入探索 java.io 和 nio

FileInputStream/FileOutputStream

FileInputStreamFileOutputStream 都属于“流式” API,就像流水(Stream)一样只能朝着一个方向读写,不能后退

FileInputStream 相当于以只读模式读文件:open(O_RDONLY) -> read -> close(fd)

FileOutputStream 相当于以只写模式写文件:open(O_WRONLY | O_CREAT | (append ? O_APPEND : O_TRUNC)) -> write -> close(fd)

// open
FileInputStream(File file)
IoBridge.open(name, O_RDONLY)
Libcore.os.open(path, flags, 0666)
Linux.open(String path, int flags, int mode)
Linux_open(JNIEnv* env, jobject, jstring javaPath, jint flags, jint mode)    // libcore_io_Linux.cpp
// 系统调用 open 的参数 flags 必须包含三个访问模式(access modes)其中之一:
// O_CREAT: 如果文件不存在则创建之
// O_APPEND:以 APPEND 的模式打开文件(附加)
// O_TRUNC: 如果文件存在且以写模式打开,则把文件长度置为 0
open(pathname, flags, modes)


// read
FileInputStream.read(byte b[], int off, int len)
IoBridge.read(FileDescriptor fd, byte[] bytes, int byteOffset, int byteCount)
Libcore.os.read(FileDescriptor fd, byte[] bytes, int byteOffset, int byteCount)
Linux.readBytes(FileDescriptor fd, byte[] bytes, int byteOffset, int byteCount)
Linux_readBytes(JNIEnv* env, jobject, jobject javaFd, jobject javaBytes, jint byteOffset, jint byteCount)    // libcore_io_Linux.cpp
// 系统调用,fd 有个成员属性 offset,read 从 offset 开始读取 count 个字节的数据到 buf,offset 也会随着增长 count
read(int fd, void *buf, size_t count)


// write
FileOutputStream.write(byte b[], int off, int len)
IoBridge.write(FileDescriptor fd, byte[] bytes, int byteOffset, int byteCount)
Libcore.os.write(FileDescriptor fd, byte[] bytes, int byteOffset, int byteCount)
Linux.writeBytes(FileDescriptor fd, Object buffer, int offset, int byteCount)
Linux_writeBytes(JNIEnv* env, jobject, jobject javaFd, jobject javaBytes, jint byteOffset, jint byteCount)    // libcore_io_Linux.cpp
write(int fd, const void* buf, size_t count)    // 系统调用


// close
FileInputStream.close()
IoBridge.closeAndSignalBlockedThreads(fd)
Libcore.os.close(fd)
Linux.close(fd)
Linux_close(JNIEnv* env, jobject, jobject javaFd)    // libcore_io_Linux.cpp
close(fd)    // 系统调用

RandomAccessFile

RandomAccessFile 提供了读写操作,相当于 FileInputStreamFileOutputStream 的组合

// open,其中 mode 与 imode 之间的映射关系为 
// r  - O_RDONLY         - 只读
// rw - O_RDWR | O_CREAT - 读写
RandomAccessFile(File file, String mode)
IoBridge.open(name, imode)
Libcore.os.open(path, flags, 0666)
Linux.open(String path, int flags, int mode)
Linux_open(JNIEnv* env, jobject, jstring javaPath, jint flags, jint mode)    // libcore_io_Linux.cpp
// 系统调用 open 的参数 flags 必须包含三个访问模式(access modes)其中之一:
// O_CREAT: 如果文件不存在则创建之
// O_APPEND:以 APPEND 的模式打开文件(附加)
// O_TRUNC: 如果文件存在且以写模式打开,则把文件长度置为 0
open(pathname, flags, modes)


// read & write
read(byte b[], int off, int len) -> 系统调用 read
write(byte b[], int off, int len) -> 系统调用 write


// seek
RandomAccessFile.seek(long pos)
Libcore.os.lseek(fd, pos, SEEK_SET)
Linux.lseek(FileDescriptor fd, long offset, int whence)
Linux_lseek(JNIEnv* env, jobject, jobject javaFd, jlong offset, jint whence)    // libcore_io_Linux.cpp
// 系统调用,改变已打开的文件描述的文件偏移(fd.offset,指示下一次读写的位置),其中 whence 的取值有:
// SEEK_SET - fd.offset = offset
// SEEK_CUR - fd.offset += offset
// SEEK_END - fd.offset = fd.end + offset
lseek(int fd, off_t offset, int whence)

总之,传统的 java.io 都是基于系统调用 open, read, write, lseekclose

  • FileInputStream 包装了 read
  • FileOutputStream 包装了 write
  • RandomAccessFile 包装了 readwrite

FileChannel

FileChannel 的读写操作最终是执行系统调用 pread/pwrite(区别于 read/write 的是它们不改变 fd.offset

pread() reads up to count bytes from file descriptor fd at offset offset (from the start of the file) into the buffer starting at buf. The file offset is not changed.

pwrite() writes up to count bytes from the buffer starting at buf to the file descriptor fd at offset offset. The file offset is not changed.

read() attempts to read up to count bytes from file descriptor fd into the buffer starting at buf. On files that support seeking, the read operation commences at the file offset, and the file offset is incremented by the number of bytes read.

read() attempts to read up to count bytes from file descriptor fd into the buffer starting at buf. On files that support seeking, the read operation commences at the file offset, and the file offset is incremented by the number of bytes read.

// read
FileChannel.read(ByteBuffer dst, long position)
FileChannelImpl.read(ByteBuffer dst, long position)
IOUtil.read(FileDescriptor fd, ByteBuffer dst, long position, NativeDispatcher nd)
IOUtil.readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb, long position, NativeDispatcher nd)
FileDispatcherImpl.pread0(FileDescriptor fd, long address, int len, long position)
FileDispatcherImpl_pread0(JNIEnv *env, jclass clazz, jobject fdo, jlong address, jint len, jlong offset)    // FileDispatcherImpl.c
pread64(fd, buf, len, offset)    // 系统调用


// write
FileChannel.write(ByteBuffer src, long position)
FileChannelImpl.write(ByteBuffer src, long position)
IOUtil.write(FileDescriptor fd, ByteBuffer src, long position, NativeDispatcher nd)
IOUtil.writeFromNativeBuffer(FileDescriptor fd, ByteBuffer bb, long position, NativeDispatcher nd)
FileDispatcherImpl.pwrite0(FileDescriptor fd, long address, int len, long position)
FileDispatcherImpl_pwrite0(JNIEnv *env, jclass clazz, jobject fdo, jlong address, jint len, jlong offset)    // FileDispatcherImpl.c
pwrite64(int __fd, const void* __buf, size_t __count, off64_t __offset)    // 系统调用

数组的两次复制

咋一看,FileInputStream/FileOutputStreamFileChannel 最终都是通过系统调用 read/write 完成文件的读写操作,那 NIO 的优势体现在哪呢?

其实在 C 层的入口点就可以看出来了,看下面的读操作,Byte[] 是作为 java object 传给 C 层的,对它的读写操作需要包裹在 GetByteArrayElements/ReleaseByteArrayElements 之间(通过 ScopedBytesRW 的构造函数和析构函数),GetByteArrayElements 会从 Byte[] 复制一份数据出来,ReleaseByteArrayElements 会回写数据到 Byte[],也就是说 Byte[] 在 C 层走一圈会有两次额外的复制操作,数组越大越耗资源

static jint Linux_readBytes(JNIEnv* env, jobject, jobject javaFd, jobject javaBytes, jint byteOffset, jint byteCount) {
    ScopedBytesRW bytes(env, javaBytes);
    if (bytes.get() == NULL) {
        return -1;
    }
    return IO_FAILURE_RETRY(env, ssize_t, read, javaFd, bytes.get() + byteOffset, byteCount);
}

// 看下 ScopedBytesRW 是什么
class ScopedBytesRW : public ScopedBytes<false> {
public:
    ScopedBytesRW(JNIEnv* env, jobject object) : ScopedBytes<false>(env, object) {}
    jbyte* get() {
        return mPtr;
    }
};

// 貌似在 ScopedBytes 的构造函数和析构函数里做了些手脚
template<bool readOnly>
class ScopedBytes {
public:
    ScopedBytes(JNIEnv* env, jobject object)
    : mEnv(env), mObject(object), mByteArray(nullptr), mPtr(nullptr)
    {
        if (mObject == nullptr) {
            jniThrowNullPointerException(mEnv);
        } else {
            jclass byteArrayClass = JniConstants::GetPrimitiveByteArrayClass(env);
            if (mEnv->IsInstanceOf(mObject, byteArrayClass)) {
                mByteArray = reinterpret_cast<jbyteArray>(mObject);
                mPtr = mEnv->GetByteArrayElements(mByteArray, nullptr);                 // 字节数组
            } else {
                mPtr = reinterpret_cast<jbyte*>(mEnv->GetDirectBufferAddress(mObject)); // DirectBuffer
            }
        }
    }

    ~ScopedBytes() {
        if (mByteArray != nullptr) {
            mEnv->ReleaseByteArrayElements(mByteArray, mPtr, readOnly ? JNI_ABORT : 0);
        }
    }
};

JNI Functions

Get<PrimitiveType>ArrayElements(JNIEnv *env, ArrayType array, jboolean *isCopy)

A family of functions that returns the body of the primitive array. The result is valid until the corresponding ReleaseArrayElements() function is called. Since the returned array may be a copy of the Java array, changes made to the returned array will not necessarily be reflected in the original array until ReleaseArrayElements() is called.

If isCopy is not NULL, then *isCopy is set to JNI_TRUE if a copy is made; or it is set to JNI_FALSE if no copy is made.

Release<PrimitiveType>ArrayElements(JNIEnv *env, ArrayType array, NativeType *elems, jint mode)

A family of functions that informs the VM that the native code no longer needs access to elems. The elems argument is a pointer derived from array using the corresponding GetArrayElements() function. If necessary, this function copies back all changes made to elems to the original array.

The mode argument provides information on how the array buffer should be released. mode has no effect if elems is not a copy of the elements in array.

0 - copy back the content and free the elems buffer

JNI_COMMIT - copy back the content but do not free the elems buffer

JNI_ABORT - free the buffer without copying back the possible changes

In most cases, programmers pass “0” to the mode argument to ensure consistent behavior for both pinned and copied arrays. The other options give the programmer more control over memory management and should be used with extreme care.

NDK: Does GetByteArrayElements copy data from Java to C++?

GetArrayElements may or may not copy the data as it sees fit. The isCopy output parameter will tell you whether it has been copied. If data is not copied, then you have obtained a pointer to the data directly in the Dalvik heap. Read more here.

You always need to call the corresponding ReleaseArrayElements, regardless of whether a copy was made. Copying data back to the VM array isn’t the only cleanup that might need to be done, although (according to the JNI documentation already linked) it is feasible that changes can be seen on the Java side before Release… has been called (iff data has not been copied).

I don’t believe the VM is going to allow you to make the conversions that would be necessary to do what you are thinking. As I see it, either way you go, you will need to convert a byte array to a float or a float to a byte array in Java, which you cannot accomplish by type casting. The data is going to be copied at some point.

为啥会有数组的复制和回写?参考 老罗的 Android 之旅阅读笔记(Dalvik/ART 虚拟机篇) 我猜有两个原因:

  1. GC 在标记阶段需要 Stop The World 以标记需要回收的对象,但很明显 VM 没法中断 native thread
  2. 还有 Compacting GC 会通过两个 Bump Pointer Space 来回捣鼓堆上的对象以实现内存的整理和压缩,释放小块的内存碎片,这会导致堆上对象的地址发生变动,VM 可以找到堆上所有对此对象的引用并修改它们的地址,但 VM 没法修改 native 代码对此对象的引用

DirectByteBuffer

FileChannel 读操作的 C 层入口,不再是 Byte[] 对象而是 C 代码可以直接操作的虚拟地址 address,系统调用 pread64 可以直接把数据写到 address 上,省去了两次复制数据的操作

FileDispatcherImpl_pread0(JNIEnv *env, jclass clazz, jobject fdo, jlong address, jint len, jlong offset)
{
    jint fd = fdval(env, fdo);
    void *buf = (void *)jlong_to_ptr(address);
    return convertReturnVal(env, pread64(fd, buf, len, offset), JNI_TRUE);
}

为啥 FileChannel 可以直接拿到并操作虚拟地址呢?那还得由 ByteBuffer 说起

ByteBuffer 有两种:

  1. ByteBuffer.allocateDirect - DirectByteBuffer
  2. ByteBuffer.allocate/ByteBuffer.wrap - HeapByteBuffer

HeapByteBuffer 就是对 Byte[] 的包装,主要看下 DirectByteBuffer

DirectByteBuffer 其实也持有一个字节数组 DirectByteBuffer.MemoryRef.buffer,不同于普通的 Byte[] 它是直接在 non_moving_space 上分配的,也就说它不会被 GC 整理和移动所以它的地址是固定不变的,native 代码可以直接在这块内存上进行读写操作

ByteBuffer.allocateDirect(int capacity)
DirectByteBuffer.MemoryRef(capacity)
VMRuntime.newNonMovableArray(componentType, length)
VMRuntime_newNonMovableArray(env, jobject, javaElementClass, length)
Array::Alloc(Thread* self, ObjPtr<Class> array_class, int32_t component_count, size_t component_size_shift, gc::AllocatorType allocator_type)
Heap::AllocObjectWithAllocator(Thread* self, ObjPtr<mirror::Class> klass, size_t byte_count, AllocatorType allocator, const PreFenceVisitor& pre_fence_visitor)
Heap::TryToAllocate(Thread* self, AllocatorType allocator_type, size_t alloc_size, size_t* bytes_allocated, size_t* usable_size, size_t* bytes_tl_bulk_allocated)
Heap->non_moving_space->Alloc(Thread* self, size_t num_bytes, size_t* bytes_allocated, size_t* usable_size, size_t* bytes_tl_bulk_allocated)

看下文档 NIO Support 是怎么说的

The NIO-related entry points allow native code to access java.nio direct buffers. The contents of a direct buffer can, potentially, reside in native memory outside of the ordinary garbage-collected heap.

jobject NewDirectByteBuffer(JNIEnv* env, void* address, jlong capacity)

Allocates and returns a direct java.nio.ByteBuffer referring to the block of memory starting at the memory address address and extending capacity bytes.

Native code that calls this function and returns the resulting byte-buffer object to Java-level code should ensure that the buffer refers to a valid region of memory that is accessible for reading and, if appropriate, writing. An attempt to access an invalid memory location from Java code will either return an arbitrary value, have no visible effect, or cause an unspecified exception to be thrown.

void* GetDirectBufferAddress(JNIEnv* env, jobject buf)

Fetches and returns the starting address of the memory region referenced by the given direct java.nio.Buffer.

This function allows native code to access the same memory region that is accessible to Java code via the buffer object.

FileChannel 在进行读写操作时,也即 java 层和 native 层之间进行数据传递时,总是会使用 DirectByteBuffer 避免数组的两次拷贝

FileChannelImpl.read(FileDescriptor fd, ByteBuffer dst, long position, NativeDispatcher nd) throws IOException {
    if (dst.isReadOnly())
        throw new IllegalArgumentException("Read-only buffer");
    if (dst instanceof DirectBuffer)
        return readIntoNativeBuffer(fd, dst, position, nd);
    // Substitute a native buffer
    ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
    try {
        int n = readIntoNativeBuffer(fd, bb, position, nd);
        bb.flip();
        if (n > 0)
            dst.put(bb);
        return n;
    } finally {
        Util.offerFirstTemporaryDirectBuffer(bb);
    }
}

FileChannelImpl.write(FileDescriptor fd, ByteBuffer src, long position, NativeDispatcher nd) throws IOException {
    if (src instanceof DirectBuffer)
        return writeFromNativeBuffer(fd, src, position, nd);
    // Substitute a native buffer
    int pos = src.position();
    int lim = src.limit();
    assert (pos <= lim);
    int rem = (pos <= lim ? lim - pos : 0);
    ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
    try {
        bb.put(src);
        bb.flip();
        // Do not update src until we see how many bytes were written
        src.position(pos);
        int n = writeFromNativeBuffer(fd, bb, position, nd);
        if (n > 0) {
            // now update src
            src.position(pos + n);
        }
        return n;
    } finally {
        Util.offerFirstTemporaryDirectBuffer(bb);
    }
}

/**
 * Returns a temporary buffer of at least the given size
 */
public static ByteBuffer getTemporaryDirectBuffer(int size) {
    // If a buffer of this size is too large for the cache, there
    // should not be a buffer in the cache that is at least as
    // large. So we'll just create a new one. Also, we don't have
    // to remove the buffer from the cache (as this method does
    // below) given that we won't put the new buffer in the cache.
    if (isBufferTooLarge(size)) {
        return ByteBuffer.allocateDirect(size);
    }
    BufferCache cache = bufferCache.get();
    ByteBuffer buf = cache.get(size);
    if (buf != null) {
        return buf;
    } else {
        // No suitable buffer in the cache so we need to allocate a new
        // one. To avoid the cache growing then we remove the first
        // buffer from the cache and free it.
        if (!cache.isEmpty()) {
            buf = cache.removeFirst();
            free(buf);
        }
        return ByteBuffer.allocateDirect(size);
    }
}

File Lock Supporting

FileChannel.lock 通过 fcntl(F_SETLKW) 获得一个文件锁(阻塞,tryLockfcntl(F_SETLK) 非阻塞),FileLock.release 则通过 F_UNLCK 释放锁

// 获得锁
FileChannel.lock()
FileChannel.lock(0, Long.MAX_VALUE, false)
FileChannelImpl.lock(long position, long size, boolean shared)
FileDispatcherImpl.lock(FileDescriptor fd, boolean blocking, long pos, long size, boolean shared)
FileDispatcherImpl.lock0(FileDescriptor fd, boolean blocking, long pos, long size, boolean shared)
FileDispatcherImpl_lock0(JNIEnv *env, jobject this, jobject fdo, jboolean block, jlong pos, jlong size, jboolean shared)
fcntl(int fd, int cmd, ... /* arg */ )    // 系统调用

// 释放锁
FileLock.release()
FileLockImpl.release()
FileChannelImpl.release(FileLockImpl fli)
FileDispatcherImpl.release(FileDescriptor fd, long pos, long size)
FileDispatcherImpl_release0(JNIEnv *env, jobject this, jobject fdo, jlong pos, jlong size)
fcntl(int __fd, int __cmd, ...)    // 系统调用

fcntl - manipulate file descriptor

int fcntl(int fd, int cmd, ... /* arg */ )

Linux implements traditional (“process-associated”) UNIX record locks, as standardized by POSIX. For a Linux-specific alternative with better semantics, see the discussion of open file description locks below.

F_SETLK, F_SETLKW, and F_GETLK are used to acquire, release, and test for the existence of record locks (also known as byte-range, file-segment, or file-region locks). The third argument, lock, is a pointer to a structure that has at least the following fields (in unspecified order).

struct flock {
    ...
    short l_type;    /* Type of lock: F_RDLCK,
                        F_WRLCK, F_UNLCK */
    short l_whence;  /* How to interpret l_start:
                        SEEK_SET, SEEK_CUR, SEEK_END */
    off_t l_start;   /* Starting offset for lock */
    off_t l_len;     /* Number of bytes to lock */
    pid_t l_pid;     /* PID of process blocking our lock
                        (set by F_GETLK and F_OFD_GETLK) */
    ...
};

如果锁已被别的进程持有,F_SETLK 返回 -1 而 F_SETLKW 会阻塞直到锁被释放

  1. F_RDLCK 读锁,共享锁,可以被多个进程持有

  2. F_WRLCK 写锁,排它锁(包括读锁),只能被一个进程持有

  3. F_UNLCK 释放锁

  4. SEEK_SET 锁的 offset 由 l_start 决定

  5. SEEK_CUR 锁的 offset = fd.offset + l_start

  6. SEEK_END 锁的 offset = fd.end + l_start

Buffer 的基本概念

概念 描述
capacity 容量,固定不变的,在构造的时候就确定了
position 指示器的位置,指示下一次读/写的位置
limit 读模式表示 Buffer 里内容的大小(正常情况下 position <= limit <= capacity),写模式表示 Buffer 的容量(正常情况下 limit == capacity)
remaining limit - position
read mode position - 下一次读的位置,limit - Buffer 内的数据量,capacity - Buffer 容量
write mode position - 下一次写的位置,limit/capacity - Buffer 容量
flip limit, position = position, 0,用来把 Buffer 从写模式切换为读模式
rewind position = 0
clear position, limit = 0, capacity
compat 将 [position, limit] 这一块数据拷贝到 [0, limit - position],然后 position, limit = limit - positon, capacity