Spark 读取csv文件操作,option参数解释
import com.bean.Yyds1 import org.apache.spark.sql.SparkSession object TestReadCSV { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("CSV Reader") .master("local") .getOrCreate() /** * 参数可以字符串,也可以是具体的类型,比如boolean * delimiter 分隔符,默认为逗号, * nullValue 指定一个字符串代表 null 值 * quote 引号字符,默认为双引号" * header 第一行不作为数据内容,作为标题 * inferSchema 自动推测字段类型 * ignoreLeadingWhiteSpace 裁剪前面的空格 * ignoreTrailingWhiteSpace 裁剪后面的空格 * nullValue 空值设置,如果不想用任何符号作为空值,可以赋值null即可 * multiline 运行多列,超过62 columns时使用 * encoding 指定編码,如:gbk / utf-8 Unicode GB2312 * ** */ import spark.implicits._ val result = spark.read.format("csv") .option("delimiter", "\\t") .option("encoding","GB2312") .option("enforceSchema",false) .option("header", "true") // .option("header", false) .option("quote", "'") .option("nullValue", "\\N") .option("ignoreLeadingWhiteSpace", false) .option("ignoreTrailingWhiteSpace", false) .option("nullValue", null) .option("multiline", "true") .load("G:\\python\\yyds\\yyds_1120_tab.csv").as[Yyds1] //yyds_1120_tab.csv aa1.csv yyds_20211120 yyds_1120_tab2_utf-8 result.map(row => { row.ji_check_cnt.toInt }).foreachPartition(a => {a.foreach(println _)}) } }
pom依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0modelVersion> <groupId>org.examplegroupId> <artifactId>TmLimitPredictartifactId> <version>1.0-SNAPSHOTversion> <properties> <log4j.version>1.2.17log4j.version> <slf4j.version>1.7.22slf4j.version> <kafka.version>2.8.0kafka.version> <spark.version>2.2.0spark.version> <scala.version>2.11.8scala.version> <jblas.version>1.2.1jblas.version> <hadoop.version>2.7.3hadoop.version> properties> <dependencies> <dependency> <groupId>org.apache.hadoopgroupId> <artifactId>hadoop-commonartifactId> <version>${hadoop.version}version> dependency> <dependency> <groupId>org.apache.hadoopgroupId> <artifactId>hadoop-mapreduce-client-coreartifactId> <version>${hadoop.version}version> dependency> <dependency> <groupId>org.apache.hadoopgroupId> <artifactId>hadoop-clientartifactId> <version>${hadoop.version}version> dependency> <dependency> <groupId>org.apache.hadoopgroupId> <artifactId>hadoop-hdfsartifactId> <version>${hadoop.version}version> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-core_2.11artifactId> <version>${spark.version}version> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-sql_2.11artifactId> <version>${spark.version}version> <exclusions> <exclusion> <groupId>com.google.guavagroupId> <artifactId>guavaartifactId> exclusion> exclusions> dependency> <dependency> <groupId>com.google.guavagroupId> <artifactId>guavaartifactId> <version>15.0version> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-hive_2.11artifactId> <version>${spark.version}version> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-streaming_2.11artifactId> <version>${spark.version}version> dependency> <dependency> <groupId>org.scala-langgroupId> <artifactId>scala-libraryartifactId> <version>${scala.version}version> dependency> <dependency> <groupId>org.scalanlpgroupId> <artifactId>jblasartifactId> <version>${jblas.version}version> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-mllib_2.11artifactId> <version>${spark.version}version> dependency> <dependency> <groupId>org.apache.kafkagroupId> <artifactId>kafka-clientsartifactId> <version>${kafka.version}version> dependency> <dependency> <groupId>org.apache.sparkgroupId> <artifactId>spark-streaming-kafka-0-10_2.11artifactId> <version>${spark.version}version> dependency> <dependency> <groupId>com.sf.kafkagroupId> <artifactId>sf-kafka-api-coreartifactId> <version>2.4.1version> dependency> <dependency> <groupId>org.projectlombokgroupId> <artifactId>lombokartifactId> <version>1.16.18version> <scope>providedscope> dependency> dependencies> <build> <sourceDirectory>src/main/javasourceDirectory> <testSourceDirectory>src/test/scalatestSourceDirectory> <resources> <resource> <directory>src/main/resourcesdirectory> <includes> <include>**/*.propertiesinclude> <include>**/*.xmlinclude> includes> resource> resources> <plugins> <plugin> <groupId>net.alchim31.mavengroupId> <artifactId>scala-maven-pluginartifactId> <version>3.2.2version> <executions> <execution> <id>scala-compile-firstid> <phase>process-resourcesphase> <goals> <goal>add-sourcegoal> <goal>compilegoal> goals> execution> executions> plugin> <plugin> <groupId>org.apache.maven.pluginsgroupId> <artifactId>maven-compiler-pluginartifactId> <version>3.1version> <configuration> <source>1.8source> <target>1.8target> <encoding>UTF-8encoding> configuration> <executions> <execution> <phase>compilephase> <goals> <goal>compilegoal> goals> execution> executions> plugin> <plugin> <artifactId>maven-dependency-pluginartifactId> <executions> <execution> <phase>process-sourcesphase> <goals> <goal>copy-dependenciesgoal> goals> <configuration> <excludeScope>providedexcludeScope> <outputDirectory>${project.build.directory}/liboutputDirectory> configuration> execution> executions> plugin> <plugin> <groupId>org.apache.maven.pluginsgroupId> <artifactId>maven-jar-pluginartifactId> <version>2.4version> <configuration> <excludes> <exclude>*.txtexclude> excludes> <archive> <addMavenDescriptor>falseaddMavenDescriptor> <manifest> <addClasspath>falseaddClasspath> <classpathPrefix>lib/classpathPrefix> <mainClass>ConnectKafkaTestmainClass> manifest> <manifestEntries> <Class-Path>lib/Class-Path> manifestEntries> archive> configuration> plugin> <plugin> <groupId>org.apache.maven.pluginsgroupId> <artifactId>maven-assembly-pluginartifactId> <version>2.4version> <configuration> <descriptors> <descriptor>src/main/assembly/assembly.xmldescriptor> descriptors> configuration> <executions> <execution> <id>make-assemblyid> <phase>packagephase> <goals> <goal>singlegoal> goals> execution> executions> plugin> plugins> build> project>
参数 | 解释 |
sep | 默认是, 指定单个字符分割字段和值 |
encoding | 默认是uft-8通过给定的编码类型进行解码 |
quote | 默认是“,其中分隔符可以是值的一部分,设置用于转义带引号的值的单个字符。如果您想关闭引号,则需要设置一个空字符串,而不是null。 |
escape | 默认(\)设置单个字符用于在引号里面转义引号 |
charToEscapeQuoteEscaping | 默认是转义字符(上面的escape)或者\0,当转义字符和引号(quote)字符不同的时候,默认是转义字符(escape),否则为\0 |
comment | 默认是空值,设置用于跳过行的单个字符,以该字符开头。默认情况下,它是禁用的 |
header | 默认是false,将第一行作为列名 |
enforceSchema |
默认是true, 如果将其设置为true,则指定或推断的模式将强制应用于数据源文件,而CSV文件中的标头将被忽略。 如果选项设置为false,则在header选项设置为true的情况下,将针对CSV文件中的所有标题验证模式。 模式中的字段名称和CSV标头中的列名称是根据它们的位置检查的,并考虑了*spark.sql.caseSensitive。 虽然默认值为true,但是建议禁用 enforceSchema选项,以避免产生错误的结果 |
inferSchema | inferSchema(默认为false`):从数据自动推断输入模式。 *需要对数据进行一次额外的传递 |
samplingRatio | 默认为1.0,定义用于模式推断的行的分数 |
ignoreLeadingWhiteSpace | 默认为false,一个标志,指示是否应跳过正在读取的值中的前导空格 |
ignoreTrailingWhiteSpace | 默认为false一个标志,指示是否应跳过正在读取的值的结尾空格 |
nullValue | 默认是空的字符串,设置null值的字符串表示形式。从2.0.1开始,这适用于所有支持的类型,包括字符串类型 |
emptyValue | 默认是空字符串,设置一个空值的字符串表示形式 |
nanValue | 默认是Nan,设置非数字的字符串表示形式 |
positiveInf | 默认是Inf |
negativeInf | 默认是-Inf 设置负无穷值的字符串表示形式 |
dateFormat |
默认是yyyy-MM-dd,设置指示日期格式的字符串。 自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于日期类型 |
timestampFormat |
默认是yyyy-MM-dd'T'HH:mm:ss.SSSXXX,设置表示时间戳格式的字符串。 自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于时间戳记类型 |
maxColumns | 默认是20480定义多少列数目的硬性设置 |
maxCharsPerColumn | 默认是-1定义读取的任何给定值允许的最大字符数。默认情况下为-1,表示长度不受限制 |
mode |
默认(允许)允许一种在解析过程中处理损坏记录的模式。它支持以下不区分大小写的模式。 请注意,Spark尝试在列修剪下仅解析CSV中必需的列。因此,损坏的记录可以根据所需的字段集而有所不同。 可以通过spark.sql.csv.parser.columnPruning.enabled(默认启用)来控制此行为。 |
mode下面的参数: | --------------------------------------------------- |
PERMISSIVE |
当它遇到损坏的记录时,将格式错误的字符串放入由“ columnNameOfCorruptRecord”配置的*字段中,并将其他字段设置为“ null”。 为了保留损坏的记录,用户可以在用户定义的模式中设置一个名为columnNameOfCorruptRecord |
DROPMALFORMED | 忽略整个损坏的记录 |
FAILFAST | 遇到损坏的记录时引发异常 |
-----mode参数结束---- | ------------------------------------------------------- |
columnNameOfCorruptRecord | 默认值指定在spark.sql.columnNameOfCorruptRecord,允许重命名由PERMISSIVE模式创建的格式错误的新字段。这会覆盖spark.sql.columnNameOfCorruptRecord |
multiLine | 默认是false,解析一条记录,该记录可能超过62个columns |