AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • 主页
  • 系统&网络
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • 主页
  • 系统&网络
    • 最新
    • 热门
    • 标签
  • Ubuntu
    • 最新
    • 热门
    • 标签
  • Unix
    • 最新
    • 标签
  • DBA
    • 最新
    • 标签
  • Computer
    • 最新
    • 标签
  • Coding
    • 最新
    • 标签
主页 / coding / 问题

问题[asynchronous](coding)

Martin Hope
Emil Sahlén
Asked: 2025-03-06 16:22:51 +0800 CST

将 Future 存储在该 Future 的源旁边

  • 7

我有一个类型,其语义上的行为类似于流,但实际上并未实现该futures_util::Stream特征。相反,它有一个方法get_next,可重复返回Future下一个项目。

我正在尝试实现一个适配器,以便将其用作流,但遇到了一些麻烦。我创建了以下两个示例:

use futures_util::Stream;
use std::{future::Future, pin::Pin, task::Poll};

struct FutureSource {/* omitted */}

impl FutureSource {
    pub fn get_next(&mut self) -> FutureString {
        FutureString {
            source: self,
            important_state: todo!(),
        }
    }
}

struct FutureString<'a> {
    source: &'a mut FutureSource,
    important_state: String,
}

impl<'a> Future for FutureString<'a> {
    type Output = String;

    fn poll(self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
        unimplemented!()
    }
}

struct StreamAdapter<'a> {
    future_source: &'a mut FutureSource,
    future: Option<FutureString<'a>>,
}

