< 返回版块

c5soft 发表于 2019-11-12 18:14

Tags:futures,async,await

无需Tokio/Async-std,自带Runtime。代码库的参考文档比较完善,只是缺少应用指南。下面是一段测试代码:

use futures::executor::block_on;
use futures::future::{join_all, pending, select_all};
use futures::prelude::*;

async fn a(a: i32, b: i32) -> i32 {
    a + b
}

async fn b(a: i32, b: i32) -> i32 {
    a - b
}

async fn c(a: i32, b: i32) -> i32 {
    pending().await
}

fn main() {
    let r = block_on(async {
        println!("{}", a(1, 2).await);
        println!("{}", b(1, 2).await);

        async fn foo(i: u32) -> u32 {
            i * 10
        }

        let futures = vec![foo(1), foo(2), foo(3)];
        let all_finished = join_all(futures).await;
        println!("{:?}", all_finished);

        // let first_finished = select_all(futures).await;
        // println!("{:?}", first_finished);

        "block_on result"
    });
    println!("{}", r)
}

关注其中的join_all与select_all。翻了半天资料没搞清楚select_all如何用,掉进trait Unpin这个坑里了,哪位高人能够指点一下?


Ext Link: https://crates.io/crates/futures

评论区

写评论
Dengjianping 2020-03-30 19:44

代码改成如下即可。

let futures = vec![Box::pin(foo(11)), Box::pin(foo(2)), Box::pin(foo(3))];
// let all_finished = join_all(futures).await;
// println!("{:?}", all_finished);

let first_finished = select_all(futures).await.0;
println!("{:?}", first_finished);

这是函数select_all的签名。

pub fn select_all<I>(iter: I) -> SelectAll<<I as IntoIterator>::Item> 
where
    I: IntoIterator,
    <I as IntoIterator>::Item: Future,
    <I as IntoIterator>::Item: Unpin, 

迭代器里的Future,在poll之前,需要pin这个Future,才能安全地移动这个Future,pin了才能保证Unpin类型的安全。为什么这里说要移动Future,实现里把这些futures放到了SelectAll这个结构里,所以要能安全地移动。 具体可以看函数实现和Unpin的官方文档。

zhuxiujia 2020-03-29 16:53

tokio 好像也依赖了 Futures0.3的,而且好多web框架用了 tokio而不是Futures哎,发布发布有啥区别呢

作者 c5soft 2019-11-12 22:57

select_all 函数不会用,可以用select!宏:

use futures::executor::block_on;
use futures::future::{join_all, pending};
use futures::prelude::*;
use futures::{join, select};
use futures_timer::Delay;
use std::time::Duration;

async fn sleep(delay: Duration) {
    Delay::new(delay).await
}

async fn a(a: i32, b: i32) -> i32 {
    sleep(Duration::from_millis(500)).await;
    a + b
}

async fn b(a: i32, b: i32) -> i32 {
    sleep(Duration::from_millis(100)).await;
    a - b
}

async fn c(_a: i32, _b: i32) -> i32 {
    pending().await
}

fn main() {
    let r = block_on(async {
        println!("1+2={}", a(1, 2).await);
        println!("1-2={}", b(1, 2).await);

        async fn foo(i: u32) -> u32 {
            i * 10
        }

        let futures = vec![foo(1), foo(2), foo(3)];
        let all_finished = join_all(futures).await;
        println!("join_all result: {:?}", all_finished);

        let all_finished = join! {foo(1),foo(2),foo(3)};
        println!("join! result: {:?}", all_finished);

        // use futures::future::select_all;
        // let first_finished = select_all(futures).await;
        // println!("{:?}", first_finished);

        let first_finished = select! {
            r_a=a(1,2).fuse()=>("a(1,2)",r_a),
            r_b=b(1,2).fuse()=>("b(1,2)",r_b),
            r_c=c(1,2).fuse()=>("c(1,2)",r_c),
        };
        println!("first finished: {:?}", first_finished);

        "block_on result"
    });
    println!("{}", r)
}

cargo.toml:

[dependencies]
futures ={version="0.3.1"}
futures-timer = "2.0.0"

Ryan-Git 2019-11-12 19:48

又一个 runtime。。。

1 共 4 条评论, 1 页