线程池

废话不多说,先来实现个最简单的 HTTP 服务:

use std::io::Write;
use std::net::{TcpListener, TcpStream};
use std::thread::sleep;
use std::time::Duration;

fn main() {
	// 监听 9090 端口
    let listener = TcpListener::bind("127.0.0.1:9090").unwrap();

	// 轮询连接请求
    for stream in listener.incoming() {
        let stream = stream.unwrap();
		// 处理该请求
        process_stream(stream);
    }
}

fn process_stream(mut stream: TcpStream) {
	// 必须要读取 stream 的内容,否则多个请求同时发送会触发 ERR_CONNECTION_RESET 错误
	// 我也不知道为啥
	let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap()

	// 睡眠 200 ms
    sleep(Duration::from_millis(200));

    let content = "hello world";

    let response = format!(
        "HTTP/1.1 200 Ok\r\nContent-Length: {}\r\n\r\n{}",
        content.len(),
        content
    );

	// 写入返回信息
    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap()
}

一个极其简陋的 HTTP 服务就搭建好了, 此时浏览器打开 http://127.0.0.1:9090 就能看到 hello world 了。

这里面需要注意的是:

  1. for stream in listener.incoming() 是在不断监听 9090 端口是否有连接进来。
  2. process_stream 固定的睡眠 200ms 是为了模拟正常 http 请求的消耗

我们对这个最朴素的 HTTP 服务做下性能测试看看:

ab -n 100 -c 20 http://127.0.0.1:9090/

该命令测试 20 个并发下,一共发送 100 个请求的情况

测得的 QPS 是 4.9 左右,这个不难理解,每个请求因为固定睡眠 200ms 所以 1s 能处理的请求个数也只能在 4-5 之间了。

很明显现在的处理效率是不能容忍的, 而问题其实也很明显,那就是我们这个服务只有当前的一个线程来处理请求,并且在处理过程中并不能接收新的请求进来,因为 process_stream 是同步的,必须等该函数执行完才能读取下一个请求。

如何能够增加服务的吞吐量呢,最简单的方式应该就是使用多线程了。

或许我们可以这么修改:

use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::thread;
use std::thread::sleep;
use std::time::Duration;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:9090").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();
        thread::spawn(|| {
            process_stream(stream);
        });
    }
}

fn process_stream(mut stream: TcpStream) {
	// 保持不变
}

我们对于每个请求都开启一个新的线程来处理,这样 process_stream 就不会阻塞新的请求了。

同样的我们做下性能测试:

ab -n 100 -c 20 http://127.0.0.1:9090/

显示的 QPS 为 78 左右,其实理论上应该是提高 20 倍才对,毕竟我们测试的是 20 的并发,也就是说同时有 20 个线程在处理请求,但考虑到线程间切换的开销,真实提高低于 20 倍也是可以理解的。但即使是这样提升的性能也是很可观了。要知道对于现在的处理方式,并发越高,吞吐可是就越高的。

先别着急沉浸于成功的喜悦,该方法其实是有致命缺陷的,那就是我们没有对线程的个数做控制,当并发数很大时,比如十万或者百万,我们不问青红皂白的直接就开启十万或是百万的线程来处理显然是不合适的,先不考虑系统允不允许开启这么多的线程,即使只是考虑线程间的切换成本,我们也是浪费了大量的 CPU 资源。

如果我们能够限制线程并发的最大数量,是不是就能在可接受的范围内提高服务的吞吐呢?这个思路当然是没问题的,那如何才能限制线程的最大并发数呢?

答案就是线程池。

线程池只是一个形象的说法,就好比我们能够使用的线程都在池子里,想用线程了就从池子里捞一个出来用,捞完了就没了。当然,用完了还是要放回池子里的。

而我们能用的最大线程个数就是线程池初始化时所拥有的线程数。

接下来我们自己实现一个线程池看看。

