java+spark-sql查询excel

2019-01-21 02:41:14来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

 Spark官网下载Spark

Spark下载,版本随意,下载后解压放入bigdata下(目录可以更改)

下载Windows下Hadoop所需文件winutils.exe

  同学们自己网上找找吧,这里就不上传了,其实该文件可有可无,报错也不影响Spark运行,强迫症可以下载,本人就有强迫症~~,文件下载后放入bigdata\hadoop\bin目录下。
不用创建环境变量,再Java最开始处定义系统变量即可,如下:

System.setProperty("hadoop.home.dir", HADOOP_HOME);

创建Java Maven项目java-spark-sql-excel

  建立相关目录层次如下:

  父级目录(项目所在目录)
    - java-spark-sql-excel
    - bigdata
      - spark
      - hadoop
        - bin
          - winutils.exe

编码

初始化SparkSession

static{
    System.setProperty("hadoop.home.dir", HADOOP_HOME);
    spark = SparkSession.builder()
            .appName("test")
            .master("local[*]") 
            .config("spark.sql.warehouse.dir",SPARK_HOME)
            .config("spark.sql.parquet.binaryAsString", "true")
            .getOrCreate();
     }

读取excel

public static void readExcel(String filePath,String tableName) throws IOException{
        DecimalFormat format = new DecimalFormat(); 
        format.applyPattern("#");
        //创建文件(可以接收上传的文件,springmvc使用CommonsMultipartFile,jersey可以使用org.glassfish.jersey.media.multipart.FormDataParam(参照本人文件上传博客))
        File file = new File(filePath);
        //创建文件流
        InputStream inputStream = new FileInputStream(file);
        //创建流的缓冲区
        BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);
        //定义Excel workbook引用
        Workbook  workbook =null;
        //.xlsx格式的文件使用XSSFWorkbook子类,xls格式的文件使用HSSFWorkbook
        if(file.getName().contains("xlsx")) workbook = new XSSFWorkbook(bufferedInputStream);
        if(file.getName().contains("xls")&&!file.getName().contains("xlsx"))  workbook = new HSSFWorkbook(bufferedInputStream);
        System.out.println(file.getName());
        //获取Sheets迭代器
        Iterator<Sheet> dataTypeSheets= workbook.sheetIterator();
        while(dataTypeSheets.hasNext()){
            //每一个sheet都是一个表,为每个sheet
            ArrayList<String> schemaList = new ArrayList<String>();
             // dataList数据集
            ArrayList<org.apache.spark.sql.Row> dataList = new ArrayList<org.apache.spark.sql.Row>();
            //字段
            List<StructField> fields = new ArrayList<>();
            //获取当前sheet
            Sheet   dataTypeSheet = dataTypeSheets.next();
            //获取第一行作为字段
            Iterator<Row> iterator = dataTypeSheet.iterator();
            //没有下一个sheet跳过
            if(!iterator.hasNext()) continue;
            //获取第一行用于建立表结构
            Iterator<Cell> firstRowCellIterator = iterator.next().iterator();
             while(firstRowCellIterator.hasNext()){
                 //获取第一行每一列作为字段
                 Cell currentCell = firstRowCellIterator.next();
                 //字符串
                 if(currentCell.getCellTypeEnum() == CellType.STRING) schemaList.add(currentCell.getStringCellValue().trim());
                 //数值
                 if(currentCell.getCellTypeEnum() == CellType.NUMERIC)  schemaList.add((currentCell.getNumericCellValue()+"").trim());
             }
             //创建StructField(spark中的字段对象,需要提供字段名,字段类型,第三个参数true表示列可以为空)并填充List<StructField>
             for (String fieldName : schemaList) {
               StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
               fields.add(field);
             }
             //根据List<StructField>创建spark表结构org.apache.spark.sql.types.StructType
            StructType schema = DataTypes.createStructType(fields);
            //字段数len
            int len = schemaList.size();
            //获取当前sheet数据行数
            int rowEnd = dataTypeSheet.getLastRowNum(); 
            //遍历当前sheet所有行
            for (int rowNum = 1; rowNum <= rowEnd; rowNum++) {  
               //一行数据做成一个List
               ArrayList<String> rowDataList = new ArrayList<String>();
               //获取一行数据
               Row r = dataTypeSheet.getRow(rowNum); 
               if(r!=null){
                   //根据字段数遍历当前行的单元格
                   for (int cn = 0; cn < len; cn++) {  
                      Cell c = r.getCell(cn, Row.MissingCellPolicy.RETURN_BLANK_AS_NULL);  
                      if (c == null)  rowDataList.add("0");//空值简单补零
                      if (c != null&&c.getCellTypeEnum() == CellType.STRING)  rowDataList.add(c.getStringCellValue().trim());//字符串
                      if (c != null&&c.getCellTypeEnum() == CellType.NUMERIC){
                         double value = c.getNumericCellValue(); 
                         if (p.matcher(value+"").matches())  rowDataList.add(format.format(value));//不保留小数点
                         if (!p.matcher(value+"").matches()) rowDataList.add(value+"");//保留小数点
                      }
                      }  
                   }  
                //dataList数据集添加一行
                dataList.add(RowFactory.create(rowDataList.toArray()));
               }
            //根据数据和表结构创建临时表
            spark.createDataFrame(dataList, schema).createOrReplaceTempView(tableName+dataTypeSheet.getSheetName());
            }            
    }

在项目目录下创建测试文件

第一个Sheet:

第二个Sheet:

第三个Sheet:

 测试

public static void main(String[] args) throws Exception {
        //需要查询的excel路径
        String xlsxPath = "test2.xlsx";
        String xlsPath  = "test.xls";
        //定义表名
        String tableName1="test_table1";        
        String tableName2="test_table2";        
        //读取excel表名为tableNameN+Sheet的名称
        readExcel(xlsxPath,tableName2);
        spark.sql("select * from "+tableName2+"Sheet1").show();
        
        readExcel(xlsPath,tableName1);
        spark.sql("select * from "+tableName1+"Sheet1").show();
        spark.sql("select * from "+tableName1+"Sheet2").show();
        spark.sql("select * from "+tableName1+"Sheet3").show();
    }

运行结果

 相关依赖

<dependencies>
     <dependency>
        <groupId>org.spark-project.hive</groupId>
        <artifactId>hive-jdbc</artifactId>
        <version>1.2.1.spark2</version>
     </dependency>
     <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.3.1</version>
     </dependency>   
     <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.1</version>
     </dependency>    
     <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.6.0</version>
     </dependency>
     <dependency>
        <groupId>org.apache.poi</groupId>
        <artifactId>poi</artifactId>
        <version>3.17</version>
     </dependency>
      <dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi-ooxml</artifactId>
            <version>3.17</version>
     </dependency> 
   </dependencies>

本人GitHub


原文链接:https://www.cnblogs.com/shenyuchong/p/10291604.html
如有疑问请与原作者联系

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:SSM框架基础配置文件

下一篇:关于高并发下kafka producer send异步发送耗时问题的分析