亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 學院 > 開發設計 > 正文

Storm系列(二):使用Csharp創建你的第一個Storm拓撲(wordcount)

2019-11-17 02:19:23
字體:
來源:轉載
供稿:網友

Storm系列(二):使用Csharp創建你的第一個Storm拓撲(Wordcount)

WordCount在大數據領域就像學習一門語言時的hello world,得益于Storm的開源以及Storm.Net.Adapter,現在我們也可以像java或Python一樣,使用Csharp創建原生支持的Storm Topologies。下面我將通過介紹wordcount來展示如何使用Csharp開發Storm拓撲。

上篇博客已經介紹了如何部署Storm開發環境,本文所講述demo已包含在Storm.Net.Adapter中,如果你覺得對你有幫助,歡迎Star和Fork,讓更多人看到來幫助完善這個項目。

首先,我們創建一個控制臺應用程序(使用控制臺是方便調用) StormSimple;使用Nuget添加添加Storm.Net.Adapter(該類庫的namespace為Storm)。

wordcount project

STEP1:通過繼承ISpout創建一個Spout:Generator,實現ISpout的四個方法:

void Open(Config stormConf, TopologyContext context);void NextTuple();void Ack(long seqId);void Fail(long seqId);

在實現這4個方法之前,我們還需要創建一些變量和方法來初始化這個類:

PRivate Context ctx;public Generator(Context ctx){    Context.Logger.Info("Generator constructor called");    this.ctx = ctx;    // Declare Output schema    Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>();    outputSchema.Add("default", new List<Type>() { typeof(string) });    this.ctx.DeclareComponentSchema(new ComponentStreamSchema(null, outputSchema));}

  

我使用了一個私有變量ctx來保存實例化時傳入的Context對象,Context有一個靜態的Logger,用于日志的發送,我們無需實例化即可使用它。根據日志級別不同,包含 Trace Debug Info Warn Error 五個級別,另外我們在實例化方法里還需要定義輸入和輸出的參數的數量和類型,本例子中輸入為null,輸出為一個字符串。另外我們還創建一個方法來直接返回實例化后的類:

/// <summary>///  Implements of delegate "newPlugin", which is used to create a instance of this spout/bolt/// </summary>/// <param name="ctx">Context instance</param>/// <returns></returns>public static Generator Get(Context ctx){    return new Generator(ctx);}

其中Open在該類第一次任務調用前執行,主要用于預處理和一些配置信息的傳入,大多數情況下,我們并不需要做什么;NextTuple方法用于生成Tuple,會不斷被調用,因此如果沒什么任務要向下發送,可以使用Thread.Sleep(50);來減少CPU的消耗(具體休息時間與Topology設置有關,只要不超過超時時間就沒有問題)。

本例子中NextTuple主要用于從一個包含英語句子的數組中隨機取出一條句子,并把它發送到下一個環節,為了能夠保證所有的任務都被成功執行一遍,我們將發送的消息緩存起來,并且限制正在執行中的任務數量為20。

private const int MAX_PENDING_TUPLE_NUM = 20;private long lastSeqId = 0;private Dictionary<long, string> cachedTuples = new Dictionary<long, string>();private Random rand = new Random();string[] sentences = new string[] {                                  "the cow jumped over the moon",                                  "an apple a day keeps the doctor away",                                  "four score and seven years ago",                                  "snow white and the seven dwarfs",                                  "i am at two with nature"};/// <summary>/// This method is used to emit one or more tuples. If there is nothing to emit, this method should return without emitting anything. /// It should be noted that NextTuple(), Ack(), and Fail() are all called in a tight loop in a single thread in C# process. /// When there are no tuples to emit, it is courteous to have NextTuple sleep for a short amount of time (such as 10 milliseconds), so as not to waste too much CPU./// </summary>public void NextTuple(){    Context.Logger.Info("NextTuple enter");    string sentence;    if (cachedTuples.Count <= MAX_PENDING_TUPLE_NUM)    {        lastSeqId++;        sentence = sentences[rand.Next(0, sentences.Length - 1)];        Context.Logger.Info("Generator Emit: {0}, seqId: {1}", sentence, lastSeqId);        this.ctx.Emit("default", new List<object>() { sentence }, lastSeqId);        cachedTuples[lastSeqId] = sentence;    }    else    {        // if have nothing to emit, then sleep for a little while to release CPU        Thread.Sleep(50);    }    Context.Logger.Info("cached tuple num: {0}", cachedTuples.Count);    Context.Logger.Info("Generator NextTx exit");}

this.ctx.Emit即用來把Topology發送給下一個Bolt。

Ack()和Fail()方法分別在整個Topology執行成功和Topology失敗時被調用。本例中Ack主要是移除緩存,Fail主要是用于取出緩存數據并重新發送Tuple。

/// <summary>/// Ack() will be called only when ack mechanism is enabled in spec file./// If ack is not supported in non-transactional topology, the Ack() can be left as empty function. /// </summary>/// <param name="seqId">Sequence Id of the tuple which is acked.</param>public void Ack(long seqId){    Context.Logger.Info("Ack, seqId: {0}", seqId);    bool result = cachedTuples.Remove(seqId);    if (!result)    {        Context.Logger.Warn("Ack(), remove cached tuple for seqId {0} fail!", seqId);    }}/// <summary>/// Fail() will be called only when ack mechanism is enabled in spec file. /// If ack is not supported in non-transactional topology, the Fail() can be left as empty function./// </summary>/// <param name="seqId">Sequence Id of the tuple which is failed.</param>public void Fail(long seqId){    Context.Logger.Info("Fail, seqId: {0}", seqId);    if (cachedTuples.ContainsKey(seqId))    {        string sentence = cachedTuples[seqId];        Context.Logger.Info("Re-Emit: {0}, seqId: {1}", sentence, seqId);        this.ctx.Emit("default", new List<object>() { sentence }, seqId);    }    else    {        Context.Logger.Warn("Fail(), can't find cached tuple for seqId {0}!", seqId);    }}

至此,一個Spout就算完成了,下面我們繼續分析Bolt。

STEP2:通過繼承IBasicBolt創建Bolt:Splitter、Counter。

Splitter是一個通過空格來拆分英語句子為一個個獨立的單詞,Counter則用來統計各個單詞出現的次數。我們只詳細分析Splitter,Counter類僅貼出全部源碼。

和Generator相同,我們首先也要構造一個實例化方法方便使用者傳參和調用:

private Context ctx;private int msgTimeoutSecs;public Splitter(Context ctx){    Context.Logger.Info("Splitter constructor called");    this.ctx = ctx;    // Declare Input and Output schemas    Dictionary<string, List<Type>> inputSchema = new Dictionary<string, List<Type>>();    inputSchema.Add("default", new List<Type>() { typeof(string) });    Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>();    outputSchema.Add("default", new List<Type>() { typeof(string), typeof(char) });    this.ctx.DeclareComponentSchema(new ComponentStreamSchema(inputSchema, outputSchema));    // Demo how to get stormConf info    if (Context.Config.StormConf.ContainsKey("topology.message.timeout.secs"))    {        msgTimeoutSecs = Convert.ToInt32(Context.Config.StormConf["topology.message.timeout.secs"]);    }    Context.Logger.Info("msgTimeoutSecs: {0}", msgTimeoutSecs);}/// <summary>///  Implements of delegate "newPlugin", which is used to create a instance of this spout/bolt/// </summary>/// <param name="ctx">Context instance</param>/// <returns></returns>public static Splitter Get(Context ctx){    return new Splitter(ctx);}

在這個實例化方法中,我們增加了一個沒有使用的變量msgTimeoutSecs用來展示如何獲取Topology的配置。

由于繼承了IBasicBolt,我們需要實現以下兩個方法:

void Prepare(Config stormConf, TopologyContext context);void Execute(StormTuple tuple);

這和IBolt是一致的,IBasicBolt和IBolt的區別僅僅在于后者需要自己處理何時向Storm發送Ack或Fail,IBasicBolt則不需要關心這些,如果你的Execute沒有拋出異常的話,總會在最后向Storm發送Ack,否則則發送Fail。Prepare則是用于執行前的預處理,此例子里同樣什么都不需要做。

/// <summary>/// The Execute() function will be called, when a new tuple is available./// </summary>/// <param name="tuple"></param>public void Execute(StormTuple tuple){    Context.Logger.Info("Execute enter");    string sentence = tuple.GetString(0);    foreach (string word in sentence.Split(' '))    {        Context.Logger.Info("Splitter Emit: {0}", word);        this.ctx.Emit("default", new List<StormTuple> { tuple }, new List<object> { word, word[0] });    }    Context.Logger.Info("Splitter Execute exit");}public void Prepare(Config stormConf, TopologyContext context){    return;}

Counter和上述的代碼類似:

using Storm;using System;using System.Collections.Generic;namespace StormSample{    /// <summary>    /// The bolt "counter" uses a dictionary to record the occurrence number of each word.    /// </summary>    public class Counter : IBasicBolt    {        private Context ctx;        private Dictionary<string, int> counts = new Dictionary<string, int>();
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
国产一区玩具在线观看| 国产精品日韩久久久久| 国产成人久久久精品一区| 国产剧情久久久久久| 色婷婷**av毛片一区| 国产精品夜色7777狼人| 欧美国产亚洲视频| 日韩综合视频在线观看| 青草青草久热精品视频在线观看| 日韩在线欧美在线| 欧美极品少妇xxxxⅹ喷水| 91精品视频免费观看| 毛片精品免费在线观看| 日韩电影免费在线观看| 亚洲iv一区二区三区| 懂色av中文一区二区三区天美| 亚洲码在线观看| 亚洲国产精彩中文乱码av| 一本大道亚洲视频| 中文字幕亚洲二区| 亚洲无亚洲人成网站77777| 中文字幕精品久久| 亚洲国产美女精品久久久久∴| 欧美精品久久久久a| 欧美日韩视频在线| 黄色成人在线播放| 福利微拍一区二区| 精品中文字幕在线观看| 亚洲国产精品成人一区二区| 日韩av在线一区| 成人精品福利视频| 国产精品色午夜在线观看| 日韩电影在线观看永久视频免费网站| 欧美老肥婆性猛交视频| 国产精品女人久久久久久| 国产精品视频精品视频| 日韩欧美在线视频免费观看| 亚洲天堂男人的天堂| 久久久精品一区二区| 欧美多人乱p欧美4p久久| 欧美一级高清免费| 日韩欧美亚洲范冰冰与中字| 欧美二区在线播放| 亚洲乱码一区av黑人高潮| 国产日韩欧美成人| 成人av在线亚洲| 中文字幕日韩在线视频| 狠狠操狠狠色综合网| 国产精品扒开腿爽爽爽视频| 国产精品久久久久久久电影| 久久青草精品视频免费观看| 国产亚洲在线播放| 亚洲香蕉在线观看| 尤物99国产成人精品视频| 日韩一级黄色av| 欧美午夜片欧美片在线观看| 一区二区三区动漫| 欧美日韩国产精品一区二区三区四区| 久久久亚洲成人| 欧美成人免费va影院高清| 亚洲精品动漫久久久久| 精品国内产的精品视频在线观看| 成人日韩av在线| 日韩综合视频在线观看| 国产女人18毛片水18精品| 亚洲人成在线观看网站高清| 97在线视频免费看| 亚洲人成绝费网站色www| 久热精品视频在线观看一区| 国产精品久久久久77777| 欧美日韩在线观看视频| 国产精品久久久久久搜索| 中文字幕日韩在线播放| 国产精品视频yy9099| 中文字幕久久久| 久久国产精品久久久久久久久久| 国产精品久久久av久久久| 亚洲天堂开心观看| 亚洲电影成人av99爱色| 亚洲国产成人在线播放| 成人在线国产精品| 欧美成人免费在线观看| 国产一区视频在线播放| 亚洲国产欧美在线成人app| 在线看片第一页欧美| 米奇精品一区二区三区在线观看| 日韩av电影国产| 日韩有码视频在线| 91久久精品美女高潮| 国产欧美一区二区三区视频| 亚洲国产女人aaa毛片在线| 国产精品黄页免费高清在线观看| 欧美在线观看一区二区三区| 奇米成人av国产一区二区三区| 亚洲成色777777女色窝| 亚洲一区二区三区乱码aⅴ蜜桃女| 一区二区三欧美| 尤物yw午夜国产精品视频| 日韩精品在线观看一区| 国产成人黄色av| 日韩精品视频在线播放| 成人黄色免费片| 欧美福利小视频| 91国内在线视频| 国产成人福利视频| 日韩在线观看网址| 国产精品一香蕉国产线看观看| 97香蕉久久超级碰碰高清版| 正在播放欧美视频| 国产综合在线观看视频| 欧美精品日韩www.p站| 中文字幕av一区二区| 久久久国产一区| 日本a级片电影一区二区| 精品亚洲一区二区三区四区五区| 欧美日韩成人在线观看| 欧美综合在线观看| 一区二区三区四区在线观看视频| 欧美黄色片免费观看| 日韩成人在线免费观看| 国产精品三级美女白浆呻吟| 国产精品十八以下禁看| 欧美亚洲另类视频| 国产精品入口日韩视频大尺度| 久热精品视频在线观看| 国产精品毛片a∨一区二区三区|国| 久久99视频精品| 日韩视频在线免费| 国产精品日韩av| 国产伦精品免费视频| 国模叶桐国产精品一区| 日韩中文字幕在线看| 538国产精品一区二区在线| 欧美激情亚洲激情| 亚洲精品国精品久久99热| 欧美国产日韩二区| 免费91在线视频| 欧美黄色片免费观看| 国产亚洲精品久久久久久777| 久久欧美在线电影| 欧美性受xxxx白人性爽| 亚洲国产中文字幕在线观看| www亚洲欧美| 亚洲男人天堂2023| 亚洲最大的av网站| 欧美日韩不卡合集视频| 欧美黄色片免费观看| 欧美乱人伦中文字幕在线| 黑人巨大精品欧美一区二区| 亚洲aa在线观看| 亚洲夜晚福利在线观看| 亚洲色图色老头| 国产suv精品一区二区三区88区| 日韩精品中文字幕在线播放| 色午夜这里只有精品| 久久综合久久八八| 国产精品视频男人的天堂| 欧美成人久久久| 日韩av在线一区| 九九精品视频在线| 国产日韩在线亚洲字幕中文| 国产精品18久久久久久首页狼| 欧美多人乱p欧美4p久久| 三级精品视频久久久久|