使用Go进行io_uring的动手实践( 三 )


io_uring_queue_(init|exit)io_uring_get_sqeio_uring_prep_(readv|writev|other)io_uring_sqe_set_dataio_uring_(wait|peek)_cqeio_uring_cqe_get_dataio_uring_cqe_seen在Go中包装io_uring有很多理论需要消化 。为了简洁起见 , 我特意跳过了更多内容 。现在 , 让我们回到用Go语言编写一些代码 , 并尝试一下 。
为了简单和安全起见 , 我们将使用该 liburing 库 , 这意味着我们将需要使用CGo 。很好 , 因为这只是一个玩具 , 正确的方法是 获得 本机支持 在Go运行时中。结果 , 不幸的是 , 我们将不得不使用回调 。在本机Go中 , 正在运行的goroutine将在运行时进入睡眠状态 , 然后在完成队列中的数据可用时被唤醒 。
让我们给程序包命名 frodo (就像这样 , 我淘汰了计算机科学中两个最困难的问题之一) 。我们将只有一个非常简单的API来读写文件 。还有另外两个功能可在完成后设置和清理环 。
我们的主要力量将是单个goroutine , 它将接受提交请求并将其推送到SQ 。然后从C中使用CQE条目对Go进行回调 。我们将使用 fd 一旦获得数据 ,文件的来知道要执行哪个回调 。但是 , 我们还需要确定何时将队列实际提交给内核 。我们维持一个队列阈值 , 如果超过了未决请求的阈值 , 我们将提交 。而且 , 我们向用户提供了另一个功能 , 允许他们自己进行提交 , 以使他们可以更好地控制应用程序的行为 。
再次注意 , 这是一种低效的处理方式 。由于CQ和SQ完全分开 , 因此它们根本不需要任何锁定 , 因此提交和完成可以从不同的线程中自由进行 。理想情况下 , 我们只需将一个条目推送到SQ并让一个单独的goroutine监听等待完成的时间 , 每当看到一个条目时 , 我们都会进行回调并回到等待状态 。还记得我们可以用来 io_uring_enter 完成工作吗? 这是一个这样的例子! 这仍然使每个CQE条目只有一个系统调用 , 我们甚至可以通过指定要等待的CQE条目数来进一步优化它 。
回到我们的简化模型 , 这是它的样子的伪代码:
// ReadFile reads a file from the given path and returns the result as a byte slice// in the passed callback function.func ReadFile(path string, cb func(buf []byte)) error {f, err := os.Open(path)// handle errorfi, err := f.Stat()// handle errorsubmitChan <---tt-darkmode-bgcolor: #BEBEBF;">submitChan 将请求发送给我们的主要工作人员 , 由他们负责提交 。这是伪代码:
queueSize := 0for {select {case sqe := <-submitChan:switch sqe.code {case opCodeRead:// We store the fd in our cbMap to be called later from the callback from C.cbMap[sqe.f.Fd()] = cbInfo{readCb: sqe.readCb,close:sqe.f.Close,}C.push_read_request(C.int(sqe.f.Fd()), C.long(sqe.size))case opCodeWrite:cbMap[sqe.f.Fd()] = cbInfo{writeCb: sqe.writeCb,close:sqe.f.Close,}C.push_write_request(C.int(sqe.f.Fd()), ptr, C.long(len(sqe.buf)))}queueSize++if queueSize > queueThreshold { // if queue_size > threshold, then pop all.submitAndPop(queueSize)queueSize = 0}case <-pollChan:if queueSize > 0 {submitAndPop(queueSize)queueSize = 0}case <-quitChan:// possibly drain channel.// pop_request till everything is done.return}}cbMap 将文件描述符映射到要调用的实际回调函数 。当CGo代码调用Go代码来表示事件完成 代码 submitAndPop 调用时 ,io_uring_submit_and_wait 使用此 queueSize, 然后从CQ弹出条目 。
让我们来看看成什么 C.push_read_request 和 C.push_write_request 做 。他们实际上所做的只是向SQ推送读/写请求 。
他们看起来像这样:
int push_read_request(int file_fd, off_t file_sz) {// Create a file_info structstruct file_info *fi;// Populate the struct with the vectors and some metadata// like the file size, fd and the opcode IORING_OP_READV.// Get an SQE.struct io_uring_sqe *sqe = io_uring_get_sqe(?);// Mark the operation to be readv.io_uring_prep_readv(sqe, file_fd, fi->iovecs, total_blocks, 0);// Set the user data section.io_uring_sqe_set_data(sqe, fi);return 0;}int push_write_request(int file_fd, void *data, off_t file_sz) {// Create a file_info structstruct file_info *fi;// Populate the struct with the vectors and some metadata// like the file size, fd and the opcode IORING_OP_WRITEV.// Get an SQE.struct io_uring_sqe *sqe = io_uring_get_sqe(?);// Mark the operation to be writev.io_uring_prep_writev(sqe, file_fd, fi->iovecs, 1, 0);// Set the user data section.io_uring_sqe_set_data(sqe, fi);return 0;}当 submitAndPop 尝试从CQ弹出条目时 , 将执行以下命令:
int pop_request() {struct io_uring_cqe *cqe;// Get an element from CQ without waiting.int ret = io_uring_peek_cqe(?,// some error handling// Get the user data set in the set_data call.struct file_info *fi = io_uring_cqe_get_data(cqe);if (fi->opcode == IORING_OP_READV) {// Calculate the number of blocks read.// Call read_callback to Go.read_callback(fi->iovecs, total_blocks, fi->file_fd);} else if (fi->opcode == IORING_OP_WRITEV) {// Call write_callback to Go.write_callback(cqe->res, fi->file_fd);}// Mark the queue item as seen.io_uring_cqe_seen(?, cqe);return 0;}