io_uring
io_uring是Linux特有的异步I/O API。io_uring的名称来自用户空间和内核空间之间共享的环形缓冲区。在内核和用户空间之间进行通信使用环形缓冲区作为主要的通信模式。这种思想在业务系统中还是挺常见的: 比如用MQ、Kafka消息队列推送信息。一个接收队列, 一个发送队列,另外设计上也有Actor模型的影子, 应用和kernel, 分别是两个actor, 而actor的邮箱就是 SQ, CQ两个队列, 对于应用程序, 消息发送到SQ, 从CQ中获取结果,对于kernel是从SQ中获取信息, 将处理结果发送到CQ中。
传统echoserver
做一个简单echoserver。
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define BUF_SIZE 512
int main(int argc, char const* argv[])
{
int serverfd, clientfd;
struct sockaddr_in server, client;
int len;
int port = 1234;
char buffer[BUF_SIZE];
if (argc == 2) {
port = atoi(argv[1]);
}
serverfd = socket(AF_INET, SOCK_STREAM, 0);
if (serverfd < 0) {
perror("create socket");
exit(1);
}
server.sin_family = AF_INET;
server.sin_addr.s_addr = INADDR_ANY;
server.sin_port = htons(port);
len = sizeof(server);
if (bind(serverfd, (struct sockaddr*)&server, len) < 0) {
perror("bind socket");
exit(1);
}
if (listen(serverfd, 2048) < 0) {
perror("listen socket");
exit(1);
}
while (1) {
len = sizeof(client);
if ((clientfd = accept(serverfd, (struct sockaddr*)&client, &len)) <
0) {
perror("accept error");
exit(4);
}
memset(buffer, 0, sizeof(buffer));
int size = read(clientfd, buffer, sizeof(buffer));
if (size < 0) {
perror("read error");
exit(5);
}
if (size == 0) {
close(clientfd);
continue;
}
if (write(clientfd, buffer, size) < 0) {
perror("write error");
exit(6);
}
close(clientfd);
}
close(serverfd);
return 0;
}
使用python写一个客户端
import asyncio
import socket
async def echo_client(host, port, message, client_id):
reader, writer = await asyncio.open_connection(host, port)
# Send the message
writer.write(message.encode())
await writer.drain()
# Receive the echoed message
data = await reader.read(512)
print(f"Client {client_id}: Received: {data.decode()}")
# Close the connection
writer.close()
async def main():
host = 'localhost' # Replace with the actual server host
port = 1234 # Replace with the actual server port
message = "Hello, server!"
# Create a list of tasks for 1000 clients
tasks = [echo_client(host, port, message, i) for i in range(1000)]
# Execute the tasks concurrently
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
验证一下系统调用情况, 服务端启动 strace -c ./echoserver
garlic@garlic:~/io_uring/echoserver-2/io_uring_echo_server/bin$ strace -c ./echoserver c^Cstrace: Process 75155 detached % time seconds usecs/call calls errors syscall ------ ----------- ----------- --------- --------- ---------------- 32.66 0.049918 49 1002 close 30.00 0.045855 45 1000 write 19.81 0.030282 30 1000 accept 16.41 0.025088 25 1001 read 0.39 0.000590 590 1 execve 0.25 0.000375 46 8 mmap 0.08 0.000118 39 3 mprotect 0.06 0.000098 49 2 newfstatat 0.05 0.000080 40 2 openat 0.04 0.000057 28 2 pread64 0.04 0.000055 27 2 1 arch_prctl 0.03 0.000049 49 1 munmap 0.03 0.000045 45 1 socket 0.03 0.000045 45 1 bind 0.02 0.000038 38 1 1 access 0.02 0.000030 30 1 listen 0.02 0.000029 29 1 brk 0.02 0.000026 26 1 prlimit64 0.02 0.000025 25 1 set_tid_address 0.02 0.000025 25 1 set_robust_list 0.02 0.000025 25 1 rseq ------ ----------- ----------- --------- --------- ---------------- 100.00 0.152853 37 4033 2 total
客户端执行 python3 echo-client.py
[garlic@dev bin]$ python3 echo-client.py Client 0: Received: Hello, server! Client 2: Received: Hello, server! Client 1: Received: Hello, server! Client 3: Received: Hello, server! 。。。io
io_uring版本
如果使用io_uring libcuring实现一个echo_server, 相对于直接使用io_uring 基础api,libcuring更简单一些
#define _GNU_SOURCE
#include <arpa/inet.h>
#include <assert.h>
#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define CLOSE 0
#define READ 1
#define WRITE 2
#define ACCEPT 3
#define BUF_SIZE 4096
#define QUEUE_DEPTH 2048
struct request {
int type;
int client_socket;
struct iovec iov;
};
struct io_uring ring;
int add_accept_request(int server_socket, struct sockaddr_in* client_addr,
socklen_t* client_addr_len)
{
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
io_uring_prep_accept(sqe, server_socket, (struct sockaddr*)client_addr,
client_addr_len, 0);
struct request* req = malloc(sizeof(*req));
req->type = ACCEPT;
io_uring_sqe_set_data(sqe, req);
return 0;
}
int add_read_request(int client_socket)
{
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
struct request* req = malloc(sizeof(*req));
req->iov.iov_base = malloc(BUF_SIZE);
req->iov.iov_len = BUF_SIZE;
req->type = READ;
req->client_socket = client_socket;
memset(req->iov.iov_base, 0, BUF_SIZE);
io_uring_prep_readv(sqe, client_socket, &req->iov, 1, 0);
io_uring_sqe_set_data(sqe, req);
return 0;
}
int add_close_request(struct request* req)
{
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
req->type = CLOSE;
io_uring_prep_close(sqe, req->client_socket);
io_uring_sqe_set_data(sqe, req);
return 0;
}
int add_write_request(struct request* req)
{
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
req->type = WRITE;
io_uring_prep_writev(sqe, req->client_socket, &req->iov, 1, 0);
io_uring_sqe_set_data(sqe, req);
return 0;
}
int free_request(struct request* req)
{
free(req->iov.iov_base);
free(req);
return 0;
}
void sigint_handler(int signo)
{
(void)signo;
printf("^C pressed. Shutting down.\n");
io_uring_queue_exit(&ring);
exit(0);
}
int main(int argc, char const* argv[])
{
int serverfd, clientfd;
struct sockaddr_in server, client;
int len;
int port = 1234;
struct io_uring_cqe* cqe;
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
if (argc == 2) {
port = atoi(argv[1]);
}
serverfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (serverfd < 0) {
perror("create socket");
exit(1);
}
server.sin_family = AF_INET;
server.sin_addr.s_addr = INADDR_ANY;
server.sin_port = htons(port);
len = sizeof(server);
if (bind(serverfd, (struct sockaddr*)&server, len) < 0) {
perror("bind socket");
exit(1);
}
if (listen(serverfd, 10) < 0) {
perror("listen socket");
exit(1);
}
signal(SIGINT, sigint_handler);
io_uring_queue_init(QUEUE_DEPTH, &ring, 0);
add_accept_request(serverfd, &client_addr, &client_addr_len);
io_uring_submit(&ring);
while (1) {
int ret = io_uring_wait_cqe(&ring, &cqe);
struct request* req = (struct request*)cqe->user_data;
switch (req->type) {
case ACCEPT:
add_accept_request(serverfd, &client_addr, &client_addr_len);
add_read_request(cqe->res);
io_uring_submit(&ring);
free(req);
break;
case READ:
if (cqe->res <= 0) {
add_close_request(req);
} else {
add_write_request(req);
}
io_uring_submit(&ring);
break;
case WRITE:
add_read_request(req->client_socket);
io_uring_submit(&ring);
break;
case CLOSE:
free_request(req);
break;
default:
fprintf(stderr, "Unexpected req type %d\n", req->type);
break;
}
io_uring_cqe_seen(&ring, cqe);
}
io_uring_queue_exit(&ring);
close(serverfd);
fprintf(stderr, "close\n");
return 0;
}
看一下系统调用情况,看样子没有多大变化。
garlic@garlic:~/io_uring/echoserver-2/io_uring_echo_server/bin$ strace -c ./io_uring ^Cstrace: Process 75168 detached % time seconds usecs/call calls errors syscall ^C pressed. Shutting down. ------ ----------- ----------- --------- --------- ---------------- 96.89 0.130784 27 4748 io_uring_enter 2.03 0.002746 42 64 brk 0.38 0.000516 36 14 mmap 0.10 0.000131 32 4 mprotect 0.08 0.000111 111 1 io_uring_setup 0.06 0.000087 29 3 openat 0.06 0.000079 39 2 pread64 0.05 0.000071 23 3 newfstatat 0.04 0.000060 20 3 close 0.04 0.000057 28 2 1 arch_prctl 0.03 0.000045 22 2 read 0.03 0.000041 41 1 munmap 0.03 0.000040 40 1 socket 0.02 0.000031 31 1 set_tid_address 0.02 0.000029 29 1 1 access 0.02 0.000025 25 1 listen 0.02 0.000025 25 1 getrandom 0.02 0.000022 22 1 prlimit64 0.02 0.000021 21 1 rt_sigaction 0.02 0.000021 21 1 rseq 0.01 0.000020 20 1 set_robust_list 0.01 0.000016 16 1 bind 0.00 0.000000 0 1 execve ------ ----------- ----------- --------- --------- ---------------- 100.00 0.134978 27 4858 2 total
io_uring优化版本
通过多个处理合并提交中。调用io_uring_peek_cqe检查完成队列避免在完成队列为空时空转,同时尽可能快地排空完成队列。
#define _GNU_SOURCE
#include <arpa/inet.h>
#include <assert.h>
#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define CLOSE 0
#define READ 1
#define WRITE 2
#define ACCEPT 3
#define BUF_SIZE 4096
#define QUEUE_DEPTH 1024
#define MAX_SQE_PER_LOOP 128
struct request {
int type;
int client_socket;
struct iovec iov;
};
struct io_uring ring;
int count = 0;
int add_accept_request(int server_socket, struct sockaddr_in* client_addr,
socklen_t* client_addr_len)
{
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
io_uring_prep_accept(sqe, server_socket, (struct sockaddr*)client_addr,
client_addr_len, 0);
struct request* req = calloc(1, sizeof(struct request));
req->type = ACCEPT;
io_uring_sqe_set_data(sqe, req);
return 0;
}
int add_read_request(int client_socket)
{
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
struct request* req = calloc(1, sizeof(struct request));
req->iov.iov_base = calloc(1, BUF_SIZE);
req->iov.iov_len = BUF_SIZE;
req->type = READ;
req->client_socket = client_socket;
io_uring_prep_readv(sqe, client_socket, &req->iov, 1, 0);
io_uring_sqe_set_data(sqe, req);
return 0;
}
int add_close_request(struct request* req)
{
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
req->type = CLOSE;
io_uring_prep_close(sqe, req->client_socket);
io_uring_sqe_set_data(sqe, req);
return 0;
}
int add_write_request(struct request* req)
{
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
req->type = WRITE;
io_uring_prep_writev(sqe, req->client_socket, &req->iov, 1, 0);
io_uring_sqe_set_data(sqe, req);
return 0;
}
int free_request(struct request* req)
{
free(req->iov.iov_base);
free(req);
return 0;
}
void sigint_handler(int signo)
{
(void)signo;
printf("%d\n", count);
printf("^C pressed. Shutting down.\n");
io_uring_queue_exit(&ring);
exit(0);
}
int main(int argc, char const* argv[])
{
int serverfd, clientfd;
struct sockaddr_in server, client;
int len;
int port = 1234;
struct io_uring_cqe* cqe;
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
if (argc == 2) {
port = atoi(argv[1]);
}
serverfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (serverfd < 0) {
perror("create socket");
exit(1);
}
server.sin_family = AF_INET;
server.sin_addr.s_addr = INADDR_ANY;
server.sin_port = htons(port);
len = sizeof(server);
if (bind(serverfd, (struct sockaddr*)&server, len) < 0) {
perror("bind socket");
exit(1);
}
if (listen(serverfd, 2048) < 0) {
perror("listen socket");
exit(1);
}
signal(SIGINT, sigint_handler);
io_uring_queue_init(QUEUE_DEPTH, &ring, 0);
add_accept_request(serverfd, &client_addr, &client_addr_len);
io_uring_submit(&ring);
while (1) {
int submissions = 0;
int ret = io_uring_wait_cqe(&ring, &cqe);
while (1) {
struct request* req = (struct request*)cqe->user_data;
switch (req->type) {
case ACCEPT:
add_accept_request(serverfd, &client_addr,
&client_addr_len);
submissions += 1;
add_read_request(cqe->res);
free_request(req);
submissions += 1;
break;
case READ:
if (cqe->res <= 0) {
add_close_request(req);
submissions += 1;
} else {
add_write_request(req);
add_read_request(req->client_socket);
submissions += 2;
}
break;
case WRITE:
break;
case CLOSE:
free_request(req);
break;
default:
fprintf(stderr, "Unexpected req type %d\n", req->type);
break;
}
io_uring_cqe_seen(&ring, cqe);
if (io_uring_sq_space_left(&ring) < MAX_SQE_PER_LOOP) {
break; // the submission queue is full
}
ret = io_uring_peek_cqe(&ring, &cqe);
if (ret == -EAGAIN) {
break; // no remaining work in completion queue
}
}
if (submissions > 0) {
count++;
io_uring_submit(&ring);
}
}
io_uring_queue_exit(&ring);
close(serverfd);
fprintf(stderr, "close\n");
return 0;
}
看下系统调用情况
garlic@garlic:~/io_uring/echoserver-2/io_uring_echo_server/bin$ strace -c ./io_uring2 ^Cstrace: Process 75230 detached % time seconds usecs/call calls errors syscall ------ ----------- ----------- --------- --------- ---------------- 98.28 0.103546 52 1962 io_uring_enter 1.00 0.001056 17 62 brk 0.15 0.000162 40 4 mprotect 0.11 0.000117 8 14 mmap 0.10 0.000109 109 1 io_uring_setup 0.05 0.000052 52 1 munmap 0.04 0.000046 46 1 socket 0.04 0.000046 46 1 rseq 0.03 0.000033 33 1 bind 0.03 0.000029 29 1 listen 0.03 0.000028 28 1 getrandom 0.03 0.000027 27 1 prlimit64 0.02 0.000026 26 1 rt_sigaction 0.02 0.000026 26 1 set_robust_list 0.02 0.000025 12 2 1 arch_prctl 0.02 0.000025 25 1 set_tid_address 0.00 0.000000 0 2 read 0.00 0.000000 0 3 close 0.00 0.000000 0 2 pread64 0.00 0.000000 0 1 1 access 0.00 0.000000 0 1 execve 0.00 0.000000 0 3 openat 0.00 0.000000 0 3 newfstatat ------ ----------- ----------- --------- --------- ---------------- 100.00 0.105353 50 2070 2 total
源码:https://github.com/weida/io_uring_echo_server
代码是根据 Why you should use io_uring for network I/O 文章编写的, 文章中优化代码后能减少到70个左右系统调用, 后来查看了作者github (https://github.com/donaldh/io_uring_playground),应该是在卡其async情况下测试的。
static const struct argp_option opts[] = {
{"async", 'a', 0, 0, "Submit async requests"},
{"batch", 'b', 0, 0, "Batch available work into single submission"},
{"fixed", 'f', 0, 0, "Use pre-allocated fixed files"},
{"multishot", 'm', 0, 0, "Use multishot accept requests"},
{"sqpoll", 'p', 0, 0, "Use submission queue polling in the kernel"},
{"debug", 'd', 0, 0, "Provide debug output, repeat for more verbose debug"},
{},
};
还有这个项目 https://github.com/frevib/io_uring-echo-server。还有与epoll的对应测试。可以参考一下。
参考及应用
Why you should use io_uring for network I/O (https://developers.redhat.com/articles/2023/04/12/why-you-should-use-iouring-network-io#:~:text=The%20main%20benefit%20of%20io_uring,reducing%20the%20number%20of%20syscalls.)
图片from 曾彥博


Comments are closed.