AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • 主页
  • 系统&网络
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • 主页
  • 系统&网络
    • 最新
    • 热门
    • 标签
  • Ubuntu
    • 最新
    • 热门
    • 标签
  • Unix
    • 最新
    • 标签
  • DBA
    • 最新
    • 标签
  • Computer
    • 最新
    • 标签
  • Coding
    • 最新
    • 标签
主页 / coding / 问题 / 79336073
Accepted
Aaras
Aaras
Asked: 2025-01-07 20:53:47 +0800 CST2025-01-07 20:53:47 +0800 CST 2025-01-07 20:53:47 +0800 CST

如何在 Rust 中从 TcpStream 读取 leb128

  • 772

简而言之,我需要从 C# 客户端发送的 TCP 连接中读取一个字符串。客户端使用BinaryWriter,它以 leb128 格式的长度作为实际字符串的前缀。我tokio::net::TcpStream在 Rust 中使用,并搜索了一个 crate 来帮助我从流中检索该长度前缀,但我找不到合适的东西。大多数解决方案都需要您正在读取的源来实现该io::Read特征,但tokio::net::TcpStream没有实现它。

我设法让它用这个丑陋的代码工作,但我从一开始就对它持怀疑态度。我最近发现它有时会导致某种竞争条件。我并不完全确定,但我认为它在 上被阻止了let file_name_len = leb128::read::unsigned(&mut socket)?;,这不知何故导致我的 TCP 侦听器停止接受新连接,这更奇怪。

let mut socket = socket.into_std()?;
socket.set_nonblocking(false)?;

let file_name_len = leb128::read::unsigned(&mut socket)?;

let mut socket = tokio::net::TcpStream::from_std(socket)?;

有人知道正确的方法吗?

sockets
  • 1 1 个回答
  • 24 Views

1 个回答

  • Voted
  1. Best Answer
    Matthieu M.
    2025-01-08T00:22:05+08:002025-01-08T00:22:05+08:00

    我不完全确定,但我认为它被阻止了let file_name_len = leb128::read::unsigned(&mut socket)?;,这以某种方式导致我的 TCP 侦听器停止接受新连接,这更加奇怪。

    上面的代码是阻塞的:

    1. 您将套接字设置为阻塞(set_nonblocking(false))。
    2. 然后阻挡leb128::read::unsigned(&mut socket)?;。

    这将阻塞整个 tokio 线程。

    如果TCP 侦听器在单独的任务中运行并且您正在使用(默认)多线程 tokio 运行时,它不应该阻止 TCP 侦听器......当然,除非您有多个这样的 LEB 任务阻止每个 tokio 线程。


    不幸的是,没有用于异步读取的标准 API,并且leb128crate 没有提供任何 tokio 集成,因此需要做一些工作。

    但是,不会太多,因为&[u8]实现了Read,并且之后Read切片将被更新为指向未读取的字节。

    由于您使用的是 TCP,我假设您已经为接收的字节设置了某种缓冲区 - 将它们传递给解码器 - 因此您应该只使用该缓冲区。

    //  Read until sufficient bytes are obtained to determine length.
    let length = loop {
        const UNEXPECTED_EOF: io::ErrorKind = io::ErrorKind::UnexpectedEof;
    
        let mut slice = buffer.readable();
    
        let result = leb128::read::unsigned(&mut slice);
    
        match result {
            Ok(length) => {
                let consumed = buffer.readable().len() - slice.len();
    
                buffer.advance(consumed);
    
                break length;
            },
            Err(leb128::read::Error::IoError(e)) if e.kind() == UNEXPECTED_EOF =>
                continue,
            Err(e) => return Err(e.into()),
        }
    
        socket.readable().await?;
    
        let length = socket.try_read(buffer.writable())?;
    
        todo!("Handle length, beware 0 means EOF");
    };
    
    //  Do something with buffered bytes, perhaps waiting for more
    //  (now that you know how many you need).
    todo!();
    

    不过,我觉得上面的代码结构不太好。混合异步 I/O 和解码意味着您无法单独测试解码,这很麻烦,我真的建议尽可能选择Sans IO设计。

    相反,我鼓励您编写一个Framer或Decoder来处理部分(或全部)解码逻辑,并将 I/O 与帧/解码完全分开。

    这个想法相对简单:将字节推入其中,获取帧字节或解码的消息。

    由于我没有解码器,我将改用成帧器,其作用是隔离流中的单个帧(编码消息)。

    一旦你有了框架,它实际上就相对简单了:

    socket.readable().await?;
    
    let mut buffer = [0; 16 * 1024];
    let length = socket.try_read(&mut buffer)?;
    
    if length == 0 {
        todo!("handle EOF");
    }
    
    framer.push(&buffer[..length]);
    
    //  May want to limit the number of frames handled at once, to avoid blocking
    //  other clients.
    
    while let Some(message) = framer.pop()? {
        todo!("handle message");
    }
    

    而且非常重要的是,可以很容易地测试框架是否可以处理各种消息。

    实际的成帧器代码相对简单:

    //  Disclaimer: I have not even _compiled_ this code, don't expect it to handle
    //              all the edge cases.
    
    #[derive(Clone, Debug, Default)]
    pub struct Framer {
        state: FramerState,
        consumed: usize,
        buffer: Vec<u8>,
    }
    
    impl Framer {
        /// Constructs a framer with a specific capacity.
        pub fn with_capacity(capacity: usize) -> Self {
            let state = FramerState::default();
            let consumed = 0;
            let buffer = Vec::with_capacity(capacity);
    
            Self { state, consumed, buffer }
        }
    
        /// Pushes bytes into the framer.
        pub fn push(&mut self, bytes: &[u8]) {
            //  Trick here: draining in push is easier, and avoids O(N²) pop.
            if self.consumed > 0 {
                self.buffer.drain(..self.consumed);
            }
    
            self.buffer.extend_from_slice(bytes);
        }
    
        /// Pops a frame, if possible.
        ///
        /// Call repeatedly to pop all buffered frames, when no complete frame is
        /// buffered any longer, returns `Ok(None)`.
        ///
        /// Returns an error if the underlying stream is faulty, for example has
        /// an overflowing length.
        pub fn pop(&mut self) -> Result<Option<&[u8]>, Error> {
            match self.state {
                FramerState::WaitingForLength => {
                    let length = self.pop_length()?;
    
                    let Some(length) = length else { return Ok(None) };
    
                    let Some(length) = NonZeroU64::new(length) else {
                        return Ok(Some(&[]));
                    };
    
                    //  FIXME: may want a maximum length here, regardless of
                    //         overflow, as otherwise a client accidentally
                    //         sending 2^63-1 LEB encoded will lead the server
                    //         to wait forever.
    
                    self.state = FramerState::WaitingForMessage(length);
    
                    self.pop_message(length)
                }
                FramerState::WaitingForMessage(length) => {
                    self.pop_message(length)
                }
            }
        }
    
        //  Pops the length.
        //
        //  # Panics
        //
        //  In Debug, if called when state is not WaitingForLength.
        fn pop_length(&mut self) -> Result<Option<u64>, Error> {
            const UNEXPECTED_EOF: io::ErrorKind = io::ErrorKind::UnexpectedEof;
    
            debug_assert_eq!(FramerState::WaitingForLength, self.state);
    
            let mut available = &self.buffer[self.consumed..];
    
            match leb128::read::unsigned(&mut available) {
                Ok(length) => {
                    let consumed = self.buffer.len() - self.consumed - available.len();
                    self.consumed += consumed;
    
                    Ok(Some(length))
                },
                Err(leb128::read::Error::IoError(e)) if e.kind() == UNEXPECTED_EOF => Ok(None),
                Err(e) => Err(e.into()),
            }
        }
    
        //  Pops the actual frame, according to the length.
        //
        //  # Panics
        //
        //  In Debug, if called when state is not WaitingForMessage(length).
        fn pop_message(&mut self, length: NonZeroU64) -> Result<Option<&[u8]>, Error> {
            debug_assert_eq!(FramerState::WaitingForMessage(length), self.state);
    
            let length = length.get().try_into().map_err(|_| Error::Overflow)?;
    
            let Some((frame, _)) = self.buffer[self.consumed..].split_at_checked(length) else {
                return Ok(None);
            };
    
            self.state = FramerState::WaitingForLength;
            self.consumed += frame.len();
    
            Ok(Some(frame))
        }
    }
    
    #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
    enum FramerState {
        #[default]
        WaitingForLength,
        WaitingForBytes(NonZeroU64),
    }
    
    • 1

相关问题

  • 我不明白 TCP 套接字

  • 我可以从 socket2::Socket 转换为 Tokio::net::TcpStream 吗?

Sidebar

Stats

  • 问题 205573
  • 回答 270741
  • 最佳答案 135370
  • 用户 68524
  • 热门
  • 回答
  • Marko Smith

    重新格式化数字,在固定位置插入分隔符

    • 6 个回答
  • Marko Smith

    为什么 C++20 概念会导致循环约束错误,而老式的 SFINAE 不会?

    • 2 个回答
  • Marko Smith

    VScode 自动卸载扩展的问题(Material 主题)

    • 2 个回答
  • Marko Smith

    Vue 3:创建时出错“预期标识符但发现‘导入’”[重复]

    • 1 个回答
  • Marko Smith

    具有指定基础类型但没有枚举器的“枚举类”的用途是什么?

    • 1 个回答
  • Marko Smith

    如何修复未手动导入的模块的 MODULE_NOT_FOUND 错误?

    • 6 个回答
  • Marko Smith

    `(表达式,左值) = 右值` 在 C 或 C++ 中是有效的赋值吗?为什么有些编译器会接受/拒绝它?

    • 3 个回答
  • Marko Smith

    在 C++ 中,一个不执行任何操作的空程序需要 204KB 的堆,但在 C 中则不需要

    • 1 个回答
  • Marko Smith

    PowerBI 目前与 BigQuery 不兼容:Simba 驱动程序与 Windows 更新有关

    • 2 个回答
  • Marko Smith

    AdMob:MobileAds.initialize() - 对于某些设备,“java.lang.Integer 无法转换为 java.lang.String”

    • 1 个回答
  • Martin Hope
    Fantastic Mr Fox msvc std::vector 实现中仅不接受可复制类型 2025-04-23 06:40:49 +0800 CST
  • Martin Hope
    Howard Hinnant 使用 chrono 查找下一个工作日 2025-04-21 08:30:25 +0800 CST
  • Martin Hope
    Fedor 构造函数的成员初始化程序可以包含另一个成员的初始化吗? 2025-04-15 01:01:44 +0800 CST
  • Martin Hope
    Petr Filipský 为什么 C++20 概念会导致循环约束错误,而老式的 SFINAE 不会? 2025-03-23 21:39:40 +0800 CST
  • Martin Hope
    Catskul C++20 是否进行了更改,允许从已知绑定数组“type(&)[N]”转换为未知绑定数组“type(&)[]”? 2025-03-04 06:57:53 +0800 CST
  • Martin Hope
    Stefan Pochmann 为什么 {2,3,10} 和 {x,3,10} (x=2) 的顺序不同? 2025-01-13 23:24:07 +0800 CST
  • Martin Hope
    Chad Feller 在 5.2 版中,bash 条件语句中的 [[ .. ]] 中的分号现在是可选的吗? 2024-10-21 05:50:33 +0800 CST
  • Martin Hope
    Wrench 为什么双破折号 (--) 会导致此 MariaDB 子句评估为 true? 2024-05-05 13:37:20 +0800 CST
  • Martin Hope
    Waket Zheng 为什么 `dict(id=1, **{'id': 2})` 有时会引发 `KeyError: 'id'` 而不是 TypeError? 2024-05-04 14:19:19 +0800 CST
  • Martin Hope
    user924 AdMob:MobileAds.initialize() - 对于某些设备,“java.lang.Integer 无法转换为 java.lang.String” 2024-03-20 03:12:31 +0800 CST

热门标签

python javascript c++ c# java typescript sql reactjs html

Explore

  • 主页
  • 问题
    • 最新
    • 热门
  • 标签
  • 帮助

Footer

AskOverflow.Dev

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve