iceberg数据存储格式
Apache Iceberg作为一款新兴的数据湖解决方案在实现上高度抽象,在存储上能够对接当前主流的HDFS,S3文件系统并且支持多种文件存储格式,例如Parquet、ORC、AVRO。相较于Hudi、Delta与Spark的强耦合,Iceberg可以与多种计算引擎对接,目前社区已经支持Spark读写Iceberg、Impala/Hive查询Iceberg。本文基于Apache Iceberg 0.10.0,介绍Iceberg文件的组织方式以及不同文件的存储格式。
Iceberg Table Format
从图中可以看到iceberg将数据进行分层管理,主要分为元数据管理层和数据存储层。元数据管理层又可以细分为三层:
- VersionMetadata
- Snapshot
- Manifest
??VersionMetadata存储当前版本的元数据信息(所有snapshot信息);Snapshot表示当前操作的一个快照,每次commit都会生成一个快照,一个快照中包含多个Manifest,每个Manifest中记录了当前操作生成数据所对应的文件地址,也就是data files的地址。基于snapshot的管理方式,iceberg能够进行time travel(历史版本读取以及增量读取),并且提供了serializable isolation。
??数据存储层支持不同的文件格式,目前支持Parquet、ORC、AVRO。
下面以HadoopTableOperation commit生成的数据为例介绍各层的数据格式。iceberg生成的数据目录结构如下所示:
├── data
│ ├── id=1
│ │ ├── 00000-0-04ae60eb-657d-45cb-bb99-d1cb7fe0ad5a-00001.parquet
│ │ └── 00000-4-487b841b-13b4-4ae8-9238-f70674d5102e-00001.parquet
│ ├── id=2
│ │ ├── 00001-1-e85b018b-e43a-44d7-9904-09c80a9b9c24-00001.parquet
│ │ └── 00001-5-0e2be766-c921-4269-8e1e-c3cff4b98a5a-00001.parquet
│ ├── id=3
│ │ ├── 00002-2-097171c5-d810-4de9-aa07-58f3f8a3f52e-00001.parquet
│ │ └── 00002-6-9d738169-1dbe-4cc5-9a87-f79457a9ec0b-00001.parquet
│ └── id=4
│ ├── 00003-3-b0c91d66-9e4e-4b7a-bcd5-db3dc1b847f2-00001.parquet
│ └── 00003-7-68c45a24-21a2-41e8-90f1-ef4be42f3002-00001.parquet
└── metadata
├── 1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae-m0.avro
├── 1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae-m1.avro
├── f475511f-877e-4da5-90aa-efa5928a7759-m0.avro
├── snap-2080639593951710914-1-1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae.avro
├── snap-5178718682852547007-1-f475511f-877e-4da5-90aa-efa5928a7759.avro
├── v1.metadata.json
├── v2.metadata.json
├── v3.metadata.json
└── version-hint.text
其中metadata目录存放元数据管理层的数据:
- version-hint.text:存储version.metadata.json的版本号,即下文的number
- version[number].metadata.json
- snap-[snapshotID]-[attemptID]-[commitUUID].avro(snapshot文件)
- [commitUUID]-m-[manifestCount].avro(manifest文件)
data目录组织形式类似于hive,都是以分区进行目录组织(上图中id为分区列),最终数据可以使用不同文件格式进行存储:
- [sparkPartitionID]-[sparkTaskID]-[UUID]-[fileCount].[parquet | avro | orc]
VersionMetadata
//
{
// 当前文件格式版本信息
// 目前为version 1
// 支持row-level delete等功能的version 2还在开发中
"format-version" : 1,
"table-uuid" : "a9114f94-911e-4acf-94cc-6d000b321812",
// hadoopTable location
"location" : "hdfs://10.242.199.202:9000/hive/empty_order_item",
// 最新snapshot的创建时间
"last-updated-ms" : 1608810968725,
"last-column-id" : 6,
// iceberg schema
"schema" : {
"type" : "struct",
"fields" : [ {
"id" : 1,
"name" : "id",
"required" : false, // 类似probuf中的required
"type" : "long"
}, {
"id" : 2,
"name" : "order_id",
"required" : false,
"type" : "long"
}, {
"id" : 3,
"name" : "product_id",
"required" : false,
"type" : "long"
}, {
"id" : 4,
"name" : "product_price",
"required" : false,
"type" : "decimal(7, 2)"
}, {
"id" : 5,
"name" : "product_quantity",
"required" : false,
"type" : "int"
}, {
"id" : 6,
"name" : "product_name",
"required" : false,
"type" : "string"
} ]
},
"partition-spec" : [ {
"name" : "id",
"transform" : "identity", // transform类型
"source-id" : 1,
"field-id" : 1000
} ],
"default-spec-id" : 0,
// 分区信息
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ {
"name" : "id",
// transform类型:目前支持identity,year,bucket等
"transform" : "identity",
// 对应schema.fields中相应field的ID
"source-id" : 1,
"field-id" : 1000
} ]
} ],
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
// hive创建该表存储的一些hive property信息
"properties" : {
"totalSize" : "0",
"rawDataSize" : "0",
"numRows" : "0",
"COLUMN_STATS_ACCURATE" : "{\"BASIC_STATS\":\"true\"}",
"numFiles" : "0"
},
// 当前snapshot id
"current-snapshot-id" : 2080639593951710914,
// snapshot信息
"snapshots" : [ {
"snapshot-id" : 5178718682852547007,
// 创建snapshot时间
"timestamp-ms" : 1608809818168,
"summary" : {
// spark写入方式,目前支持overwrite以及append
"operation" : "overwrite",
"spark.app.id" : "local-1608809790982",
"replace-partitions" : "true",
// 本次snapshot添加的文件数量
"added-data-files" : "4",
// 本次snapshot添加的record数量
"added-records" : "4",
// 本次snapshot添加的文件大小
"added-files-size" : "7217",
// 本次snapshot修改的分区数量
"changed-partition-count" : "4",
// 本次snapshot中record总数 = lastSnapshotTotalRecord - currentSnapshotDeleteRecord + currentSnapshotAddRecord
"total-records" : "4",
"total-data-files" : "4",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/snap-5178718682852547007-1-f475511f-877e-4da5-90aa-efa5928a7759.avro"
}, {
"snapshot-id" : 2080639593951710914,
// 上次snapshotID
"parent-snapshot-id" : 5178718682852547007,
"timestamp-ms" : 1608810968725,
"summary" : {
"operation" : "overwrite",
"spark.app.id" : "local-1608809790982",
"replace-partitions" : "true",
"added-data-files" : "4",
"deleted-data-files" : "4",
"added-records" : "4",
"deleted-records" : "4",
"added-files-size" : "7217",
"removed-files-size" : "7217",
"changed-partition-count" : "4",
"total-records" : "4",
"total-data-files" : "4",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
// snapshot文件路径
"manifest-list" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/snap-2080639593951710914-1-1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae.avro"
} ],
// snapshot记录
"snapshot-log" : [ {
"timestamp-ms" : 1608809818168,
"snapshot-id" : 5178718682852547007
}, {
"timestamp-ms" : 1608810968725,
"snapshot-id" : 2080639593951710914
} ],
// metada记录
"metadata-log" : [ {
"timestamp-ms" : 1608809758229,
"metadata-file" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/v1.metadata.json"
}, {
"timestamp-ms" : 1608809818168,
"metadata-file" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/v2.metadata.json"
} ]
}
上例展示的是v3.metadata.json中的数据,该文件保存了iceberg table schema、partition、snapshot信息,partition中的transform信息使得iceberg能够根据字段进行hidden partition,而无需像hive一样显示的指定分区字段。由于VersionMetadata中记录了每次snapshot的id以及create_time,我们可以通过时间或snapshotId查询相应snapshot的数据,实现Time Travel。
Snapshot
// Snapshot: 2080639593951710914
// Location: hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/snap-2080639593951710914-1-1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae.avro
// manifest entry
{
"manifest_path" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae-m1.avro",
"manifest_length" : 5291,
"partition_spec_id" : 0,
// 该manifest entry所属的snapshot
"added_snapshot_id" : {
"long" : 2080639593951710914
},
// 该manifest中添加的文件数量
"added_data_files_count" : {
"int" : 4
},
// 创建该manifest时已经存在且
// 没有被这次创建操作删除的文件数量
"existing_data_files_count" : {
"int" : 0
},
// 创建manifest删除的文件
"deleted_data_files_count" : {
"int" : 0
},
// 该manifest中partition字段的范围
"partitions" : {
"array" : [ {
"contains_null" : false,
"lower_bound" : {
"bytes" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
},
"upper_bound" : {
"bytes" : "\u0004\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
}
} ]
},
"added_rows_count" : {
"long" : 4
},
"existing_rows_count" : {
"long" : 0
},
"deleted_rows_count" : {
"long" : 0
}
}
// manifest entry
{
"manifest_path" : "hdfs://10.242.199.202:9000/hive/empty_order_item/metadata/1f8279fb-5b2d-464c-af12-d9d6fbe9b5ae-m0.avro",
"manifest_length" : 5289,
"partition_spec_id" : 0,
"added_snapshot_id" : {
"long" : 2080639593951710914
},
"added_data_files_count" : {
"int" : 0
},
"existing_data_files_count" : {
"int" : 0
},
"deleted_data_files_count" : {
"int" : 4
},
"partitions" : {
"array" : [ {
"contains_null" : false,
"lower_bound" : {
"bytes" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
},
"upper_bound" : {
"bytes" : "\u0004\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
}
} ]
},
"added_rows_count" : {
"long" : 0
},
"existing_rows_count" : {
"long" : 0
},
"deleted_rows_count" : {
"long" : 4
}
}
一个snapshot中可以包含多个manifest entry,一个manifest entry表示一个manifest,其中重点需要关注的是每个manifest中的partitions字段,在根据filter进行过滤时可以首先通过该字段表示的分区范围对manifest进行过滤,避免无效的查询。
Manifest
// DataFileEntry
{
// 表示对应数据文件status
// 0: EXISTING, 1: ADDED,2: DELETED
"status" : 1,
"snapshot_id" : {
"long" : 2080639593951710914
},
"data_file" : {
"file_path" : "hdfs://10.242.199.202:9000/hive/empty_order_item/data/id=1/00000-4-487b841b-13b4-4ae8-9238-f70674d5102e-00001.parquet",
"file_format" : "PARQUET",
// 对应的分区值
"partition" : {
"id" : {
"long" : 1
}
},
// 文件中record数量
"record_count" : 1,
// 文件大小
"file_size_in_bytes" : 1823,
"block_size_in_bytes" : 67108864,
// 不同column存储大小
"column_sizes" : {
"array" : [ {
"key" : 1,
"value" : 52
}, {
"key" : 2,
"value" : 52
}, {
"key" : 3,
"value" : 52
}, {
"key" : 4,
"value" : 53
}, {
"key" : 5,
"value" : 51
}, {
"key" : 6,
"value" : 61
} ]
},
// 不同列对应的value数量
"value_counts" : {
"array" : [ {
"key" : 1,
"value" : 1
}, {
"key" : 2,
"value" : 1
}, {
"key" : 3,
"value" : 1
}, {
"key" : 4,
"value" : 1
}, {
"key" : 5,
"value" : 1
}, {
"key" : 6,
"value" : 1
} ]
},
// 列值为null的数量
"null_value_counts" : {
"array" : [ {
"key" : 1,
"value" : 0
}, {
"key" : 2,
"value" : 0
}, {
"key" : 3,
"value" : 0
}, {
"key" : 4,
"value" : 0
}, {
"key" : 5,
"value" : 0
}, {
"key" : 6,
"value" : 0
} ]
},
// 不同列的范围
"lower_bounds" : {
"array" : [ {
"key" : 1,
"value" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
}, {
"key" : 2,
"value" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
}, {
"key" : 3,
"value" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
}, {
"key" : 4,
"value" : "\u0013?"
}, {
"key" : 5,
"value" : "\u0002\u0000\u0000\u0000"
}, {
"key" : 6,
"value" : "table lamp"
} ]
},
"upper_bounds" : {
"array" : [ {
"key" : 1,
"value" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
}, {
"key" : 2,
"value" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
}, {
"key" : 3,
"value" : "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
}, {
"key" : 4,
"value" : "\u0013?"
}, {
"key" : 5,
"value" : "\u0002\u0000\u0000\u0000"
}, {
"key" : 6,
"value" : "table lamp"
} ]
},
"key_metadata" : null,
// parquet block offset/ orc stripe offset
"split_offsets" : {
"array" : [ 4 ]
}
}
}
{
...
}
Manifest管理多个data文件,一条DataFileEntry对应一个data文件,DataFileEntry中记录了所属partition,value bounds等信息,value_counts和null_value_counts可以用于过滤null列,例:column a所对应的value_count为3,且对应的null_value_count也为3,此时如果select a,则可以根据value_count-null_value_count=0判断a全为null直接返回而无需再进行parquet文件的查询;除此之外,可以根据value bounds进行过滤,加速查询。
总结
本文主要介绍了Iceberg不同文件的存储格式,讲解了不同字段中的作用,正是这些元数据管理保证了iceberg能够进行高效快速的查询,后续会根据这些文件进一步分析iceberg写入和查询过程。
原文链接:https://blog.csdn.net/u012794915/article/details/111831471