AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • Início
  • system&network
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • Início
  • system&network
    • Recentes
    • Highest score
    • tags
  • Ubuntu
    • Recentes
    • Highest score
    • tags
  • Unix
    • Recentes
    • tags
  • DBA
    • Recentes
    • tags
  • Computer
    • Recentes
    • tags
  • Coding
    • Recentes
    • tags
Início / coding / Perguntas / 77672103
Accepted
matsuisa
matsuisa
Asked: 2023-12-17 02:47:13 +0800 CST2023-12-17 02:47:13 +0800 CST 2023-12-17 02:47:13 +0800 CST

Memória compartilhada em Rust

  • 772

Ambiente:

macOS Sonoma Ver.14.0 (M1 mac) Rust Ver.1.65.0

O que eu quero fazer: quero compartilhar um vec com uma matriz de [u8;128] elementos entre multithreads. Os requisitos que desejo cumprir ao compartilhar são os seguintes.

  1. todo o vec deve ser legível
  2. ser capaz de reescrever elementos de um [u8; 128] digite o vec
  3. ser capaz de inserir dados do tipo [u8; 128] em vec

Abaixo está o código que escrevi, mas esse código pode fazer até leitura, mas há um problema que a escrita não é refletida. Se eu executar este código e depois executar o seguinte comando uma vez no computador onde ele foi executado

    nc -v localhost50051

    [[0u8; 128],[1u8; 128],[2u8; 128]]

será produzido. Isso está correto até este ponto, mas a saída de dados na segunda execução é a mesma da primeira execução. Minha intenção é que o segundo elemento produza dados com 3 preenchimentos conforme mostrado abaixo, pois estou atualizando os dados na primeira execução.

    [[0u8; 128],[3u8; 128],[2u8; 128]]

Suponho que meu uso do Arc esteja errado e que na verdade seja um clone do SharedData sendo transmitido em vez de uma referência ao SharedData, mas não sei como posso identificar isso. Como posso corrigir o código para que funcione como pretendido?

principal.rs:

use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::RwLock;
use std::time::Duration;
use tokio_task_pool::Pool;

struct SharedData {
    data: Arc<RwLock<Vec<[u8; 128]>>>
}

impl SharedData {
    fn new(data: RwLock<Vec<[u8; 128]>>) -> Self {
        Self {
            data: Arc::new(data)
        }
    }

    fn update(&self, index: usize, update_data: [u8; 128]) {
        let read_guard_for_array = self.data.read().unwrap();
        let write_lock = RwLock::new((*read_guard_for_array)[index]);
        let mut write_guard_for_item = write_lock.write().unwrap();
        *write_guard_for_item = update_data;
    }
}

fn socket_to_async_tcplistener(s: socket2::Socket) -> std::io::Result<tokio::net::TcpListener> {
    std::net::TcpListener::from(s).try_into()
}

async fn process(mut stream: tokio::net::TcpStream, db_arc: Arc<SharedData>) {
    let read_guard = db_arc.data.read().unwrap();
    println!("In process() read: {:?}", *read_guard);
    db_arc.update(1, [3u8; 128]);
}

async fn serve(_: usize, tcplistener_arc: Arc<tokio::net::TcpListener>, db_arc: Arc<SharedData>) {
    let task_pool_capacity = 10;

    let task_pool = Pool::bounded(task_pool_capacity)
        .with_spawn_timeout(Duration::from_secs(300))
        .with_run_timeout(Duration::from_secs(300));
    
    loop {
        let (stream, _) = tcplistener_arc.as_ref().accept().await.unwrap();
        let db_arc_clone = db_arc.clone();

        task_pool.spawn(async move {
            process(stream, db_arc_clone).await;
        }).await.unwrap();
    }
}

#[tokio::main]
async fn main() {
    let addr: std::net::SocketAddr = "0.0.0.0:50051".parse().unwrap();
    let soc2 = socket2::Socket::new(
        match addr {
            SocketAddr::V4(_) => socket2::Domain::IPV4,
            SocketAddr::V6(_) => socket2::Domain::IPV6,
        },
        socket2::Type::STREAM,
        Some(socket2::Protocol::TCP)
    ).unwrap();
    
    soc2.set_reuse_address(true).unwrap();
    soc2.set_reuse_port(true).unwrap();
    soc2.set_nonblocking(true).unwrap();
    soc2.bind(&addr.into()).unwrap();
    soc2.listen(8192).unwrap();

    let tcp_listener = Arc::new(socket_to_async_tcplistener(soc2).unwrap());

    let mut vec = vec![
        [0u8; 128],
        [1u8; 128],
        [2u8; 128],
    ];

    let share_db = Arc::new(SharedData::new(RwLock::new(vec)));
    let mut handlers = Vec::new();
    for i in 0..num_cpus::get() - 1 {
        let cloned_listener = Arc::clone(&tcp_listener);
        let db_arc = share_db.clone();

        let h = std::thread::spawn(move || {
            tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .unwrap()
                .block_on(serve(i, cloned_listener, db_arc));
        });
        handlers.push(h);
    }

    for h in handlers {
        h.join().unwrap();
    }
}

