我有一个 AWS kinesis 数据流连续接收事件,这些事件被发送到 kinesis 数据分析,以使用 apache flink 获取滚动窗口上的指标。
是否可以将翻滚窗口中 x% 的随机数据转储到 s3 存储桶?如果是,请分享代码片段。
回答1
你的翻滚窗口输出,如果它没有被聚合,我假设会是某种 Seq[Element]
。我能想到的最简单的方法是使用 flatMap
运算符在 S3 中对您想要 store 的元素进行采样,并将输出连接到写入 S3 的 FileSink
。
根据输出格式的不同,代码看起来会有很大的不同。最简单的例子是:
val windowOutStream: Seq[Element] = ...
val sampledOutStream: Seq[String] = windowOutStream.flatMap(window => {
// iterate over all the elements in the window
// filter the ones you want to store to S3
// encode the element as a string and add the window start/end times so you can later identify to which window they belonged to
})
// writeAsText will write each element in a new line in the same file
sampledOutStream.writeAsText("s3://<bucket>/<endpoint>");
您可能希望使用比字符串更优化的输出格式,并添加滚动策略以便自动创建新文件。
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#file-sink展示了如何初始化使用其他输出编码并且更灵活的行或批量编码文件接收器,但想法是相似的。