亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 學院 > 開發設計 > 正文

FunDA(7)- Reactive Streams to fs2 Pull Streams

2019-11-11 04:50:09
字體:
來源:轉載
供稿:網友

    Reactive-Stream不只是簡單的push-model-stream, 它還帶有“拖式”(pull-model)性質。這是因為在Iteratee模式里雖然理論上由Enumerator負責主動推送數據,實現了push-model功能。但實際上Iteratee也會根據自身情況,通過提供callback函數通知Enumerator可以開始推送數據,這從某種程度上也算是一種pull-model。換句話講Reactive-Streams是通過push-pull-model來實現上下游Enumerator和Iteratee之間互動的。我們先看個簡單的Iteratee例子:

def showElements: Iteratee[Int,Unit] = Cont {  case Input.El(e) =>     PRintln(s"EL($e)")     showElements  case Input.Empty => showElements  case Input.EOF =>     println("EOF")     Done((),Input.EOF)}                                                 //> showElements: => play.api.libs.iteratee.Iteratee[Int,Unit]val enumNumbers = Enumerator(1,2,3,4,5)           //> enumNumbers  : play.api.libs.iteratee.Enumerator[Int] = play.api.libs.iteratee.Enumerator$$anon$19@47f6473enumNumbers |>> showElements                      //> EL(1)                                                  //| EL(2)                                                  //| EL(3)                                                  //| EL(4)                                                  //| EL(5)                                                  //| res0: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[Int,Unit]] = Success(Cont(<function1>))我們看到:enumNumbers |>> showElements立刻啟動了運算。但并沒有實際完成數據發送,因為showElements并沒有收到Input.EOF。首先,我們必須用Iteratee.run來完成運算:

val it = Iteratee.flatten(enum |>> consumeAll).run//> El(1)                                                  //| El(2)                                                  //| El(3)                                                  //| El(4)                                                  //| El(5)                                                  //| El(6)                                                  //| El(7)                                                  //| El(8)                                                  //| EOF                                                  //| it  : scala.concurrent.Future[Int] = Success(99)這個run函數是這樣定義的:

/**   * Extracts the computed result of the Iteratee pushing an Input.EOF if necessary   * Extracts the computed result of the Iteratee, pushing an Input.EOF first   * if the Iteratee is in the [[play.api.libs.iteratee.Cont]] state.   * In case of error, an exception may be thrown synchronously or may   * be used to complete the returned Promise; this indeterminate behavior   * is inherited from fold().   *   *  @return a [[scala.concurrent.Future]] of the eventually computed result   */  def run: Future[A] = fold({    case Step.Done(a, _) => Future.successful(a)    case Step.Cont(k) => k(Input.EOF).fold({      case Step.Done(a1, _) => Future.successful(a1)      case Step.Cont(_) => sys.error("diverging iteratee after Input.EOF")      case Step.Error(msg, e) => sys.error(msg)    })(dec)    case Step.Error(msg, e) => sys.error(msg)  })(dec)再一個問題是:enumNumbers |>> showElements是個封閉的運算,我們無法逐部分截取數據流,只能取得整個運算結果。也就是說如果我們希望把一個Enumerator產生的數據引導到fs2 Stream的話,只能在所有數據都讀入內存后才能實現了。這樣就違背了使用Reactive-Streams的意愿。那我們應該怎么辦?一個可行的方法是使用一個存儲數據結構,用兩個線程,一個線程里Iteratee把當前數據存入數據結構,另一個線程里fs2把數據取出來。fs2.async.mutable包提供了個Queue類型,我們可以用這個Queue結構來作為Iteratee與fs2之間的管道:Iteratee從一頭把數據壓進去(enqueue),fs2從另一頭把數據取出來(dequeue)。

我們先設計enqueue部分,這部分是在Iteratee里進行的:

def enqueueTofs2(q: async.mutable.Queue[Task,Option[Int]]): Iteratee[Int,Unit] = Cont {   case Input.EOF =>       q.enqueue1(None).unsafeRun       Done((),Input.EOF)   case Input.Empty => enqueueTofs2(q)   case Input.El(e) =>       q.enqueue1(Some(e)).unsafeRun       enqueueTofs2(q)}    //> enqueueTofs2: (q: fs2.async.mutable.Queue[fs2.Task,Option[Int]])play.api.libs.iteratee.Iteratee[Int,Unit]

先分析一下這個Iteratee:我們直接把enqueueTofs2放入Cont狀態,也就是等待接受數據狀態。當收到數據時運行q.enqueue1把數據塞入q,然后不斷循環運行至收到Input.EOF。注意:q.enqueue1(Some(e)).unsafeRun是個同步運算,在未成功完成數據enqueue1的情況下會一直占用線程。所以,q另一端的dequeue部分必須是在另一個線程里運行,否則會造成整個程序的死鎖。fs2的Queue類型款式是:Queue[F,A],所以我們必須用Stream.eval來對這個Queue進行函數式的操作:

val fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>    //run Enumerator-Iteratee and enqueue data in thread 1    //dequeue data and en-stream in thread 2 (current thread)  }因為Stream.eval運算結果是Stream[Task,Int],所以我們可以得出這個flatMap內的函數款式 Queue[Task,Option[Int]] => Stream[Task,Int]。下面我們先考慮如何實現數據enqueue部分:這部分是通過Iteratee的運算過程產生的。我們提到過這部分必須在另一個線程里運行,所以可以用Task來選定另一線程如下:

    Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture()現在這個Task就在后面另一個線程里自己去運算了。但它的運行進展則會依賴于另一個線程中dequeue數據的進展。我們先看看fs2提供的兩個函數款式:

/** Repeatedly calls `dequeue1` forever. */  def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat/**   * Halts the input stream at the first `None`.   *   * @example {{{   * scala> Stream[Pure, Option[Int]](Some(1), Some(2), None, Some(3), None).unNoneTerminate.toList   * res0: List[Int] = List(1, 2)   * }}}   */  def unNoneTerminate[F[_],I]: Pipe[F,Option[I],I] =    _ repeatPull { _.receive {      case (hd, tl) =>        val out = Chunk.indexedSeq(hd.toVector.takeWhile { _.isDefined }.collect { case Some(i) => i })        if (out.size == hd.size) Pull.output(out) as tl        else if (out.isEmpty) Pull.done        else Pull.output(out) >> Pull.done    }}

剛好,dequeue產生Stream[F,A]。而unNoneTerminate可以根據Stream(None)來終止運算?,F在我們可以把這個Reactive-Streams到fs2-pull-streams轉換過程這樣來定義:

implicit val strat = Strategy.fromFixedDaemonPool(4)                                                  //> strat  : fs2.Strategy = Strategyval fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>  Task(Iteratee.flatten(enumNumbers |>> enqueueTofs2(q)).run).unsafeRunAsyncFuture  pipe.unNoneTerminate(q.dequeue)}   //> fs2Stream  : fs2.Stream[fs2.Task,Int] = attemptEval(Task).flatMap(<function1>).flatMap(<function1>)現在這個stream應該已經變成fs2.Stream[Task,Int]了。我們可以用前面的log函數來試運行一下:

def log[A](prompt: String): Pipe[Task,A,A] =    _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}                                                  //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A]    fs2Stream.through(log("")).run.unsafeRun          //> > 1                                                  //| > 2                                                  //| > 3                                                  //| > 4                                                  //| > 5我們成功的把Iteratee的Reactive-Stream轉化成fs2的Pull-Model-Stream。

下面是這次討論的源代碼:

import play.api.libs.iteratee._import scala.concurrent._import scala.concurrent.duration._import scala.concurrent.ExecutionContext.Implicits.globalimport scala.collection.mutable._import fs2._object iteratees {def showElements: Iteratee[Int,Unit] = Cont {  case Input.El(e) =>     println(s"EL($e)")     showElements  case Input.Empty => showElements  case Input.EOF =>     println("EOF")     Done((),Input.EOF)}val enumNumbers = Enumerator(1,2,3,4,5)enumNumbers |>> showElementsIteratee.flatten(enumNumbers |>> showElements).rundef enqueueTofs2(q: async.mutable.Queue[Task,Option[Int]]): Iteratee[Int,Unit] = Cont {   case Input.EOF =>       q.enqueue1(None).unsafeRun       Done((),Input.EOF)   case Input.Empty => enqueueTofs2(q)   case Input.El(e) =>       q.enqueue1(Some(e)).unsafeRun       enqueueTofs2(q)}implicit val strat = Strategy.fromFixedDaemonPool(4)val fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>  Task(Iteratee.flatten(enumNumbers |>> enqueueTofs2(q)).run).unsafeRunAsyncFuture  pipe.unNoneTerminate(q.dequeue)}def log[A](prompt: String): Pipe[Task,A,A] =    _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}    fs2Stream.through(log("")).run.unsafeRun }


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
日韩中文在线中文网在线观看| 亚洲男女性事视频| 国内精品久久久久伊人av| 中文字幕一区二区三区电影| 川上优av一区二区线观看| 成人免费视频在线观看超级碰| 亚洲人成在线免费观看| 国产精品欧美在线| 91中文字幕在线观看| 欧美午夜宅男影院在线观看| 精品亚洲国产成av人片传媒| 亚洲片在线资源| 亚洲电影免费观看高清完整版| 欧美一区二区三区四区在线| 久久韩剧网电视剧| xxx一区二区| 亚洲国产另类 国产精品国产免费| 亚洲风情亚aⅴ在线发布| 日韩精品在线私人| 成人精品在线视频| 国产精品高潮呻吟视频| 亚洲精品中文字幕av| 国产日韩欧美视频| 国产精品一区电影| 中文字幕亚洲欧美在线| 全色精品综合影院| 国产一区二区视频在线观看| 国产欧美日韩免费看aⅴ视频| 欧美在线视频一区| 日韩欧美国产免费播放| 欧美激情xxxx性bbbb| 欧美精品免费在线| 亚洲国产精品99久久| 97婷婷大伊香蕉精品视频| 亚洲男女自偷自拍图片另类| 亚洲图片欧美日产| 久久五月情影视| 久久亚洲精品成人| 一区二区欧美激情| 97久久精品国产| 在线亚洲午夜片av大片| 成人xxxx视频| 欧美制服第一页| 欧美午夜精品久久久久久人妖| 另类专区欧美制服同性| 色综合视频一区中文字幕| 美女啪啪无遮挡免费久久网站| 欧美激情中文字幕在线| 国产一区二区视频在线观看| 欧美肥老太性生活视频| 日韩美女免费线视频| 懂色av影视一区二区三区| 色一情一乱一区二区| 性视频1819p久久| 亚洲美女在线观看| 久久久久久av| 欧美精品videos| 91精品啪在线观看麻豆免费| 美日韩精品视频免费看| 国产一区二区三区日韩欧美| 日韩av电影国产| 97精品免费视频| 久久香蕉国产线看观看av| 日本不卡免费高清视频| 57pao成人国产永久免费| www.亚洲天堂| 精品福利在线看| 亚洲精品狠狠操| 久久亚洲影音av资源网| 欧美人与性动交a欧美精品| 国产精品视频久| 久久综合伊人77777尤物| 国内外成人免费激情在线视频网站| 国产精品免费一区豆花| 国产精品久久久久999| 国产精品视频资源| 久久中文字幕在线视频| 色综合91久久精品中文字幕| 午夜精品蜜臀一区二区三区免费| 尤物yw午夜国产精品视频| 国产精品扒开腿做爽爽爽视频| 亚洲欧美在线x视频| 久久全球大尺度高清视频| 亚洲精品白浆高清久久久久久| 日本亚洲欧美成人| 国模gogo一区二区大胆私拍| 欧美成人激情在线| 亚洲欧美中文日韩在线| 欧美成人剧情片在线观看| 成人午夜激情免费视频| 亚洲综合在线中文字幕| 亚洲色图15p| www.xxxx欧美| 久久久久亚洲精品| 成人xvideos免费视频| 欧美成人精品一区| 久久伊人91精品综合网站| 在线性视频日韩欧美| 欧美自拍视频在线观看| 国产午夜精品视频| 欧美另类极品videosbest最新版本| 亚洲国产日韩欧美在线动漫| 久久久久久亚洲精品中文字幕| 欧美激情在线视频二区| 色香阁99久久精品久久久| 欧美中文在线观看国产| 国产精品视频yy9099| 欧美午夜精品久久久久久浪潮| 97视频人免费观看| 国产亚洲激情视频在线| 日韩中文字幕免费看| 91社影院在线观看| 国产精品中文字幕在线观看| 国产精品永久免费视频| 欧美天天综合色影久久精品| 91av在线播放视频| 国色天香2019中文字幕在线观看| 欧美在线国产精品| 国产精品h片在线播放| 欧美超级乱淫片喷水| 亚洲免费中文字幕| 亚洲人成网站免费播放| 欧美一区三区三区高中清蜜桃| 在线视频欧美性高潮| 成人国产在线视频| 国产精品极品尤物在线观看| 久青草国产97香蕉在线视频| 成人精品一区二区三区电影免费| 精品精品国产国产自在线| 亚洲天堂久久av| 日韩欧美国产骚| 国产成人一区二区三区| 国内免费精品永久在线视频| 国产乱肥老妇国产一区二| 久久99久久久久久久噜噜| 亚洲国产欧美一区二区三区同亚洲| 久久电影一区二区| 国产一区二区三区视频在线观看| 日韩成人xxxx| 欧美亚洲另类激情另类| 欧美另类极品videosbest最新版本| 91在线高清免费观看| 国产在线拍揄自揄视频不卡99| 日韩av手机在线看| 国产亚洲精品美女久久久| 国产亚洲成精品久久| 久久成人av网站| 亚洲老板91色精品久久| 亚洲自拍另类欧美丝袜| 人妖精品videosex性欧美| 色先锋资源久久综合5566| 亚洲第一天堂无码专区| 欧美怡红院视频一区二区三区| 97香蕉久久超级碰碰高清版| 欧美乱大交xxxxx| 欧美另类老肥妇| 麻豆国产精品va在线观看不卡| 国产亚洲欧洲高清| 欧美激情2020午夜免费观看| 91po在线观看91精品国产性色| 亚洲欧美日韩中文在线| 欧美激情按摩在线| 日韩国产高清污视频在线观看| 一区二区欧美亚洲|