Spark 推送数据至 elasticsearch
2018-08-21 05:33:22来源:博客园 阅读 ()
1.工程依赖
1 <properties> 2 <spark_version>2.3.1</spark_version> 3 <!-- elasticsearch--> 4 <elasticsearch.version>5.5.2</elasticsearch.version> 5 <fastjson.version>1.2.28</fastjson.version> 6 <elasticsearch-hadoop.version>6.3.2</elasticsearch-hadoop.version> 7 <elasticsearch-spark.version>5.5.2</elasticsearch-spark.version> 8 </properties> 9 <dependencies> 10 <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> 11 <dependency> 12 <groupId>org.apache.spark</groupId> 13 <artifactId>spark-core_2.11</artifactId> 14 <version>${spark_version}</version> 15 </dependency> 16 <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> 17 <dependency> 18 <groupId>org.apache.spark</groupId> 19 <artifactId>spark-sql_2.11</artifactId> 20 <version>${spark_version}</version> 21 </dependency> 22 <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-yarn --> 23 <dependency> 24 <groupId>org.apache.spark</groupId> 25 <artifactId>spark-yarn_2.11</artifactId> 26 <version>${spark_version}</version> 27 </dependency> 28 <dependency> 29 <groupId>org.elasticsearch</groupId> 30 <artifactId>elasticsearch-spark-20_2.11</artifactId> 31 <version>${elasticsearch-spark.version}</version> 32 </dependency> 33 <dependency> 34 <groupId>mysql</groupId> 35 <artifactId>mysql-connector-java</artifactId> 36 <version>5.1.46</version> 37 </dependency> 38 </dependencies>
2.spark读取hadoop hdfs数据,并推送至elasticsearch
1 public class PushWordCombination { 2 3 private static PinyinTool tool = new PinyinTool(); 4 5 public static void pushDataByLen(SparkContext sc, SparkSession sparkSession, String goodsCategory, Integer len) { 6 Dataset<Row> goodsDF1 = sparkSession.read().format("json").json(String.format("/data/app/%s/combination%d.json", goodsCategory, len)); 7 if (goodsDF1.count() == 0) { 8 return; 9 } 10 11 sparkSession.udf().register("pinYin", (String s) -> tool.toPinYin(s, "", PinyinTool.Type.LOWERCASE), DataTypes.StringType); 12 13 Encoder<RDDKeyByCounts> nameKeyEncoder = Encoders.bean(RDDKeyByCounts.class); 14 Dataset<RDDKeyByCounts> dataset = goodsDF1.selectExpr("name as name", "counts as counts", String.format("%d as goodsCategory", 0), 15 String.format("%d as nameLen", len), "pinYin(name) as pinYin").as(nameKeyEncoder); 16 17 JavaEsSpark.saveToEs(dataset.javaRDD(),"goods-category/category", ImmutableMap.of("es.mapping.id", "name")); 18 } 19 20 public static void main(String[] args) { 21 //自定义比较器 22 SparkConf conf = new SparkConf().setAppName("my-app"). 23 set("es.nodes", ESProperties.IP). 24 set("es.port",ESProperties.PORT). 25 set("pushdown",ESProperties.PUSH_DOWN). 26 set("es.index.auto.create",ESProperties.INDEX_AUTO_CREATE). 27 set("es.nodes.wan.only","true").//在这种模式下,连接器禁用发现,并且只在所有操作中通过声明的ESE节点连接,包括读和写 28 set("es.net.http.auth.user",ESProperties.SECURITY_USER). 29 set("es.net.http.auth.pass",ESProperties.SECURITY_PWD); 30 31 SparkContext sc = new SparkContext(conf); 32 33 SparkSession sparkSession = new SparkSession(sc); 34 35 for (int j = 2; j <= 4; j++) { 36 pushDataByLen(sc, sparkSession, "all", j); 37 } 38 sparkSession.stop(); 39 } 40 }
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
上一篇:从Java到JVM到OS线程睡眠
下一篇:Kibana安全特性之权限控制
- Flink 如何分流数据 2020-06-11
- 数据源管理 | Kafka集群环境搭建,消息存储机制详解 2020-06-11
- 大公司都在做的大数据平台,为你精选这一份书单 2020-06-09
- switch循环所支持的数据类型 2020-06-07
- java基本数据类型 2020-06-06
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash