io_uring echo-server

io_uring

io_uring是Linux特有的异步I/O API。io_uring的名称来自用户空间和内核空间之间共享的环形缓冲区。在内核和用户空间之间进行通信使用环形缓冲区作为主要的通信模式。这种思想在业务系统中还是挺常见的: 比如用MQ、Kafka消息队列推送信息。一个接收队列, 一个发送队列,另外设计上也有Actor模型的影子, 应用和kernel, 分别是两个actor, 而actor的邮箱就是 SQ, CQ两个队列, 对于应用程序, 消息发送到SQ, 从CQ中获取结果,对于kernel是从SQ中获取信息, 将处理结果发送到CQ中。

 

传统echoserver

做一个简单echoserver。

#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define BUF_SIZE 512
int main(int argc, char const* argv[])
{
    int                serverfd, clientfd;
    struct sockaddr_in server, client;
    int                len;
    int                port = 1234;
    char               buffer[BUF_SIZE];

    if (argc == 2) {
        port = atoi(argv[1]);
    }

    serverfd = socket(AF_INET, SOCK_STREAM, 0);
    if (serverfd < 0) {
        perror("create socket");
        exit(1);
    }

    server.sin_family = AF_INET;
    server.sin_addr.s_addr = INADDR_ANY;
    server.sin_port = htons(port);

    len = sizeof(server);
    if (bind(serverfd, (struct sockaddr*)&server, len) < 0) {
        perror("bind socket");
        exit(1);
    }

    if (listen(serverfd, 2048) < 0) {
        perror("listen socket");
        exit(1);
    }

    while (1) {
        len = sizeof(client);
        if ((clientfd = accept(serverfd, (struct sockaddr*)&client, &len)) <
            0) {
            perror("accept error");
            exit(4);
        }

        memset(buffer, 0, sizeof(buffer));
        int size = read(clientfd, buffer, sizeof(buffer));

        if (size < 0) {
            perror("read error");
            exit(5);
        }

        if (size == 0) {
            close(clientfd);
        continue;
        }

        if (write(clientfd, buffer, size) < 0) {
            perror("write error");
            exit(6);
        }
        close(clientfd);
    }
    close(serverfd);
    return 0;
}

使用python写一个客户端

import asyncio
import socket

async def echo_client(host, port, message, client_id):
    reader, writer = await asyncio.open_connection(host, port)

    # Send the message
    writer.write(message.encode())
    await writer.drain()

    # Receive the echoed message
    data = await reader.read(512)
    print(f"Client {client_id}: Received: {data.decode()}")

    # Close the connection
    writer.close()

async def main():
    host = 'localhost'  # Replace with the actual server host
    port = 1234         # Replace with the actual server port
    message = "Hello, server!"

    # Create a list of tasks for 1000 clients
    tasks = [echo_client(host, port, message, i) for i in range(1000)]

    # Execute the tasks concurrently
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

 

验证一下系统调用情况, 服务端启动 strace -c ./echoserver

garlic@garlic:~/io_uring/echoserver-2/io_uring_echo_server/bin$ strace -c ./echoserver
c^Cstrace: Process 75155 detached
% time     seconds  usecs/call     calls    errors syscall
------ ----------- ----------- --------- --------- ----------------
 32.66    0.049918          49      1002           close
 30.00    0.045855          45      1000           write
 19.81    0.030282          30      1000           accept
 16.41    0.025088          25      1001           read
  0.39    0.000590         590         1           execve
  0.25    0.000375          46         8           mmap
  0.08    0.000118          39         3           mprotect
  0.06    0.000098          49         2           newfstatat
  0.05    0.000080          40         2           openat
  0.04    0.000057          28         2           pread64
  0.04    0.000055          27         2         1 arch_prctl
  0.03    0.000049          49         1           munmap
  0.03    0.000045          45         1           socket
  0.03    0.000045          45         1           bind
  0.02    0.000038          38         1         1 access
  0.02    0.000030          30         1           listen
  0.02    0.000029          29         1           brk
  0.02    0.000026          26         1           prlimit64
  0.02    0.000025          25         1           set_tid_address
  0.02    0.000025          25         1           set_robust_list
  0.02    0.000025          25         1           rseq
------ ----------- ----------- --------- --------- ----------------
100.00    0.152853          37      4033         2 total

客户端执行 python3 echo-client.py

[garlic@dev bin]$ python3 echo-client.py
Client 0: Received: Hello, server!
Client 2: Received: Hello, server!
Client 1: Received: Hello, server!
Client 3: Received: Hello, server!
。。。io

 

io_uring版本

如果使用io_uring libcuring实现一个echo_server, 相对于直接使用io_uring 基础api,libcuring更简单一些

#define _GNU_SOURCE
#include <arpa/inet.h>
#include <assert.h>
#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define CLOSE 0
#define READ 1
#define WRITE 2
#define ACCEPT 3
#define BUF_SIZE 4096

#define QUEUE_DEPTH 2048

struct request {
    int          type;
    int          client_socket;
    struct iovec iov;
};

struct io_uring ring;

int add_accept_request(int server_socket, struct sockaddr_in* client_addr,
                       socklen_t* client_addr_len)
{
    struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
    io_uring_prep_accept(sqe, server_socket, (struct sockaddr*)client_addr,
                         client_addr_len, 0);
    struct request* req = malloc(sizeof(*req));
    req->type = ACCEPT;
    io_uring_sqe_set_data(sqe, req);
    return 0;
}

int add_read_request(int client_socket)
{
    struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
    struct request*      req = malloc(sizeof(*req));
    req->iov.iov_base = malloc(BUF_SIZE);
    req->iov.iov_len = BUF_SIZE;
    req->type = READ;
    req->client_socket = client_socket;
    memset(req->iov.iov_base, 0, BUF_SIZE);
    io_uring_prep_readv(sqe, client_socket, &req->iov, 1, 0);
    io_uring_sqe_set_data(sqe, req);
    return 0;
}

int add_close_request(struct request* req)
{
    struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
    req->type = CLOSE;
    io_uring_prep_close(sqe, req->client_socket);
    io_uring_sqe_set_data(sqe, req);
    return 0;
}

int add_write_request(struct request* req)
{
    struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
    req->type = WRITE;
    io_uring_prep_writev(sqe, req->client_socket, &req->iov, 1, 0);
    io_uring_sqe_set_data(sqe, req);
    return 0;
}

int free_request(struct request* req)
{
    free(req->iov.iov_base);
    free(req);
    return 0;
}

void sigint_handler(int signo)
{
    (void)signo;
    printf("^C pressed. Shutting down.\n");
    io_uring_queue_exit(&ring);
    exit(0);
}

int main(int argc, char const* argv[])
{
    int                  serverfd, clientfd;
    struct sockaddr_in   server, client;
    int                  len;
    int                  port = 1234;
    struct io_uring_cqe* cqe;
    struct sockaddr_in   client_addr;
    socklen_t            client_addr_len = sizeof(client_addr);

    if (argc == 2) {
        port = atoi(argv[1]);
    }

    serverfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
    if (serverfd < 0) {
        perror("create socket");
        exit(1);
    }

    server.sin_family = AF_INET;
    server.sin_addr.s_addr = INADDR_ANY;
    server.sin_port = htons(port);

    len = sizeof(server);
    if (bind(serverfd, (struct sockaddr*)&server, len) < 0) {
        perror("bind socket");
        exit(1);
    }

    if (listen(serverfd, 10) < 0) {
        perror("listen socket");
        exit(1);
    }

    signal(SIGINT, sigint_handler);
    io_uring_queue_init(QUEUE_DEPTH, &ring, 0);

    add_accept_request(serverfd, &client_addr, &client_addr_len);
    io_uring_submit(&ring);

    while (1) {
        int ret = io_uring_wait_cqe(&ring, &cqe);

        struct request* req = (struct request*)cqe->user_data;

        switch (req->type) {
        case ACCEPT:
            add_accept_request(serverfd, &client_addr, &client_addr_len);
            add_read_request(cqe->res);
            io_uring_submit(&ring);
            free(req);
            break;
        case READ:
            if (cqe->res <= 0) {
                add_close_request(req);
            } else {
                add_write_request(req);
            }
            io_uring_submit(&ring);
            break;
        case WRITE:
            add_read_request(req->client_socket);
            io_uring_submit(&ring);
            break;
        case CLOSE:
            free_request(req);
            break;
        default:
            fprintf(stderr, "Unexpected req type %d\n", req->type);
            break;
        }

        io_uring_cqe_seen(&ring, cqe);
    }
    io_uring_queue_exit(&ring);
    close(serverfd);
    fprintf(stderr, "close\n");

    return 0;
}

看一下系统调用情况,看样子没有多大变化。