先不考虑代码实现的细节,我们先来畅想下最终想要达到的效果。比如我创建一个线程池,初始化的线程个数是 4。也就是说,服务启动的时候就有 4 个线程可以处理 process_stream 工作。我们的主线程绑定端口并监听请求的到来,每来一个请求就从线程池中找一个空闲的线程来执行工作。如果线程池中所有的线程都在执行工作,则将该请求暂存下来等到有空闲的线程时再分配给它。

一图胜千言:

如图所示,我们的线程池初始时有 4 个线程可以用来处理请求。线程池中除了这四个工作线程外,还有一个单独的线程 (thread0) 用来获取需要处理的请求和空闲的工作线程,该线程同时持有一个 job_channel 和 worker_channel 的接收端,job_channel 用来从主线程接收需要处理的请求,worker_channel 用来接收当前的空闲工作线程。

我们可以看到,当前一共有 3 个请求需要处理,还有一个请求正在接收,同时 4 个工作线程都是空闲状态。

接下来, 线程0 同时拿到了请求1线程1,搭配成功,并将请求1提交给线程1处理,此时线程1ready 状态转变成running 状态。同时上一步的新请求(请求4)也进入job_channel 等待发送给线程池处理。

再然后,线程0 继续配对 请求2线程2线程2变成running 状态。

同上,线程0配对成功 请求3线程4线程4变成running状态。同时线程1的请求处理完毕并立刻通过worker_channel通知线程0

代码实现如下:

use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::thread::sleep;
use std::time::Duration;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:9090").unwrap();

    let pool = thread_poll::ThreadPool::new(20);
    for stream in listener.incoming() {
        let stream = stream.unwrap();
		// 此处改成由线程池处理请求
        pool.add_job(|| {
            process_stream(stream);
        });
    }
}

fn process_stream(mut stream: TcpStream) {
	// 不变
}

mod thread_poll {
    use std::collections::HashMap;
    use std::sync::mpsc;
    use std::thread;

    pub type Job = Box<dyn FnOnce() + Send + 'static>;

    pub struct ThreadPool {
        job_sender: mpsc::Sender<Job>,
    }

    impl ThreadPool {
        pub fn new(size: usize) -> Self {
			// worker_channel
            let (worker_sender, worker_receiver) = mpsc::channel();
			// job_channel
            let (job_sender, job_receiver) = mpsc::channel();

            let mut workers = HashMap::new();
            for id in 0..size {
				// 初始化 size 个工作线程
                let worker = Worker::new(id, worker_sender.clone());
                workers.insert(id, worker);
            }

			// 线程0
			thread::spawn(move || loop {
				// 获取可用工作线程的 id
				let worker_id = worker_receiver.recv().unwrap();
				// 根据 id 获取对应的工作线程
				let worker = workers.get(&worker_id).unwrap();
				// 获取需要处理的请求
				let job = job_receiver.recv().unwrap();
				// 通知线程处理该请求
				worker.handle_job(job);
			})

            ThreadPool { job_sender }
        }

        pub fn add_job<F>(&self, job: F)
        where
            F: FnOnce() + Send + 'static,
        {
            let job = Box::new(job);
			// 主线程发送过来的请求,放到 job_channel 中
            self.job_sender.send(job).unwrap();
        }
    }

    pub struct Worker {
        pub id: usize,
        pub job_sender: mpsc::Sender<Job>,
    }
    impl Worker {
        pub fn new(id: usize, worker_sender: mpsc::Sender<usize>) -> Worker {
			// worker 内部创建一个 job_channel,用以通知内部线程处理请求
            let (job_sender, job_receiver) = mpsc::channel::<Job>();

            thread::spawn(move || loop {
				// 默认是空闲状态,因此把工作线程的 id 放到 worker_channel 中
                worker_sender.send(id.clone()).unwrap();
				// 接收需要处理的请求
                let job = job_receiver.recv().unwrap();
				// 处理请求
                job();
            });
            Worker { id, job_sender }
        }

        pub fn handle_job(&self, job: Job) {
			// 把需要处理的请求放到
            self.job_sender.send(job).unwrap();
        }
    }
}

性能测试(size = 20):

ab -n 100 -c 20 http://127.0.0.1:9090/

QPS 同样为 78 左右。