Carga.toml:

[package]
name = "tokio-test"
version = "0.1.0"
edition = "2021"

[dependencies]
log = "0.4.20"
env_logger = "0.10.0"
tokio = { version = "1.34.0", features = ["full"] }
tokio-stream = { version = "0.1.14", features = ["net"] }
serde = { version = "1.0.193", features = ["derive"] }
serde_yaml = "0.9.27"
serde_derive = "1.0.193"
mio = {version="0.8.9", features=["net", "os-poll", "os-ext"]}
num_cpus = "1.16.0"
socket2 = { version="0.5.5", features = ["all"]}
array-macro = "2.1.8"
tokio-task-pool = "0.1.5"
argparse = "0.2.2"
multithreading
  • 2 2 respostas
  • 54 Views

2 respostas

  • Voted
  1. Jmb
    2023-12-17T03:29:33+08:002023-12-17T03:29:33+08:00
    fn update(&self, index: usize, update_data: [u8; 128]) {
        let read_guard_for_array = self.data.read().unwrap();
        let write_lock = RwLock::new((*read_guard_for_array)[index]);
    

    Isso cria uma cópia dos dados e os envolve em algo inútil RwLock(inútil porque essa cópia é sempre mantida em um único thread).

        let mut write_guard_for_item = write_lock.write().unwrap();
        *write_guard_for_item = update_data;
    }
    

    Isso modifica a cópia, que é imediatamente descartada no final da função.

    Em vez disso, você precisa bloquear o RwLockque já possui:

    fn update(&self, index: usize, update_data: [u8; 128]) {
        let mut write_guard = self.data.write().unwrap();
        write_guard[index] = update_data;
    }
    

    Observe que não há como obter um bloqueio de gravação apenas para um item específico e bloqueios de leitura para todo o array: os bloqueios de leitura e gravação devem estar relacionados aos mesmos dados. Isso significa que você também precisa liberar o bloqueio de leitura antes de atualizar:

    async fn process(mut stream: tokio::net::TcpStream, db_arc: Arc<SharedData>) {
        let read_guard = db_arc.data.read().unwrap();
        println!("In process() read: {:?}", *read_guard);
        drop (read_guard);
        db_arc.update(1, [3u8; 128]);
    }
    
    • 1
  2. Best Answer
    Yoric
    2023-12-17T03:35:08+08:002023-12-17T03:35:08+08:00

    Não olhei o código inteiro, mas há alguns erros.

    fn update()

        fn update(&self, index: usize, update_data: [u8; 128]) {
            let read_guard_for_array = self.data.read().unwrap();
            let write_lock = RwLock::new((*read_guard_for_array)[index]);
            let mut write_guard_for_item = write_lock.write().unwrap();
            *write_guard_for_item = update_data;
        }
    

    Não é assim que você usa RwLock:

    • se você quiser modificar os dados, em vez de usar self.data.read(), use self.data.write();
    • Não tenho certeza do que você pretende fazer com isso no segundo RwLock, mas é inútil.

    Em vez disso, faça algo como

        fn update(&self, index: usize, update_data: [u8; 128]) {
            let write_guard_for_array = self.data.write().unwrap();
            write_guard_for_array[index] = update_data;
        }
    

    fn process()

    async fn process(mut stream: tokio::net::TcpStream, db_arc: Arc<SharedData>) {
        let read_guard = db_arc.data.read().unwrap();
        println!("In process() read: {:?}", *read_guard);
        db_arc.update(1, [3u8; 128]);
    }
    

    Geralmente, você provavelmente não deveria acessar db_arc.datadiretamente. Mas, além disso, depois de corrigir function update(), isso entrará em um impasse:

    1. Você adquire db_arc.data.read(). Pela definição de a RwLock, isso significa que ninguém pode modificar o conteúdo de db_arc.dataaté que o bloqueio de leitura seja liberado.
    2. O bloqueio de leitura é liberado somente no final do escopo.
    3. Antes do final do escopo, você chama update(), que tentará adquirir data.write(). Mas não pode adquiri-lo até que o bloqueio de leitura seja liberado.

    Você provavelmente quer algo como:

    async fn process(mut stream: tokio::net::TcpStream, db_arc: Arc<SharedData>) {
        {
          let read_guard = db_arc.data.read().unwrap();
          println!("In process() read: {:?}", *read_guard);
        } // End of scope, `read_guard` is released.
        db_arc.update(1, [3u8; 128]);
    }
    

    Tóquio + tópicos

    Você está misturando threads e tokio. É teoricamente possível, mas arriscado. Ambas as opções são válidas, mas sugiro escolher uma ou outra. Normalmente, escolha tokio se você tiver muitas E/S (por exemplo, solicitações de rede ou acesso ao disco) ou threads se tiver muito uso de CPU.

    • 1

