rust - 如何将 tokio TcpStream 变成 Serializable/Deserializable values 的 Sink/Stream?

回答1

我不知道您的代码结构或您计划使用的编解码器,但我已经想出了如何将所有内容粘合到一个可行的示例中。

您的 Sink<T>Stream<Item=T> 将由 tokio-serde 中的 https://docs.rs/tokio-serde/0.8.0/tokio_serde/struct.Framed.html 类型提供。这一层处理通过 serde 传递您的消息。此类型采用四个通用参数:TransportItem(流项)、SinkItemCodecCodec 是您要使用的特定序列化器和反序列化器的包装器。您可以在https://docs.rs/tokio-serde/0.8.0/tokio_serde/formats/index.html查看提供的选项。 ItemSinkItem 将成为您必须实现 SerializeDeserialize 的消息类型。 Transport 必须是 Sink<SinkItem> 和/或 Stream<Item=Item> 本身,以便框架实现任何有用的特征。这就是 tokio-util 的用武之地。它提供了 https://docs.rs/tokio-util/latest/tokio_util/codec/index.html#structs,允许您将实现 AsyncRead/AsyncWrite 的事物分别转换为流和接收器。为了构造这些帧,您需要指定一个编解码器,它将帧与线路分隔开。在我的示例中为简单起见,我只使用了 https://docs.rs/tokio-util/latest/tokio_util/codec/length_delimited/struct.LengthDelimitedCodec.html,但也提供了其他选项。

不再赘述,这里有一个示例,说明如何将 tokio::net::TcpStream 拆分为 Sink<T>Stream<Item=T>。请注意,T 是流端的结果,因为如果消息格式错误,serde 层可能会失败。

use futures::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::net::{
    tcp::{OwnedReadHalf, OwnedWriteHalf},
    TcpListener,
    TcpStream,
};
use tokio_serde::{formats::Json, Framed};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};

#[derive(Serialize, Deserialize, Debug)]
struct MyMessage {
    field: String,
}

type WrappedStream = FramedRead<OwnedReadHalf, LengthDelimitedCodec>;
type WrappedSink = FramedWrite<OwnedWriteHalf, LengthDelimitedCodec>;

// We use the unit type in place of the message types since we're
// only dealing with one half of the IO
type SerStream = Framed<WrappedStream, MyMessage, (), Json<MyMessage, ()>>;
type DeSink = Framed<WrappedSink, (), MyMessage, Json<(), MyMessage>>;

fn wrap_stream(stream: TcpStream) -> (SerStream, DeSink) {
    let (read, write) = stream.into_split();
    let stream = WrappedStream::new(read, LengthDelimitedCodec::new());
    let sink = WrappedSink::new(write, LengthDelimitedCodec::new());
    (
        SerStream::new(stream, Json::default()),
        DeSink::new(sink, Json::default()),
    )
}

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("0.0.0.0:8080")
        .await
        .expect("Failed to bind server to addr");

    tokio::task::spawn(async move {
        let (stream, _) = listener
            .accept()
            .await
            .expect("Failed to accept incoming connection");
        
        let (mut stream, mut sink) = wrap_stream(stream);

        println!(
            "Server received: {:?}",
            stream
                .next()
                .await
                .expect("No data in stream")
                .expect("Failed to parse ping")
        );

        sink.send(MyMessage {
            field: "pong".to_owned(),
        })
            .await
            .expect("Failed to send pong");
    });

    let stream = TcpStream::connect("127.0.0.1:8080")
        .await
        .expect("Failed to connect to server");

    let (mut stream, mut sink) = wrap_stream(stream);

    sink.send(MyMessage {
        field: "ping".to_owned(),
    })
        .await
        .expect("Failed to send ping to server");
        
    println!(
        "Client received: {:?}",
        stream
            .next()
            .await
            .expect("No data in stream")
            .expect("Failed to parse pong")
    );
}

运行此示例会产生:

Server received: MyMessage { field: "ping" }
Client received: MyMessage { field: "pong" }

请注意,您不需要拆分流。您可以改为从 TcpStream 构造一个 tokio_util::codec::Framed,并使用 tokio_serde::formats::SymmetricalJson<MyMessage> 构造一个 tokio_serde::Framed,然后该 Framed 将相应地实现 SinkStream。此外,此示例中的许多功能都是受功能限制的,因此请务必根据文档启用适当的功能。

相似文章

rust - 如何在并发上共享 tokio::net::TcpStream?

我需要在同一个TcpStream上发送和接收正常数据,同时定期发送心跳数据。在当前的实现中,Arc<Mutex>用于我的目的,但它编译时出错。如何修复这些错误,或者是否有其他方法可以实现相同的目标?u...

rust - 从函数返回期货流

我正在尝试编写一个函数,该函数从服务器读取响应并返回一个期货流,在等待时应该返回一个实现tokioAsyncRead特征的类型:pubasyncfnconnect_peers(url:&str)->i...

随机推荐

最新文章