Rust高并发编程总结

2021-10-01 13:54:29 浏览数 (1)

Serverless的概念火了,业界已经不再讨论要不要用Serverless的问题了,而是高喊Serverless First的口号力求快速拥抱Serverless,无服务器并不是Serverless的本质,不需要关心服务器的情况就能高效工作,才是Serverless胜出的核心要义。互联网时代流量的大起大落,很多科技巨头在面对流量的冲击时也都败下阵来,针对前几个月B站的崩溃事件,笔者还曾写过《B站的前端崩了,后端的你别慌》来分析来龙去脉,而Serverless凭借快速伸缩的自动弹性特点,可以从容应对类似的冲击,这也让这种新技术出尽的风头。

在Serverless的喧嚣背后,Rust看似牢牢占据了C位,但其实在高并发这个话题下要总结的模式与套路其实很多,尤其是像Tokio专业的编程框架,对于程序员编写高性能程序的帮助很大。因此本文把之前介绍过的Tokio相关知识点进行一下补充和总结。

Future到底是个什么概念

简单来讲Future不是一个值,而是一种值类型,一种在未来才能得到的值类型。Future对象必须实现Rust标准库中的std::future:: future接口。Future的输出Output是Future完成后才能生成的值。在Rust中Future通过管理器调用Future::poll来推动Future的运算。Future本质上是一个状态机,而且可以嵌套使用,我们来看一下面这个例子,在main函数中,我们实例化MainFuture并调用.await,而MainFuture除了在几个状态之间迁移以外,还会调用一个Delay的Future,从而实现Future的嵌套。

MainFuture以State0状态做为初始状态。当调度器调用poll方法时,MainFuture会尝试尽可能地提升其状态。如果future完成,则返回Poll::Ready,如果MainFuture没有完成,则是由于它等待的DelayFuture没有达到Ready状态,那么此时返回Pending。调度器收到Pending结果,会将这个MainFuture重新放回待调度的队列当中,稍后会再度调用Poll方法来推进Future的执行。具体如下

代码语言:javascript复制
use std::future::Future;

use std::pin::Pin;

use std::task::{Context, Poll};

use std::time::{Duration, Instant};



struct Delay {

    when: Instant,

}



impl Future for Delay {

    type Output = &'static str;



    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)

        -> Poll<&'static str>

    {

        if Instant::now() >= self.when {

            println!("Hello world");

            Poll::Ready("done")

        } else {

           

            cx.waker().wake_by_ref();

            Poll::Pending

        }

    }

}

enum MainFuture {

    

    State0,

    State1(Delay),

    Terminated,

}



impl Future for MainFuture {

    type Output = ();



    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)

        -> Poll<()>

    {

        use MainFuture::*;

      

        loop {

            match *self {

                State0 => {

                    let when = Instant::now()  

                        Duration::from_millis(1);

                    let future = Delay { when };

                    println!("init status");

                    *self = State1(future);

                }

                State1(ref mut my_future) => {

                    match Pin::new(my_future).poll(cx) {

                        Poll::Ready(out) => {

                            assert_eq!(out, "done");

                            println!("delay finished this future is ready");

                            *self = Terminated;

                            return Poll::Ready(());

                        }

                        Poll::Pending => {

                            println!("not ready");

                            return Poll::Pending;

                        }

                    }

                }

                Terminated => {

                    panic!("future polled after completion")

                }

            }

        }

    }

}

#[tokio::main]

async fn main() {

    let when = Instant::now()   Duration::from_millis(10);

   

    let mainFuture=MainFuture::State0;

    mainFuture.await;

   

}

当然这个Future的实现存在一个明显的问题,通过运行结果也可以知道调试器明显在需要等待的情况下还执行了很多次的Poll操作,理想状态下需要当Future有进展时再执行Poll操作。不断轮徇的Poll其实就退化成了低效的轮询。

解决之道在于poll函数中的Context参数,这个Context就是Future的waker(),通过调用waker可以向执行器发出信号,表明这个任务应该进行Poll操作了。当Future的状态推进时,调用wake来通知执行器,才是正解这就需要把Delay部分的代码改一下:

代码语言:javascript复制
let waker = cx.waker().clone();

            let when = self.when;



            // Spawn a timer thread.

            thread::spawn(move || {

                let now = Instant::now();



                if now < when {

                    thread::sleep(when - now);

                }



                waker.wake();

            });

无论是哪种高并发框架,本质上讲都是基于这种Task/Poll机制的调度器, poll做的本质工作就是监测链条上前续Task的执行状态。

let waker = cx.waker().clone();

let when = self.when;

// Spawn a timer thread.

thread::spawn(move || {

let now = Instant::now();

if now < when {

thread::sleep(when - now);

}

waker.wake();

});

用好Poll的机制,就能避免上面出现事件循环定期遍历整个事件队列的调度算法,Poll的精髓就是把状态为ready的事件通知给对应的处理程序,而基于poll设计的如tokio框架进行应用开发时,程序员根本不必关心整个消息传递,只需要用and_then、spawn等方法建立任务链条并让系统工作起来就可以了。

数据祯的实现

帧是数据传输中的最小单位,帧粒度以下的字节数据对于应用来说没有任何意义,同时不完整的帧也应该在帧的处理层进行过滤,read_frame方法在返回之前等待接收到整个帧。对TcpStream::read()的单个调用可能返回任意数量的数据。它可以包含整个框架,部分框架,或多个框架。如果接收到部分帧,数据将被缓冲,并从套接字读取更多数据。如果接收到多个帧,则返回第一个帧,其余的数据将被缓冲,直到下一次调用read_frame。要实现这一点,Connection需要一个读缓冲区字段。数据从套接字读入读缓冲区。当一个帧被解析时,相应的数据将从缓冲区中删除。我们将使用BytesMut作为缓冲区类型。

代码语言:javascript复制
use bytes::BytesMut;use tokio::net::TcpStream;

pub struct Connection {

    stream: TcpStream,

    buffer: BytesMut,

}

impl Connection {

    pub fn new(stream: TcpStream) -> Connection {

        Connection {

            stream,

            // Allocate the buffer with 4kb of capacity.

            buffer: BytesMut::with_capacity(4096),

        }

    }

}

read_frame函数尝试解析帧。如果有足够的数据来解析帧,则将帧返回给read_frame()的调用者。否则,将尝试从套接字中读取更多数据到缓冲区中。读取更多数据后,再次调用parse_frame()。这一次,如果接收到足够的数据,解析可能会成功。当从流中读取数据时,返回值为0表示不再从对等端接收数据。如果读缓冲区中仍然有数据,这表明已经接收到部分帧,连接正在突然终止。这是一个错误条件,并返回Err。

代码语言:javascript复制
use mini_redis::{Frame, Result};

pub async fn read_frame(&mut self)

    -> Result<Option<Frame>>

{

    loop {

        if let Some(frame) = self.parse_frame()? {

            return Ok(Some(frame));

        }



        // Ensure the buffer has capacity

        if self.buffer.len() == self.cursor {

            // Grow the buffer

            self.buffer.resize(self.cursor * 2, 0);

        }



        // Read into the buffer, tracking the number

        // of bytes read

        let n = self.stream.read(

            &mut self.buffer[self.cursor..]).await?;



        if 0 == n {

            if self.cursor == 0 {

                return Ok(None);

            } else {

                return Err("connection reset by peer".into());

            }

        } else {

            // Update our cursor

            self.cursor  = n;

        }

    }

}

一定要小心的Select

另外还有一个值得注意的点是select,在使用一个以上的通道时,任何一个通道都可以先完成。选择select!关键字将在所有的通道上等待,并将提到最先返回通道上的值。注意select!当等到第一个返回之后,其它未完成的任务将被取消。具体如下:

代码语言:javascript复制
use tokio::sync::oneshot;



async fn some_operation() -> String {

    

    "hello beyondma".to_string()

}



#[tokio::main]

async fn main() {

    let (mut tx1, rx1) = oneshot::channel();

    let (tx2, rx2) = oneshot::channel();



      tokio::spawn(async {

        let _ = tx1.send("hello beyondma");

    });





    tokio::spawn(async {

        let _ = tx2.send("hi beyondma");

    });



    tokio::select! {

        val = rx1 => {

            println!("rx1 completed first with {:?}", val);

        }

        val = rx2 => {

            println!("rx2 completed first with {:?}", val);

        }

    }

}

以上这段代码的执行结果要不是

hello beyondma

要么是

hi beyondma

不可能出现两个都被输出的情况。

为了解释select的机制,我们自行设计一个MySelect的future,在对MySelect进行poll操作时,将轮询第一个分支。如果已经准备好,则使用该值并完成MySelect。在MySelect.await接收到一个Ready后,整个future被丢弃。具体如下:

代码语言:javascript复制
use tokio::sync::oneshot;

use std::future::Future;

use std::pin::Pin;

use std::task::{Context, Poll};



struct MySelect {

    rx1: oneshot::Receiver<&'static str>,

    rx2: oneshot::Receiver<&'static str>,

}



impl Future for MySelect {

    type Output = ();



    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {

        if let Poll::Ready(val) = Pin::new(&mut self.rx1).poll(cx) {

            println!("rx1 completed first with {:?}", val);

            return Poll::Ready(());

        }



        if let Poll::Ready(val) = Pin::new(&mut self.rx2).poll(cx) {

            println!("rx2 completed first with {:?}", val);

            return Poll::Ready(());

        }



        Poll::Pending

    }

}



#[tokio::main]

async fn main() {

    let (tx1, rx1) = oneshot::channel();

    let (tx2, rx2) = oneshot::channel();



    // use tx1 and tx2

     tokio::spawn(async {

        let _ = tx1.send("hello beyondma");

    });



    tokio::spawn(async {

        let _ = tx2.send("hi beyondma");

    });

    MySelect {

        rx1,

        rx2,

    }.await;

}

Rust高并发总结

Rust是近些年来随着Serverless一起新兴起的语言,表面上看他像是C,既没有JVM虚拟机也没有GC垃圾回收器,但仔细一瞧他还不是C,Rust特别不信任程序员,力图让Rust编译器把程序中的错误杀死在在生成可执行文件之前的Build阶段。由于没有GC所以Rust当中独创了一套变量的生命周期及借调用机制。开发者必须时刻小心变量的生命周期是否存在问题。

而且Rust难的像火星语言,多路通道在使用之前要clone,带锁的哈希表用之前要先unwrap,种种用法和Java、Go完全不同,但是也正在由于这样严格的使用限制,我们刚刚所提到的Go语言中Gorotine会出现的问题,在Rust中都不会出现,因为Go的那些用法,通通不符合Rust变量生命周期的检查,想编译通过都是不可能完成的任务。

所以Rust很像逍遥派,想入门非常难,但只要能出师,写的程序能通过编译,那你百分百是一位高手,所以这是一门下限很高,上限同样也很高的极致语言。

目前Rust的高并发编程框架最具代表性的就是Tokio,本文开头Future的例子就是基于Tokio框架编写的,这里也不加赘述了。

根据官方的说法每个Rust的Tokio任务只有64字节大小,这比直接通过folk线程去网络请求,效率会提升几个数量级,在高并发框架的帮助下,开发者完全可以做到极限压榨硬件的性能。

0 人点赞