亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 學院 > 開發設計 > 正文

FunDA(7)- Reactive Streams to fs2 Pull Streams

2019-11-11 05:23:29
字體:
來源:轉載
供稿:網友

    Reactive-Stream不只是簡單的push-model-stream, 它還帶有“拖式”(pull-model)性質。這是因為在Iteratee模式里雖然理論上由Enumerator負責主動推送數據,實現了push-model功能。但實際上Iteratee也會根據自身情況,通過提供callback函數通知Enumerator可以開始推送數據,這從某種程度上也算是一種pull-model。換句話講Reactive-Streams是通過push-pull-model來實現上下游Enumerator和Iteratee之間互動的。我們先看個簡單的Iteratee例子:

def showElements: Iteratee[Int,Unit] = Cont {  case Input.El(e) =>     PRintln(s"EL($e)")     showElements  case Input.Empty => showElements  case Input.EOF =>     println("EOF")     Done((),Input.EOF)}                                                 //> showElements: => play.api.libs.iteratee.Iteratee[Int,Unit]val enumNumbers = Enumerator(1,2,3,4,5)           //> enumNumbers  : play.api.libs.iteratee.Enumerator[Int] = play.api.libs.iteratee.Enumerator$$anon$19@47f6473enumNumbers |>> showElements                      //> EL(1)                                                  //| EL(2)                                                  //| EL(3)                                                  //| EL(4)                                                  //| EL(5)                                                  //| res0: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[Int,Unit]] = Success(Cont(<function1>))我們看到:enumNumbers |>> showElements立刻啟動了運算。但并沒有實際完成數據發送,因為showElements并沒有收到Input.EOF。首先,我們必須用Iteratee.run來完成運算:

val it = Iteratee.flatten(enum |>> consumeAll).run//> El(1)                                                  //| El(2)                                                  //| El(3)                                                  //| El(4)                                                  //| El(5)                                                  //| El(6)                                                  //| El(7)                                                  //| El(8)                                                  //| EOF                                                  //| it  : scala.concurrent.Future[Int] = Success(99)這個run函數是這樣定義的:

/**   * Extracts the computed result of the Iteratee pushing an Input.EOF if necessary   * Extracts the computed result of the Iteratee, pushing an Input.EOF first   * if the Iteratee is in the [[play.api.libs.iteratee.Cont]] state.   * In case of error, an exception may be thrown synchronously or may   * be used to complete the returned Promise; this indeterminate behavior   * is inherited from fold().   *   *  @return a [[scala.concurrent.Future]] of the eventually computed result   */  def run: Future[A] = fold({    case Step.Done(a, _) => Future.successful(a)    case Step.Cont(k) => k(Input.EOF).fold({      case Step.Done(a1, _) => Future.successful(a1)      case Step.Cont(_) => sys.error("diverging iteratee after Input.EOF")      case Step.Error(msg, e) => sys.error(msg)    })(dec)    case Step.Error(msg, e) => sys.error(msg)  })(dec)再一個問題是:enumNumbers |>> showElements是個封閉的運算,我們無法逐部分截取數據流,只能取得整個運算結果。也就是說如果我們希望把一個Enumerator產生的數據引導到fs2 Stream的話,只能在所有數據都讀入內存后才能實現了。這樣就違背了使用Reactive-Streams的意愿。那我們應該怎么辦?一個可行的方法是使用一個存儲數據結構,用兩個線程,一個線程里Iteratee把當前數據存入數據結構,另一個線程里fs2把數據取出來。fs2.async.mutable包提供了個Queue類型,我們可以用這個Queue結構來作為Iteratee與fs2之間的管道:Iteratee從一頭把數據壓進去(enqueue),fs2從另一頭把數據取出來(dequeue)。

我們先設計enqueue部分,這部分是在Iteratee里進行的:

def enqueueTofs2(q: async.mutable.Queue[Task,Option[Int]]): Iteratee[Int,Unit] = Cont {   case Input.EOF =>       q.enqueue1(None).unsafeRun       Done((),Input.EOF)   case Input.Empty => enqueueTofs2(q)   case Input.El(e) =>       q.enqueue1(Some(e)).unsafeRun       enqueueTofs2(q)}    //> enqueueTofs2: (q: fs2.async.mutable.Queue[fs2.Task,Option[Int]])play.api.libs.iteratee.Iteratee[Int,Unit]

先分析一下這個Iteratee:我們直接把enqueueTofs2放入Cont狀態,也就是等待接受數據狀態。當收到數據時運行q.enqueue1把數據塞入q,然后不斷循環運行至收到Input.EOF。注意:q.enqueue1(Some(e)).unsafeRun是個同步運算,在未成功完成數據enqueue1的情況下會一直占用線程。所以,q另一端的dequeue部分必須是在另一個線程里運行,否則會造成整個程序的死鎖。fs2的Queue類型款式是:Queue[F,A],所以我們必須用Stream.eval來對這個Queue進行函數式的操作:

val fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>    //run Enumerator-Iteratee and enqueue data in thread 1    //dequeue data and en-stream in thread 2 (current thread)  }因為Stream.eval運算結果是Stream[Task,Int],所以我們可以得出這個flatMap內的函數款式 Queue[Task,Option[Int]] => Stream[Task,Int]。下面我們先考慮如何實現數據enqueue部分:這部分是通過Iteratee的運算過程產生的。我們提到過這部分必須在另一個線程里運行,所以可以用Task來選定另一線程如下:

    Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture()現在這個Task就在后面另一個線程里自己去運算了。但它的運行進展則會依賴于另一個線程中dequeue數據的進展。我們先看看fs2提供的兩個函數款式:

/** Repeatedly calls `dequeue1` forever. */  def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat/**   * Halts the input stream at the first `None`.   *   * @example {{{   * scala> Stream[Pure, Option[Int]](Some(1), Some(2), None, Some(3), None).unNoneTerminate.toList   * res0: List[Int] = List(1, 2)   * }}}   */  def unNoneTerminate[F[_],I]: Pipe[F,Option[I],I] =    _ repeatPull { _.receive {      case (hd, tl) =>        val out = Chunk.indexedSeq(hd.toVector.takeWhile { _.isDefined }.collect { case Some(i) => i })        if (out.size == hd.size) Pull.output(out) as tl        else if (out.isEmpty) Pull.done        else Pull.output(out) >> Pull.done    }}

剛好,dequeue產生Stream[F,A]。而unNoneTerminate可以根據Stream(None)來終止運算。現在我們可以把這個Reactive-Streams到fs2-pull-streams轉換過程這樣來定義:

implicit val strat = Strategy.fromFixedDaemonPool(4)                                                  //> strat  : fs2.Strategy = Strategyval fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>  Task(Iteratee.flatten(enumNumbers |>> enqueueTofs2(q)).run).unsafeRunAsyncFuture  pipe.unNoneTerminate(q.dequeue)}   //> fs2Stream  : fs2.Stream[fs2.Task,Int] = attemptEval(Task).flatMap(<function1>).flatMap(<function1>)現在這個stream應該已經變成fs2.Stream[Task,Int]了。我們可以用前面的log函數來試運行一下:

def log[A](prompt: String): Pipe[Task,A,A] =    _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}                                                  //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A]    fs2Stream.through(log("")).run.unsafeRun          //> > 1                                                  //| > 2                                                  //| > 3                                                  //| > 4                                                  //| > 5我們成功的把Iteratee的Reactive-Stream轉化成fs2的Pull-Model-Stream。

下面是這次討論的源代碼:

import play.api.libs.iteratee._import scala.concurrent._import scala.concurrent.duration._import scala.concurrent.ExecutionContext.Implicits.globalimport scala.collection.mutable._import fs2._object iteratees {def showElements: Iteratee[Int,Unit] = Cont {  case Input.El(e) =>     println(s"EL($e)")     showElements  case Input.Empty => showElements  case Input.EOF =>     println("EOF")     Done((),Input.EOF)}val enumNumbers = Enumerator(1,2,3,4,5)enumNumbers |>> showElementsIteratee.flatten(enumNumbers |>> showElements).rundef enqueueTofs2(q: async.mutable.Queue[Task,Option[Int]]): Iteratee[Int,Unit] = Cont {   case Input.EOF =>       q.enqueue1(None).unsafeRun       Done((),Input.EOF)   case Input.Empty => enqueueTofs2(q)   case Input.El(e) =>       q.enqueue1(Some(e)).unsafeRun       enqueueTofs2(q)}implicit val strat = Strategy.fromFixedDaemonPool(4)val fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>  Task(Iteratee.flatten(enumNumbers |>> enqueueTofs2(q)).run).unsafeRunAsyncFuture  pipe.unNoneTerminate(q.dequeue)}def log[A](prompt: String): Pipe[Task,A,A] =    _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}    fs2Stream.through(log("")).run.unsafeRun }


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
亚洲精品免费一区二区三区| 国产精品一区二区久久久| 日韩电影免费在线观看| 久久亚洲精品网站| 欧美日韩在线观看视频| 欧美性猛交丰臀xxxxx网站| 98视频在线噜噜噜国产| 亚洲无限乱码一二三四麻| 最近2019中文字幕在线高清| 欧亚精品中文字幕| 成人春色激情网| 97久久超碰福利国产精品…| 欧美日韩精品二区| 成人精品网站在线观看| 日韩精品视频在线观看网址| 欧美视频专区一二在线观看| 国产精品ⅴa在线观看h| 日韩在线观看免费高清| xx视频.9999.com| 日韩中文字幕国产| www.xxxx精品| 日韩高清av一区二区三区| 国产精品精品久久久| 日韩欧美国产黄色| 久久久亚洲国产天美传媒修理工| 免费不卡欧美自拍视频| 欧美肥老妇视频| 菠萝蜜影院一区二区免费| 国产精品美女久久久久久免费| 蜜臀久久99精品久久久无需会员| 在线看日韩av| 欧美噜噜久久久xxx| 日韩乱码在线视频| 国产精品成人免费电影| 国产精品成人av性教育| 欧美激情欧美激情在线五月| 日韩福利伦理影院免费| 国产精品igao视频| 国产视频精品va久久久久久| 久久中文精品视频| 国产精品嫩草影院一区二区| 亚洲电影成人av99爱色| 九九久久久久99精品| 777国产偷窥盗摄精品视频| 久久天天躁狠狠躁夜夜躁2014| 精品国产成人av| 亚洲乱码国产乱码精品精天堂| 亚洲国产精品人人爽夜夜爽| 国产精品激情av在线播放| 欧美三级欧美成人高清www| 国产精品入口福利| 欧美成人剧情片在线观看| 日韩精品在线电影| 国内揄拍国内精品| 欧美理论在线观看| 亚洲色图偷窥自拍| 欧美视频二区36p| www.日本久久久久com.| 中日韩美女免费视频网址在线观看| 日韩在线中文字| 欧美成人激情在线| 日韩男女性生活视频| 国产精品视频区| 日韩中文第一页| 这里只有精品视频| 久久精视频免费在线久久完整在线看| 欧美国产日韩精品| 国产精品入口免费视频一| 97色在线观看| 欧美精品在线观看91| 欧美激情亚洲一区| 亚洲成av人片在线观看香蕉| 成人网中文字幕| 欧美电影免费观看| 中文字幕精品www乱入免费视频| 2020久久国产精品| 欧美成人精品三级在线观看| 国产欧美精品一区二区三区介绍| 青青久久aⅴ北条麻妃| 成人久久久久久久| 亚洲一区二区在线播放| 精品视频在线播放| 欧美成人免费网| 久久久久久久国产精品| 成人一区二区电影| 丰满岳妇乱一区二区三区| 久久精品一区中文字幕| 成人a级免费视频| 欧美色videos| 91大神福利视频在线| 国产日韩欧美电影在线观看| 一本一本久久a久久精品综合小说| 奇米影视亚洲狠狠色| 欧美日韩在线观看视频| 丝袜美腿亚洲一区二区| 国产网站欧美日韩免费精品在线观看| 亚洲欧美变态国产另类| 国产欧美日韩免费| 欧美激情国内偷拍| 亚洲国产精彩中文乱码av| 精品国产鲁一鲁一区二区张丽| 久久伊人精品视频| 国语自产精品视频在线看抢先版图片| 国产精品视频中文字幕91| 国内精品久久久久久| 亚洲最新av在线网站| 亚洲最大成人在线| 啊v视频在线一区二区三区| 国产精品日韩在线播放| 亚洲一区二区黄| 在线精品视频视频中文字幕| 亚洲人成伊人成综合网久久久| 国产一区二区三区毛片| 97久久超碰福利国产精品…| 九九九久久国产免费| 亚洲色在线视频| 久久影院模特热| 国产丝袜一区二区| 亚洲色图狂野欧美| 久国内精品在线| 亚洲第一综合天堂另类专| 国内精品久久久久| 在线亚洲欧美视频| 色噜噜久久综合伊人一本| 国产精品成人一区二区三区吃奶| 国产啪精品视频| 亚洲第一视频在线观看| 一道本无吗dⅴd在线播放一区| 亚洲日韩欧美视频一区| 免费av一区二区| 国产精品久久久久久久久久久不卡| 久久精视频免费在线久久完整在线看| 九色精品美女在线| 国产精品久久久久久一区二区| 国产精品一区专区欧美日韩| 国产亚洲一级高清| 日韩网站免费观看高清| 一区二区三区国产视频| 亚洲一区二区三区在线视频| 欧美激情免费看| 亚洲免费小视频| 色悠悠久久88| 欧美理论电影在线播放| 亚洲精品久久久久中文字幕二区| 亚洲欧美日韩网| 最近2019好看的中文字幕免费| 久久九九精品99国产精品| 国产一区二区美女视频| 久热精品视频在线观看一区| 国产精品6699| 日韩的一区二区| 欧美色xxxx| 亚洲第一天堂无码专区| 国产日本欧美一区二区三区在线| 国产亚洲精品久久久久动| 韩剧1988在线观看免费完整版| 欧美成人免费观看| 成人亲热视频网站| 久久久久久国产免费| 精品久久久91| 国产精品专区第二| 一道本无吗dⅴd在线播放一区| 日本高清+成人网在线观看| 日本成人激情视频|