我有一个函数,它返回一个fs2测量值流。
import cats.effect._
import fs2._
def apply(sds: SerialPort, interval: Int)(implicit cs: ContextShift[IO]): Stream[IO, SdsMeasurement] =
for {
blocker <- Stream.resource(Blocker[IO])
stream <- io.readInputStream(IO(sds.getInputStream), 1, blocker)
.through(SdsStateMachine.collectMeasurements())
} yield stream通常,它是一个无限流,除非我向它传递一个测试标志,在这种情况下,它应该输出一个值并停止。
val infiniteSource: Stream[IO, SdsMeasurement] = ...
val source = if (isTest) infiniteSource.take(1) else infiniteSource
source.compile.drain无限流工作正常。它给了我无限大的所有度量。测试流确实只给了我第一个度量,没有更多。我的问题是,在最后一次测量之后,Stream不会返回。它会永远阻塞。我做错了什么?
注意:我想我抽象了必要的代码,但要了解更多上下文,请查看我的项目:https://github.com/jkransen/fijnstof/blob/ZIO/src/main/scala/nl/kransen/fijnstof/Main.scala
发布于 2020-05-10 05:13:59
您在这里展示的代码看起来很好,我不认为问题出在该代码中。如果它阻塞,则可能是某个底层API阻塞,例如,它可能是InputStream的close方法。在这种情况下,我通常会在每个可能阻塞的函数调用之前和之后添加日志语句。
https://stackoverflow.com/questions/59309321
复制相似问题