--- 摄于 2017 年 9 月 藏川线前段
异步,简单理解就是单核环境下可以同时处理多个不同的任务。这些任务由于各种各样的原因,需要阻塞等待执行,异步将阻塞线程的时间利用起来去执行其他任务,从而使多个任务能够在单核环境下并发处理,可以类比于内核的时间片调度。
这个问题很简单,因为资源是有限的,如果资源是无限的,所有任务都可以有自己独立的线程去执行,这样的代码结构清晰,操作简单,也易于理解和维护。但现实是,无论是什么服务器,资源都是有瓶颈的,如何榨干有限的资源达到最大利用率,这就是异步概念出现的原因之一。
不过要理解的是:异步只对任务的吞吐量(Throughput)友好,对延时(Latency)并没有优化,甚至是劣化了。为什么?当单核处理的任务数急剧增多的时候,无论怎么优化,每个任务的执行必然是要时间的,当两个任务同时达到完成状态,单核只能选择先做其中一个,然后再处理下一个,这时候,时延就产生了,这是无法解决的物理瓶颈。
大部分资料都喜欢从网络传输的异步化说起,我偏偏反其道而行,就从时间定时讲起吧。
我们现实里的闹钟,设定时间后,就不需要去关心它是怎么实现的,等到了时间,自然会有铃声想起,提示到了预订的目标时间。这就是所谓的异步,塞一个任务进去,过一段时间之后,有人(物)提示你,这个任务完成了,可以接着设置下一个任务了。
那么有没有人想过这玩意是怎么实现的,或者换个问法,如果是你,你打算怎么实现它。我们从最基础的地方说起:
在计算机世界里,时间是通过 晶振 的震动频率来计量时间的,在普通开发者的眼里,能获取时间的唯一手段是调用 os 提供的 syscall 去获取,这个太底层了,我们往上面走一点,通过 libc::gettimeofday
的封装,可以更简单地得到现在是什么时间。将任务时间与当前时间的差值,通过 libc::nanosleep
可以让计算机在指定时间之后唤醒线程,从而达到定点唤醒的目的。现在我们有了最基础的两个函数,可以开始实现自己的闹钟了。
fn set_clock(sec: u64, nsecs: u64, func: Fn()) {
unsafe {
while secs > 0 || nsecs > 0 {
let mut ts = libc::timespec {
tv_sec: cmp::min(libc::time_t::MAX as u64, secs) as libc::time_t,
tv_nsec: nsecs,
};
secs -= ts.tv_sec as u64;
let ts_ptr = &mut ts as *mut _;
/// If we're awoken with a signal then the return value will be -1 and
// nanosleep will fill in `ts` with the remaining time.
if libc::nanosleep(ts_ptr, ts_ptr) == -1 {
assert_eq!(os::errno(), libc::EINTR);
secs += ts.tv_sec as u64;
nsecs = ts.tv_nsec;
} else {
nsecs = 0;
}
}
}
func()
}
fn main() {
let now = unsafe {libc::gettimeofday(...)};
let target = ...;
let interval = taget - now;
set_clock(interval.as_secs(), interval.subsec_nanos(), do_something);
}
以上代码无法编译,但大致意思是可以理解的,这是一次性任务,当计时完成之后,执行一个函数,然后整个程序就结束了,那如果多个计时任务需要执行呢?最简单的办法就是每一个计时任务创建一个线程并执行 set_clock
函数。这样可以完成任务,但消耗实在太大,每个线程有自己独立的栈空间,默认是 2M,当计时任务过多的时候,线程的内存消耗会非常恐怖。
每个线程对一个任务计时的消耗实在是太恐怖了,那我们能不能用一个线程去计时所有任务呢,这样的消耗可以显著下降。这是可以的,下面做一个简单的堆计时的思路例子:
struct Timer {
heap:MiniHeap<Instant>,
task:BtreeMap<Instant,Box<dyn Fn() + Send + 'static>>,
task_recv_queue: Queue<(Duration, Box<dyn Fn() + Send + 'static>)>,
mini_duration_limit: Duration,
now: Instant,
}
impl Timer {
fn run(&mut self) {
let now = Instant::now();
loop {
for (time, f) in self.task_recv_queue.pop() {
let target = time - self.now;
self.heap.push(target);
self.task.insert(target, f);
}
let time = heap.peek().map(|min| min - self.now).unwrap_or(self.mini_duration_limit);
thread::park_timeout(time);
// may unpark by other thread, so can't use `self.now += time` here
self.now = Instant::now();
self.heap.split_off(self.now);
tasks = self.task.split_off(self.now);
// can run these fn on thread
for (_, task_fn) in tasks {
task_fn()
}
}
}
}
fn main() {
let task_queue = Queue::new();
let timer = Timer::new(Queue);
let id = thread::spawn(move {
timer.run()
});
task_queue.push((time, do_something));
task_queue.push((time, do_something));
// wakeup timer thread
id.unpark();
task_queue.push((time, do_something));
id.unpark();
/// do other thing...
}
像上面的做法,是一个经典的堆计时法,所有计时任务先排序,取最近(小)的计时间隔进行计时,其他任务的执行时间一定比这个更大,在被唤醒之后,处理当前时间,并将到达执行时间的任务执行完成,然后读取新的任务,循环往复。
这样的做法,可以并发执行近乎无限的计时任务。并且相对来说,时延是可控的,类似的还有一种方式是时间轮。
到这,戛然而止,其实还没进入异步主题,但第一话,关于时间的话题已经搞定了,定时器的并发执行已经搞定了。
请登录后评论
评论区
加载更多