我有一个 tokio https://docs.rs/tokio/1.18.2/tokio/net/struct.TcpStream.html。我想通过这个流传递一些类型 T
。此类型 T
实现 https://docs.rs/serde/1.0.137/serde/trait.Serialize.html 和 https://docs.rs/serde/1.0.137/serde/trait.Deserialize.html。如何获得 https://docs.rs/futures/latest/futures/sink/trait.Sink.html 和 https://docs.rs/futures/latest/futures/stream/trait.Stream.html?
我找到了板条箱 https://docs.rs/tokio-util/0.7.2/tokio_util/ 和 https://docs.rs/tokio-serde/0.8.0/tokio_serde/,但我不知道如何使用它们来做我想做的事。
回答1
我不知道您的代码结构或您计划使用的编解码器,但我已经想出了如何将所有内容粘合到一个可行的示例中。
您的 Sink<T>
和 Stream<Item=T>
将由 tokio-serde
中的 https://docs.rs/tokio-serde/0.8.0/tokio_serde/struct.Framed.html 类型提供。这一层处理通过 serde
传递您的消息。此类型采用四个通用参数:Transport
、Item
(流项)、SinkItem
和 Codec
。 Codec
是您要使用的特定序列化器和反序列化器的包装器。您可以在https://docs.rs/tokio-serde/0.8.0/tokio_serde/formats/index.html查看提供的选项。 Item
和 SinkItem
将成为您必须实现 Serialize
和 Deserialize
的消息类型。 Transport
必须是 Sink<SinkItem>
和/或 Stream<Item=Item>
本身,以便框架实现任何有用的特征。这就是 tokio-util
的用武之地。它提供了 https://docs.rs/tokio-util/latest/tokio_util/codec/index.html#structs,允许您将实现 AsyncRead
/AsyncWrite
的事物分别转换为流和接收器。为了构造这些帧,您需要指定一个编解码器,它将帧与线路分隔开。在我的示例中为简单起见,我只使用了 https://docs.rs/tokio-util/latest/tokio_util/codec/length_delimited/struct.LengthDelimitedCodec.html,但也提供了其他选项。
不再赘述,这里有一个示例,说明如何将 tokio::net::TcpStream
拆分为 Sink<T>
和 Stream<Item=T>
。请注意,T
是流端的结果,因为如果消息格式错误,serde 层可能会失败。
use futures::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpListener,
TcpStream,
};
use tokio_serde::{formats::Json, Framed};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
#[derive(Serialize, Deserialize, Debug)]
struct MyMessage {
field: String,
}
type WrappedStream = FramedRead<OwnedReadHalf, LengthDelimitedCodec>;
type WrappedSink = FramedWrite<OwnedWriteHalf, LengthDelimitedCodec>;
// We use the unit type in place of the message types since we're
// only dealing with one half of the IO
type SerStream = Framed<WrappedStream, MyMessage, (), Json<MyMessage, ()>>;
type DeSink = Framed<WrappedSink, (), MyMessage, Json<(), MyMessage>>;
fn wrap_stream(stream: TcpStream) -> (SerStream, DeSink) {
let (read, write) = stream.into_split();
let stream = WrappedStream::new(read, LengthDelimitedCodec::new());
let sink = WrappedSink::new(write, LengthDelimitedCodec::new());
(
SerStream::new(stream, Json::default()),
DeSink::new(sink, Json::default()),
)
}
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("0.0.0.0:8080")
.await
.expect("Failed to bind server to addr");
tokio::task::spawn(async move {
let (stream, _) = listener
.accept()
.await
.expect("Failed to accept incoming connection");
let (mut stream, mut sink) = wrap_stream(stream);
println!(
"Server received: {:?}",
stream
.next()
.await
.expect("No data in stream")
.expect("Failed to parse ping")
);
sink.send(MyMessage {
field: "pong".to_owned(),
})
.await
.expect("Failed to send pong");
});
let stream = TcpStream::connect("127.0.0.1:8080")
.await
.expect("Failed to connect to server");
let (mut stream, mut sink) = wrap_stream(stream);
sink.send(MyMessage {
field: "ping".to_owned(),
})
.await
.expect("Failed to send ping to server");
println!(
"Client received: {:?}",
stream
.next()
.await
.expect("No data in stream")
.expect("Failed to parse pong")
);
}
运行此示例会产生:
Server received: MyMessage { field: "ping" }
Client received: MyMessage { field: "pong" }
请注意,您不需要拆分流。您可以改为从 TcpStream
构造一个 tokio_util::codec::Framed
,并使用 tokio_serde::formats::SymmetricalJson<MyMessage>
构造一个 tokio_serde::Framed
,然后该 Framed
将相应地实现 Sink
和 Stream
。此外,此示例中的许多功能都是受功能限制的,因此请务必根据文档启用适当的功能。