AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • Início
  • system&network
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • Início
  • system&network
    • Recentes
    • Highest score
    • tags
  • Ubuntu
    • Recentes
    • Highest score
    • tags
  • Unix
    • Recentes
    • tags
  • DBA
    • Recentes
    • tags
  • Computer
    • Recentes
    • tags
  • Coding
    • Recentes
    • tags
Início / coding / Perguntas / 79336073
Accepted
Aaras
Aaras
Asked: 2025-01-07 20:53:47 +0800 CST2025-01-07 20:53:47 +0800 CST 2025-01-07 20:53:47 +0800 CST

Como ler leb128 de TcpStream em Rust

  • 772

Resumindo, preciso ler uma string de uma conexão TCP enviada por um cliente C#. O cliente usa o BinaryWriter, que prefixa a string real com um comprimento no formato leb128. Estou usando tokio::net::TcpStreamem Rust e procurei por uma caixa para me ajudar a recuperar esse prefixo de comprimento do fluxo, mas não consegui encontrar nada adequado. A maioria das soluções requer que a fonte da qual você está lendo implemente o io::Readtrait, mas tokio::net::TcpStreamnão o implementa.

Consegui fazê-lo funcionar com esse código feio, mas desconfiei dele desde o começo. Recentemente, descobri que às vezes ele leva a algum tipo de condição de corrida. Não tenho certeza, mas acho que ele é bloqueado no let file_name_len = leb128::read::unsigned(&mut socket)?;, o que de alguma forma faz com que meu ouvinte TCP pare de aceitar novas conexões, o que é ainda mais estranho.

let mut socket = socket.into_std()?;
socket.set_nonblocking(false)?;

let file_name_len = leb128::read::unsigned(&mut socket)?;

let mut socket = tokio::net::TcpStream::from_std(socket)?;

Alguém sabe o caminho certo para fazer isso?

sockets
  • 1 1 respostas
  • 24 Views

1 respostas

  • Voted
  1. Best Answer
    Matthieu M.
    2025-01-08T00:22:05+08:002025-01-08T00:22:05+08:00

    Não tenho certeza, mas acho que ele é bloqueado no let file_name_len = leb128::read::unsigned(&mut socket)?;, o que de alguma forma faz com que meu ouvinte TCP pare de aceitar novas conexões, o que é ainda mais estranho.

    O código acima está bloqueando :

    1. Você define o soquete para bloqueio ( set_nonblocking(false)).
    2. Então bloqueie em leb128::read::unsigned(&mut socket)?;.

    Isso bloqueará todo o tópico tokio.

    Ele não deve bloquear o ouvinte TCP se o ouvinte TCP for executado em uma tarefa separada e você estiver usando o tempo de execução multithread (padrão) do Tokio... a menos que, é claro, você tenha várias tarefas LEB bloqueando cada thread do Tokio.


    Infelizmente, não há uma API padrão para leituras assíncronas, e o leb128pacote não fornece nenhuma integração com o Tokio, então vai dar um pouco de trabalho.

    Não muito, porém, porque &[u8]implementa Read, e depois Reado slice terá sido atualizado para apontar para os bytes não lidos.

    Como você está usando TCP, presumo que já tenha algum tipo de buffer para os bytes recebidos — para passá-los ao decodificador — então você deve usar apenas esse buffer .

    //  Read until sufficient bytes are obtained to determine length.
    let length = loop {
        const UNEXPECTED_EOF: io::ErrorKind = io::ErrorKind::UnexpectedEof;
    
        let mut slice = buffer.readable();
    
        let result = leb128::read::unsigned(&mut slice);
    
        match result {
            Ok(length) => {
                let consumed = buffer.readable().len() - slice.len();
    
                buffer.advance(consumed);
    
                break length;
            },
            Err(leb128::read::Error::IoError(e)) if e.kind() == UNEXPECTED_EOF =>
                continue,
            Err(e) => return Err(e.into()),
        }
    
        socket.readable().await?;
    
        let length = socket.try_read(buffer.writable())?;
    
        todo!("Handle length, beware 0 means EOF");
    };
    
    //  Do something with buffered bytes, perhaps waiting for more
    //  (now that you know how many you need).
    todo!();
    

    Não acho a estrutura de código acima muito... legal, no entanto. Misturar E/S assíncrona e decodificação significa que você não pode testar a decodificação sozinha, doloroso, eu realmente aconselho preferir o design Sans IO quando possível.

    Em vez disso, eu o encorajaria a escrever um Framerou Decoderque cuidará de parte (ou toda) da lógica de decodificação e apenas separará claramente a E/S do enquadramento/decodificação.

    A ideia é relativamente simples: inserir bytes nele e obter bytes enquadrados ou mensagens decodificadas.

    Como não tenho seu decodificador, usarei um framer, cuja função é isolar um único quadro (mensagem codificada) no fluxo.

    Depois que você tiver um emoldurador, na verdade é relativamente simples:

    socket.readable().await?;
    
    let mut buffer = [0; 16 * 1024];
    let length = socket.try_read(&mut buffer)?;
    
    if length == 0 {
        todo!("handle EOF");
    }
    
    framer.push(&buffer[..length]);
    
    //  May want to limit the number of frames handled at once, to avoid blocking
    //  other clients.
    
    while let Some(message) = framer.pop()? {
        todo!("handle message");
    }
    

    E o mais importante: é muito fácil testar se o criador da mensagem consegue lidar com todos os tipos de mensagens.

    O código do framer real é relativamente simples:

    //  Disclaimer: I have not even _compiled_ this code, don't expect it to handle
    //              all the edge cases.
    
    #[derive(Clone, Debug, Default)]
    pub struct Framer {
        state: FramerState,
        consumed: usize,
        buffer: Vec<u8>,
    }
    
    impl Framer {
        /// Constructs a framer with a specific capacity.
        pub fn with_capacity(capacity: usize) -> Self {
            let state = FramerState::default();
            let consumed = 0;
            let buffer = Vec::with_capacity(capacity);
    
            Self { state, consumed, buffer }
        }
    
        /// Pushes bytes into the framer.
        pub fn push(&mut self, bytes: &[u8]) {
            //  Trick here: draining in push is easier, and avoids O(N²) pop.
            if self.consumed > 0 {
                self.buffer.drain(..self.consumed);
            }
    
            self.buffer.extend_from_slice(bytes);
        }
    
        /// Pops a frame, if possible.
        ///
        /// Call repeatedly to pop all buffered frames, when no complete frame is
        /// buffered any longer, returns `Ok(None)`.
        ///
        /// Returns an error if the underlying stream is faulty, for example has
        /// an overflowing length.
        pub fn pop(&mut self) -> Result<Option<&[u8]>, Error> {
            match self.state {
                FramerState::WaitingForLength => {
                    let length = self.pop_length()?;
    
                    let Some(length) = length else { return Ok(None) };
    
                    let Some(length) = NonZeroU64::new(length) else {
                        return Ok(Some(&[]));
                    };
    
                    //  FIXME: may want a maximum length here, regardless of
                    //         overflow, as otherwise a client accidentally
                    //         sending 2^63-1 LEB encoded will lead the server
                    //         to wait forever.
    
                    self.state = FramerState::WaitingForMessage(length);
    
                    self.pop_message(length)
                }
                FramerState::WaitingForMessage(length) => {
                    self.pop_message(length)
                }
            }
        }
    
        //  Pops the length.
        //
        //  # Panics
        //
        //  In Debug, if called when state is not WaitingForLength.
        fn pop_length(&mut self) -> Result<Option<u64>, Error> {
            const UNEXPECTED_EOF: io::ErrorKind = io::ErrorKind::UnexpectedEof;
    
            debug_assert_eq!(FramerState::WaitingForLength, self.state);
    
            let mut available = &self.buffer[self.consumed..];
    
            match leb128::read::unsigned(&mut available) {
                Ok(length) => {
                    let consumed = self.buffer.len() - self.consumed - available.len();
                    self.consumed += consumed;
    
                    Ok(Some(length))
                },
                Err(leb128::read::Error::IoError(e)) if e.kind() == UNEXPECTED_EOF => Ok(None),
                Err(e) => Err(e.into()),
            }
        }
    
        //  Pops the actual frame, according to the length.
        //
        //  # Panics
        //
        //  In Debug, if called when state is not WaitingForMessage(length).
        fn pop_message(&mut self, length: NonZeroU64) -> Result<Option<&[u8]>, Error> {
            debug_assert_eq!(FramerState::WaitingForMessage(length), self.state);
    
            let length = length.get().try_into().map_err(|_| Error::Overflow)?;
    
            let Some((frame, _)) = self.buffer[self.consumed..].split_at_checked(length) else {
                return Ok(None);
            };
    
            self.state = FramerState::WaitingForLength;
            self.consumed += frame.len();
    
            Ok(Some(frame))
        }
    }
    
    #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
    enum FramerState {
        #[default]
        WaitingForLength,
        WaitingForBytes(NonZeroU64),
    }
    
    • 1

relate perguntas

  • Eu não entendo os soquetes TCP

  • Posso converter de socket2::Socket para Tokio::net::TcpStream?

Sidebar

Stats

  • Perguntas 205573
  • respostas 270741
  • best respostas 135370
  • utilizador 68524
  • Highest score
  • respostas
  • Marko Smith

    Reformatar números, inserindo separadores em posições fixas

    • 6 respostas
  • Marko Smith

    Por que os conceitos do C++20 causam erros de restrição cíclica, enquanto o SFINAE antigo não?

    • 2 respostas
  • Marko Smith

    Problema com extensão desinstalada automaticamente do VScode (tema Material)

    • 2 respostas
  • Marko Smith

    Vue 3: Erro na criação "Identificador esperado, mas encontrado 'import'" [duplicado]

    • 1 respostas
  • Marko Smith

    Qual é o propósito de `enum class` com um tipo subjacente especificado, mas sem enumeradores?

    • 1 respostas
  • Marko Smith

    Como faço para corrigir um erro MODULE_NOT_FOUND para um módulo que não importei manualmente?

    • 6 respostas
  • Marko Smith

    `(expression, lvalue) = rvalue` é uma atribuição válida em C ou C++? Por que alguns compiladores aceitam/rejeitam isso?

    • 3 respostas
  • Marko Smith

    Um programa vazio que não faz nada em C++ precisa de um heap de 204 KB, mas não em C

    • 1 respostas
  • Marko Smith

    PowerBI atualmente quebrado com BigQuery: problema de driver Simba com atualização do Windows

    • 2 respostas
  • Marko Smith

    AdMob: MobileAds.initialize() - "java.lang.Integer não pode ser convertido em java.lang.String" para alguns dispositivos

    • 1 respostas
  • Martin Hope
    Fantastic Mr Fox Somente o tipo copiável não é aceito na implementação std::vector do MSVC 2025-04-23 06:40:49 +0800 CST
  • Martin Hope
    Howard Hinnant Encontre o próximo dia da semana usando o cronógrafo 2025-04-21 08:30:25 +0800 CST
  • Martin Hope
    Fedor O inicializador de membro do construtor pode incluir a inicialização de outro membro? 2025-04-15 01:01:44 +0800 CST
  • Martin Hope
    Petr Filipský Por que os conceitos do C++20 causam erros de restrição cíclica, enquanto o SFINAE antigo não? 2025-03-23 21:39:40 +0800 CST
  • Martin Hope
    Catskul O C++20 mudou para permitir a conversão de `type(&)[N]` de matriz de limites conhecidos para `type(&)[]` de matriz de limites desconhecidos? 2025-03-04 06:57:53 +0800 CST
  • Martin Hope
    Stefan Pochmann Como/por que {2,3,10} e {x,3,10} com x=2 são ordenados de forma diferente? 2025-01-13 23:24:07 +0800 CST
  • Martin Hope
    Chad Feller O ponto e vírgula agora é opcional em condicionais bash com [[ .. ]] na versão 5.2? 2024-10-21 05:50:33 +0800 CST
  • Martin Hope
    Wrench Por que um traço duplo (--) faz com que esta cláusula MariaDB seja avaliada como verdadeira? 2024-05-05 13:37:20 +0800 CST
  • Martin Hope
    Waket Zheng Por que `dict(id=1, **{'id': 2})` às vezes gera `KeyError: 'id'` em vez de um TypeError? 2024-05-04 14:19:19 +0800 CST
  • Martin Hope
    user924 AdMob: MobileAds.initialize() - "java.lang.Integer não pode ser convertido em java.lang.String" para alguns dispositivos 2024-03-20 03:12:31 +0800 CST

Hot tag

python javascript c++ c# java typescript sql reactjs html

Explore

  • Início
  • Perguntas
    • Recentes
    • Highest score
  • tag
  • help

Footer

AskOverflow.Dev

About Us

  • About Us
  • Contact Us

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve