亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 學院 > 開發設計 > 正文

FunDA(7)- Reactive Streams to fs2 Pull Streams

2019-11-11 06:51:04
字體:
來源:轉載
供稿:網友

    Reactive-Stream不只是簡單的push-model-stream, 它還帶有“拖式”(pull-model)性質。這是因為在Iteratee模式里雖然理論上由Enumerator負責主動推送數據,實現了push-model功能。但實際上Iteratee也會根據自身情況,通過提供callback函數通知Enumerator可以開始推送數據,這從某種程度上也算是一種pull-model。換句話講Reactive-Streams是通過push-pull-model來實現上下游Enumerator和Iteratee之間互動的。我們先看個簡單的Iteratee例子:

def showElements: Iteratee[Int,Unit] = Cont {  case Input.El(e) =>     PRintln(s"EL($e)")     showElements  case Input.Empty => showElements  case Input.EOF =>     println("EOF")     Done((),Input.EOF)}                                                 //> showElements: => play.api.libs.iteratee.Iteratee[Int,Unit]val enumNumbers = Enumerator(1,2,3,4,5)           //> enumNumbers  : play.api.libs.iteratee.Enumerator[Int] = play.api.libs.iteratee.Enumerator$$anon$19@47f6473enumNumbers |>> showElements                      //> EL(1)                                                  //| EL(2)                                                  //| EL(3)                                                  //| EL(4)                                                  //| EL(5)                                                  //| res0: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[Int,Unit]] = Success(Cont(<function1>))我們看到:enumNumbers |>> showElements立刻啟動了運算。但并沒有實際完成數據發送,因為showElements并沒有收到Input.EOF。首先,我們必須用Iteratee.run來完成運算:

val it = Iteratee.flatten(enum |>> consumeAll).run//> El(1)                                                  //| El(2)                                                  //| El(3)                                                  //| El(4)                                                  //| El(5)                                                  //| El(6)                                                  //| El(7)                                                  //| El(8)                                                  //| EOF                                                  //| it  : scala.concurrent.Future[Int] = Success(99)這個run函數是這樣定義的:

/**   * Extracts the computed result of the Iteratee pushing an Input.EOF if necessary   * Extracts the computed result of the Iteratee, pushing an Input.EOF first   * if the Iteratee is in the [[play.api.libs.iteratee.Cont]] state.   * In case of error, an exception may be thrown synchronously or may   * be used to complete the returned Promise; this indeterminate behavior   * is inherited from fold().   *   *  @return a [[scala.concurrent.Future]] of the eventually computed result   */  def run: Future[A] = fold({    case Step.Done(a, _) => Future.successful(a)    case Step.Cont(k) => k(Input.EOF).fold({      case Step.Done(a1, _) => Future.successful(a1)      case Step.Cont(_) => sys.error("diverging iteratee after Input.EOF")      case Step.Error(msg, e) => sys.error(msg)    })(dec)    case Step.Error(msg, e) => sys.error(msg)  })(dec)再一個問題是:enumNumbers |>> showElements是個封閉的運算,我們無法逐部分截取數據流,只能取得整個運算結果。也就是說如果我們希望把一個Enumerator產生的數據引導到fs2 Stream的話,只能在所有數據都讀入內存后才能實現了。這樣就違背了使用Reactive-Streams的意愿。那我們應該怎么辦?一個可行的方法是使用一個存儲數據結構,用兩個線程,一個線程里Iteratee把當前數據存入數據結構,另一個線程里fs2把數據取出來。fs2.async.mutable包提供了個Queue類型,我們可以用這個Queue結構來作為Iteratee與fs2之間的管道:Iteratee從一頭把數據壓進去(enqueue),fs2從另一頭把數據取出來(dequeue)。

我們先設計enqueue部分,這部分是在Iteratee里進行的:

def enqueueTofs2(q: async.mutable.Queue[Task,Option[Int]]): Iteratee[Int,Unit] = Cont {   case Input.EOF =>       q.enqueue1(None).unsafeRun       Done((),Input.EOF)   case Input.Empty => enqueueTofs2(q)   case Input.El(e) =>       q.enqueue1(Some(e)).unsafeRun       enqueueTofs2(q)}    //> enqueueTofs2: (q: fs2.async.mutable.Queue[fs2.Task,Option[Int]])play.api.libs.iteratee.Iteratee[Int,Unit]

先分析一下這個Iteratee:我們直接把enqueueTofs2放入Cont狀態,也就是等待接受數據狀態。當收到數據時運行q.enqueue1把數據塞入q,然后不斷循環運行至收到Input.EOF。注意:q.enqueue1(Some(e)).unsafeRun是個同步運算,在未成功完成數據enqueue1的情況下會一直占用線程。所以,q另一端的dequeue部分必須是在另一個線程里運行,否則會造成整個程序的死鎖。fs2的Queue類型款式是:Queue[F,A],所以我們必須用Stream.eval來對這個Queue進行函數式的操作:

val fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>    //run Enumerator-Iteratee and enqueue data in thread 1    //dequeue data and en-stream in thread 2 (current thread)  }因為Stream.eval運算結果是Stream[Task,Int],所以我們可以得出這個flatMap內的函數款式 Queue[Task,Option[Int]] => Stream[Task,Int]。下面我們先考慮如何實現數據enqueue部分:這部分是通過Iteratee的運算過程產生的。我們提到過這部分必須在另一個線程里運行,所以可以用Task來選定另一線程如下:

    Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture()現在這個Task就在后面另一個線程里自己去運算了。但它的運行進展則會依賴于另一個線程中dequeue數據的進展。我們先看看fs2提供的兩個函數款式:

/** Repeatedly calls `dequeue1` forever. */  def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat/**   * Halts the input stream at the first `None`.   *   * @example {{{   * scala> Stream[Pure, Option[Int]](Some(1), Some(2), None, Some(3), None).unNoneTerminate.toList   * res0: List[Int] = List(1, 2)   * }}}   */  def unNoneTerminate[F[_],I]: Pipe[F,Option[I],I] =    _ repeatPull { _.receive {      case (hd, tl) =>        val out = Chunk.indexedSeq(hd.toVector.takeWhile { _.isDefined }.collect { case Some(i) => i })        if (out.size == hd.size) Pull.output(out) as tl        else if (out.isEmpty) Pull.done        else Pull.output(out) >> Pull.done    }}

剛好,dequeue產生Stream[F,A]。而unNoneTerminate可以根據Stream(None)來終止運算?,F在我們可以把這個Reactive-Streams到fs2-pull-streams轉換過程這樣來定義:

implicit val strat = Strategy.fromFixedDaemonPool(4)                                                  //> strat  : fs2.Strategy = Strategyval fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>  Task(Iteratee.flatten(enumNumbers |>> enqueueTofs2(q)).run).unsafeRunAsyncFuture  pipe.unNoneTerminate(q.dequeue)}   //> fs2Stream  : fs2.Stream[fs2.Task,Int] = attemptEval(Task).flatMap(<function1>).flatMap(<function1>)現在這個stream應該已經變成fs2.Stream[Task,Int]了。我們可以用前面的log函數來試運行一下:

def log[A](prompt: String): Pipe[Task,A,A] =    _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}                                                  //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A]    fs2Stream.through(log("")).run.unsafeRun          //> > 1                                                  //| > 2                                                  //| > 3                                                  //| > 4                                                  //| > 5我們成功的把Iteratee的Reactive-Stream轉化成fs2的Pull-Model-Stream。

下面是這次討論的源代碼:

import play.api.libs.iteratee._import scala.concurrent._import scala.concurrent.duration._import scala.concurrent.ExecutionContext.Implicits.globalimport scala.collection.mutable._import fs2._object iteratees {def showElements: Iteratee[Int,Unit] = Cont {  case Input.El(e) =>     println(s"EL($e)")     showElements  case Input.Empty => showElements  case Input.EOF =>     println("EOF")     Done((),Input.EOF)}val enumNumbers = Enumerator(1,2,3,4,5)enumNumbers |>> showElementsIteratee.flatten(enumNumbers |>> showElements).rundef enqueueTofs2(q: async.mutable.Queue[Task,Option[Int]]): Iteratee[Int,Unit] = Cont {   case Input.EOF =>       q.enqueue1(None).unsafeRun       Done((),Input.EOF)   case Input.Empty => enqueueTofs2(q)   case Input.El(e) =>       q.enqueue1(Some(e)).unsafeRun       enqueueTofs2(q)}implicit val strat = Strategy.fromFixedDaemonPool(4)val fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>  Task(Iteratee.flatten(enumNumbers |>> enqueueTofs2(q)).run).unsafeRunAsyncFuture  pipe.unNoneTerminate(q.dequeue)}def log[A](prompt: String): Pipe[Task,A,A] =    _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}    fs2Stream.through(log("")).run.unsafeRun }


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
欧美不卡视频一区发布| 日韩中文字幕国产精品| 亚洲精品国产精品乱码不99按摩| 一区二区三区天堂av| 亚洲女在线观看| 亚洲色图综合网| 97视频在线观看免费高清完整版在线观看| 一区二区欧美日韩视频| 国产精品久久国产精品99gif| 欧美日韩aaaa| 欧美国产日韩一区| 奇米一区二区三区四区久久| 2019中文字幕全在线观看| 欧美精品aaa| 日本午夜精品理论片a级appf发布| 自拍偷拍亚洲精品| 一区二区三区视频免费在线观看| 国产盗摄xxxx视频xxx69| 国产精品入口免费视频一| 欧美日韩高清在线观看| 日韩在线观看高清| 亚洲视频在线观看视频| 日韩毛片在线观看| 精品亚洲国产成av人片传媒| 国产精品亚洲第一区| 亚洲一区二区三区毛片| 亚洲美女精品成人在线视频| 久久九九有精品国产23| 国产成人小视频在线观看| 欧美黑人一区二区三区| 2019中文字幕在线免费观看| 亚洲激情国产精品| 色噜噜久久综合伊人一本| 性欧美激情精品| 国产精品日韩一区| 亚洲电影免费观看高清完整版| 国产极品jizzhd欧美| 欧美日产国产成人免费图片| 日韩精品免费在线播放| 亚洲精品wwww| 日本一区二区在线免费播放| 亚洲天堂av在线免费| 疯狂做受xxxx高潮欧美日本| 97婷婷涩涩精品一区| 欧美日韩一区二区免费视频| 欧洲亚洲女同hd| 亚洲综合成人婷婷小说| 精品欧美国产一区二区三区| 午夜精品久久久久久久久久久久久| 欧美亚洲国产日本| 最近中文字幕mv在线一区二区三区四区| 国产精品高清在线观看| 亚洲色图校园春色| 亚洲护士老师的毛茸茸最新章节| 国产精品视频久久久久| 懂色av一区二区三区| 性日韩欧美在线视频| 日本伊人精品一区二区三区介绍| 久久精品男人天堂| 国产精品午夜国产小视频| 欧美精品一二区| 国产视频在线观看一区二区| 欧美日韩精品在线视频| 欧美人在线观看| 日韩亚洲欧美中文高清在线| www.午夜精品| 亚洲毛片在线免费观看| 亚洲网址你懂得| 欧美大尺度在线观看| 国产成人亚洲综合91精品| 色综合视频网站| 九九热最新视频//这里只有精品| 日韩在线欧美在线| 精品女同一区二区三区在线播放| 久久久av免费| 国产欧美在线观看| 亚洲精品一区二区网址| 亚洲人成网在线播放| 亚洲国产精品va在线| 久久久精品久久久| 91av网站在线播放| 欧美激情一区二区三区久久久| 亚洲精品美女久久| 国产精品高潮粉嫩av| 91精品在线国产| 国产精品久久久久av| 欧美激情精品久久久久久| 中文字幕亚洲欧美在线| zzijzzij亚洲日本成熟少妇| 日本sm极度另类视频| 麻豆乱码国产一区二区三区| 亚洲最大在线视频| 亚洲综合自拍一区| 国产精品视频一| 精品一区二区电影| 久久免费视频这里只有精品| 日韩欧美国产激情| 日韩av电影在线免费播放| 国产精品视频色| 亚洲国产精品成人av| 97视频人免费观看| 色哟哟网站入口亚洲精品| 亚洲码在线观看| 一区二区三区回区在观看免费视频| 国产欧美日韩精品在线观看| 国产在线视频不卡| 成人性生交大片免费看视频直播| 在线播放日韩精品| 国产成人精品午夜| 欧美视频在线观看免费网址| 最新亚洲国产精品| 91精品国产综合久久香蕉的用户体验| 精品亚洲国产成av人片传媒| 成人激情在线观看| 久久久久久久一区二区| 久久久亚洲影院| 欧美性xxxxhd| 亚洲天堂av女优| 亚洲最大激情中文字幕| 久久久精品国产一区二区| 亚洲精品视频免费在线观看| 青青在线视频一区二区三区| 亚洲国产精品久久久久久| 精品欧美国产一区二区三区| 亚洲人成亚洲人成在线观看| 三级精品视频久久久久| 国产精品r级在线| 欧美国产日韩一区二区在线观看| 久久国产精品久久久久久久久久| 97色在线视频观看| 98精品国产自产在线观看| 欧美一区二区三区精品电影| 日本精品视频在线播放| 欧美极品在线播放| 亚洲成人精品视频| 欧美成人在线影院| 中文字幕亚洲国产| 在线午夜精品自拍| 午夜精品一区二区三区在线| 国产在线精品一区免费香蕉| 久久精品男人天堂| 538国产精品视频一区二区| 国产69精品久久久久9| 国产精品69久久| 国产成人精品免高潮在线观看| 成人淫片在线看| 日韩电影免费在线观看| www.日韩免费| 久久手机免费视频| 亚洲人成电影在线观看天堂色| 亚洲激情第一页| 日韩在线观看免费高清| 国产久一一精品| 国产91精品不卡视频| 国产精品中文字幕在线观看| 久久精品这里热有精品| 精品亚洲一区二区三区四区五区| 久久久亚洲天堂| 亚洲尤物视频网| 免费91在线视频| 国产精品成人aaaaa网站| 久久人人爽国产| 久久亚洲精品一区| 精品久久久精品|