这里 Framing.delimiter 的第3个参数 allowTruncation 需要设置为 true ,否则文件在不以 \n 结尾的情况下将抛出以下异常:akka.stream.scaladsl.Framing$FramingException: Stream finished but there was a truncated final frame in the buffer 。
implicitval system = ActorSystem(Behaviors.ignore, "topK") val res = Paths.get(Thread.currentThread().getContextClassLoader.getResource("movies.csv").toURI)
val topKF = FileIO .fromPath(Paths.get(res.toUri)) .via(CsvParsing.lineScanner()) .drop(1) // Drop CSV Header .mapConcat { case name :: AsDouble(rating) :: _ => Movie(name.utf8String, rating) :: Nil case _ => Nil } .runWith(newTopKSink(10))
val logic = newGraphStageLogic(shape) withInHandler { var buf = List[Movie]() var bufSize = 0
definsertMovie(list: List[Movie], movie: Movie): List[Movie] = { list match { caseNil => movie :: Nil case list => var buf = List[Movie]() var use = false for (item <- list.reverse) { if (!use && item.rating < movie.rating) { buf ::= movie use = true } buf ::= item } if (!use) { buf ::= movie } buf } } overridedefpreStart(): Unit = pull(in)
overridedefonPush(): Unit = { val movie = grab(in) buf = if (bufSize < TOP_K) { bufSize += 1 insertMovie(buf, movie) } else { if (buf.head.rating < movie.rating) insertMovie(buf.slice(1, TOP_K), movie) else buf } pull(in) }
overridedefonUpstreamFinish(): Unit = { p.trySuccess(buf) }
overridedefonUpstreamFailure(ex: Throwable): Unit = { p.tryFailure(ex) failStage(ex) }
overridedefpostStop(): Unit = { if (!p.isCompleted) p.failure(newAbruptStageTerminationException(this)) }