impl<'a> Stream for StreamAdapter<'a> {
    type Item = <FutureString<'a> as Future>::Output;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        if self.future.is_none() {
            self.future.insert(self.future_source.get_next());
        }
        let fut = self.future.as_mut().unwrap();

        match Pin::new(fut).poll(cx) {
            Poll::Ready(value) => {
                self.future = None;
                Poll::Ready(Some(value))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

这不能编译并出现以下错误:

error[E0499]: cannot borrow `self` as mutable more than once at a time
  --> src/main.rs:41:32
   |
41 |             self.future.insert(self.future_source.get_next());
   |             ----        ------ ^^^^ second mutable borrow occurs here
   |             |           |
   |             |           first borrow later used by call
   |             first mutable borrow occurs here

error: lifetime may not live long enough
  --> src/main.rs:41:32
   |
33 | impl<'a> Stream for StreamAdapter<'a> {
   |      -- lifetime `'a` defined here
...
37 |         mut self: Pin<&mut Self>,
   |                       - let's call the lifetime of this reference `'1`
...
41 |             self.future.insert(self.future_source.get_next());
   |                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ argument requires that `'1` must outlive `'a`

error[E0499]: cannot borrow `self` as mutable more than once at a time
  --> src/main.rs:43:19
   |
33 | impl<'a> Stream for StreamAdapter<'a> {
   |      -- lifetime `'a` defined here
...
41 |             self.future.insert(self.future_source.get_next());
   |                                -----------------------------
   |                                |
   |                                first mutable borrow occurs here
   |                                argument requires that `self` is borrowed for `'a`
42 |         }
43 |         let fut = self.future.as_mut().unwrap();
   |                   ^^^^ second mutable borrow occurs here

error[E0499]: cannot borrow `self` as mutable more than once at a time
  --> src/main.rs:47:17
   |
33 | impl<'a> Stream for StreamAdapter<'a> {
   |      -- lifetime `'a` defined here
...
41 |             self.future.insert(self.future_source.get_next());
   |                                -----------------------------
   |                                |
   |                                first mutable borrow occurs here
   |                                argument requires that `self` is borrowed for `'a`
...
47 |                 self.future = None;
   |                 ^^^^ second mutable borrow occurs here

我对错误的理解:

  • cannot borrow `self` as mutable more than once at a time
    self中的可变借用self.future_source.get_next()具有生命周期'a,这必然意味着借用的生命周期self必须至少为'a。此生命周期超出了整个函数调用的范围,这意味着我们self根本无法再次借用。
  • lifetime may not live long enough:
    这个对我来说有点令人困惑,但我们还是来看看:future 借用了 ,self这'a意味着 的引用的生命周期self必须至少为'a。虽然这当然是可能的,但编译器无法强制执行。我可以指定 的引用self具有生命周期,'a但这与特征 fn 不一致。

有没有办法在安全的 Rust 中做到这一点?还是我需要使用不安全的?或者无论我怎么做,我试图做的事情都是不合理的?

asynchronous
  • 1 个回答
  • 34 Views
Martin Hope
NoBullsh1t
Asked: 2025-02-12 05:30:55 +0800 CST

如何使用异步函数创建特征抽象?

  • 5

我正在构建一个应用程序。它有一个核心构件,我称之为“大脑”,它了解所有子系统(例如 DB)。

我不想直接使用任何特定的 DB 实现,而是使用特征对象(或任何可以实现相同目的的东西)。

所以看起来就像

struct Brain {
  db: Box<dyn DbTrait>,
}

但是,我也希望所有代码都是高效的,因此我使用 tokio,并且大多数内容都是异步的

trait DbTrait {
  fn do_something(&self) -> impl Future<Output = Whatever>;
}

这会产生冲突,因为 Rust 无法从返回impl类型的特征中创建特征对象,如下面的 MWE 所强调的那样:

trait ReturnsFutures {
    fn futuristic(&self) -> impl Future<Output = ()>;
}

struct FutureReturner {}

impl ReturnsFutures for FutureReturner {
    async fn futuristic(&self) {
        println!("just fake async");
    }
}

struct OtherFutureReturner {}

impl ReturnsFutures for OtherFutureReturner {
    async fn futuristic(&self) {
            println!("also fakin' it");
    }
}

struct FuturisticService {
    future_returner: Box<dyn ReturnsFutures>,
}

给出错误

the trait `ReturnsFutures` cannot be made into an object
consider moving `futuristic` to another trait
the following types implement the trait, consider defining an enum where each variant holds one of these types, implementing `ReturnsFutures` for this new enum and using it instead:
  FutureReturner
  OtherFutureReturnerrustcClick for full compiler diagnostic

目前,我能想到一种解决方法,即改为异步,特征可以返回一个通道来等待结果。虽然我觉得更复杂,但这可能是相当可行的。对于这类问题,还有其他更可行/惯用的解决方案吗?

asynchronous
  • 1 个回答
  • 33 Views
Martin Hope
user1233894
Asked: 2024-12-15 09:33:33 +0800 CST

尝试从全局函数传递值

  • 6

我正在尝试从另一个视图调用的函数传递 Double 值,并且该函数在 AlamoFire 请求完成之前返回一个值

struct GetPrice {

    static func getThePrice(book: String) async -> Double? {
        
        var theNetPrice: Double = 0.0
        var iListPrice = ""
        let url = "https://example.com"
        
        let headers: HTTPHeaders = [
            "accept": "application/xml",
            "Authorization": "Basic \xxx",
           
            "Content-Type": "application/x-www-form-urlencoded"
        ]

        let body: [String: Any] = ["xxxxx"]

        AF.request(url, method: .post, parameters: body, encoding: URLEncoding.default, headers: headers)
            .responseData { response in
            var statusCode = response.response?.statusCode
                //print("response.result ", response.result)
            switch response.result {
            case .success:
                let data = response.data
                let s = String(data: data!, encoding: .utf8)
                let xml = XMLHash.parse(String(s!))
                
                    iListPrice = xml["xxx"].element!.text
                    theNetPrice = Double(iListPrice)
                    print(theNetPrice) // this shows correct value
                    //return theNetPrice -- this would give error 'Cannot convert value of type 'Double' to closure result type 'Void''
            case .failure(let error):
                statusCode = error._code
                print("status code is: \(String(describing: statusCode))")
                
                print(error)
            }
        }
        
        return theNetPrice //returns 0.0
    }
}
'''
asynchronous
  • 1 个回答
  • 33 Views
Martin Hope
Alexander
Asked: 2024-11-16 01:24:40 +0800 CST

使用 HttpClient 的 F# 异步表达式

  • 5

此示例取自 dotnet/fsharp 文档,由于以下原因,它已过时 - “此构造已弃用。WebRequest、HttpWebRequest、ServicePoint 和 WebClient 已过时。请改用 HttpClient。” https://learn.microsoft.com/en-us/dotnet/fsharp/language-reference/async-expressions

这个例子使用 HttpClient 看起来怎么样?

需要说明的是,我是 Web 开发的新手,但我确实了解异步表达式是什么,只是在将示例转换为使用 HttpClient 时遇到了麻烦。

open System.Net
open Microsoft.FSharp.Control.WebExtensions

let urlList = [ "Microsoft.com", "http://www.microsoft.com/"
                "MSDN", "http://msdn.microsoft.com/"
                "Bing", "http://www.bing.com"
            ]

let fetchAsync(name, url:string) =
    async {
        try
            let uri = new System.Uri(url)
            let webClient = new WebClient()
            let! html = webClient.AsyncDownloadString(uri)
            printfn "Read %d characters for %s" html.Length name
        with
            | ex -> printfn "%s" (ex.Message);
    }

let runAll() =
    urlList
    |> Seq.map fetchAsync
    |> Async.Parallel
    |> Async.RunSynchronously
    |> ignore

runAll()
asynchronous
  • 1 个回答
  • 40 Views
Martin Hope
narumi
Asked: 2024-11-05 02:51:24 +0800 CST

为什么 Future 实现中的 `while let` 会触发 Waker,而 `if let` 却不会?

  • 9

我一直在 Rust 中研究 Future 实现,但遇到了一些我无法完全理解的行为。具体来说,我在方法std::sync::mpsc::Receiver内部使用poll,并尝试在while let和之间做出选择if let以接收消息。

这是我的轮询方法的简化版本:

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
    let this = self.get_mut();

    // Option 1: Using `while let`
    while let Ok(new_attributes) = this.receiver.try_recv() {
        // Process the message
    }

    // Option 2: Using `if let`
    if let Ok(new_attributes) = this.receiver.try_recv() {
        // Process the message
    }

    Poll::Pending
}

我观察到的情况

当我使用 while let 时,一切似乎都按预期工作:当新消息到达时,Future 会重新轮询。但是,当我使用 if let 时,Future 似乎挂起了,并且永远不会再次唤醒,即使有新消息可用。

我的理解

我知道 Waker 应该用于通知执行者应该重新轮询 Future。鉴于我的轮询函数始终返回 Poll::Pending,我预计 Future 将继续被重复轮询,除非进程本身停止。但是,我不明白为什么 while let 确保 Waker 被正确触发,而 if let 似乎没有这样做。

其他背景信息

未来是这样产生的:

ctx.task_executor()
    .spawn_critical_blocking("transaction execution service", Box::pin(fut_struct));

spawn_critical_blocking 函数的工作原理如下:

pub fn spawn_critical_blocking<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
where
    F: Future<Output = ()> + Send + 'static,
{
    self.spawn_critical_as(name, fut, TaskKind::Blocking)
}

在内部:

fn spawn_critical_as<F>(
    &self,
    name: &'static str,
    fut: F,
    task_kind: TaskKind,
) -> JoinHandle<()>
where
    F: Future<Output = ()> + Send + 'static,
{
    // Wrapping the future and handling task errors
    let task = std::panic::AssertUnwindSafe(fut)
        .catch_unwind()
        .map_err(...);

    let task = async move {
        // Handling task shutdown and execution
        let task = pin!(task);
        let _ = select(on_shutdown, task).await;
    };

    self.spawn_on_rt(task, task_kind)
}

然后该spawn_on_rt方法使用spawn_blocking。

我的问题

  • 为什么使用 while let 可以确保 Waker 被触发,但使用 if let
  • 做

不是?

  • 导致这种行为差异的根本机制是什么?

如有任何见解或解释我将不胜感激!

asynchronous
  • 1 个回答
  • 68 Views
Martin Hope
yushizhao
Asked: 2024-11-02 14:23:56 +0800 CST

为什么被屏蔽的tokio worker还能同时工作?

  • 5

当我跑步时

use tokio::sync::mpsc;

#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
async fn main() {
    let (tx, mut rx) = mpsc::channel(1);

    tokio::spawn(async move {
        while let Some(i) = rx.recv().await {
            println!("got = {}", i);
        } 
    });

    for i in 0..5 {

        // busy calculation
        std::thread::sleep(std::time::Duration::from_millis(10));

        match tx.try_send(i) {
            Ok(_) => {
                println!("sent = {}", i);
            },
            Err(err) => {
                println!("{}", err);
            }
        };

    };
 
}

我得到了

sent = 0
got = 0
sent = 1
got = 1
sent = 2
got = 2
sent = 3
got = 3
sent = 4

据我了解,唯一的工作者正在处理 for 循环,因为它从不产生结果。唯一的工作者应该没有机会处理接收。因此,通道在第一次发送后应该是满的。结果我错了。我遗漏了什么?

asynchronous
  • 1 个回答
  • 35 Views
Martin Hope
Fee
Asked: 2024-10-24 00:27:06 +0800 CST

每次创建未来时,`pub fn long(&self) -> impl Future<Output = ()> + 'static` 是否会泄露数据?

  • 6

基本上,基于这个答案:

pub fn long() -> impl Future<Output = ()> + 'static {
  let s = String::from("what is the answer?");
  async move {
     // `s` gets moved into this block
     println!("{s}");
  }
}

#[tokio::main]
async fn main() {
    let future = long(); // Create the future
    future.await; // Await the future, it will complete, and `s` will be dropped
}

我的问题是,这个'static界限是否会在每次创建未来时泄漏数据?或者'static这里的界限是否意味着特征实现必须是静态的?(例如,函数定义在内存中)。到目前为止,我有:

  1. 问 ChatGPT:它说awaiting 放弃了未来
  2. 书中这段话没有回答。
  3. 阅读《rustonomicon》中关于终身强制的规定
  4. 阅读 类似的 问题

我主要想概念化“静态”的需要和效果;代码已经可以运行了。

背景

我有一个阅读器可以读取某些部分,而对于另一个阅读器,它可能需要先加载其他数据才能继续。我想到最好的办法是:

type Image = Vec<u8>;
#[async_trait]
trait Reader {
    async fn read_into_buffer(&self, offset: u32, buf: &mut [u8]) {
       sleep(2);
       buf.fill(42);
    }
}

struct Decoder<R: Reader> {
  reader: Arc<R>,
  images: HashMap<u32, Arc<Image>>,
}

impl<R> Decoder<R> {
  fn decode_chunk(&self, overview: u32, chunk: usize) -> Result<(),impl Future<Output = Vec<u8>> {
    if let Some(im) = self.images.get(overview) {
      // cheap clones because they are Arcs
      let image = im.clone()
      let r = self.reader.clone()
      async move {
        // don't mention `self` in here
        let buf = vec![0; 42];
        let offset = image[chunk];
        r.read_into_buffer(offset, buf);
        buf
      }
    } else {
        Err(())
    }
  }

  async fn load_overview(&mut self, offset: u32) {
    let buf = vec![0;42];
    self.reader.read_into_buffer(offset, buf).await;
    self.images.insert(offset, buf);
  }
}

可以这样使用:

decoder.load_overview(42).await;
let chunk_1 = decoder.decode_chunk(42, 13).unwrap(); // no await'ing
let chunk_2_err = decoder.decode_chunk(13).unwrap_err(); // chunk 13 isn't loaded
decoder.load_overview(13).await;
let chunk_2 = decoder.decode_chunk(13, 13).unwrap();
let res = (chunk_1.await, chunk_2.await)
asynchronous
  • 1 个回答
  • 64 Views
Martin Hope
snOm3ad
Asked: 2024-10-18 01:16:11 +0800 CST

通过特征 A *或* B 满足特征 C

  • 5

你可以很容易地要求一个新接口满足 A 和 B:

pub trait Writer: AsyncWriter + BlockingWriter {}

但是,我应该能够Writer通过实现其中任何一个来满足接口。为此必须使用枚举很麻烦,因为我必须携带我不打算使用的选项的类型信息:

pub enum Writer<A: AsyncWriter, B: BlockingWriter> {
    Async(A),
    Blocking(B),
}

如果我想构造一个仅使用异步版本的编写器,我仍然必须传递有关阻塞编写器的类型信息。你可以聪明地使用以下命令从枚举中删除类型信息dyn:

pub enum Writer<'async, 'blocking> {
    Async(&'async dyn AsyncWriter),
    Blocking(&'blocking dyn BlockingWriter),
}

问题是这只适用于对象安全特征,所以它不是完全万无一失的!同样的情况也适用于使用Box<dyn T>,它在嵌入式环境中也不起作用,因为有时你没有堆。

有没有更好的方法来思考这个问题?使用单独的类型是唯一的方法吗?

asynchronous
  • 1 个回答
  • 61 Views
Martin Hope
ILIKETOLEARN
Asked: 2024-10-05 05:01:45 +0800 CST

在异步 SQLAlchemy 中访问提交后的查询结果会产生错误

  • 5

因此,我一直在测试一些东西,并注意到每当我访问某些查询结果的更新值时,它都会抛出一个错误。看看下面的代码

async def createUser() -> JSONResponse:
    async with Session() as session:
        userRawQuery = await session.execute(
            select(User).filter(User.id == 'some-value')
        )
        user = userRawQuery.scalar()
        # Change Username - For Testing Purposes
        user.username = 'John'
        await session.commit()
        return JSONResponse(
            content = {"msg": "Updated Creator Request"},
            status_code = 200
        )

这段代码运行没有问题。但是当我这样做时

return JSONResponse(
    content = {
        "user": {
            "id": user.id,
            "username": user.username,
            "age": user.age
        }
    },
    status_code = 200
)

它会产生错误。出于某种原因,访问提交后的数据会产生错误。很奇怪。知道为什么会发生这种情况吗?SQLAlchemy 的同步方法不会出现此错误,但异步方法会出现。

  • 尝试使用替代方法,如“query()”方法,但它不支持 SQLAlchemy 的异步版本

  • 尝试了一下代码,以确定我刚才介绍的操作是否真的会导致错误

asynchronous
  • 1 个回答
  • 18 Views
Martin Hope
john
Asked: 2024-08-25 06:24:31 +0800 CST

AlpineJS异步数据加载

  • 5

我第一次研究 AlpineJS,并尝试将一些数据加载到x-data属性中。

我有类似的东西

<body x-data="data">
...

在我的 HTML 中。data我在其中存储了稍后使用的所有数据和函数(这是一个非常简单的 SPA)

document.addEventListener('alpine:init', () => {
  Alpine.data('data', () => ({
    foo: 1,
    bar: 2,
    ...
  }));
}

在所有变量同步之前,一切都运行正常,但现在我需要将从数据库异步获取的一些数据分配给一个变量。

我尝试了一些简单的事情

document.addEventListener('alpine:init', () => {
  Alpine.data('data', async () => ({
    foo: 1,
    bar: await getData(),
    ...
  }));
}

不幸的是,这不起作用。我得到了Uncaught ReferenceError: VARIABLE_NAME is not defined里面每个变量的data。

我该如何解决这个问题?有没有标准的方法来处理这种情况?

asynchronous
  • 2 个回答
  • 21 Views

Sidebar

Stats

  • 问题 205573
  • 回答 270741
  • 最佳答案 135370
  • 用户 68524
  • 热门
  • 回答
  • Marko Smith

    重新格式化数字,在固定位置插入分隔符

    • 6 个回答
  • Marko Smith

    为什么 C++20 概念会导致循环约束错误,而老式的 SFINAE 不会?

    • 2 个回答
  • Marko Smith

    VScode 自动卸载扩展的问题(Material 主题)

    • 2 个回答
  • Marko Smith

    Vue 3:创建时出错“预期标识符但发现‘导入’”[重复]

    • 1 个回答
  • Marko Smith

    具有指定基础类型但没有枚举器的“枚举类”的用途是什么?

    • 1 个回答
  • Marko Smith

    如何修复未手动导入的模块的 MODULE_NOT_FOUND 错误?

    • 6 个回答
  • Marko Smith

    `(表达式,左值) = 右值` 在 C 或 C++ 中是有效的赋值吗?为什么有些编译器会接受/拒绝它?

    • 3 个回答
  • Marko Smith

    在 C++ 中,一个不执行任何操作的空程序需要 204KB 的堆,但在 C 中则不需要

    • 1 个回答
  • Marko Smith

    PowerBI 目前与 BigQuery 不兼容:Simba 驱动程序与 Windows 更新有关

    • 2 个回答
  • Marko Smith

    AdMob:MobileAds.initialize() - 对于某些设备,“java.lang.Integer 无法转换为 java.lang.String”

    • 1 个回答
  • Martin Hope
    Fantastic Mr Fox msvc std::vector 实现中仅不接受可复制类型 2025-04-23 06:40:49 +0800 CST
  • Martin Hope
    Howard Hinnant 使用 chrono 查找下一个工作日 2025-04-21 08:30:25 +0800 CST
  • Martin Hope
    Fedor 构造函数的成员初始化程序可以包含另一个成员的初始化吗? 2025-04-15 01:01:44 +0800 CST
  • Martin Hope
    Petr Filipský 为什么 C++20 概念会导致循环约束错误,而老式的 SFINAE 不会? 2025-03-23 21:39:40 +0800 CST
  • Martin Hope
    Catskul C++20 是否进行了更改,允许从已知绑定数组“type(&)[N]”转换为未知绑定数组“type(&)[]”? 2025-03-04 06:57:53 +0800 CST
  • Martin Hope
    Stefan Pochmann 为什么 {2,3,10} 和 {x,3,10} (x=2) 的顺序不同? 2025-01-13 23:24:07 +0800 CST
  • Martin Hope
    Chad Feller 在 5.2 版中,bash 条件语句中的 [[ .. ]] 中的分号现在是可选的吗? 2024-10-21 05:50:33 +0800 CST
  • Martin Hope
    Wrench 为什么双破折号 (--) 会导致此 MariaDB 子句评估为 true? 2024-05-05 13:37:20 +0800 CST
  • Martin Hope
    Waket Zheng 为什么 `dict(id=1, **{'id': 2})` 有时会引发 `KeyError: 'id'` 而不是 TypeError? 2024-05-04 14:19:19 +0800 CST
  • Martin Hope
    user924 AdMob:MobileAds.initialize() - 对于某些设备,“java.lang.Integer 无法转换为 java.lang.String” 2024-03-20 03:12:31 +0800 CST

热门标签

python javascript c++ c# java typescript sql reactjs html

Explore

  • 主页
  • 问题
    • 最新
    • 热门
  • 标签
  • 帮助

Footer

AskOverflow.Dev

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve