亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 學院 > 開發設計 > 正文

FunDA(7)- Reactive Streams to fs2 Pull Streams

2019-11-11 05:25:19
字體:
來源:轉載
供稿:網友

    Reactive-Stream不只是簡單的push-model-stream, 它還帶有“拖式”(pull-model)性質。這是因為在Iteratee模式里雖然理論上由Enumerator負責主動推送數據,實現了push-model功能。但實際上Iteratee也會根據自身情況,通過提供callback函數通知Enumerator可以開始推送數據,這從某種程度上也算是一種pull-model。換句話講Reactive-Streams是通過push-pull-model來實現上下游Enumerator和Iteratee之間互動的。我們先看個簡單的Iteratee例子:

def showElements: Iteratee[Int,Unit] = Cont {  case Input.El(e) =>     PRintln(s"EL($e)")     showElements  case Input.Empty => showElements  case Input.EOF =>     println("EOF")     Done((),Input.EOF)}                                                 //> showElements: => play.api.libs.iteratee.Iteratee[Int,Unit]val enumNumbers = Enumerator(1,2,3,4,5)           //> enumNumbers  : play.api.libs.iteratee.Enumerator[Int] = play.api.libs.iteratee.Enumerator$$anon$19@47f6473enumNumbers |>> showElements                      //> EL(1)                                                  //| EL(2)                                                  //| EL(3)                                                  //| EL(4)                                                  //| EL(5)                                                  //| res0: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[Int,Unit]] = Success(Cont(<function1>))我們看到:enumNumbers |>> showElements立刻啟動了運算。但并沒有實際完成數據發送,因為showElements并沒有收到Input.EOF。首先,我們必須用Iteratee.run來完成運算:

val it = Iteratee.flatten(enum |>> consumeAll).run//> El(1)                                                  //| El(2)                                                  //| El(3)                                                  //| El(4)                                                  //| El(5)                                                  //| El(6)                                                  //| El(7)                                                  //| El(8)                                                  //| EOF                                                  //| it  : scala.concurrent.Future[Int] = Success(99)這個run函數是這樣定義的:

/**   * Extracts the computed result of the Iteratee pushing an Input.EOF if necessary   * Extracts the computed result of the Iteratee, pushing an Input.EOF first   * if the Iteratee is in the [[play.api.libs.iteratee.Cont]] state.   * In case of error, an exception may be thrown synchronously or may   * be used to complete the returned Promise; this indeterminate behavior   * is inherited from fold().   *   *  @return a [[scala.concurrent.Future]] of the eventually computed result   */  def run: Future[A] = fold({    case Step.Done(a, _) => Future.successful(a)    case Step.Cont(k) => k(Input.EOF).fold({      case Step.Done(a1, _) => Future.successful(a1)      case Step.Cont(_) => sys.error("diverging iteratee after Input.EOF")      case Step.Error(msg, e) => sys.error(msg)    })(dec)    case Step.Error(msg, e) => sys.error(msg)  })(dec)再一個問題是:enumNumbers |>> showElements是個封閉的運算,我們無法逐部分截取數據流,只能取得整個運算結果。也就是說如果我們希望把一個Enumerator產生的數據引導到fs2 Stream的話,只能在所有數據都讀入內存后才能實現了。這樣就違背了使用Reactive-Streams的意愿。那我們應該怎么辦?一個可行的方法是使用一個存儲數據結構,用兩個線程,一個線程里Iteratee把當前數據存入數據結構,另一個線程里fs2把數據取出來。fs2.async.mutable包提供了個Queue類型,我們可以用這個Queue結構來作為Iteratee與fs2之間的管道:Iteratee從一頭把數據壓進去(enqueue),fs2從另一頭把數據取出來(dequeue)。

我們先設計enqueue部分,這部分是在Iteratee里進行的:

def enqueueTofs2(q: async.mutable.Queue[Task,Option[Int]]): Iteratee[Int,Unit] = Cont {   case Input.EOF =>       q.enqueue1(None).unsafeRun       Done((),Input.EOF)   case Input.Empty => enqueueTofs2(q)   case Input.El(e) =>       q.enqueue1(Some(e)).unsafeRun       enqueueTofs2(q)}    //> enqueueTofs2: (q: fs2.async.mutable.Queue[fs2.Task,Option[Int]])play.api.libs.iteratee.Iteratee[Int,Unit]

先分析一下這個Iteratee:我們直接把enqueueTofs2放入Cont狀態,也就是等待接受數據狀態。當收到數據時運行q.enqueue1把數據塞入q,然后不斷循環運行至收到Input.EOF。注意:q.enqueue1(Some(e)).unsafeRun是個同步運算,在未成功完成數據enqueue1的情況下會一直占用線程。所以,q另一端的dequeue部分必須是在另一個線程里運行,否則會造成整個程序的死鎖。fs2的Queue類型款式是:Queue[F,A],所以我們必須用Stream.eval來對這個Queue進行函數式的操作:

val fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>    //run Enumerator-Iteratee and enqueue data in thread 1    //dequeue data and en-stream in thread 2 (current thread)  }因為Stream.eval運算結果是Stream[Task,Int],所以我們可以得出這個flatMap內的函數款式 Queue[Task,Option[Int]] => Stream[Task,Int]。下面我們先考慮如何實現數據enqueue部分:這部分是通過Iteratee的運算過程產生的。我們提到過這部分必須在另一個線程里運行,所以可以用Task來選定另一線程如下:

    Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture()現在這個Task就在后面另一個線程里自己去運算了。但它的運行進展則會依賴于另一個線程中dequeue數據的進展。我們先看看fs2提供的兩個函數款式:

/** Repeatedly calls `dequeue1` forever. */  def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat/**   * Halts the input stream at the first `None`.   *   * @example {{{   * scala> Stream[Pure, Option[Int]](Some(1), Some(2), None, Some(3), None).unNoneTerminate.toList   * res0: List[Int] = List(1, 2)   * }}}   */  def unNoneTerminate[F[_],I]: Pipe[F,Option[I],I] =    _ repeatPull { _.receive {      case (hd, tl) =>        val out = Chunk.indexedSeq(hd.toVector.takeWhile { _.isDefined }.collect { case Some(i) => i })        if (out.size == hd.size) Pull.output(out) as tl        else if (out.isEmpty) Pull.done        else Pull.output(out) >> Pull.done    }}

剛好,dequeue產生Stream[F,A]。而unNoneTerminate可以根據Stream(None)來終止運算?,F在我們可以把這個Reactive-Streams到fs2-pull-streams轉換過程這樣來定義:

implicit val strat = Strategy.fromFixedDaemonPool(4)                                                  //> strat  : fs2.Strategy = Strategyval fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>  Task(Iteratee.flatten(enumNumbers |>> enqueueTofs2(q)).run).unsafeRunAsyncFuture  pipe.unNoneTerminate(q.dequeue)}   //> fs2Stream  : fs2.Stream[fs2.Task,Int] = attemptEval(Task).flatMap(<function1>).flatMap(<function1>)現在這個stream應該已經變成fs2.Stream[Task,Int]了。我們可以用前面的log函數來試運行一下:

def log[A](prompt: String): Pipe[Task,A,A] =    _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}                                                  //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A]    fs2Stream.through(log("")).run.unsafeRun          //> > 1                                                  //| > 2                                                  //| > 3                                                  //| > 4                                                  //| > 5我們成功的把Iteratee的Reactive-Stream轉化成fs2的Pull-Model-Stream。

下面是這次討論的源代碼:

import play.api.libs.iteratee._import scala.concurrent._import scala.concurrent.duration._import scala.concurrent.ExecutionContext.Implicits.globalimport scala.collection.mutable._import fs2._object iteratees {def showElements: Iteratee[Int,Unit] = Cont {  case Input.El(e) =>     println(s"EL($e)")     showElements  case Input.Empty => showElements  case Input.EOF =>     println("EOF")     Done((),Input.EOF)}val enumNumbers = Enumerator(1,2,3,4,5)enumNumbers |>> showElementsIteratee.flatten(enumNumbers |>> showElements).rundef enqueueTofs2(q: async.mutable.Queue[Task,Option[Int]]): Iteratee[Int,Unit] = Cont {   case Input.EOF =>       q.enqueue1(None).unsafeRun       Done((),Input.EOF)   case Input.Empty => enqueueTofs2(q)   case Input.El(e) =>       q.enqueue1(Some(e)).unsafeRun       enqueueTofs2(q)}implicit val strat = Strategy.fromFixedDaemonPool(4)val fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>  Task(Iteratee.flatten(enumNumbers |>> enqueueTofs2(q)).run).unsafeRunAsyncFuture  pipe.unNoneTerminate(q.dequeue)}def log[A](prompt: String): Pipe[Task,A,A] =    _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}    fs2Stream.through(log("")).run.unsafeRun }


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
在线中文字幕日韩| 久久夜精品va视频免费观看| 91精品国产高清久久久久久久久| 亚洲精品suv精品一区二区| 日韩经典第一页| 国内外成人免费激情在线视频网站| 中文字幕久久久| 国产精品欧美风情| 91成人天堂久久成人| 中文综合在线观看| 日韩电影在线观看中文字幕| 欧美日韩免费网站| 亚洲美女视频网站| 日韩电影在线观看永久视频免费网站| 成人福利网站在线观看| 日韩国产精品一区| 91精品国产一区| 亚洲精品在线观看www| 欧美日韩免费一区| 亚洲资源在线看| 欧美日韩在线视频一区二区| 久久深夜福利免费观看| 欧美日韩精品在线| 日本aⅴ大伊香蕉精品视频| 韩国国内大量揄拍精品视频| 国产日韩av高清| 久久影视电视剧凤归四时歌| 欧美日韩综合视频网址| 久久中文精品视频| 午夜精品久久久久久99热软件| 中文字幕精品久久| 色综合伊人色综合网| 免费不卡在线观看av| 久久免费精品日本久久中文字幕| 久久久av电影| 欧美福利视频在线| 亚洲精品福利在线观看| 国产精品精品一区二区三区午夜版| 亚洲福利视频网站| 国产精品免费久久久久影院| 亚洲天堂男人的天堂| 一区二区在线视频| 亚洲在线视频福利| 91精品国产高清久久久久久久久| 国产精品久久久久不卡| 色偷偷av亚洲男人的天堂| 亚洲毛片在线看| 亚洲成人久久电影| 精品国产一区av| 欧美精品在线观看| 国产精品久久久久久久久久| 成人网址在线观看| 91性高湖久久久久久久久_久久99| 成人疯狂猛交xxx| 亚洲乱码国产乱码精品精天堂| 亚洲免费一在线| 538国产精品一区二区在线| 久久久黄色av| 国产在线a不卡| 亚洲精品国产拍免费91在线| 亚洲男人天堂2023| 日本不卡视频在线播放| 久久久精品国产一区二区| 久久偷看各类女兵18女厕嘘嘘| 91av在线精品| 亚洲精品影视在线观看| 亚洲欧美一区二区激情| 伊人伊人伊人久久| 欧美在线xxx| 成人福利在线视频| 亚洲精品国精品久久99热| 97免费视频在线播放| 国产亚洲精品久久久优势| 亚洲欧洲日韩国产| 欧美精品一本久久男人的天堂| 欧美制服第一页| 98午夜经典影视| 国产精品一二三视频| 欧美在线激情视频| 日韩中文字幕在线免费观看| 欧美亚洲视频在线看网址| 精品在线小视频| 精品欧美国产一区二区三区| 亚洲免费一级电影| 欧美日韩一区二区精品| 国产视频在线一区二区| 91在线免费观看网站| 国产亚洲欧美另类中文| 最新的欧美黄色| 日韩精品极品在线观看| 日韩成人中文字幕在线观看| 亚洲成年人影院在线| 久久免费视频这里只有精品| 久久久精品国产一区二区| 亚洲精品国产精品国自产观看浪潮| 日韩视频免费看| 国产日韩中文字幕在线| 日韩美女激情视频| 日韩高清免费观看| 久久人人爽国产| 国产免费成人av| 最近2019年好看中文字幕视频| 色偷偷偷综合中文字幕;dd| 国产裸体写真av一区二区| 国产成+人+综合+亚洲欧美丁香花| 色婷婷综合久久久久中文字幕1| 国产精品视频一区国模私拍| 热久久这里只有精品| 日韩69视频在线观看| 中文字幕亚洲一区二区三区五十路| 成人激情视频免费在线| 久久久精品在线| 精品欧美国产一区二区三区| 色视频www在线播放国产成人| 亚洲免费电影在线观看| 日韩h在线观看| 国产精品福利无圣光在线一区| 国产一区二区久久精品| 亚洲三级 欧美三级| 欧美精品久久久久久久久| 国产成人黄色av| 久久网福利资源网站| 日韩在线视频二区| 欧美wwwwww| 性亚洲最疯狂xxxx高清| 97在线看免费观看视频在线观看| 中文字幕欧美视频在线| 亚洲国产精彩中文乱码av在线播放| 亚洲欧洲偷拍精品| 精品国内自产拍在线观看| 国产精品久久久久99| 亚洲精品国产美女| 国产成人短视频| 亚洲国产精品悠悠久久琪琪| 国自在线精品视频| 国产精品国内视频| 欧美中在线观看| 久久久久一本一区二区青青蜜月| 亚洲综合中文字幕68页| 538国产精品一区二区在线| 国产91色在线|| 欧美综合在线观看| 精品电影在线观看| 亚洲精品99久久久久| 亚洲成人网在线观看| 日产精品99久久久久久| 精品视频一区在线视频| 日韩精品在线观看一区二区| 国产一区二区激情| 国产成人精品久久二区二区91| 日韩精品中文字幕在线观看| 国产在线视频不卡| 亚洲国产精品人人爽夜夜爽| 亚洲一区999| 亚洲精品国偷自产在线99热| 成人免费网站在线| 亚洲精品99久久久久| 亚洲第一福利网站| 高清亚洲成在人网站天堂| 国产91色在线|免| 国产精品美腿一区在线看| 中文字幕国产亚洲| 欧美在线中文字幕| 国产精品揄拍一区二区|