relate perguntas

  • Propriedades JMeter gravando simultaneamente

  • Como usar uma variável de estado Tauri gerenciada dentro de um thread de tempo de execução assíncrono Tauri gerado?

  • Um println extra no thread principal faz com que o Rust execute resultados diferentes

  • ideal para liberar atômicos de baixa contenção dos caches?

  • Rust: Itere em uma pasta e abra cada arquivo

Sidebar

Stats

  • Perguntas 205573
  • respostas 270741
  • best respostas 135370
  • utilizador 68524
  • Highest score
  • respostas
  • Marko Smith

    destaque o código em HTML usando <font color="#xxx">

    • 2 respostas
  • Marko Smith

    Por que a resolução de sobrecarga prefere std::nullptr_t a uma classe ao passar {}?

    • 1 respostas
  • Marko Smith

    Você pode usar uma lista de inicialização com chaves como argumento de modelo (padrão)?

    • 2 respostas
  • Marko Smith

    Por que as compreensões de lista criam uma função internamente?

    • 1 respostas
  • Marko Smith

    Estou tentando fazer o jogo pacman usando apenas o módulo Turtle Random e Math

    • 1 respostas
  • Marko Smith

    java.lang.NoSuchMethodError: 'void org.openqa.selenium.remote.http.ClientConfig.<init>(java.net.URI, java.time.Duration, java.time.Duratio

    • 3 respostas
  • Marko Smith

    Por que 'char -> int' é promoção, mas 'char -> short' é conversão (mas não promoção)?

    • 4 respostas
  • Marko Smith

    Por que o construtor de uma variável global não é chamado em uma biblioteca?

    • 1 respostas
  • Marko Smith

    Comportamento inconsistente de std::common_reference_with em tuplas. Qual é correto?

    • 1 respostas
  • Marko Smith

    Somente operações bit a bit para std::byte em C++ 17?

    • 1 respostas
  • Martin Hope
    fbrereto Por que a resolução de sobrecarga prefere std::nullptr_t a uma classe ao passar {}? 2023-12-21 00:31:04 +0800 CST
  • Martin Hope
    比尔盖子 Você pode usar uma lista de inicialização com chaves como argumento de modelo (padrão)? 2023-12-17 10:02:06 +0800 CST
  • Martin Hope
    Amir reza Riahi Por que as compreensões de lista criam uma função internamente? 2023-11-16 20:53:19 +0800 CST
  • Martin Hope
    Michael A formato fmt %H:%M:%S sem decimais 2023-11-11 01:13:05 +0800 CST
  • Martin Hope
    God I Hate Python std::views::filter do C++20 não filtrando a visualização corretamente 2023-08-27 18:40:35 +0800 CST
  • Martin Hope
    LiDa Cute Por que 'char -> int' é promoção, mas 'char -> short' é conversão (mas não promoção)? 2023-08-24 20:46:59 +0800 CST
  • Martin Hope
    jabaa Por que o construtor de uma variável global não é chamado em uma biblioteca? 2023-08-18 07:15:20 +0800 CST
  • Martin Hope
    Panagiotis Syskakis Comportamento inconsistente de std::common_reference_with em tuplas. Qual é correto? 2023-08-17 21:24:06 +0800 CST
  • Martin Hope
    Alex Guteniev Por que os compiladores perdem a vetorização aqui? 2023-08-17 18:58:07 +0800 CST
  • Martin Hope
    wimalopaan Somente operações bit a bit para std::byte em C++ 17? 2023-08-17 17:13:58 +0800 CST

Hot tag

python javascript c++ c# java typescript sql reactjs html

Explore

  • Início
  • Perguntas
    • Recentes
    • Highest score
  • tag
  • help

Footer

AskOverflow.Dev

About Us

  • About Us
  • Contact Us

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve