首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >KSQL -使用GEO_DISTANCE计算来自2条消息的距离

KSQL -使用GEO_DISTANCE计算来自2条消息的距离
EN

Stack Overflow用户
提问于 2018-09-11 00:33:39
回答 1查看 550关注 0票数 1

我有一个kafka主题,主题中的每一条消息都有lat/lon和事件时间戳。创建了一个引用主题的流,并希望使用geo_distance计算两点之间的距离。示例

代码语言:javascript
复制
GpsDateTime            lat              lon
2016-11-30 22:38:36,    32.685757,  -96.735942
2016-11-30 22:39:07,    32.687347,  -96.732841
2016-11-30 22:39:37,    32.68805,   -96.729726 

我想在上面的溪流上创建一个新的流,用距离来丰富它。

代码语言:javascript
复制
GpsDateTime            lat              lon          Distance
2016-11-30 22:38:36,    32.685757,  -96.735942        0
2016-11-30 22:39:07,    32.687347,  -96.732841        0.340
2016-11-30 22:39:37,    32.68805,   -96.729726        0.302

是否有可能使用KSQL实现预期的结果?或者如何在处理新消息时引用以前的消息?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-09-14 03:24:34

首先,这些读数来自某种设备吗?如果是这样的话,您是否有他们的唯一ID (UUID)?我会把它放到你的溪流里,所以它会喜欢UUID, GpsDateTime, lat, lon

您将需要创建一个相当基本的Kafka流应用程序。在这个应用程序中,您将把最近从流中读取的内容存储到StoreBuilder中。然后,当从Kafka收到新消息时,您将检索这个最新值,进行计算,然后将新的lat长值存储到StoreBuilder中。

当然,我不清楚你是否只想拥有一个lat,长值,所有后续的值都是从一读开始计算的。或者,如果您希望有一个滚动计算,您总是比较之间的距离,最后和当前的读数。

无论如何,您可以在以下网站上看到这段代码:https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java

此示例是一个单词计数示例,但可以为您的用例快速转换。

静态最后一个类WordCountTransformerSupplier (第78行)将成为您的LatLongDistanceComputation。

您将使用适当的类型创建StoreBuilder (第154行)(无论您以何种方式存储lat/lon )。

第165行是从流入的值流中读取项目的实际位置。

当然,您还需要编辑inputTopic和outputTopic (第66-67行)以及其他一些内容。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52267154

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档