亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 開發 > Java > 正文

Spring boot集成Kafka+Storm的示例代碼

2024-07-13 10:15:45
字體:
來源:轉載
供稿:網友

前言

由于業務需求需要把Strom與kafka整合到spring boot項目里,實現其他服務輸出日志至kafka訂閱話題,storm實時處理該話題完成數據監控及其他數據統計,但是網上教程較少,今天想寫的就是如何整合storm+kafka 到spring boot,順帶說一說我遇到的坑。

使用工具及環境配置

? 1. java 版本jdk-1.8

? 2. 編譯工具使用IDEA-2017

? 3. maven作為項目管理

? 4.spring boot-1.5.8.RELEASE

需求體現

1.為什么需要整合到spring boot

為了使用spring boot 統一管理各種微服務,及同時避免多個分散配置

2.具體思路及整合原因

? 使用spring boot統一管理kafka、storm、redis等所需要的bean,通過其他服務日志收集至Kafka,KafKa實時發送日志至storm,在strom bolt時進行相應的處理操作

遇到的問題

? 1.使用spring boot并沒有相關整合storm

? 2.以spring boot啟動方式不知道如何觸發提交Topolgy

? 3.提交Topology時遇到numbis not client localhost 問題

? 4.Storm bolt中無法通過注解獲得實例化bean進行相應的操作

解決思路

在整合之前我們需要知道相應的spring boot 的啟動方式及配置(如果你在閱讀本文時,默認你已經對storm,kafka及spring boot有相關了解及使用)

spring boot 對storm進行整合的例子在網上很少,但是因為有相應的需求,因此我們還是需要整合.

首先導入所需要jar包:

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.1.1</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> <exclusions> <exclusion>  <artifactId>zookeeper</artifactId>  <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion>  <artifactId>spring-boot-actuator</artifactId>  <groupId>org.springframework.boot</groupId> </exclusion> <exclusion>  <artifactId>kafka-clients</artifactId>  <groupId>org.apache.kafka</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <exclusions> <exclusion>  <artifactId>kafka-clients</artifactId>  <groupId>org.apache.kafka</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop</artifactId> <version>2.5.0.RELEASE</version> <exclusions> <exclusion>  <groupId>org.slf4j</groupId>  <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion>  <artifactId>commons-logging</artifactId>  <groupId>commons-logging</groupId> </exclusion> <exclusion>  <artifactId>netty</artifactId>  <groupId>io.netty</groupId> </exclusion> <exclusion>  <artifactId>jackson-core-asl</artifactId>  <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion>  <artifactId>curator-client</artifactId>  <groupId>org.apache.curator</groupId> </exclusion> <exclusion>  <artifactId>jettison</artifactId>  <groupId>org.codehaus.jettison</groupId> </exclusion> <exclusion>  <artifactId>jackson-mapper-asl</artifactId>  <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion>  <artifactId>jackson-jaxrs</artifactId>  <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion>  <artifactId>snappy-java</artifactId>  <groupId>org.xerial.snappy</groupId> </exclusion> <exclusion>  <artifactId>jackson-xc</artifactId>  <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion>  <artifactId>guava</artifactId>  <groupId>com.google.guava</groupId> </exclusion> <exclusion>  <artifactId>hadoop-mapreduce-client-core</artifactId>  <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion>  <artifactId>zookeeper</artifactId>  <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion>  <artifactId>servlet-api</artifactId>  <groupId>javax.servlet</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> <exclusions> <exclusion>  <artifactId>slf4j-log4j12</artifactId>  <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.4</version> <exclusions> <exclusion>  <artifactId>log4j</artifactId>  <groupId>log4j</groupId> </exclusion> <exclusion>  <artifactId>zookeeper</artifactId>  <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion>  <artifactId>netty</artifactId>  <groupId>io.netty</groupId> </exclusion> <exclusion>  <artifactId>hadoop-common</artifactId>  <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion>  <artifactId>guava</artifactId>  <groupId>com.google.guava</groupId> </exclusion> <exclusion>  <artifactId>hadoop-annotations</artifactId>  <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion>  <artifactId>hadoop-yarn-common</artifactId>  <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion>  <artifactId>slf4j-log4j12</artifactId>  <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> <exclusions> <exclusion>  <artifactId>commons-logging</artifactId>  <groupId>commons-logging</groupId> </exclusion> <exclusion>  <artifactId>curator-client</artifactId>  <groupId>org.apache.curator</groupId> </exclusion> <exclusion>  <artifactId>jackson-mapper-asl</artifactId>  <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion>  <artifactId>jackson-core-asl</artifactId>  <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion>  <artifactId>log4j</artifactId>  <groupId>log4j</groupId> </exclusion> <exclusion>  <artifactId>snappy-java</artifactId>  <groupId>org.xerial.snappy</groupId> </exclusion> <exclusion>  <artifactId>zookeeper</artifactId>  <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion>  <artifactId>guava</artifactId>  <groupId>com.google.guava</groupId> </exclusion> <exclusion>  <artifactId>hadoop-auth</artifactId>  <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion>  <artifactId>commons-lang</artifactId>  <groupId>commons-lang</groupId> </exclusion> <exclusion>  <artifactId>slf4j-log4j12</artifactId>  <groupId>org.slf4j</groupId> </exclusion> <exclusion>  <artifactId>servlet-api</artifactId>  <groupId>javax.servlet</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-examples</artifactId> <version>2.7.3</version> <exclusions> <exclusion>  <artifactId>commons-logging</artifactId>  <groupId>commons-logging</groupId> </exclusion> <exclusion>  <artifactId>netty</artifactId>  <groupId>io.netty</groupId> </exclusion> <exclusion>  <artifactId>guava</artifactId>  <groupId>com.google.guava</groupId> </exclusion> <exclusion>  <artifactId>log4j</artifactId>  <groupId>log4j</groupId> </exclusion> <exclusion>  <artifactId>servlet-api</artifactId>  <groupId>javax.servlet</groupId> </exclusion> </exclusions> </dependency> <!--storm--> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <scope>${provided.scope}</scope> <exclusions> <exclusion>  <groupId>org.apache.logging.log4j</groupId>  <artifactId>log4j-slf4j-impl</artifactId> </exclusion> <exclusion>  <artifactId>servlet-api</artifactId>  <groupId>javax.servlet</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>1.1.1</version> <exclusions> <exclusion>  <artifactId>kafka-clients</artifactId>  <groupId>org.apache.kafka</groupId> </exclusion> </exclusions> </dependency>

其中去除jar包是因為需要相與項目構建依賴有多重依賴問題,storm版本為1.1.0  spring boot相關依賴為

```java

<!-- spring boot -->  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter</artifactId>   <exclusions>    <exclusion>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-logging</artifactId>    </exclusion>   </exclusions>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-web</artifactId>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-aop</artifactId>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-test</artifactId>   <scope>test</scope>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-log4j2</artifactId>  </dependency>  <dependency>   <groupId>org.mybatis.spring.boot</groupId>   <artifactId>mybatis-spring-boot-starter</artifactId>   <version>${mybatis-spring.version}</version>  </dependency>  <dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-configuration-processor</artifactId>   <optional>true</optional>  </dependency>

ps:maven的jar包僅因為項目使用需求,不是最精簡,僅供大家參考.

項目結構:

config-存儲不同環境配置文件

Springboot,集成,Kafka,Storm,Spring,boot集成Kafka

存儲構建spring boot 相關實現類 其他如構建名

啟動spring boot的時候我們會發現

其實開始整合前,對storm了解的較少,屬于剛開始沒有接觸過,后面參考發現整合到spring boot里面啟動spring boot之后并沒有相應的方式去觸發提交Topolgy的函數,所以也造成了以為啟動spring boot之后就完事了結果等了半個小時什么事情都沒發生才發現沒有實現觸發提交函數.

為了解決這個問題我的想法是: 啟動spring boot->創建kafka監聽Topic然后啟動Topolgy完成啟動,可是這樣的問題kafka監聽這個主題會重復觸發Topolgy,這明顯不是我們想要的.看了一會后發現spring 有相關啟動完成之后執行某個時間方法,這個對我來說簡直是救星啊.所以現在觸發Topolgy的思路變為:

啟動spring boot ->執行觸發方法->完成相應的觸發條件

構建方法為:

/** * @author Leezer * @date 2017/12/28 * spring加載完后自動自動提交Topology **/@Configuration@Componentpublic class AutoLoad implements ApplicationListener<ContextRefreshedEvent> { private static String BROKERZKSTR; private static String TOPIC; private static String HOST; private static String PORT; public AutoLoad(@Value("${storm.brokerZkstr}") String brokerZkstr,     @Value("${zookeeper.host}") String host,     @Value("${zookeeper.port}") String port,     @Value("${kafka.default-topic}") String topic ){  BROKERZKSTR = brokerZkstr;  HOST= host;  TOPIC= topic;  PORT= port; } @Override public void onApplicationEvent(ContextRefreshedEvent event) {  try {   //實例化topologyBuilder類。   TopologyBuilder topologyBuilder = new TopologyBuilder();   //設置噴發節點并分配并發數,該并發數將會控制該對象在集群中的線程數。   BrokerHosts brokerHosts = new ZkHosts(BROKERZKSTR);   // 配置Kafka訂閱的Topic,以及zookeeper中數據節點目錄和名字   SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, TOPIC, "/storm", "s32");   spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());   spoutConfig.zkServers = Collections.singletonList(HOST);   spoutConfig.zkPort = Integer.parseInt(PORT);   //從Kafka最新輸出日志讀取   spoutConfig.startOffsetTime = OffsetRequest.LatestTime();   KafkaSpout receiver = new KafkaSpout(spoutConfig);   topologyBuilder.setSpout("kafka-spout", receiver, 1).setNumTasks(2);   topologyBuilder.setBolt("alarm-bolt", new AlarmBolt(), 1).setNumTasks(2).shuffleGrouping("kafka-spout");   Config config = new Config();   config.setDebug(false);   /*設置該topology在storm集群中要搶占的資源slot數,一個slot對應這supervisor節點上的以個worker進程,如果你分配的spot數超過了你的物理節點所擁有的worker數目的話,有可能提交不成功,加入你的集群上面已經有了一些topology而現在還剩下2個worker資源,如果你在代碼里分配4個給你的topology的話,那么這個topology可以提交但是提交以后你會發現并沒有運行。 而當你kill掉一些topology后釋放了一些slot后你的這個topology就會恢復正常運行。   */   config.setNumWorkers(1);   LocalCluster cluster = new LocalCluster();   cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology());  } catch (Exception e) {   e.printStackTrace();  } }}

? 注:

啟動項目時因為使用的是內嵌tomcat進行啟動,可能會報如下錯誤

[Tomcat-startStop-1] ERROR o.a.c.c.ContainerBase - A child container failed during startjava.util.concurrent.ExecutionException: org.apache.catalina.LifecycleException: Failed to start component [StandardEngine[Tomcat].StandardHost[localhost].TomcatEmbeddedContext[]] at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_144] at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_144] at org.apache.catalina.core.ContainerBase.startInternal(ContainerBase.java:939) [tomcat-embed-core-8.5.23.jar:8.5.23] at org.apache.catalina.core.StandardHost.startInternal(StandardHost.java:872) [tomcat-embed-core-8.5.23.jar:8.5.23] at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) [tomcat-embed-core-8.5.23.jar:8.5.23] at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1419) [tomcat-embed-core-8.5.23.jar:8.5.23] at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1409) [tomcat-embed-core-8.5.23.jar:8.5.23] at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_144] at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_144] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]

這是因為有相應導入的jar包引入了servlet-api版本低于內嵌版本,我們需要做的就是打開maven依賴把其去除

<exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId></exclusion>

然后重新啟動就可以了.

啟動過程中還有可能報:

 

復制代碼 代碼如下:

org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:90

 

這個問題我思考了很久,發現網上的解釋都是因為storm配置問題導致不對,可是我的storm是部署在服務器上的.并沒有相關的配置,按理也應該去服務器上讀取相關配置,可是結果并不是這樣的。最后嘗試了幾個做法發現都不對,這里才發現,在構建集群的時候storm提供了相應的本地集群

LocalCluster cluster = new LocalCluster();

進行本地測試,如果在本地測試就使用其進行部署測試,如果部署到服務器上需要把:

cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology());//修正為:StormSubmitter.submitTopology("kafka-spout", config, topologyBuilder.createTopology());

進行任務提交;

以上解決了上面所述的問題1-3

問題4:是在bolt中使用相關bean實例,我發現我把其使用@Component加入spring中也無法獲取到實例:我的猜想是在我們構建提交Topolgy的時候,它會在:

 

復制代碼 代碼如下:

topologyBuilder.setBolt("alarm-bolt",new AlarmBolt(),1).setNumTasks(2).shuffleGrouping("kafka-spout");

 

執行bolt相關:

@Override public void prepare(Map stormConf, TopologyContext context,      OutputCollector collector) {  this.collector = collector;  StormLauncher stormLauncher = StormLauncher.getStormLauncher();  dataRepositorys =(AlarmDataRepositorys)   stormLauncher.getBean("alarmdataRepositorys"); }

而不會實例化bolt,導致線程不一而spring 獲取不到.(這里我也不是太明白,如果有大佬知道可以分享一波)

而我們使用spring boot的意義就在于這些獲取這些繁雜的對象,這個問題困擾了我很久.最終想到,我們可以通過上下文getbean獲取實例不知道能不能行,然后我就開始了定義:

例如我需要在bolt中使用一個服務:

/** * @author Leezer * @date 2017/12/27 * 存儲操作失敗時間 **/@Service("alarmdataRepositorys")public class AlarmDataRepositorys extends RedisBase implements IAlarmDataRepositorys { private static final String ERRO = "erro"; /**  * @param type 類型  * @param key key值  * @return 錯誤次數  **/ @Override public String getErrNumFromRedis(String type,String key) {  if(type==null || key == null){   return null;  }else {   ValueOperations<String, String> valueOper = primaryStringRedisTemplate.opsForValue();   return valueOper.get(String.format("%s:%s:%s",ERRO,type,key));  } } /**  * @param type 錯誤類型  * @param key key值  * @param value 存儲值  **/ @Override public void setErrNumToRedis(String type, String key,String value) {  try {   ValueOperations<String, String> valueOper = primaryStringRedisTemplate.opsForValue();   valueOper.set(String.format("%s:%s:%s", ERRO,type, key), value, Dictionaries.ApiKeyDayOfLifeCycle, TimeUnit.SECONDS);  }catch (Exception e){   logger.info(Dictionaries.REDIS_ERROR_PREFIX+String.format("key為%s存入redis失敗",key));  } }

這里我指定了該bean的名稱,則在bolt執行prepare時:使用getbean方法獲取了相關bean就能完成相應的操作.

然后kafka訂閱主題發送至我bolt進行相關的處理.而這里getbean的方法是在啟動bootmain函數定義:

@SpringBootApplication@EnableTransactionManagement@ComponentScan({"service","storm"})@EnableMongoRepositories(basePackages = {"storm"})@PropertySource(value = {"classpath:service.properties", "classpath:application.properties","classpath:storm.properties"})@ImportResource(locations = { "classpath:/configs/spring-hadoop.xml", "classpath:/configs/spring-hbase.xml"})public class StormLauncher extends SpringBootServletInitializer { //設置 安全線程launcher實例 private volatile static StormLauncher stormLauncher; //設置上下文 private ApplicationContext context; public static void main(String[] args) {  SpringApplicationBuilder application = new SpringApplicationBuilder(StormLauncher.class);  // application.web(false).run(args);該方式是spring boot不以web形式啟動 application.run(args); StormLauncher s = new StormLauncher(); s.setApplicationContext(application.context()); setStormLauncher(s); } private static void setStormLauncher(StormLauncher stormLauncher) { StormLauncher.stormLauncher = stormLauncher; } public static StormLauncher getStormLauncher() { return stormLauncher; } @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { return application.sources(StormLauncher.class); } /** * 獲取上下文 * * @return the application context */ public ApplicationContext getApplicationContext() { return context; } /** * 設置上下文. * * @param appContext 上下文 */ private void setApplicationContext(ApplicationContext appContext) { this.context = appContext; } /** * 通過自定義name獲取 實例 Bean. * * @param name the name * @return the bean */ public Object getBean(String name) { return context.getBean(name); } /** * 通過class獲取Bean. * * @param <T> the type parameter * @param clazz the clazz * @return the bean */ public <T> T getBean(Class<T> clazz) { return context.getBean(clazz); } /** * 通過name,以及Clazz返回指定的Bean * * @param <T> the type parameter * @param name the name * @param clazz the clazz * @return the bean */ public <T> T getBean(String name, Class<T> clazz) { return context.getBean(name, clazz); }

到此集成storm 和kafka至spring boot已經結束了,相關kafka及其他配置我會放入github上面

對了這里還有一個kafkaclient的坑:

Async loop died! java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.

項目會報kafka client 問題,這是因為storm-kafka中,kafka使用的是0.8版本,而NetworkSend是0.9以上的版本,這里集成需要與你集成的kafka相關版本一致.

雖然集成比較簡單,但是參考都比較少,加之剛開始接觸storm所以思考比較多,也在這記錄一下.

項目地址 - github

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持VeVb武林網。


注:相關教程知識閱讀請移步到JAVA教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
欧美日韩加勒比精品一区| 亚洲第一精品夜夜躁人人爽| 日韩成人av在线| 久久综合88中文色鬼| 国产欧美一区二区三区久久人妖| 久久久国产在线视频| 亚洲国产精品久久精品怡红院| 亚洲欧洲一区二区三区在线观看| 最近2019年手机中文字幕| 亚洲第一福利在线观看| 亚洲国产高潮在线观看| 日韩精品极品视频免费观看| 色先锋资源久久综合5566| 欧美激情中文字幕在线| 国产视频精品久久久| 亚洲综合在线小说| 日本欧美中文字幕| 亚洲欧美成人网| 亚洲一区二区久久久久久| 国产精品一久久香蕉国产线看观看| 久久99热精品这里久久精品| 成人国产精品久久久久久亚洲| 国产精品美女呻吟| 欧美在线视频导航| 午夜精品久久久久久久男人的天堂| 精品日本高清在线播放| 日韩专区在线观看| 久久国产天堂福利天堂| 久久国产视频网站| 欧美高跟鞋交xxxxhd| 亚洲国产日韩欧美综合久久| 日本高清久久天堂| 庆余年2免费日韩剧观看大牛| 亚洲女人初尝黑人巨大| 国产欧美精品久久久| 亚洲欧美日韩精品久久| 亚洲成人av资源网| 日韩美女视频免费在线观看| 欧美激情一二区| 亚洲欧美成人网| 一本色道久久综合狠狠躁篇怎么玩| 动漫精品一区二区| 久久久免费高清电视剧观看| 韩国精品美女www爽爽爽视频| 色综合影院在线| 国产精品精品视频一区二区三区| 欧美午夜激情小视频| 久久香蕉频线观| 欧美做爰性生交视频| 国产精品网站视频| 日本三级韩国三级久久| 欧美性猛交xxxx乱大交3| 中文字幕一区日韩电影| 国产一区二区丝袜| 岛国av一区二区| 欧美一级黄色网| 57pao成人永久免费视频| 日韩精品一区二区视频| 91在线国产电影| www欧美日韩| 91精品国产综合久久久久久久久| 日韩av在线精品| 欧美体内谢she精2性欧美| 日韩欧美在线免费观看| 日韩欧美精品网站| 成人免费激情视频| 亚洲欧美日韩在线高清直播| 色悠久久久久综合先锋影音下载| 国内精品中文字幕| 国产美女扒开尿口久久久| 在线精品视频视频中文字幕| 久久久人成影片一区二区三区| 国产精品视频区| 国产欧美婷婷中文| 色综合久综合久久综合久鬼88| 欧美wwwxxxx| 狠狠做深爱婷婷久久综合一区| 久久精品视频亚洲| 992tv成人免费视频| 亚洲男人天堂九九视频| 欧美精品情趣视频| 久久频这里精品99香蕉| 欧美日韩亚洲精品一区二区三区| 欧美做受高潮电影o| 国产国语刺激对白av不卡| 亚洲少妇中文在线| 成人高清视频观看www| 欧美精品在线播放| 久久成年人免费电影| 亚洲成av人乱码色午夜| 久久免费少妇高潮久久精品99| 日韩欧美在线观看视频| 久久成人18免费网站| 精品亚洲一区二区三区四区五区| 亚洲自拍欧美色图| 日韩成人性视频| 国模精品一区二区三区色天香| 欧美精品在线观看91| 亚洲欧美日韩久久久久久| 992tv在线成人免费观看| 成人精品在线视频| 国产精品免费久久久久影院| 欧美美女18p| 欧美www视频在线观看| 国产精品啪视频| 日韩中文在线视频| 亚洲电影成人av99爱色| 成人美女免费网站视频| 国语自产偷拍精品视频偷| 国产精品久久久久久久久免费看| 国产91精品不卡视频| 日韩国产欧美精品一区二区三区| 日韩精品丝袜在线| 91在线视频一区| 亚洲丝袜在线视频| 欧美激情在线视频二区| 亚洲摸下面视频| 亚洲综合在线播放| 成人激情在线播放| 欧美国产精品va在线观看| 亚洲午夜精品久久久久久性色| 久久久久久久久久久网站| 欧美巨猛xxxx猛交黑人97人| 欧美大荫蒂xxx| 亚洲人成伊人成综合网久久久| 久久精品视频亚洲| 最近2019年好看中文字幕视频| 久久精品视频在线播放| 日韩www在线| 国产欧美日韩亚洲精品| 国产精品一区二区三区毛片淫片| 精品福利在线视频| 97久久久久久| 在线日韩日本国产亚洲| 欧美丝袜美女中出在线| 久久成人综合视频| 日本欧美在线视频| 亚洲男人天堂手机在线| 91久久精品视频| 欧美巨大黑人极品精男| 亚洲国产中文字幕久久网| 午夜精品视频在线| 精品久久久久久久大神国产| 亚洲天天在线日亚洲洲精| 日本精品视频在线播放| 欧美在线视频观看免费网站| 久久理论片午夜琪琪电影网| 欧美精品在线免费| 成人免费观看a| 中文字幕一区电影| 国产亚洲综合久久| 欧美精品一区二区三区国产精品| 欧美在线观看一区二区三区| 亚洲国产日韩欧美在线动漫| 亚洲精品一区二区三区婷婷月| 日韩av在线免费| 成人福利在线视频| 国产精品扒开腿做爽爽爽视频| 91精品视频在线免费观看| 久久久久久久999| 精品爽片免费看久久| 国产成人小视频在线观看| 欧洲永久精品大片ww免费漫画| 亚洲激情国产精品|