藏川线前段

--- 摄于 2017 年 9 月 藏川线前段

异步(一)

异步,简单理解就是单核环境下可以同时处理多个不同的任务。这些任务由于各种各样的原因,需要阻塞等待执行,异步将阻塞线程的时间利用起来去执行其他任务,从而使多个任务能够在单核环境下并发处理,可以类比于内核的时间片调度。

为什么需要异步

这个问题很简单,因为资源是有限的,如果资源是无限的,所有任务都可以有自己独立的线程去执行,这样的代码结构清晰,操作简单,也易于理解和维护。但现实是,无论是什么服务器,资源都是有瓶颈的,如何榨干有限的资源达到最大利用率,这就是异步概念出现的原因之一。

不过要理解的是:异步只对任务的吞吐量(Throughput)友好,对延时(Latency)并没有优化,甚至是劣化了。为什么?当单核处理的任务数急剧增多的时候,无论怎么优化,每个任务的执行必然是要时间的,当两个任务同时达到完成状态,单核只能选择先做其中一个,然后再处理下一个,这时候,时延就产生了,这是无法解决的物理瓶颈。

闹钟

大部分资料都喜欢从网络传输的异步化说起,我偏偏反其道而行,就从时间定时讲起吧。

我们现实里的闹钟,设定时间后,就不需要去关心它是怎么实现的,等到了时间,自然会有铃声想起,提示到了预订的目标时间。这就是所谓的异步,塞一个任务进去,过一段时间之后,有人(物)提示你,这个任务完成了,可以接着设置下一个任务了。

那么有没有人想过这玩意是怎么实现的,或者换个问法,如果是你,你打算怎么实现它。我们从最基础的地方说起:

时间怎么计量

在计算机世界里,时间是通过 晶振 的震动频率来计量时间的,在普通开发者的眼里,能获取时间的唯一手段是调用 os 提供的 syscall 去获取,这个太底层了,我们往上面走一点,通过 libc::gettimeofday 的封装,可以更简单地得到现在是什么时间。将任务时间与当前时间的差值,通过 libc::nanosleep 可以让计算机在指定时间之后唤醒线程,从而达到定点唤醒的目的。现在我们有了最基础的两个函数,可以开始实现自己的闹钟了。

最简单直接的做法
fn set_clock(sec: u64, nsecs: u64, func: Fn()) {
    unsafe {
        while secs > 0 || nsecs > 0 {
            let mut ts = libc::timespec {
                tv_sec: cmp::min(libc::time_t::MAX as u64, secs) as libc::time_t,
                tv_nsec: nsecs,
            };
            secs -= ts.tv_sec as u64;
            let ts_ptr = &mut ts as *mut _;
            /// If we're awoken with a signal then the return value will be -1 and
        // nanosleep will fill in `ts` with the remaining time.
            if libc::nanosleep(ts_ptr, ts_ptr) == -1 {
                assert_eq!(os::errno(), libc::EINTR);
                secs += ts.tv_sec as u64;
                nsecs = ts.tv_nsec;
            } else {
                nsecs = 0;
            }
        }
    }
    func()
}

fn main() {
    let now = unsafe {libc::gettimeofday(...)};
    let target = ...;
    let interval = taget - now;
    set_clock(interval.as_secs(), interval.subsec_nanos(), do_something);
}

以上代码无法编译,但大致意思是可以理解的,这是一次性任务,当计时完成之后,执行一个函数,然后整个程序就结束了,那如果多个计时任务需要执行呢?最简单的办法就是每一个计时任务创建一个线程并执行 set_clock 函数。这样可以完成任务,但消耗实在太大,每个线程有自己独立的栈空间,默认是 2M,当计时任务过多的时候,线程的内存消耗会非常恐怖。

消耗相对小的办法

每个线程对一个任务计时的消耗实在是太恐怖了,那我们能不能用一个线程去计时所有任务呢,这样的消耗可以显著下降。这是可以的,下面做一个简单的堆计时的思路例子:

struct Timer {
    heap:MiniHeap<Instant>,
    task:BtreeMap<Instant,Box<dyn Fn() + Send + 'static>>,
    task_recv_queue: Queue<(Duration, Box<dyn Fn() + Send + 'static>)>,
    mini_duration_limit: Duration,
    now: Instant,
}

impl Timer {
    fn run(&mut self) {
        let now = Instant::now();
        loop {
            for (time, f) in self.task_recv_queue.pop() {
                let target = time - self.now;
                self.heap.push(target);
                self.task.insert(target, f);
            }
            let time = heap.peek().map(|min| min - self.now).unwrap_or(self.mini_duration_limit);

            thread::park_timeout(time);
	    // may unpark by other thread, so can't use `self.now += time` here
            self.now = Instant::now();
            self.heap.split_off(self.now);
            tasks = self.task.split_off(self.now);
            // can run these fn on thread
            for (_, task_fn) in tasks {
                task_fn()
            }
        }
    }
}

fn main() {
    let task_queue = Queue::new();
    let timer = Timer::new(Queue);
    let id = thread::spawn(move {
        timer.run()
    });
    task_queue.push((time, do_something));
    task_queue.push((time, do_something));
    // wakeup timer thread
    id.unpark();
    task_queue.push((time, do_something));
    id.unpark();
    
    /// do other thing...
}

像上面的做法,是一个经典的堆计时法,所有计时任务先排序,取最近(小)的计时间隔进行计时,其他任务的执行时间一定比这个更大,在被唤醒之后,处理当前时间,并将到达执行时间的任务执行完成,然后读取新的任务,循环往复。

这样的做法,可以并发执行近乎无限的计时任务。并且相对来说,时延是可控的,类似的还有一种方式是时间轮

最后

到这,戛然而止,其实还没进入异步主题,但第一话,关于时间的话题已经搞定了,定时器的并发执行已经搞定了。

评论区

加载更多

登录后评论