我有一个数据管道,我在其中发出返回Vec<T>
.每个T
都有我将用来发出另一个 API 请求的信息。
首先,我流式传输请求并在包含这些请求的通道上发送Vec<T>
,然后我的目标是获取该 Vector,对其进行迭代,并生成新任务以发出下一步请求。
然而,我在这个设计中迷失了方向,目前我的代码可以运行,但没有提供任何有意义的东西。
我尝试tokio::spawn
在迭代器调用之前移动第二个调用(但while let(Some(r) = post_rx.recv().await
iter() 不是 asnyc,所以我收到编译错误。
本质上,我试图找出一种可以通过通道消耗响应并产生进一步任务的方法。
下面的代码(将运行,但卡在第二个通道中):
use futures::stream::{StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Post {
pub user_id: u16,
pub id: u16,
pub title: String,
pub body: String,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct User {
pub id: u16,
pub name: String,
pub username: String,
pub email: String,
pub address: Address,
pub phone: String,
pub website: String,
pub company: Company,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Address {
pub street: String,
pub suite: String,
pub city: String,
pub zipcode: String,
pub geo: Geo,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Geo {
pub lat: String,
pub lng: String,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Company {
pub name: String,
pub catch_phrase: String,
pub bs: String,
}
#[tokio::main]
async fn main() {
let (post_tx, mut post_rx) = mpsc::channel::<Vec<Post>>(5);
let (user_tx, mut user_rx) = mpsc::channel::<User>(5);
let client = reqwest::Client::new();
// create iterator that will stream async responses
tokio::spawn(async move {
let _ = futures::stream::iter((1..).step_by(1)).then(|i| {
let client = client.clone();
let url = format!("https://jsonplaceholder.typicode.com/posts?userId={i}");
client.get(url).send()
})
.and_then(|resp| {
resp.json::<Vec<Post>>()
})
.try_for_each_concurrent(2, |r| async {
let post_tx_cloned = post_tx.clone();
let _ = post_tx_cloned.send(r).await;
Ok(())
}).await;
});
// consume responses from our channel to do future things with results...
let new_client = reqwest::Client::new();
while let Some(r) = post_rx.recv().await {
println!("received {} posts", r.len());
// iterate over vec of posts to spawn other concurrent requests
let posts = r.clone();
let _ = posts.iter().map(|p| {
let p = p.clone();
let cloned_client = new_client.clone();
let user_tx_cloned = user_tx.clone();
// should I move this task spawn before the iter().map() call?
tokio::spawn(async move {
let url = format!("https://jsonplaceholder.typicode.com/users/{id}", id = p.user_id);
let resp = cloned_client
.get(url)
.send()
.await
.unwrap()
.json::<User>()
.await;
println!("{:?}", resp);
user_tx_cloned.send(resp.unwrap()).await.unwrap();
});
});
// quit if the response comes back empty
if r.len() == 0 {
break;
}
}
// read from the user channel
while let Some(user) = user_rx.recv().await {
println!("{:?}", user);
}
}
编辑:我更新了 main 以生成一个 for 循环(而不是iter().map()
),它现在消耗第二个通道中的用户。我相信问题是因为我没有使用.map()
上面的先前代码中的 。在这种情况下,使用 for 循环似乎更简单。代码现在运行,但从未完成。
编辑后的代码:
#[tokio::main]
async fn main() {
let (post_tx, mut post_rx) = mpsc::channel::<Vec<Post>>(5);
let (user_tx, mut user_rx) = mpsc::channel::<User>(5);
let client = reqwest::Client::new();
// create iterator that will stream async responses
tokio::spawn(async move {
let _ = futures::stream::iter((1..).step_by(1)).then(|i| {
let client = client.clone();
let url = format!("https://jsonplaceholder.typicode.com/posts?userId={i}");
client.get(url).send()
})
.and_then(|resp| {
resp.json::<Vec<Post>>()
})
.try_for_each_concurrent(2, |r| async {
let post_tx_cloned = post_tx.clone();
let _ = post_tx_cloned.send(r).await;
Ok(())
}).await;
});
// consume responses from our channel to do future things with results...
while let Some(r) = post_rx.recv().await {
println!("received {} posts", r.len());
// iterate over vec of posts to spawn other concurrent requests
let posts = r.clone();
// for i in posts.iter() {
// println!("post: {:?}", i);
// }
let new_client = reqwest::Client::new();
let cloned_client = new_client.clone();
let user_tx_cloned = user_tx.clone();
tokio::spawn(async move {
for p in posts {
let url = format!("https://jsonplaceholder.typicode.com/users/{id}", id = p.user_id);
let resp = cloned_client
.get(url)
.send()
.await
.unwrap()
.json::<User>()
.await;
user_tx_cloned.send(resp.unwrap()).await.unwrap();
}
});
// quit if the response comes back empty
if r.len() == 0 {
break;
}
}
// read from the user channel
while let Some(user) = user_rx.recv().await {
println!("found user {}", user.id);
}
}