garlic@garlic:~/io_uring/echoserver-2/io_uring_echo_server/bin$ strace -c ./io_uring
^Cstrace: Process 75168 detached
% time     seconds  usecs/call     calls    errors syscall
^C pressed. Shutting down.
------ ----------- ----------- --------- --------- ----------------
 96.89    0.130784          27      4748           io_uring_enter
  2.03    0.002746          42        64           brk
  0.38    0.000516          36        14           mmap
  0.10    0.000131          32         4           mprotect
  0.08    0.000111         111         1           io_uring_setup
  0.06    0.000087          29         3           openat
  0.06    0.000079          39         2           pread64
  0.05    0.000071          23         3           newfstatat
  0.04    0.000060          20         3           close
  0.04    0.000057          28         2         1 arch_prctl
  0.03    0.000045          22         2           read
  0.03    0.000041          41         1           munmap
  0.03    0.000040          40         1           socket
  0.02    0.000031          31         1           set_tid_address
  0.02    0.000029          29         1         1 access
  0.02    0.000025          25         1           listen
  0.02    0.000025          25         1           getrandom
  0.02    0.000022          22         1           prlimit64
  0.02    0.000021          21         1           rt_sigaction
  0.02    0.000021          21         1           rseq
  0.01    0.000020          20         1           set_robust_list
  0.01    0.000016          16         1           bind
  0.00    0.000000           0         1           execve
------ ----------- ----------- --------- --------- ----------------
100.00    0.134978          27      4858         2 total

 

io_uring优化版本

通过多个处理合并提交中。调用io_uring_peek_cqe检查完成队列避免在完成队列为空时空转,同时尽可能快地排空完成队列。

#define _GNU_SOURCE
#include <arpa/inet.h>
#include <assert.h>
#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define CLOSE 0
#define READ 1
#define WRITE 2
#define ACCEPT 3
#define BUF_SIZE 4096

#define QUEUE_DEPTH 1024
#define MAX_SQE_PER_LOOP 128

struct request {
    int          type;
    int          client_socket;
    struct iovec iov;
};

struct io_uring ring;
int             count = 0;

int add_accept_request(int server_socket, struct sockaddr_in* client_addr,
                       socklen_t* client_addr_len)
{
    struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
    io_uring_prep_accept(sqe, server_socket, (struct sockaddr*)client_addr,
                         client_addr_len, 0);

    struct request* req = calloc(1, sizeof(struct request));
    req->type = ACCEPT;
    io_uring_sqe_set_data(sqe, req);
    return 0;
}

int add_read_request(int client_socket)
{
    struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
    struct request*      req = calloc(1, sizeof(struct request));
    req->iov.iov_base = calloc(1, BUF_SIZE);
    req->iov.iov_len = BUF_SIZE;
    req->type = READ;
    req->client_socket = client_socket;
    io_uring_prep_readv(sqe, client_socket, &req->iov, 1, 0);
    io_uring_sqe_set_data(sqe, req);
    return 0;
}

int add_close_request(struct request* req)
{
    struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
    req->type = CLOSE;
    io_uring_prep_close(sqe, req->client_socket);
    io_uring_sqe_set_data(sqe, req);
    return 0;
}

int add_write_request(struct request* req)
{
    struct io_uring_sqe* sqe = io_uring_get_sqe(&ring);
    req->type = WRITE;
    io_uring_prep_writev(sqe, req->client_socket, &req->iov, 1, 0);
    io_uring_sqe_set_data(sqe, req);
    return 0;
}

int free_request(struct request* req)
{
    free(req->iov.iov_base);
    free(req);
    return 0;
}

void sigint_handler(int signo)
{
    (void)signo;
    printf("%d\n", count);
    printf("^C pressed. Shutting down.\n");
    io_uring_queue_exit(&ring);
    exit(0);
}

