Spark 读取csv文件操作,option参数解释


import com.bean.Yyds1
import org.apache.spark.sql.SparkSession

object TestReadCSV {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("CSV Reader")
      .master("local")
      .getOrCreate()
    /** *   参数可以字符串,也可以是具体的类型,比如boolean
     * delimiter 分隔符,默认为逗号,
     * nullValue 指定一个字符串代表 null 值
     * quote 引号字符,默认为双引号"
     * header 第一行不作为数据内容,作为标题
     * inferSchema 自动推测字段类型
     * ignoreLeadingWhiteSpace 裁剪前面的空格
     * ignoreTrailingWhiteSpace 裁剪后面的空格
     * nullValue 空值设置,如果不想用任何符号作为空值,可以赋值null即可
     * multiline  运行多列,超过62 columns时使用
     * encoding   指定編码,如:gbk  / utf-8  Unicode  GB2312
     * ** */

      import spark.implicits._
    val result = spark.read.format("csv")
      .option("delimiter", "\\t")
      .option("encoding","GB2312")
      .option("enforceSchema",false)
      .option("header", "true")
//      .option("header", false)
      .option("quote", "'")
      .option("nullValue", "\\N")
      .option("ignoreLeadingWhiteSpace", false)
      .option("ignoreTrailingWhiteSpace", false)
      .option("nullValue", null)
      .option("multiline", "true")
      .load("G:\\python\\yyds\\yyds_1120_tab.csv").as[Yyds1] //yyds_1120_tab.csv  aa1.csv   yyds_20211120  yyds_1120_tab2_utf-8


    result.map(row => {
      row.ji_check_cnt.toInt
    }).foreachPartition(a => {a.foreach(println _)})

  }
}

pom依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0modelVersion>
    <groupId>org.examplegroupId>
    <artifactId>TmLimitPredictartifactId>
    <version>1.0-SNAPSHOTversion>

    <properties>
        <log4j.version>1.2.17log4j.version>
        <slf4j.version>1.7.22slf4j.version>
        
        <kafka.version>2.8.0kafka.version>
        <spark.version>2.2.0spark.version>
        <scala.version>2.11.8scala.version>
        <jblas.version>1.2.1jblas.version>
        <hadoop.version>2.7.3hadoop.version>
    properties>


    <dependencies>
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        

        <dependency>
            <groupId>org.apache.hadoopgroupId>
            <artifactId>hadoop-commonartifactId>
            <version>${hadoop.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.hadoopgroupId>
            <artifactId>hadoop-mapreduce-client-coreartifactId>
            <version>${hadoop.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.hadoopgroupId>
            <artifactId>hadoop-clientartifactId>
            <version>${hadoop.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.hadoopgroupId>
            <artifactId>hadoop-hdfsartifactId>
            <version>${hadoop.version}version>
        dependency>

        <dependency>
            <groupId>org.apache.sparkgroupId>
            <artifactId>spark-core_2.11artifactId>
            <version>${spark.version}version>
            
        dependency>
        <dependency>
            <groupId>org.apache.sparkgroupId>
            <artifactId>spark-sql_2.11artifactId>
            <version>${spark.version}version>
            <exclusions>
                <exclusion>
                    <groupId>com.google.guavagroupId>
                    <artifactId>guavaartifactId>
                exclusion>
            exclusions>
            
        dependency>
        <dependency>
            <groupId>com.google.guavagroupId>
            <artifactId>guavaartifactId>
            <version>15.0version>
        dependency>
        <dependency>
            <groupId>org.apache.sparkgroupId>
            <artifactId>spark-hive_2.11artifactId>
            <version>${spark.version}version>
            
        dependency>
        <dependency>
            <groupId>org.apache.sparkgroupId>
            <artifactId>spark-streaming_2.11artifactId>
            <version>${spark.version}version>
            
        dependency>
        
        <dependency>
            <groupId>org.scala-langgroupId>
            <artifactId>scala-libraryartifactId>
            <version>${scala.version}version>
            
        dependency>

        
        <dependency>
            <groupId>org.scalanlpgroupId>
            <artifactId>jblasartifactId>
            <version>${jblas.version}version>
            
        dependency>
        <dependency>
            <groupId>org.apache.sparkgroupId>
            <artifactId>spark-mllib_2.11artifactId>
            <version>${spark.version}version>
            
        dependency>


        
        <dependency>
            <groupId>org.apache.kafkagroupId>
            <artifactId>kafka-clientsartifactId>
            <version>${kafka.version}version>
            
        dependency>
        <dependency>
            <groupId>org.apache.sparkgroupId>
            <artifactId>spark-streaming-kafka-0-10_2.11artifactId>
            <version>${spark.version}version>
            
        dependency>
        <dependency>
            <groupId>com.sf.kafkagroupId>
            <artifactId>sf-kafka-api-coreartifactId>
            <version>2.4.1version>
            
        dependency>

        
        <dependency>
            <groupId>org.projectlombokgroupId>
            <artifactId>lombokartifactId>
            <version>1.16.18version>
            <scope>providedscope>
        dependency>
    dependencies>


    <build>
        
        <sourceDirectory>src/main/javasourceDirectory>
        <testSourceDirectory>src/test/scalatestSourceDirectory>

        <resources>
            <resource>
                <directory>src/main/resourcesdirectory>
                <includes>
                    <include>**/*.propertiesinclude>
                    <include>**/*.xmlinclude>
                includes>
                
                
            resource>
            
            
        resources>

        <plugins>
            
            <plugin>
                <groupId>net.alchim31.mavengroupId>
                <artifactId>scala-maven-pluginartifactId>
                <version>3.2.2version>
                
                
                
                <executions>
                    <execution>
                        <id>scala-compile-firstid>
                        <phase>process-resourcesphase>
                        <goals>
                            <goal>add-sourcegoal>
                            <goal>compilegoal>
                        goals>
                    execution>
                executions>
            plugin>

            
            <plugin>
                <groupId>org.apache.maven.pluginsgroupId>
                <artifactId>maven-compiler-pluginartifactId>
                <version>3.1version>
                <configuration>
                    <source>1.8source>
                    <target>1.8target>
                    <encoding>UTF-8encoding>
                configuration>
                <executions>
                    <execution>
                        <phase>compilephase>
                        <goals>
                            <goal>compilegoal>
                        goals>
                    execution>
                executions>
            plugin>

            
            <plugin>
                <artifactId>maven-dependency-pluginartifactId>
                <executions>
                    <execution>
                        <phase>process-sourcesphase>

                        <goals>
                            <goal>copy-dependenciesgoal>
                        goals>

                        <configuration>
                            <excludeScope>providedexcludeScope>
                            <outputDirectory>${project.build.directory}/liboutputDirectory>
                        configuration>

                    execution>
                executions>
            plugin>
            
            <plugin>
                <groupId>org.apache.maven.pluginsgroupId>
                <artifactId>maven-jar-pluginartifactId>
                <version>2.4version>
                
                <configuration>
                    
                    <excludes>
                        
                        
                        <exclude>*.txtexclude>
                    excludes>
                    
                    <archive>
                        
                        <addMavenDescriptor>falseaddMavenDescriptor>
                        
                        <manifest>
                            
                            
                            <addClasspath>falseaddClasspath>
                            
                            <classpathPrefix>lib/classpathPrefix>
                            
                            
                            <mainClass>ConnectKafkaTestmainClass>
                        manifest>
                        
                        <manifestEntries>
                            
                            <Class-Path>lib/Class-Path>
                        manifestEntries>
                    archive>
                    
                    
                configuration>
            plugin>

            
            <plugin>
                <groupId>org.apache.maven.pluginsgroupId>
                <artifactId>maven-assembly-pluginartifactId>
                <version>2.4version>
                
                <configuration>
                    
                    <descriptors>
                        <descriptor>src/main/assembly/assembly.xmldescriptor>
                    descriptors>
                configuration>
                <executions>
                    <execution>
                        <id>make-assemblyid>
                        <phase>packagephase>
                        <goals>
                            <goal>singlegoal>
                        goals>
                    execution>
                executions>
            plugin>
        plugins>
    build>


project>
参数 解释
sep 默认是, 指定单个字符分割字段和值
encoding 默认是uft-8通过给定的编码类型进行解码
quote 默认是“,其中分隔符可以是值的一部分,设置用于转义带引号的值的单个字符。如果您想关闭引号,则需要设置一个空字符串,而不是null。
escape 默认(\)设置单个字符用于在引号里面转义引号
charToEscapeQuoteEscaping 默认是转义字符(上面的escape)或者\0,当转义字符和引号(quote)字符不同的时候,默认是转义字符(escape),否则为\0
comment 默认是空值,设置用于跳过行的单个字符,以该字符开头。默认情况下,它是禁用的
header 默认是false,将第一行作为列名
enforceSchema

默认是true, 如果将其设置为true,则指定或推断的模式将强制应用于数据源文件,而CSV文件中的标头将被忽略。

如果选项设置为false,则在header选项设置为true的情况下,将针对CSV文件中的所有标题验证模式。

模式中的字段名称和CSV标头中的列名称是根据它们的位置检查的,并考虑了*spark.sql.caseSensitive。

虽然默认值为true,但是建议禁用 enforceSchema选项,以避免产生错误的结果

inferSchema inferSchema(默认为false`):从数据自动推断输入模式。 *需要对数据进行一次额外的传递
samplingRatio 默认为1.0,定义用于模式推断的行的分数
ignoreLeadingWhiteSpace 默认为false,一个标志,指示是否应跳过正在读取的值中的前导空格
ignoreTrailingWhiteSpace 默认为false一个标志,指示是否应跳过正在读取的值的结尾空格
nullValue 默认是空的字符串,设置null值的字符串表示形式。从2.0.1开始,这适用于所有支持的类型,包括字符串类型
emptyValue 默认是空字符串,设置一个空值的字符串表示形式
nanValue 默认是Nan,设置非数字的字符串表示形式
positiveInf 默认是Inf
negativeInf 默认是-Inf 设置负无穷值的字符串表示形式
dateFormat

默认是yyyy-MM-dd,设置指示日期格式的字符串。

自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于日期类型

timestampFormat

默认是yyyy-MM-dd'T'HH:mm:ss.SSSXXX,设置表示时间戳格式的字符串。

自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于时间戳记类型

maxColumns 默认是20480定义多少列数目的硬性设置
maxCharsPerColumn 默认是-1定义读取的任何给定值允许的最大字符数。默认情况下为-1,表示长度不受限制
mode

默认(允许)允许一种在解析过程中处理损坏记录的模式。它支持以下不区分大小写的模式。

请注意,Spark尝试在列修剪下仅解析CSV中必需的列。因此,损坏的记录可以根据所需的字段集而有所不同。

可以通过spark.sql.csv.parser.columnPruning.enabled(默认启用)来控制此行为。

   
mode下面的参数: ---------------------------------------------------
PERMISSIVE

当它遇到损坏的记录时,将格式错误的字符串放入由“ columnNameOfCorruptRecord”配置的*字段中,并将其他字段设置为“ null”。

为了保留损坏的记录,用户可以在用户定义的模式中设置一个名为columnNameOfCorruptRecord

DROPMALFORMED 忽略整个损坏的记录
FAILFAST 遇到损坏的记录时引发异常
-----mode参数结束---- -------------------------------------------------------
   
columnNameOfCorruptRecord 默认值指定在spark.sql.columnNameOfCorruptRecord,允许重命名由PERMISSIVE模式创建的格式错误的新字段。这会覆盖spark.sql.columnNameOfCorruptRecord
multiLine 默认是false,解析一条记录,该记录可能超过62个columns