我有一个数据管道,我在其中发出返回Vec<T>
.每个T
都有我将用来发出另一个 API 请求的信息。
首先,我流式传输请求并在包含这些请求的通道上发送Vec<T>
,然后我的目标是获取该 Vector,对其进行迭代,并生成新任务以发出下一步请求。
然而,我在这个设计中迷失了方向,目前我的代码可以运行,但没有提供任何有意义的东西。
我尝试tokio::spawn
在迭代器调用之前移动第二个调用(但while let(Some(r) = post_rx.recv().await
iter() 不是 asnyc,所以我收到编译错误。
本质上,我试图找出一种可以通过通道消耗响应并产生进一步任务的方法。
下面的代码(将运行,但卡在第二个通道中):
use futures::stream::{StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Post {
pub user_id: u16,
pub id: u16,
pub title: String,
pub body: String,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct User {
pub id: u16,
pub name: String,
pub username: String,
pub email: String,
pub address: Address,
pub phone: String,
pub website: String,
pub company: Company,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Address {
pub street: String,
pub suite: String,
pub city: String,
pub zipcode: String,
pub geo: Geo,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Geo {
pub lat: String,
pub lng: String,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Company {
pub name: String,
pub catch_phrase: String,
pub bs: String,
}
#[tokio::main]
async fn main() {
let (post_tx, mut post_rx) = mpsc::channel::<Vec<Post>>(5);
let (user_tx, mut user_rx) = mpsc::channel::<User>(5);
let client = reqwest::Client::new();
// create iterator that will stream async responses
tokio::spawn(async move {
let _ = futures::stream::iter((1..).step_by(1)).then(|i| {
let client = client.clone();
let url = format!("https://jsonplaceholder.typicode.com/posts?userId={i}");
client.get(url).send()
})
.and_then(|resp| {
resp.json::<Vec<Post>>()
})
.try_for_each_concurrent(2, |r| async {
let post_tx_cloned = post_tx.clone();
let _ = post_tx_cloned.send(r).await;
Ok(())
}).await;
});
// consume responses from our channel to do future things with results...
let new_client = reqwest::Client::new();
while let Some(r) = post_rx.recv().await {
println!("received {} posts", r.len());
// iterate over vec of posts to spawn other concurrent requests
let posts = r.clone();
let _ = posts.iter().map(|p| {
let p = p.clone();
let cloned_client = new_client.clone();
let user_tx_cloned = user_tx.clone();
// should I move this task spawn before the iter().map() call?
tokio::spawn(async move {
let url = format!("https://jsonplaceholder.typicode.com/users/{id}", id = p.user_id);
let resp = cloned_client
.get(url)
.send()
.await
.unwrap()
.json::<User>()
.await;
println!("{:?}", resp);
user_tx_cloned.send(resp.unwrap()).await.unwrap();
});
});
// quit if the response comes back empty
if r.len() == 0 {
break;
}
}
// read from the user channel
while let Some(user) = user_rx.recv().await {
println!("{:?}", user);
}
}
编辑:我更新了 main 以生成一个 for 循环(而不是iter().map()
),它现在消耗第二个通道中的用户。我相信问题是因为我没有使用.map()
上面的先前代码中的 。在这种情况下,使用 for 循环似乎更简单。代码现在运行,但从未完成。
编辑后的代码:
#[tokio::main]
async fn main() {
let (post_tx, mut post_rx) = mpsc::channel::<Vec<Post>>(5);
let (user_tx, mut user_rx) = mpsc::channel::<User>(5);
let client = reqwest::Client::new();
// create iterator that will stream async responses
tokio::spawn(async move {
let _ = futures::stream::iter((1..).step_by(1)).then(|i| {
let client = client.clone();
let url = format!("https://jsonplaceholder.typicode.com/posts?userId={i}");
client.get(url).send()
})
.and_then(|resp| {
resp.json::<Vec<Post>>()
})
.try_for_each_concurrent(2, |r| async {
let post_tx_cloned = post_tx.clone();
let _ = post_tx_cloned.send(r).await;
Ok(())
}).await;
});
// consume responses from our channel to do future things with results...
while let Some(r) = post_rx.recv().await {
println!("received {} posts", r.len());
// iterate over vec of posts to spawn other concurrent requests
let posts = r.clone();
// for i in posts.iter() {
// println!("post: {:?}", i);
// }
let new_client = reqwest::Client::new();
let cloned_client = new_client.clone();
let user_tx_cloned = user_tx.clone();
tokio::spawn(async move {
for p in posts {
let url = format!("https://jsonplaceholder.typicode.com/users/{id}", id = p.user_id);
let resp = cloned_client
.get(url)
.send()
.await
.unwrap()
.json::<User>()
.await;
user_tx_cloned.send(resp.unwrap()).await.unwrap();
}
});
// quit if the response comes back empty
if r.len() == 0 {
break;
}
}
// read from the user channel
while let Some(user) = user_rx.recv().await {
println!("found user {}", user.id);
}
}
由于您的程序受网络 IO 限制,我建议忘记线程并仅使用并发。我所说的“网络 IO 限制”只是指您的程序大部分时间都在等待 HTTP 请求完成。除此之外(除非你的 json 响应很大)这里没有发生太多计算。对于这样的用例,多线程几乎没有什么好处,甚至可能是有害的,因为线程同步的开销非常昂贵(您必须进行分析才能找到答案)。
这意味着删除通道并
tokio::spawn
完全删除,并完全依赖于流抽象。这使得你的程序基本上是单线程的,所有 JSON 解析和打印都发生在一个线程上。仅使用流,不使用线程,我们可以按如下方式编写您的程序。
在第一阶段,您查询所有可用用户。这会产生一个响应流
Vec<Post>
,因为filter_map
会丢弃函数返回的任何 None。请注意,如果此流不高效(即函数永远不会返回 Some),则流管道的其余部分将永远阻塞,这是您必须在实际代码中处理的事情:接下来我们实现您的退出条件,当用户端点首先返回一个空数组时,我们终止流:
然后我们同时创建另一个请求。请注意,这会轮询每个创建的请求(直到传入的限制
flat_map_ordered
),但不会引入任何并行性。换句话说,假设您从上面的第一个请求中获取了 10 个元素,然后flat_map_unordered
立即调用其函数 10 次,保存生成的 10 个请求,并且每次flat_map_unordered
轮询时,它都会轮询每个请求并为任何请求生成一个值,该值是完全的。所有这一切都发生在一个线程中,不涉及并行性:最后我们消耗了主体中的流
main
:您的下一步是在现实场景中分析该程序,并确定是否存在任何性能瓶颈。如果找到任何内容,您可以并行化该部分代码。