AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • 主页
  • 系统&网络
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • 主页
  • 系统&网络
    • 最新
    • 热门
    • 标签
  • Ubuntu
    • 最新
    • 热门
    • 标签
  • Unix
    • 最新
    • 标签
  • DBA
    • 最新
    • 标签
  • Computer
    • 最新
    • 标签
  • Coding
    • 最新
    • 标签
主页 / coding / 问题 / 77672103
Accepted
matsuisa
matsuisa
Asked: 2023-12-17 02:47:13 +0800 CST2023-12-17 02:47:13 +0800 CST 2023-12-17 02:47:13 +0800 CST

Rust 中的共享内存

  • 772

环境:

macOS Sonoma Ver.14.0 (M1 mac) Rust Ver.1.65.0

我想要做什么:我想在多线程之间共享一个带有 [u8;128] 元素数组的 vec。我在分享时要执行的要求如下。

  1. 整个 vec 必须可读
  2. 能够重写特定 [u8; 的元素 [128] 输入向量
  3. 能够插入 [u8; 类型的数据 128]进入向量

下面是我写的代码,不过这段代码读起来可以,但是有一个问题就是写的没有体现出来。如果我运行此代码,然后在执行该代码的计算机上运行以下命令一次

    nc -v 本地主机 50051

    [[0u8; 128],[1u8; 128],[2u8; 128]]

将被输出。到目前为止这是正确的,但第二次运行的数据输出与第一次运行相同。我的意图是第二个元素将输出具有 3 个填充的数据,如下所示,因为我正在第一次运行中更新数据。

    [[0u8; 128],[3u8; 128],[2u8; 128]]

我猜测我对 Arc 的使用是错误的,它实际上是传递 SharedData 的克隆,而不是对 SharedData 的引用,但我不知道如何识别这一点。如何修复代码以使其按我的预期工作?

主要.rs:

use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::RwLock;
use std::time::Duration;
use tokio_task_pool::Pool;

struct SharedData {
    data: Arc<RwLock<Vec<[u8; 128]>>>
}

impl SharedData {
    fn new(data: RwLock<Vec<[u8; 128]>>) -> Self {
        Self {
            data: Arc::new(data)
        }
    }

    fn update(&self, index: usize, update_data: [u8; 128]) {
        let read_guard_for_array = self.data.read().unwrap();
        let write_lock = RwLock::new((*read_guard_for_array)[index]);
        let mut write_guard_for_item = write_lock.write().unwrap();
        *write_guard_for_item = update_data;
    }
}

fn socket_to_async_tcplistener(s: socket2::Socket) -> std::io::Result<tokio::net::TcpListener> {
    std::net::TcpListener::from(s).try_into()
}

async fn process(mut stream: tokio::net::TcpStream, db_arc: Arc<SharedData>) {
    let read_guard = db_arc.data.read().unwrap();
    println!("In process() read: {:?}", *read_guard);
    db_arc.update(1, [3u8; 128]);
}

async fn serve(_: usize, tcplistener_arc: Arc<tokio::net::TcpListener>, db_arc: Arc<SharedData>) {
    let task_pool_capacity = 10;

    let task_pool = Pool::bounded(task_pool_capacity)
        .with_spawn_timeout(Duration::from_secs(300))
        .with_run_timeout(Duration::from_secs(300));
    
    loop {
        let (stream, _) = tcplistener_arc.as_ref().accept().await.unwrap();
        let db_arc_clone = db_arc.clone();

        task_pool.spawn(async move {
            process(stream, db_arc_clone).await;
        }).await.unwrap();
    }
}

#[tokio::main]
async fn main() {
    let addr: std::net::SocketAddr = "0.0.0.0:50051".parse().unwrap();
    let soc2 = socket2::Socket::new(
        match addr {
            SocketAddr::V4(_) => socket2::Domain::IPV4,
            SocketAddr::V6(_) => socket2::Domain::IPV6,
        },
        socket2::Type::STREAM,
        Some(socket2::Protocol::TCP)
    ).unwrap();
    
    soc2.set_reuse_address(true).unwrap();
    soc2.set_reuse_port(true).unwrap();
    soc2.set_nonblocking(true).unwrap();
    soc2.bind(&addr.into()).unwrap();
    soc2.listen(8192).unwrap();

    let tcp_listener = Arc::new(socket_to_async_tcplistener(soc2).unwrap());

    let mut vec = vec![
        [0u8; 128],
        [1u8; 128],
        [2u8; 128],
    ];

    let share_db = Arc::new(SharedData::new(RwLock::new(vec)));
    let mut handlers = Vec::new();
    for i in 0..num_cpus::get() - 1 {
        let cloned_listener = Arc::clone(&tcp_listener);
        let db_arc = share_db.clone();

        let h = std::thread::spawn(move || {
            tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .unwrap()
                .block_on(serve(i, cloned_listener, db_arc));
        });
        handlers.push(h);
    }

    for h in handlers {
        h.join().unwrap();
    }
}

货物.toml:

[package]
name = "tokio-test"
version = "0.1.0"
edition = "2021"

[dependencies]
log = "0.4.20"
env_logger = "0.10.0"
tokio = { version = "1.34.0", features = ["full"] }
tokio-stream = { version = "0.1.14", features = ["net"] }
serde = { version = "1.0.193", features = ["derive"] }
serde_yaml = "0.9.27"
serde_derive = "1.0.193"
mio = {version="0.8.9", features=["net", "os-poll", "os-ext"]}
num_cpus = "1.16.0"
socket2 = { version="0.5.5", features = ["all"]}
array-macro = "2.1.8"
tokio-task-pool = "0.1.5"
argparse = "0.2.2"
multithreading
  • 2 2 个回答
  • 54 Views

2 个回答

  • Voted
  1. Jmb
    2023-12-17T03:29:33+08:002023-12-17T03:29:33+08:00
    fn update(&self, index: usize, update_data: [u8; 128]) {
        let read_guard_for_array = self.data.read().unwrap();
        let write_lock = RwLock::new((*read_guard_for_array)[index]);
    

    这会创建数据的副本并将其包装在无用的RwLock(无用的,因为该副本始终保存在单个线程中)。

        let mut write_guard_for_item = write_lock.write().unwrap();
        *write_guard_for_item = update_data;
    }
    

    这会修改副本,然后在函数结束时立即将其丢弃。

    相反,您需要锁定RwLock已有的:

    fn update(&self, index: usize, update_data: [u8; 128]) {
        let mut write_guard = self.data.write().unwrap();
        write_guard[index] = update_data;
    }
    

    请注意,无法仅获得特定项目的写锁和整个数组的读锁:读锁和写锁必须与相同的数据相关。这意味着您还需要在更新之前释放读锁:

    async fn process(mut stream: tokio::net::TcpStream, db_arc: Arc<SharedData>) {
        let read_guard = db_arc.data.read().unwrap();
        println!("In process() read: {:?}", *read_guard);
        drop (read_guard);
        db_arc.update(1, [3u8; 128]);
    }
    
    • 1
  2. Best Answer
    Yoric
    2023-12-17T03:35:08+08:002023-12-17T03:35:08+08:00

    我没有看过完整的代码,但有一些错误。

    fn update()

        fn update(&self, index: usize, update_data: [u8; 128]) {
            let read_guard_for_array = self.data.read().unwrap();
            let write_lock = RwLock::new((*read_guard_for_array)[index]);
            let mut write_guard_for_item = write_lock.write().unwrap();
            *write_guard_for_item = update_data;
        }
    

    这不是你使用 a 的方式RwLock:

    • 如果你想修改数据,不要使用self.data.read(),而是使用self.data.write();
    • 我不确定你第二次打算用它做什么RwLock,但它没有用。

    相反,做类似的事情

        fn update(&self, index: usize, update_data: [u8; 128]) {
            let write_guard_for_array = self.data.write().unwrap();
            write_guard_for_array[index] = update_data;
        }
    

    fn process()

    async fn process(mut stream: tokio::net::TcpStream, db_arc: Arc<SharedData>) {
        let read_guard = db_arc.data.read().unwrap();
        println!("In process() read: {:?}", *read_guard);
        db_arc.update(1, [3u8; 128]);
    }
    

    一般来说,您可能不应该db_arc.data直接访问。但除此之外,一旦你修复了 function update(),这就会陷入僵局:

    1. 你获得db_arc.data.read(). 根据 a 的定义,这意味着在释放读锁之前RwLock没有人可以修改 的内容。db_arc.data
    2. 读锁仅在作用域结束时释放。
    3. 在范围结束之前,您调用update(),它将尝试获取data.write()。但在读锁被释放之前它无法获取它。

    您可能想要以下内容:

    async fn process(mut stream: tokio::net::TcpStream, db_arc: Arc<SharedData>) {
        {
          let read_guard = db_arc.data.read().unwrap();
          println!("In process() read: {:?}", *read_guard);
        } // End of scope, `read_guard` is released.
        db_arc.update(1, [3u8; 128]);
    }
    

    东京 + 话题

    你正在混合线程和 tokio。理论上是可行的,但是有风险。两种选择都是有效的,但我建议选择其中之一。通常,如果您有大量 I/O(例如网络请求或磁盘访问),请选择 tokio;如果您有大量 CPU 使用率,请选择线程。

    • 1

相关问题

  • JMeter 属性并发写入

  • 如何在生成的 tauri 异步运行时线程中使用托管 Tauri 状态变量?

  • 主线程中额外的 println 导致 Rust 执行不同的结果

  • 从缓存中刷新低争用原子的最佳方式?

  • Rust:遍历文件夹并打开每个文件

Sidebar

Stats

  • 问题 205573
  • 回答 270741
  • 最佳答案 135370
  • 用户 68524
  • 热门
  • 回答
  • Marko Smith

    使用 <font color="#xxx"> 突出显示 html 中的代码

    • 2 个回答
  • Marko Smith

    为什么在传递 {} 时重载解析更喜欢 std::nullptr_t 而不是类?

    • 1 个回答
  • Marko Smith

    您可以使用花括号初始化列表作为(默认)模板参数吗?

    • 2 个回答
  • Marko Smith

    为什么列表推导式在内部创建一个函数?

    • 1 个回答
  • Marko Smith

    我正在尝试仅使用海龟随机和数学模块来制作吃豆人游戏

    • 1 个回答
  • Marko Smith

    java.lang.NoSuchMethodError: 'void org.openqa.selenium.remote.http.ClientConfig.<init>(java.net.URI, java.time.Duration, java.time.Duratio

    • 3 个回答
  • Marko Smith

    为什么 'char -> int' 是提升,而 'char -> Short' 是转换(但不是提升)?

    • 4 个回答
  • Marko Smith

    为什么库中不调用全局变量的构造函数?

    • 1 个回答
  • Marko Smith

    std::common_reference_with 在元组上的行为不一致。哪个是对的?

    • 1 个回答
  • Marko Smith

    C++17 中 std::byte 只能按位运算?

    • 1 个回答
  • Martin Hope
    fbrereto 为什么在传递 {} 时重载解析更喜欢 std::nullptr_t 而不是类? 2023-12-21 00:31:04 +0800 CST
  • Martin Hope
    比尔盖子 您可以使用花括号初始化列表作为(默认)模板参数吗? 2023-12-17 10:02:06 +0800 CST
  • Martin Hope
    Amir reza Riahi 为什么列表推导式在内部创建一个函数? 2023-11-16 20:53:19 +0800 CST
  • Martin Hope
    Michael A fmt 格式 %H:%M:%S 不带小数 2023-11-11 01:13:05 +0800 CST
  • Martin Hope
    God I Hate Python C++20 的 std::views::filter 未正确过滤视图 2023-08-27 18:40:35 +0800 CST
  • Martin Hope
    LiDa Cute 为什么 'char -> int' 是提升,而 'char -> Short' 是转换(但不是提升)? 2023-08-24 20:46:59 +0800 CST
  • Martin Hope
    jabaa 为什么库中不调用全局变量的构造函数? 2023-08-18 07:15:20 +0800 CST
  • Martin Hope
    Panagiotis Syskakis std::common_reference_with 在元组上的行为不一致。哪个是对的? 2023-08-17 21:24:06 +0800 CST
  • Martin Hope
    Alex Guteniev 为什么编译器在这里错过矢量化? 2023-08-17 18:58:07 +0800 CST
  • Martin Hope
    wimalopaan C++17 中 std::byte 只能按位运算? 2023-08-17 17:13:58 +0800 CST

热门标签

python javascript c++ c# java typescript sql reactjs html

Explore

  • 主页
  • 问题
    • 最新
    • 热门
  • 标签
  • 帮助

Footer

AskOverflow.Dev

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve