Spark 推送数据至 elasticsearch

2018-08-21 05:33:22来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

1.工程依赖

 1 <properties>
 2     <spark_version>2.3.1</spark_version>
 3     <!-- elasticsearch-->
 4     <elasticsearch.version>5.5.2</elasticsearch.version>
 5     <fastjson.version>1.2.28</fastjson.version>
 6     <elasticsearch-hadoop.version>6.3.2</elasticsearch-hadoop.version>
 7     <elasticsearch-spark.version>5.5.2</elasticsearch-spark.version>
 8 </properties>
 9 <dependencies>
10     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
11     <dependency>
12         <groupId>org.apache.spark</groupId>
13         <artifactId>spark-core_2.11</artifactId>
14         <version>${spark_version}</version>
15     </dependency>
16     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
17     <dependency>
18         <groupId>org.apache.spark</groupId>
19         <artifactId>spark-sql_2.11</artifactId>
20         <version>${spark_version}</version>
21     </dependency>
22     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-yarn -->
23     <dependency>
24         <groupId>org.apache.spark</groupId>
25         <artifactId>spark-yarn_2.11</artifactId>
26         <version>${spark_version}</version>
27     </dependency>
28     <dependency>
29         <groupId>org.elasticsearch</groupId>
30         <artifactId>elasticsearch-spark-20_2.11</artifactId>
31         <version>${elasticsearch-spark.version}</version>
32     </dependency>
33     <dependency>
34         <groupId>mysql</groupId>
35         <artifactId>mysql-connector-java</artifactId>
36         <version>5.1.46</version>
37     </dependency>
38 </dependencies>

2.spark读取hadoop hdfs数据,并推送至elasticsearch

 1 public class PushWordCombination {
 2 
 3     private static PinyinTool tool = new PinyinTool();
 4 
 5     public static void pushDataByLen(SparkContext sc, SparkSession sparkSession, String goodsCategory, Integer len) {
 6         Dataset<Row> goodsDF1 = sparkSession.read().format("json").json(String.format("/data/app/%s/combination%d.json", goodsCategory, len));
 7         if (goodsDF1.count() == 0) {
 8             return;
 9         }
10 
11         sparkSession.udf().register("pinYin", (String s) -> tool.toPinYin(s, "", PinyinTool.Type.LOWERCASE), DataTypes.StringType);
12 
13         Encoder<RDDKeyByCounts> nameKeyEncoder = Encoders.bean(RDDKeyByCounts.class);
14         Dataset<RDDKeyByCounts> dataset = goodsDF1.selectExpr("name as name", "counts as counts", String.format("%d as goodsCategory", 0),
15                 String.format("%d as nameLen", len), "pinYin(name) as pinYin").as(nameKeyEncoder);
16 
17         JavaEsSpark.saveToEs(dataset.javaRDD(),"goods-category/category", ImmutableMap.of("es.mapping.id", "name"));
18     }
19 
20     public static void main(String[] args) {
21         //自定义比较器
22         SparkConf conf = new SparkConf().setAppName("my-app").
23                 set("es.nodes", ESProperties.IP).
24                 set("es.port",ESProperties.PORT).
25                 set("pushdown",ESProperties.PUSH_DOWN).
26                 set("es.index.auto.create",ESProperties.INDEX_AUTO_CREATE).
27                 set("es.nodes.wan.only","true").//在这种模式下,连接器禁用发现,并且只在所有操作中通过声明的ESE节点连接,包括读和写
28                 set("es.net.http.auth.user",ESProperties.SECURITY_USER).
29                 set("es.net.http.auth.pass",ESProperties.SECURITY_PWD);
30 
31         SparkContext sc = new SparkContext(conf);
32 
33         SparkSession sparkSession = new SparkSession(sc);
34 
35         for (int j = 2; j <= 4; j++) {
36             pushDataByLen(sc, sparkSession, "all", j);
37         }
38         sparkSession.stop();
39     }
40 }

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:从Java到JVM到OS线程睡眠

下一篇:Kibana安全特性之权限控制