深入探索 java.io 和 nio
FileInputStream/FileOutputStream
FileInputStream
和 FileOutputStream
都属于“流式” API,就像流水(Stream
)一样只能朝着一个方向读写,不能后退
FileInputStream
相当于以只读模式读文件:open(O_RDONLY) -> read -> close(fd)
FileOutputStream
相当于以只写模式写文件:open(O_WRONLY | O_CREAT | (append ? O_APPEND : O_TRUNC)) -> write -> close(fd)
// open
FileInputStream(File file)
IoBridge.open(name, O_RDONLY)
Libcore.os.open(path, flags, 0666)
Linux.open(String path, int flags, int mode)
Linux_open(JNIEnv* env, jobject, jstring javaPath, jint flags, jint mode) // libcore_io_Linux.cpp
// 系统调用 open 的参数 flags 必须包含三个访问模式(access modes)其中之一:
// O_CREAT: 如果文件不存在则创建之
// O_APPEND:以 APPEND 的模式打开文件(附加)
// O_TRUNC: 如果文件存在且以写模式打开,则把文件长度置为 0
open(pathname, flags, modes)
// read
FileInputStream.read(byte b[], int off, int len)
IoBridge.read(FileDescriptor fd, byte[] bytes, int byteOffset, int byteCount)
Libcore.os.read(FileDescriptor fd, byte[] bytes, int byteOffset, int byteCount)
Linux.readBytes(FileDescriptor fd, byte[] bytes, int byteOffset, int byteCount)
Linux_readBytes(JNIEnv* env, jobject, jobject javaFd, jobject javaBytes, jint byteOffset, jint byteCount) // libcore_io_Linux.cpp
// 系统调用,fd 有个成员属性 offset,read 从 offset 开始读取 count 个字节的数据到 buf,offset 也会随着增长 count
read(int fd, void *buf, size_t count)
// write
FileOutputStream.write(byte b[], int off, int len)
IoBridge.write(FileDescriptor fd, byte[] bytes, int byteOffset, int byteCount)
Libcore.os.write(FileDescriptor fd, byte[] bytes, int byteOffset, int byteCount)
Linux.writeBytes(FileDescriptor fd, Object buffer, int offset, int byteCount)
Linux_writeBytes(JNIEnv* env, jobject, jobject javaFd, jobject javaBytes, jint byteOffset, jint byteCount) // libcore_io_Linux.cpp
write(int fd, const void* buf, size_t count) // 系统调用
// close
FileInputStream.close()
IoBridge.closeAndSignalBlockedThreads(fd)
Libcore.os.close(fd)
Linux.close(fd)
Linux_close(JNIEnv* env, jobject, jobject javaFd) // libcore_io_Linux.cpp
close(fd) // 系统调用
RandomAccessFile
RandomAccessFile
提供了读写操作,相当于 FileInputStream
和 FileOutputStream
的组合
// open,其中 mode 与 imode 之间的映射关系为
// r - O_RDONLY - 只读
// rw - O_RDWR | O_CREAT - 读写
RandomAccessFile(File file, String mode)
IoBridge.open(name, imode)
Libcore.os.open(path, flags, 0666)
Linux.open(String path, int flags, int mode)
Linux_open(JNIEnv* env, jobject, jstring javaPath, jint flags, jint mode) // libcore_io_Linux.cpp
// 系统调用 open 的参数 flags 必须包含三个访问模式(access modes)其中之一:
// O_CREAT: 如果文件不存在则创建之
// O_APPEND:以 APPEND 的模式打开文件(附加)
// O_TRUNC: 如果文件存在且以写模式打开,则把文件长度置为 0
open(pathname, flags, modes)
// read & write
read(byte b[], int off, int len) -> 系统调用 read
write(byte b[], int off, int len) -> 系统调用 write
// seek
RandomAccessFile.seek(long pos)
Libcore.os.lseek(fd, pos, SEEK_SET)
Linux.lseek(FileDescriptor fd, long offset, int whence)
Linux_lseek(JNIEnv* env, jobject, jobject javaFd, jlong offset, jint whence) // libcore_io_Linux.cpp
// 系统调用,改变已打开的文件描述的文件偏移(fd.offset,指示下一次读写的位置),其中 whence 的取值有:
// SEEK_SET - fd.offset = offset
// SEEK_CUR - fd.offset += offset
// SEEK_END - fd.offset = fd.end + offset
lseek(int fd, off_t offset, int whence)
总之,传统的 java.io
都是基于系统调用 open
, read
, write
, lseek
和 close
FileInputStream
包装了read
FileOutputStream
包装了write
RandomAccessFile
包装了read
和write
FileChannel
FileChannel
的读写操作最终是执行系统调用 pread/pwrite
(区别于 read/write
的是它们不改变 fd.offset
)
pread()
reads up to count bytes from file descriptor fd at offset offset (from the start of the file) into the buffer starting at buf. The file offset is not changed.
pwrite()
writes up to count bytes from the buffer starting at buf to the file descriptor fd at offset offset. The file offset is not changed.
read()
attempts to read up to count bytes from file descriptor fd into the buffer starting at buf. On files that support seeking, the read operation commences at the file offset, and the file offset is incremented by the number of bytes read.
read()
attempts to read up to count bytes from file descriptor fd into the buffer starting at buf. On files that support seeking, the read operation commences at the file offset, and the file offset is incremented by the number of bytes read.
// read
FileChannel.read(ByteBuffer dst, long position)
FileChannelImpl.read(ByteBuffer dst, long position)
IOUtil.read(FileDescriptor fd, ByteBuffer dst, long position, NativeDispatcher nd)
IOUtil.readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb, long position, NativeDispatcher nd)
FileDispatcherImpl.pread0(FileDescriptor fd, long address, int len, long position)
FileDispatcherImpl_pread0(JNIEnv *env, jclass clazz, jobject fdo, jlong address, jint len, jlong offset) // FileDispatcherImpl.c
pread64(fd, buf, len, offset) // 系统调用
// write
FileChannel.write(ByteBuffer src, long position)
FileChannelImpl.write(ByteBuffer src, long position)
IOUtil.write(FileDescriptor fd, ByteBuffer src, long position, NativeDispatcher nd)
IOUtil.writeFromNativeBuffer(FileDescriptor fd, ByteBuffer bb, long position, NativeDispatcher nd)
FileDispatcherImpl.pwrite0(FileDescriptor fd, long address, int len, long position)
FileDispatcherImpl_pwrite0(JNIEnv *env, jclass clazz, jobject fdo, jlong address, jint len, jlong offset) // FileDispatcherImpl.c
pwrite64(int __fd, const void* __buf, size_t __count, off64_t __offset) // 系统调用
数组的两次复制
咋一看,FileInputStream/FileOutputStream
和 FileChannel
最终都是通过系统调用 read/write
完成文件的读写操作,那 NIO 的优势体现在哪呢?
其实在 C 层的入口点就可以看出来了,看下面的读操作,Byte[]
是作为 java object 传给 C 层的,对它的读写操作需要包裹在 GetByteArrayElements/ReleaseByteArrayElements
之间(通过 ScopedBytesRW
的构造函数和析构函数),GetByteArrayElements
会从 Byte[]
复制一份数据出来,ReleaseByteArrayElements
会回写数据到 Byte[]
,也就是说 Byte[]
在 C 层走一圈会有两次额外的复制操作,数组越大越耗资源
static jint Linux_readBytes(JNIEnv* env, jobject, jobject javaFd, jobject javaBytes, jint byteOffset, jint byteCount) {
ScopedBytesRW bytes(env, javaBytes);
if (bytes.get() == NULL) {
return -1;
}
return IO_FAILURE_RETRY(env, ssize_t, read, javaFd, bytes.get() + byteOffset, byteCount);
}
// 看下 ScopedBytesRW 是什么
class ScopedBytesRW : public ScopedBytes<false> {
public:
ScopedBytesRW(JNIEnv* env, jobject object) : ScopedBytes<false>(env, object) {}
jbyte* get() {
return mPtr;
}
};
// 貌似在 ScopedBytes 的构造函数和析构函数里做了些手脚
template<bool readOnly>
class ScopedBytes {
public:
ScopedBytes(JNIEnv* env, jobject object)
: mEnv(env), mObject(object), mByteArray(nullptr), mPtr(nullptr)
{
if (mObject == nullptr) {
jniThrowNullPointerException(mEnv);
} else {
jclass byteArrayClass = JniConstants::GetPrimitiveByteArrayClass(env);
if (mEnv->IsInstanceOf(mObject, byteArrayClass)) {
mByteArray = reinterpret_cast<jbyteArray>(mObject);
mPtr = mEnv->GetByteArrayElements(mByteArray, nullptr); // 字节数组
} else {
mPtr = reinterpret_cast<jbyte*>(mEnv->GetDirectBufferAddress(mObject)); // DirectBuffer
}
}
}
~ScopedBytes() {
if (mByteArray != nullptr) {
mEnv->ReleaseByteArrayElements(mByteArray, mPtr, readOnly ? JNI_ABORT : 0);
}
}
};
Get<PrimitiveType>ArrayElements(JNIEnv *env, ArrayType array, jboolean *isCopy)
A family of functions that returns the body of the primitive array. The result is valid until the corresponding Release
ArrayElements() function is called. Since the returned array may be a copy of the Java array, changes made to the returned array will not necessarily be reflected in the original array until Release ArrayElements() is called. If isCopy is not NULL, then *isCopy is set to JNI_TRUE if a copy is made; or it is set to JNI_FALSE if no copy is made.
Release<PrimitiveType>ArrayElements(JNIEnv *env, ArrayType array, NativeType *elems, jint mode)
A family of functions that informs the VM that the native code no longer needs access to elems. The elems argument is a pointer derived from array using the corresponding Get
ArrayElements() function. If necessary, this function copies back all changes made to elems to the original array. The mode argument provides information on how the array buffer should be released. mode has no effect if elems is not a copy of the elements in array.
0 - copy back the content and free the elems buffer
JNI_COMMIT - copy back the content but do not free the elems buffer
JNI_ABORT - free the buffer without copying back the possible changes
In most cases, programmers pass “0” to the mode argument to ensure consistent behavior for both pinned and copied arrays. The other options give the programmer more control over memory management and should be used with extreme care.
NDK: Does GetByteArrayElements copy data from Java to C++?
Get
ArrayElements may or may not copy the data as it sees fit. The isCopy output parameter will tell you whether it has been copied. If data is not copied, then you have obtained a pointer to the data directly in the Dalvik heap. Read more here. You always need to call the corresponding Release
ArrayElements, regardless of whether a copy was made. Copying data back to the VM array isn’t the only cleanup that might need to be done, although (according to the JNI documentation already linked) it is feasible that changes can be seen on the Java side before Release… has been called (iff data has not been copied). I don’t believe the VM is going to allow you to make the conversions that would be necessary to do what you are thinking. As I see it, either way you go, you will need to convert a byte array to a float or a float to a byte array in Java, which you cannot accomplish by type casting. The data is going to be copied at some point.
为啥会有数组的复制和回写?参考 老罗的 Android 之旅阅读笔记(Dalvik/ART 虚拟机篇) 我猜有两个原因:
- GC 在标记阶段需要
Stop The World
以标记需要回收的对象,但很明显 VM 没法中断 native thread - 还有
Compacting GC
会通过两个Bump Pointer Space
来回捣鼓堆上的对象以实现内存的整理和压缩,释放小块的内存碎片,这会导致堆上对象的地址发生变动,VM 可以找到堆上所有对此对象的引用并修改它们的地址,但 VM 没法修改 native 代码对此对象的引用
DirectByteBuffer
看 FileChannel
读操作的 C 层入口,不再是 Byte[]
对象而是 C 代码可以直接操作的虚拟地址 address
,系统调用 pread64
可以直接把数据写到 address
上,省去了两次复制数据的操作
FileDispatcherImpl_pread0(JNIEnv *env, jclass clazz, jobject fdo, jlong address, jint len, jlong offset)
{
jint fd = fdval(env, fdo);
void *buf = (void *)jlong_to_ptr(address);
return convertReturnVal(env, pread64(fd, buf, len, offset), JNI_TRUE);
}
为啥 FileChannel
可以直接拿到并操作虚拟地址呢?那还得由 ByteBuffer
说起
ByteBuffer 有两种:
- ByteBuffer.allocateDirect -
DirectByteBuffer
- ByteBuffer.allocate/ByteBuffer.wrap -
HeapByteBuffer
HeapByteBuffer
就是对 Byte[]
的包装,主要看下 DirectByteBuffer
DirectByteBuffer
其实也持有一个字节数组 DirectByteBuffer.MemoryRef.buffer
,不同于普通的 Byte[]
它是直接在 non_moving_space 上分配的,也就说它不会被 GC 整理和移动所以它的地址是固定不变的,native 代码可以直接在这块内存上进行读写操作
ByteBuffer.allocateDirect(int capacity)
DirectByteBuffer.MemoryRef(capacity)
VMRuntime.newNonMovableArray(componentType, length)
VMRuntime_newNonMovableArray(env, jobject, javaElementClass, length)
Array::Alloc(Thread* self, ObjPtr<Class> array_class, int32_t component_count, size_t component_size_shift, gc::AllocatorType allocator_type)
Heap::AllocObjectWithAllocator(Thread* self, ObjPtr<mirror::Class> klass, size_t byte_count, AllocatorType allocator, const PreFenceVisitor& pre_fence_visitor)
Heap::TryToAllocate(Thread* self, AllocatorType allocator_type, size_t alloc_size, size_t* bytes_allocated, size_t* usable_size, size_t* bytes_tl_bulk_allocated)
Heap->non_moving_space->Alloc(Thread* self, size_t num_bytes, size_t* bytes_allocated, size_t* usable_size, size_t* bytes_tl_bulk_allocated)
看下文档 NIO Support 是怎么说的
The NIO-related entry points allow native code to access java.nio direct buffers. The contents of a direct buffer can, potentially, reside in native memory outside of the ordinary garbage-collected heap.
jobject NewDirectByteBuffer(JNIEnv* env, void* address, jlong capacity)
Allocates and returns a direct java.nio.ByteBuffer referring to the block of memory starting at the memory address address and extending capacity bytes.
Native code that calls this function and returns the resulting byte-buffer object to Java-level code should ensure that the buffer refers to a valid region of memory that is accessible for reading and, if appropriate, writing. An attempt to access an invalid memory location from Java code will either return an arbitrary value, have no visible effect, or cause an unspecified exception to be thrown.
void* GetDirectBufferAddress(JNIEnv* env, jobject buf)
Fetches and returns the starting address of the memory region referenced by the given direct java.nio.Buffer.
This function allows native code to access the same memory region that is accessible to Java code via the buffer object.
FileChannel
在进行读写操作时,也即 java 层和 native 层之间进行数据传递时,总是会使用 DirectByteBuffer
避免数组的两次拷贝
FileChannelImpl.read(FileDescriptor fd, ByteBuffer dst, long position, NativeDispatcher nd) throws IOException {
if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
if (dst instanceof DirectBuffer)
return readIntoNativeBuffer(fd, dst, position, nd);
// Substitute a native buffer
ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
try {
int n = readIntoNativeBuffer(fd, bb, position, nd);
bb.flip();
if (n > 0)
dst.put(bb);
return n;
} finally {
Util.offerFirstTemporaryDirectBuffer(bb);
}
}
FileChannelImpl.write(FileDescriptor fd, ByteBuffer src, long position, NativeDispatcher nd) throws IOException {
if (src instanceof DirectBuffer)
return writeFromNativeBuffer(fd, src, position, nd);
// Substitute a native buffer
int pos = src.position();
int lim = src.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
try {
bb.put(src);
bb.flip();
// Do not update src until we see how many bytes were written
src.position(pos);
int n = writeFromNativeBuffer(fd, bb, position, nd);
if (n > 0) {
// now update src
src.position(pos + n);
}
return n;
} finally {
Util.offerFirstTemporaryDirectBuffer(bb);
}
}
/**
* Returns a temporary buffer of at least the given size
*/
public static ByteBuffer getTemporaryDirectBuffer(int size) {
// If a buffer of this size is too large for the cache, there
// should not be a buffer in the cache that is at least as
// large. So we'll just create a new one. Also, we don't have
// to remove the buffer from the cache (as this method does
// below) given that we won't put the new buffer in the cache.
if (isBufferTooLarge(size)) {
return ByteBuffer.allocateDirect(size);
}
BufferCache cache = bufferCache.get();
ByteBuffer buf = cache.get(size);
if (buf != null) {
return buf;
} else {
// No suitable buffer in the cache so we need to allocate a new
// one. To avoid the cache growing then we remove the first
// buffer from the cache and free it.
if (!cache.isEmpty()) {
buf = cache.removeFirst();
free(buf);
}
return ByteBuffer.allocateDirect(size);
}
}
File Lock Supporting
FileChannel.lock
通过 fcntl(F_SETLKW)
获得一个文件锁(阻塞,tryLock
是 fcntl(F_SETLK)
非阻塞),FileLock.release
则通过 F_UNLCK
释放锁
// 获得锁
FileChannel.lock()
FileChannel.lock(0, Long.MAX_VALUE, false)
FileChannelImpl.lock(long position, long size, boolean shared)
FileDispatcherImpl.lock(FileDescriptor fd, boolean blocking, long pos, long size, boolean shared)
FileDispatcherImpl.lock0(FileDescriptor fd, boolean blocking, long pos, long size, boolean shared)
FileDispatcherImpl_lock0(JNIEnv *env, jobject this, jobject fdo, jboolean block, jlong pos, jlong size, jboolean shared)
fcntl(int fd, int cmd, ... /* arg */ ) // 系统调用
// 释放锁
FileLock.release()
FileLockImpl.release()
FileChannelImpl.release(FileLockImpl fli)
FileDispatcherImpl.release(FileDescriptor fd, long pos, long size)
FileDispatcherImpl_release0(JNIEnv *env, jobject this, jobject fdo, jlong pos, jlong size)
fcntl(int __fd, int __cmd, ...) // 系统调用
fcntl - manipulate file descriptor
int fcntl(int fd, int cmd, ... /* arg */ )
Linux implements traditional (“process-associated”) UNIX record locks, as standardized by POSIX. For a Linux-specific alternative with better semantics, see the discussion of open file description locks below.
F_SETLK, F_SETLKW, and F_GETLK are used to acquire, release, and test for the existence of record locks (also known as byte-range, file-segment, or file-region locks). The third argument, lock, is a pointer to a structure that has at least the following fields (in unspecified order).
struct flock { ... short l_type; /* Type of lock: F_RDLCK, F_WRLCK, F_UNLCK */ short l_whence; /* How to interpret l_start: SEEK_SET, SEEK_CUR, SEEK_END */ off_t l_start; /* Starting offset for lock */ off_t l_len; /* Number of bytes to lock */ pid_t l_pid; /* PID of process blocking our lock (set by F_GETLK and F_OFD_GETLK) */ ... };
如果锁已被别的进程持有,F_SETLK
返回 -1 而 F_SETLKW
会阻塞直到锁被释放
F_RDLCK 读锁,共享锁,可以被多个进程持有
F_WRLCK 写锁,排它锁(包括读锁),只能被一个进程持有
F_UNLCK 释放锁
SEEK_SET 锁的 offset 由 l_start 决定
SEEK_CUR 锁的 offset = fd.offset + l_start
SEEK_END 锁的 offset = fd.end + l_start
Buffer 的基本概念
概念 | 描述 |
---|---|
capacity | 容量,固定不变的,在构造的时候就确定了 |
position | 指示器的位置,指示下一次读/写的位置 |
limit | 读模式表示 Buffer 里内容的大小(正常情况下 position <= limit <= capacity),写模式表示 Buffer 的容量(正常情况下 limit == capacity) |
remaining | limit - position |
read mode | position - 下一次读的位置,limit - Buffer 内的数据量,capacity - Buffer 容量 |
write mode | position - 下一次写的位置,limit/capacity - Buffer 容量 |
flip | limit, position = position, 0,用来把 Buffer 从写模式切换为读模式 |
rewind | position = 0 |
clear | position, limit = 0, capacity |
compat | 将 [position, limit] 这一块数据拷贝到 [0, limit - position],然后 position, limit = limit - positon, capacity |