亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 學院 > 開發設計 > 正文

基于Spark的公安大數據實時運維技術實踐

2019-11-10 18:32:28
字體:
來源:轉載
供稿:網友

文章來源:https://www.iteblog.com/archives/1956.html

公安行業存在數以萬計的前后端設備,前端設備包括相機、檢測器及感應器,后端設備包括各級中心機房中的服務器、應用服務器、網絡設備及機房動力系統,數量巨大、種類繁多的設備給公安內部運維管理帶來了巨大挑戰。傳統通過ICMP/SNMP、Trap/Syslog等工具對設備進行診斷分析的方式已不能滿足實際要求,由于公安內部運維管理的特殊性,現行通過ELK等架構的方式同樣也滿足不了需要。為尋求合理的方案,我們將目光轉向開源架構,構建了一套適用于公安行業的實時運維管理平臺。

實時運維平臺整體架構

數據采集層:Logstash+Flume,負責在不同場景下收集、過濾各類前后端硬件設備輸出的Snmp Trap、Syslog日志信息以及應用服務器自身產生的系統和業務日志;數據傳輸層:采用高吞吐的分布式消息隊列Kafka集群,保證匯聚的日志、消息的可靠傳輸;數據處理層:由Spark實時Pull Kafka數據,通過Spark Streaming以及RDD操作進行數據流的處理以及邏輯分析;數據存儲層:實時數據存入MySQL中便于實時的業務應用和展示;全量數據存入ES以及HBase中便于后續的檢索分析;業務服務層:基于存儲層,后續的整體業務應用涵蓋了APM、網絡監控、拓撲、告警、工單、CMDB等。

整體系統涉及的主要開源框架情況如下:

另外,整體環境基于JDK 8以及Scala 2.10.4。公安系統設備種類繁多,接下來將以交換機Syslog日志為例,詳細介紹日志處理分析的整體流程。

圖1  公安實時運維平臺整體架構圖1 公安實時運維平臺整體架構

Flume+Logstash日志收集

Flume是Cloudera貢獻的一個分布式、可靠及高可用的海量日志采集系統,支持定制各類Source(數據源)用于數據收集,同時提供對數據的簡單處理以及通過緩存寫入Sink(數據接收端)的能力。

Flume中,Source、Channel及Sink的配置如下:

# 為該Flume Agent的source、channel以及sink命名a1.sources = r1a1.sinks = k1a1.channels = c1# 配置Syslog源a1.sources.r1.type = syslogtcpa1.sources.r1.port = 5140a1.sources.r1.host = localhost# Kafka Sink的相關配置a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.topic = syslog-kafkaa1.sinks.k1.brokerList = gtcluster-slave01:9092a1.sinks.k1.requiredAcks = 1a1.sinks.k1.batchSize = 20a1.sinks.k1.channel = c1# Channel基于內存作為緩存a1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# 將Source以及Sink綁定至Channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

該配置通過syslog source配置localhost tcp 5140端口來接收網絡設備發送的Syslog信息,event緩存在內存中,再通過KafkaSink將日志發送到kafka集群中名為“syslog-kafka”的topic中。

Logstash來自Elastic公司,專為收集、分析和傳輸各類日志、事件以及非結構化的數據所設計。它有三個主要功能:事件輸入(Input)、事件過濾器(Filter)以及事件輸出(Output),在后綴為.conf的配置文件中設置,本例中Syslog配置如下:

# syslog.confinput { Syslog { port => "514" }}filter {}output { kafka { bootstrap_servers => "192.168.8.65:9092" topic_id => "syslog-kafka" comPRession_type => "snappy" codec => plain { format => "%{host} %{@timestamp} %{message}" } }}

Input(輸入)插件用于指定各種數據源,本例中的Logstash通過udp 514端口接收Syslog信息;

Filter(過濾器)插件雖然在本例中不需要配置,但它的功能非常強大,可以進行復雜的邏輯處理,包括正則表達式處理、編解碼、k/v切分以及各種數值、時間等數據處理,具體可根據實際場景設置;

Output(輸出)插件用于將處理后的事件數據發送到指定目的地,指定了Kafka的位置、topic以及壓縮類型。在最后的Codec編碼插件中,指定來源主機的ip地址(host)、Logstash處理的時間戳(@timestamp)作為前綴并整合原始的事件消息(message),方便在事件傳輸過程中判斷Syslog信息來源。單條原始Syslog信息流樣例如下:

147>12164: Oct 9 18:04:10.735: %LINK-3-UPDOWN: Interface GigabitEthernet0/16, changed state to down

Logstash Output插件處理后的信息流變成為:

19.1.1.12 2016-10-13T10:04:54.520Z <147>12164: Oct 9 18:04:10.735: %LINK-3-UPDOWN: Interface GigabitEthernet0/16, changed state to down

其中紅色字段就是codec編碼插件植入的host以及timestamp信息。處理后的Syslog信息會發送至Kafka集群中進行消息的緩存。

Kafka日志緩沖

Kafka是一個高吞吐的分布式消息隊列,也是一個訂閱/發布系統。Kafka集群中每個節點都有一個被稱為broker的實例,負責緩存數據。Kafka有兩類客戶端,Producer(消息生產者的)和Consumer(消息消費者)。Kafka中不同業務系統的消息可通過topic進行區分,每個消息都會被分區,用以分擔消息讀寫負載,每個分區又可以有多個副本來防止數據丟失。消費者在具體消費某個topic消息時,指定起始偏移量。Kafka通過Zero-Copy、Exactly Once等技術語義保證了消息傳輸的實時、高效、可靠以及容錯性。

Kafka集群中某個broker的配置文件server.properties的部分配置如下:

########## Server Basics ############ 為每一個broker設置獨立的數字作為idbroker.id=1###### Socket Server Settings ####### socket監聽端口port=9092########### Zookeeper ############### Zookeeper的連接配置zookeeper.connect=gtcluster-slave02:2181,gtcluster-slave03:2181,gtcluster-slave04:2181# Timeout in ms for connecting to zookeeperzookeeper.connection.timeout.ms=3000

其中需指定集群里不同broker的id,此臺broker的id為1,默認監聽9092端口,然后配置Zookeeper(后續簡稱zk)集群,再啟動broker即可。

Kafka集群名為syslog-kafka的topic:

bin/kafka-topics.sh--create--zookeeper gtcluster-slave02:2181,gtcluster-slave03:2181,gtcluster-slave04:2181--replication-factor 3 --partitions 3--topic syslog-kafka

Kafka集群的topic以及partition等信息也可以通過登錄zk來觀察。然后再通過下列命令查看Kafka接收到的所有交換機日志信息:

bin/kafka-console-consumer.sh--zookeeper gtcluster-slave02:2181--from-beginning--topic Syslog-kafka

部分日志樣例如下:

10.1.1.10 2016-10-18T05:23:04.015Z <163>5585: Oct 18 13:22:45: %LINK-3-UPDOWN: Interface FastEthernet0/9, changed state to down19.1.1.113 2016-10-18T05:24:04.425Z <149>10857: Oct 18 13:25:23.019 cmt: %LINEPROTO-5-UPDOWN: Line protocol on Interface GigabitEthernet1/0/3, changed state to down19.1.1.113 2016-10-18T05:24:08.312Z <149>10860: Oct 18 13:25:27.935 cmt: %LINEPROTO-5-UPDOWN: Line protocol on Interface GigabitEthernet1/0/3, changed state to up

Spark日志處理邏輯

Spark是一個為大規模數據處理而生的快速、通用的引擎,在速度、效率及通用性上表現極為優異。

在Spark主程序中,通過Scala的正則表達式解析Kafka Source中名為“syslog-kafka” 的topic中的所有Syslog信息,再將解析后的有效字段封裝為結果對象,最后通過MyBatis近實時地寫入MySQL中,供前端應用進行實時地可視化展示。另外,全量數據存儲進入HBase及ES中,為后續海量日志的檢索分析及其它更高級的應用提供支持。主程序示例代碼如下:

object SwSyslogProcessor { def main(args: Array[String]): Unit = { // 初始化SparkContext,批處理間隔5秒 val sparkConf: SparkConf = new SparkConf().setAppName("SwSyslogProcessorApp ") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val ssc = new StreamingContext(sparkConf, Seconds(5)) // 定義topic val topic = Set("syslog-kafka") // 定義kafka的broker list地址 val brokers = "192.168.8.65:9092,192.168.8.66:9092,192.168.8.67:9092" val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringDecoder") // 通過topic以及brokers,創建從kafka獲取的數據流 val swSyslogDstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topic) val totalcounts = ssc.sparkContext.accumulator(0L, "Total count") val lines = swSyslogDstream.map(x => x._2) // 將一行一行數據映射成SwSyslog對象 lines.filter(x => !x.isEmpty && x.contains("%LIN")&& x.contains("Ethernet") .map( x => { SwSyslogService.encapsulateSwSyslog(x) // 封裝并返回SwSyslog }).foreachRDD((s: RDD[SwSyslog], time: Time) => { // 遍歷DStream中的RDD if (!s.isEmpty()) { // 遍歷RDD中的分區記錄 s.foreachPartition { records => { if (!records.isEmpty) records.toSet.foreach { r: SwSyslog => // 統計當前處理的記錄總數 totalcounts.add(1L) // 保存SwSyslog信息到MySQL SwSyslogService.saveSwSyslog(r) } } } } } ) //啟動程序 ssc.start() // 阻塞等待 ssc.awaitTermination() }

整體的處理分析主要分為4步:

初始化SparkContext并指定application的參數;創建基于Kafka topic “syslog-kafka” 的DirectStream;將獲取的每一行數據映射為Syslog對象,調用Service進行對象封裝并返回;遍歷RDD,記錄不為空時保存或者更新Syslog信息到MySQL中。

Syslog POJO的部分基本屬性如下:

@Table(name = "sw_syslog")public class SwSyslog { /** * 日志ID */ @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; /** * 設備IP */ @Column(name = "dev_ip") private String devIp; /** * 服務器時間 */ @Column(name = "server_time") private String serverTime; /** * 信息序號 */ @Column(name = "syslog_num") private Long syslogNum; ……}

SwSyslog實體中的基本屬性對應Syslog中的接口信息,注解中的name對應MySQL中的表sw_syslog 以及各個字段,MyBatis完成成員屬性和數據庫結構的ORM(對象關系映射)。

程序中的SwSyslogService有兩個主要功能:

public static SwSyslog encapsulateSwSyslog(String syslogInfo) { SwSyslog swsyslog = new SwSyslog(); swsyslog.setDevIp(SwSyslogExtractorUtil.extractDevIp(syslogInfo)); swsyslog.setServerTime(SwSyslogExtractorUtil.extractServerTime(syslogInfo)); swsyslog.setSyslogNum(SwSyslogExtractorUtil.extractSyslogNum(syslogInfo)); swsyslog.setDevTime(SwSyslogExtractorUtil.extractDevTime(syslogInfo)); swsyslog.setSyslogType(SwSyslogExtractorUtil.extractSyslogType(syslogInfo)); swsyslog.setInfoType(SwSyslogExtractorUtil.extractInfoType(syslogInfo)); swsyslog.setDevInterface(SwSyslogExtractorUtil .extractDevInterface(syslogInfo)); swsyslog.setInterfaceState(SwSyslogExtractorUtil .extractInterfaceState(syslogInfo)); return swsyslog;}public static void saveSwSyslog(SwSyslog swSyslog) { LOGGER.debug("開始保存或更新SwSyslog", swSyslog); // 根據ip查詢所有Syslog List<swsyslog> list = swSyslogMapper.queryAllByIp(swSyslog.getDevIp()); // 如果list非空,即查到對應IP的SwSyslog if (list != null && !list.isEmpty()) { for (SwSyslog sys : list) { // 若IP的接口相同,則更新信息 if (sys.getDevInterface().equals(swSyslog.getDevInterface())) { LOGGER.debug("有相同IP相同接口的記錄,開始更新SwSyslog"); sys.setServerTime(swSyslog.getServerTime()); sys.setSyslogNum(swSyslog.getSyslogNum()); sys.setDevTime(swSyslog.getDevTime()); sys.setSyslogType(swSyslog.getSyslogType()); sys.setInfoType(swSyslog.getInfoType()); sys.setInterfaceState(swSyslog.getInterfaceState()); sys.setUpdated(new Date()); swSyslogMapper.update(sys); } else { // 若接口不同,則直接保存 LOGGER.debug("相同IP無對應接口,保存SwSyslog"); swSyslog.setCreated(new Date()); swSyslog.setUpdated(swSyslog.getCreated()); swSyslogMapper.insert(swSyslog); } } } else { // 沒有對應的IP記錄,直接保存信息 LOGGER.debug("沒有相同IP記錄,直接保存SwSyslog"); swSyslog.setCreated(new Date()); swSyslog.setUpdated(swSyslog.getCreated()); swSyslogMapper.insert(swSyslog); }}</swsyslog>

encapsulateSwSyslog()將Spark處理后的每一行Syslog通過Scala的正則表達式解析為不同的字段,然后封裝并返回Syslog對象;遍歷RDD分區生成的每一個Syslog對象中都有ip以及接口信息,saveSwSyslog()會據此判斷該插入還是更新Syslog信息至數據庫。另外,封裝好的Syslog對象通過ORM工具MyBatis與MySQL進行互操作。

獲取到的每一行Syslog信息如之前所述:

19.1.1.12 2016-10-13T10:04:54.520Z <147>12164: Oct 9 18:04:10.735: %LINK-3-UPDOWN: Interface GigabitEthernet0/16, changed state to down

這段信息需解析為設備ip、服務器時間、信息序號、設備時間、Syslog類型、屬性、設備接口、接口狀態等字段。Scala正則解析邏輯如下:

/** * 抽取服務器時間 * 樣例:2016-10-09T10:04:54.517Z * @param line * @return */ def extractServerTime(line: String): String = { val regex1 = "20//d{2}-//d{2}-//d{2}".r val regex2 = "//d{2}://d{2}://d{2}.?(//d{3})?".r val matchedDate = regex1.findFirstIn(line) val matchedTime = regex2.findFirstIn(line) val result1 = matchedDate match { case Some(date) => date case None => " " } val result2 = matchedTime match { case Some(time) => time case None => " " } result1 + " " + result2}/** * 抽取設備時間 * 樣例:Sep 29 09:33:06 * Oct 9 18:04:09.733 * @param line * @return */ def extractDevTime(line: String): String = { val regex = "[a-zA-Z]{3}//s+//d+//s//d{2}://d{2}://d{2}((.//d{3})|())".r val matchedDevTime = regex.findFirstIn(line) val result = matchedDevTime match { case Some(devTime) => devTime case None => " " } result }

通過正則過濾、Syslog封裝以及MyBatis持久層映射,Syslog接口狀態信息最終解析如下:

最后,諸如APM、網絡監控或者告警等業務應用便可以基于MySQL做可視化展示。

總結

本文首先對公安運維管理現狀做了簡要介紹,然后介紹公安實時運維平臺的整體架構,再以交換機Syslog信息為例,詳細介紹如何使用Flume+Logstash+Kafka+Spark Streaming進行實時日志處理分析,對處理過程中大量的技術細節進行了描述并通過代碼詳細地介紹整體處理步驟。本文中的示例實時地將數據寫入MySQL存在一定的性能瓶頸,后期會對包含本例的相關代碼重構,數據將會實時寫入HBase來提高性能。


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
精品中文字幕在线观看| 亚洲精品久久久久中文字幕二区| 狠狠久久五月精品中文字幕| 色悠久久久久综合先锋影音下载| 在线观看中文字幕亚洲| 菠萝蜜影院一区二区免费| 欧美日韩一区二区三区在线免费观看| 日韩成人久久久| 91欧美精品成人综合在线观看| 中文字幕精品—区二区| 在线看欧美日韩| 亚洲丁香婷深爱综合| 欧美精品激情blacked18| 国内精品模特av私拍在线观看| 91精品免费看| 96精品视频在线| 国产日韩视频在线观看| 欧美综合一区第一页| 久久久久免费视频| 亚洲男人天堂2024| 国产精品海角社区在线观看| 欧美精品在线观看| 九色精品免费永久在线| 国产精品美女999| 精品久久久久久久久中文字幕| 91久久国产精品91久久性色| 欧美在线不卡区| 国产精品白丝av嫩草影院| 亚洲女人天堂av| 欧美激情综合色| 91国内免费在线视频| 亚洲在线视频福利| 亚洲视频欧洲视频| 亚洲国产精品久久精品怡红院| 成人有码在线视频| 国产在线视频91| 日韩一区二区在线视频| 精品国产一区二区三区久久狼5月| 欧美激情一二三| 国产精品国产亚洲伊人久久| 最近2019年手机中文字幕| 久久精品人人爽| 欧美日本啪啪无遮挡网站| 欧美在线观看日本一区| 成人免费观看49www在线观看| 国产在线观看不卡| 亚洲天堂av图片| 久久久久久高潮国产精品视| 精品国产一区二区三区久久久狼| 久热精品视频在线观看一区| 欧美激情成人在线视频| 欧美人在线观看| 亚洲第一二三四五区| 亚洲国产精彩中文乱码av在线播放| 午夜精品久久久99热福利| 疯狂做受xxxx高潮欧美日本| 久久久久成人网| 成人免费福利在线| 富二代精品短视频| 正在播放欧美视频| 日韩久久免费视频| 欧美黑人一区二区三区| 久久久国产精彩视频美女艺术照福利| 久久久www成人免费精品张筱雨| 亚洲国产精品一区二区久| 国产精品 欧美在线| 精品久久久av| 国色天香2019中文字幕在线观看| 日本国产精品视频| 欧美在线亚洲在线| 亚洲精品网站在线播放gif| 亚洲欧洲日本专区| 久久激情视频免费观看| 在线播放亚洲激情| 91精品国产色综合久久不卡98| 亚洲国产高清自拍| 亚洲第一区中文字幕| 久久久免费高清电视剧观看| 亚洲免费精彩视频| 精品视频一区在线视频| 国产日韩欧美在线观看| 91麻豆国产语对白在线观看| 国产中文日韩欧美| 国产精品久久77777| 日韩av快播网址| 日韩电影在线观看免费| 亚洲欧美国产精品va在线观看| 中文字幕少妇一区二区三区| 日韩有码在线播放| 国产色婷婷国产综合在线理论片a| 国产成人福利夜色影视| 亚洲欧美制服另类日韩| 久久免费少妇高潮久久精品99| 91精品免费视频| 国产精品羞羞答答| 国内精品久久影院| 欧美日韩午夜激情| 91九色国产在线| 97在线精品视频| 欧美性猛交xxxx富婆| 久久精品成人一区二区三区| 久久6免费高清热精品| 国产精品吹潮在线观看| 91免费福利视频| 亚洲国产精品推荐| 4438全国亚洲精品在线观看视频| 国产一区二区三区在线免费观看| 亚洲天堂av在线免费观看| 欧美巨猛xxxx猛交黑人97人| 91免费精品国偷自产在线| 亚洲男人天堂视频| 中文字幕亚洲无线码在线一区| 国产精品久久久久影院日本| 中文字幕亚洲图片| 中文字幕日韩视频| 亚洲国产一区二区三区在线观看| 亚洲欧美国产精品va在线观看| 不卡在线观看电视剧完整版| 国产精品美女在线观看| 欧美精品一区三区| 超在线视频97| 国产一区二区久久精品| 精品久久久久久久久久国产| 国产精品爽爽爽爽爽爽在线观看| 高清欧美性猛交xxxx黑人猛交| 国产精品视频一区二区三区四| 亚洲欧美一区二区三区在线| 久久99视频免费| 欧美性在线观看| 亚洲国产成人精品久久久国产成人一区| 日韩在线视频网| 亚洲欧美国产日韩天堂区| 亚洲电影天堂av| 亚洲精品mp4| 欧美成人精品不卡视频在线观看| 91精品国产综合久久男男| 久久青草福利网站| 97精品视频在线播放| 亚洲精品资源美女情侣酒店| 欧美床上激情在线观看| 欧美大片免费观看| 亚洲欧洲在线视频| 精品无码久久久久久国产| 欧洲成人免费视频| 国产精品久久久久久久久久久久| 亚洲一区二区三区视频| 久久精品中文字幕免费mv| 国产精品日韩专区| 国内精品久久影院| 久久琪琪电影院| 欧美日韩成人免费| 26uuu日韩精品一区二区| 91精品国产综合久久香蕉| 亚洲欧洲一区二区三区久久| 精品成人在线视频| 久久亚洲精品网站| 在线观看中文字幕亚洲| 国产成人在线一区二区| 久久精品国产69国产精品亚洲| 成人情趣片在线观看免费| 中文字幕日韩专区| 91精品国产色综合久久不卡98口| 美女少妇精品视频| 欧美日韩亚洲系列|