int main(int argc, char const* argv[])
{
    int                  serverfd, clientfd;
    struct sockaddr_in   server, client;
    int                  len;
    int                  port = 1234;
    struct io_uring_cqe* cqe;
    struct sockaddr_in   client_addr;
    socklen_t            client_addr_len = sizeof(client_addr);

    if (argc == 2) {
        port = atoi(argv[1]);
    }

    serverfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
    if (serverfd < 0) {
        perror("create socket");
        exit(1);
    }

    server.sin_family = AF_INET;
    server.sin_addr.s_addr = INADDR_ANY;
    server.sin_port = htons(port);

    len = sizeof(server);
    if (bind(serverfd, (struct sockaddr*)&server, len) < 0) {
        perror("bind socket");
        exit(1);
    }

    if (listen(serverfd, 2048) < 0) {
        perror("listen socket");
        exit(1);
    }

    signal(SIGINT, sigint_handler);
    io_uring_queue_init(QUEUE_DEPTH, &ring, 0);

    add_accept_request(serverfd, &client_addr, &client_addr_len);

    io_uring_submit(&ring);

    while (1) {
        int submissions = 0;
        int ret = io_uring_wait_cqe(&ring, &cqe);

        while (1) {
            struct request* req = (struct request*)cqe->user_data;
            switch (req->type) {
            case ACCEPT:
                add_accept_request(serverfd, &client_addr,
                                   &client_addr_len);
                submissions += 1;
                add_read_request(cqe->res);
                free_request(req);
                submissions += 1;
                break;
            case READ:
                if (cqe->res <= 0) {
                    add_close_request(req);
                    submissions += 1;
                } else {
                    add_write_request(req);
                    add_read_request(req->client_socket);
                    submissions += 2;
                }
                break;
            case WRITE:
                break;
            case CLOSE:
                free_request(req);
                break;
            default:
                fprintf(stderr, "Unexpected req type %d\n", req->type);
                break;
            }

            io_uring_cqe_seen(&ring, cqe);

            if (io_uring_sq_space_left(&ring) < MAX_SQE_PER_LOOP) {
                break;  // the submission queue is full
            }

            ret = io_uring_peek_cqe(&ring, &cqe);
            if (ret == -EAGAIN) {
                break;  // no remaining work in completion queue
            }
        }
        if (submissions > 0) {
            count++;
            io_uring_submit(&ring);
        }
    }
    io_uring_queue_exit(&ring);
    close(serverfd);
    fprintf(stderr, "close\n");

    return 0;
}

看下系统调用情况

garlic@garlic:~/io_uring/echoserver-2/io_uring_echo_server/bin$ strace -c ./io_uring2
^Cstrace: Process 75230 detached
% time     seconds  usecs/call     calls    errors syscall
------ ----------- ----------- --------- --------- ----------------
 98.28    0.103546          52      1962           io_uring_enter
  1.00    0.001056          17        62           brk
  0.15    0.000162          40         4           mprotect
  0.11    0.000117           8        14           mmap
  0.10    0.000109         109         1           io_uring_setup
  0.05    0.000052          52         1           munmap
  0.04    0.000046          46         1           socket
  0.04    0.000046          46         1           rseq
  0.03    0.000033          33         1           bind
  0.03    0.000029          29         1           listen
  0.03    0.000028          28         1           getrandom
  0.03    0.000027          27         1           prlimit64
  0.02    0.000026          26         1           rt_sigaction
  0.02    0.000026          26         1           set_robust_list
  0.02    0.000025          12         2         1 arch_prctl
  0.02    0.000025          25         1           set_tid_address
  0.00    0.000000           0         2           read
  0.00    0.000000           0         3           close
  0.00    0.000000           0         2           pread64
  0.00    0.000000           0         1         1 access
  0.00    0.000000           0         1           execve
  0.00    0.000000           0         3           openat
  0.00    0.000000           0         3           newfstatat
------ ----------- ----------- --------- --------- ----------------
100.00    0.105353          50      2070         2 total

 

源码:https://github.com/weida/io_uring_echo_server

代码是根据  Why you should use io_uring for network I/O 文章编写的, 文章中优化代码后能减少到70个左右系统调用, 后来查看了作者github (https://github.com/donaldh/io_uring_playground),应该是在卡其async情况下测试的。

static const struct argp_option opts[] = {
    {"async", 'a', 0, 0, "Submit async requests"},
    {"batch", 'b', 0, 0, "Batch available work into single submission"},
    {"fixed", 'f', 0, 0, "Use pre-allocated fixed files"},
    {"multishot", 'm', 0, 0, "Use multishot accept requests"},
    {"sqpoll", 'p', 0, 0, "Use submission queue polling in the kernel"},
    {"debug", 'd', 0, 0, "Provide debug output, repeat for more verbose debug"},
    {},
};

还有这个项目 https://github.com/frevib/io_uring-echo-server。还有与epoll的对应测试。可以参考一下。

 

参考及应用

Why you should use io_uring for network I/O (https://developers.redhat.com/articles/2023/04/12/why-you-should-use-iouring-network-io#:~:text=The%20main%20benefit%20of%20io_uring,reducing%20the%20number%20of%20syscalls.)

图片from 曾彥博

 

Comments are closed.