Presto 简略介绍


Presto 0.271 Documentation

? presto 原有团队重立了 presto 项目并改名为 prestosql,后来又改名为 Trino,这个版本不支持 Java 8,以及 Java 11 的低级版本,但拥有更活跃的社区和更多的贡献者。

概览

? presto 不是一个关系型数据库,不能代替MySQL,PostgreSQL 或者 Oracle,它不是用来处理 OLTP。

? presto 是一个能使用分布式查询高效地查询大规模数据的工具。它被设计为一个替代使用 MapReduce jobs 通道查询 HDFS的工具,如 Hive 或者 Pig。但是它不限于访问 HDFS。它可以并已经扩展操作不同的数据源,包括传统的关系型数据库和其他数据源比如 Cassandra。

? Presto 旨在处理数据仓库和分析:数据分析、聚合大量数据并生成报告,这些工作通常被归为在线分析处理(OLAP)

概念

? 需要理解下面这些名词和概念。作为用户端,应该需要熟悉注入 stage 和 split 的概念来充分使用 presto 高效地查询。作为 presto 管理员 或者贡献者,需要理解 presto stage 的概念如何映射到 task 并且 task 包含一组驱动以处理数据。

? 本节,提供一些 presto 引用的核心概念的可靠定义,本节从最一般到最具体进行排序。

server type

? 服务器类型:coordinator、worker

coordinator

? presto coordinator 是一个负责解析语句,计划查询,管理 worker 节点的服务器。作为 presto 的大脑,它同时也是与客户端通信以提交语句来执行的节点。每个 presto 安装必须有一个 coordinator 和 一个或多个 worker。对于开发和测试的目的,可以配置单个 presto 实例承担这两个角色。

? coordinator 保持跟踪每一个worker 的活动,并且协调查询的执行。coordinator 创建一个查询的逻辑模型,这个模型涉及的一系列 stage 被翻译成一系列连接的 task 运行在集群中的 worker 上。

? coordinator 使用 REST API 与 worker 通信。

worker

? Presto worker 是一个负责执行 task 并处理数据的服务器。worker 节点从连接器获取数据,并相互交换数据。coordinator 负责从worker获取结果并返回给最终结果为客户端。

? 当 presto worker 处理启动,它将自己介绍给 coordinator 的发现服务器,这使得 presto coordinator 可以使用 worker 做 task 执行。

? worker 使用 REST API 与 coordinator 和其他 worker 通信。

Data Source

? 通过这篇文档,你将阅读到诸如 connector, catalog, schema, and table 的名词。这些基本概念覆盖了指定数据源 presto 模型,在下面的章节描述。

Connector

? 连接器适配 presto 到数据源如 Hive 和 关系型数据库。你可以将连接器和数据库的驱动是一样的。这是一种 prseto 的 SPI 的实现,允许 presto 使用标准 API 与数据源交互。

? presto 包含几个内置的连接器:JMX 连接器,System 连接器提供访问系统内置表,Hive 连接器和 TPCH 连接器用来服务于 TPC-H benchmark 数据。很多第三方开发者已经贡献了连接器,prseto 可以访问各种数据源。

? 每个目录都与特定的连接器相关联。每个 catalog 配置文件包含一个必填属性 connector.name 被 catalog 管理器用来为一个给定的 catalog 创建连接器。可以有多个 catalog 使用同一个 connector 访问相似数据库的两个不同的实例。例如,你有两个 Hive 集群,你可以在一个 presto 集群配置两个 catalog 都是用 Hive connector,允许你从两个Hive 集群查询,甚至在同一个 SQL 中。

Catalog

? presto catalog 包含 schema 并通过 connector 引用数据源。例如,可以配置 JMX catalog 经由 JMX connector 提供访问 JMX 信息。当你在 presto 运行一个 SQL 语句时,你针对一个或多个 catalog 运行它。

? 当在 presto 定位一张表时,全限定表名需要配置在 catalog 中。例如一个全限定表名 hive.test_data.test 会引用 在 hive catalog 下的 test_data schema 的 test 表。

? 在存储在配置文件目录下的配置文件中定义 catalog 。

Schema

? schema 是组织表的一种方式。catalog 与 schema 一起定义表的集合,以被查询。当在 presto 访问 Hive 或者关系型数据库时,比如 MySQL,schema 转译成为在目标数据库的相同概念。其他类型的 connector 可能选择以对下游数据源有意义的方式组织表到 schema 。

Table

Query Execution Model

? presto 执行 SQL 语句并将这些语句转换为在 coordinator 和 worker 的分布式集群上执行的语句。

Statement

? presto 执行 兼容ANSI 的SQL 语句。当 presto 文档提及 statement 时,它指的是由子句,表达式,谓词组成的定义在 ANSI SQL 标准的语句。

? 一些读者可能会好奇为什么这节列举了语句和查询不同概念。这是必要的,因为在 presto, statement 只是指 SQL 语句的文本表示。 执行语句时,Presto 会创建一个查询以及一个查询计划,该计划随后分布在一系列 Presto worker 中。

Query

? 当 presto 解析一条语句,它将其转换为一条查询并创建分布式查询计划,然后实现为一系列内连接的 stage 运行在 presto workder中。当你在 presto 获取关于一条查询的信息时,你接收到一个每个件的快照,这些组件涉及生成响应语句的结果集。

? 语句和查询的区别很简单。一个语句可以认为是 SQL 文本,传递给 Presto,然而查询指向配置和组件具体执行那条语句。查询包含 stages, tasks, splits, connectors, 和其他 components,和数据源在?生成结果。

Stage

? 当 presto 执行查询时,它将执行拆分为一个层级结构的 stage 来执行它。例如,如果presto 需要聚合在 Hive 的十亿行数据,它创建一个根 stage 来聚合其他几个 stage 的输出,这些 stage 被设计为实现分布式查询计划的不同部分。

? 包含查询的 stage 的层级结构类似于一棵树。每个查询有一个根 stage 负责聚合其他 stage 的输出。stage 是 coordinator 用来模型化一个分布式查询计划,但是 stage 本身不运行在 presto worker。

Task

? stage 对分布式查询计划的一个特定部分进行建模,但 stage 本身不运行在 presto worker。为了理解 stage 如何执行,你需要理解 stage 被实现为一系列分布在 worker 网络上的 task。

? task 是 Presto 架构中的“工作马”,因为分布式查询计划被解构为一系列 stage,然后将这些 stage 转换为 task,然后执行或处理拆分。 Presto task 有输入和输出,就像一个阶段可以由一系列任务并行执行一样,一个任务与一系列驱动程序并行执行。

Split

? task 操作 split,大数据集的部分。分布式查询计划中最低级的的 stage 从 connector 的分片中获取数据,分布式查询计划中中间的 stage 从其他 stage 中获取数据。

? 当 presto 调度查询时,coordinator 会查询 connector 得到所有表可用的分片的列表。coordinator 跟踪哪个机器正在运行,哪个 task 和 什么 split 正在被处理。

Driver

? task 包括一个或多个并行的 driver。driver 操作数据并结合操作符生产输出,然后由 task 聚合传输到其他的 stage 的 task 。driver 时一系列操作符实例,或者你可以认为driver 是一个内存中操作符的物理集合。这是在presto 结构中最低级的并行。driver由一个输入和一个输出。

Operator

? operator 消费,转换并生产数据。例如,表扫描从 connector 获取数据并生产可以被其他算子消费的数据,和过滤算子通过应用谓词到输入数据来消费数据并生产子集

Exchange

? exchange 在 presto 节点间传输数据给查询的不同 stage。task 生产数据到输出缓冲区,并使用 exchange 客户端从其他 task 消费数据。

安装

部署

安装

? 下载 Presto 服务器tar包并解压。里面包括一个最高级目录,称为安装目录。Presto 需要一个数据目录来存储日志等。建议创建一个数据目录在安装目录之外,这样可以在更新的时候方便地保留。

配置

? 在安装目录中创建 etc 目录。里面包含以下配置文件

  • Node Properties:环境配置指定每个节点
  • JVM Config:操作Java 虚拟机的命令行选项
  • Config Properties:Presto 服务器的配置
  • Catalog Properties: 数据源适配器的配置。
Node Properties

? etc/node.properties 包含每个节点的配置。一个 节点 是一个在机器上安装了 Presto 的实例。该文件通常由在 Presto 第一次安装时由部署系统创建的。下面时一个最小化的配置。

node.environment=production
node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
node.data-dir=/var/presto/data
  • node.environment: 环境名称。在同一个集群的所有 Presto 节点需要有相同的名称。
  • node.id: 唯一标识。对于每个节必须是唯一的。此标识符应在 Presto 重新启动或升级期间保持一致。如果在一台机器上运行多个 Presto 安装(即同一台机器上的多个节点),每个安装必须有一个唯一标识符。
  • node.data-dir: 数据目录的位置(文件系统路径)。Presto 将在此处存储日志和其他数据。
JVM Config

jvm.config

coordinator:

coordinator=true
node-scheduler.include-coordinator=false
http-server.http.port=8080
query.max-memory=50GB
query.max-memory-per-node=1GB
query.max-total-memory-per-node=2GB
discovery-server.enabled=true
discovery.uri=http://node1:8080

workers:

coordinator=false
http-server.http.port=8080
query.max-memory=50GB
query.max-memory-per-node=1GB
query.max-total-memory-per-node=2GB
discovery.uri=http://node1:8080

如果 coordinator 和 workers 在同一个机器上做测试:

coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
query.max-memory=5GB
query.max-memory-per-node=1GB
query.max-total-memory-per-node=2GB
discovery-server.enabled=true
discovery.uri=http://node1:8080
  • coordinator: 允许 presto 实例作为 coordinator 接收客户端的查询并管理查询的执行。
  • node-scheduler.include-coordinator: 允许在 coordinator 上调度任务。对于大集群,在 coordinator 上处理任务会影响查询性能,因为机器资源对调度,管理,监控查询执行的关键任务不可用。
  • http-server.http.port: 指定 HTTP 服务的端口。presto 使用 http 进行内部和外部通信。
  • query.max-memory: 一个查询可以使用的分布式内存最大值。
  • query.max-memory-per-node: 一台机器上一个查询使用的用户内存最大值。
  • query.max-total-memory-per-node: 一台机器上一个查询使用的用户和系统内存最大值。系统内存是在由 reader,writer,网络缓冲等,在执行过程中使用的内存。
  • discovery-server.enabled: presto 使用发现服务找到集群中的所有节点。每个 presto 实例会在启动时向发现服务注册。为了简化部署,避免运行额外的服务,presto coordinator 可以运行一个发现服务的内置版本。这个服务会与 presto 共享http 服务器,因此会使用相同端口。
  • discovery.uri: 发现服务器的 URI。因为我们在 presto coordinator 启动了发现服务的内嵌版本,这应该为 presto coordinator 的 URI。替换example.net:8080来适配 presto coordinator 的 host 和 port。URI 不能以斜杠结尾。

Properties Reference 参考通用配置

Config Properties

config.properties

coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
query.max-memory=5GB
query.max-memory-per-node=1GB
query.max-total-memory-per-node=2GB
discovery-server.enabled=true
discovery.uri=http://node1:8080
Log Levels

log.properties

com.facebook.presto=INFO
Node Properties
node.environment=production
node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
node.data-dir=/yg/install/presto-server-0.271/data
Catalog Properties

etc/catalog/

? presto 通过 挂载到 catalog 的 connector 访问数据。connector 在 catalog 提供所有 schema 和 table 。例如,Hive connector 映射每个数据库到 schema

命令行客户端

? presto cli 提供一个基于终端的交互式 shell 来运行查询。这个客户端时自执行的 jar 文件,意思是它像一个普通 UNIX 可执行。

? 下载 presto-cli-0.271-executable.jar ,重命名为 presto,让它拥有可执行权限,然后运行

./presto --server localhost:8080 --catalog hive --schema default

运行 --help 查看可用选项

示例

  1. 安装好 MySQL 并关闭防火墙,打开用户远程登录权限

  2. presto-server/etc/catalog 下编写配置文件 mysql.properties

connector.name=mysql
connection-url=jdbc:mysql://192.168.40.2:3306?
connection-user=root
connection-password=123456
  1. 启动 presto server(coordinator、worker)
bin/launcher run	//前台运行,打印日志
bin/launcher start	//后台运行
bin/launcher stop	//停止
  1. 启动 presto-cli-0.271-executable.jar 下载之后改名为 presto,并添加执行权限
./presto --server localhost:8080 --catalog mysql --schema default
  1. 执行 SQL 语句
show schemas from mysql;
select count(1),job from toys.bank_additional_full3 group by job;
  1. 在 UI 界面查看集群状态和 SQL 执行情况。
http://node1:8080

溢出到磁盘

内存管理和溢出

? 默认情况下,presto 会杀掉那些请求的内存超出配置限制的查询。()query_max_memory or query_max_memory_per_node)。这个机制保证了内存分配给查询的公平性和避免由内存分配造成的死锁。这在集群中有很多小查询时很高效,但是会杀掉超出限制的大查询。

? 为了克服这种低效率,这里引入 可撤销内存 的概念。查询可以请求不计入限制的内存,但是可以倍内存管理器随时撤销。当内存被撤销时,查询运行器将中间数据从内存溢出到磁盘,一遍稍后处理它。

? 实际上,当集群空闲,并且所有内存可用,内存紧张的查询可以使用集群中所有的内存。另一方面,当集群没有太多空闲内存时,一些查询会被强制使用磁盘存储中间数据。被溢出到磁盘的查询的执行时间可能要长几个数量级,相比完全在内存中运行的查询。

? 请注意,启用溢出到磁盘不保证所有内存紧张的查询执行。有一些内存紧张的操作不支持溢出。查询运行器有可能无法将中间数据到划分成足够小的能够适合内存的 chunk ,进而从磁盘加载数据时导致OOM错误。

函数和操作符

比较操作符

操作符 说明
<
>
<=
>=
=
<>
!= 非标准,但是支持

Range Operator: BETWEEN

SELECT 3 BETWEEN 2 AND 6;
SELECT 3 >= 2 AND 3 <= 6;

SELECT 3 NOT BETWEEN 2 AND 6;
SELECT 3 < 2 OR 3 > 6;

SELECT NULL BETWEEN 2 AND 4; -- null	任何涉及 NULL 的比较都会返回 NULL
SELECT 2 BETWEEN NULL AND 6; -- null

SELECT 'Paul' BETWEEN 'John' AND 'Ringo'; -- true

IS NULL and IS NOT NULL

select NULL IS NULL; -- true
SELECT 3.0 IS NULL; -- false

IS DISTINCT FROM and IS NOT DISTINCT FROM

因为涉及到 NULL 的比较会返回 NULL, 因此这两个操作符将 NULL 视为已知值, 总是会返回 True False

SELECT NULL IS DISTINCT FROM NULL; -- false
SELECT NULL IS NOT DISTINCT FROM NULL; -- true

GREATEST and LEAST

非 SQL 标准, 但是是常见的扩展。若其中一个参数为 NULL 则返回 NULL, 支持下面的格式:

  • DOUBLE

  • BIGINT

  • VARCHAR

  • TIMESTAMP

  • TIMESTAMP WITH TIME ZONE

  • DATE

greatest(value1, value2, ..., valueN) 返回提供的最大的值

Quantified Comparison Predicates: ALL, ANY and SOME

ALL , ANY , SOME 可以和比较操作符一起用

expression operator quantifier ( subquery )
SELECT 'hello' = ANY (VALUES 'hello', 'world'); -- true

SELECT 21 < ALL (VALUES 19, 20, 21); -- false

SELECT 42 >= SOME (SELECT 41 UNION ALL SELECT 42 UNION ALL SELECT 43); -- true
表达式 意义
A = ALL (...) 等于所有值
A <> ALL (...) 不匹配所有值
A < ALL (...) 比最小的小
A = ANY (...) 等于任意一个值,等价于 A IN (...)
A <> ANY (...) 不匹配一个或多个值
A < ANY (...) A 比最大的值小

ANY SOME 具有相同的意思, 可以互相使用。

LIKE

正则匹配

expression LIKE pattern [ ESCAPE 'escape_character' ]

[ ESCAPE 'escape_character' ] 代表转义后面的字符

SELECT * FROM (VALUES ('a_c'), ('_cd'), ('cde')) AS t (name)
WHERE name LIKE '%#_%' ESCAPE '#'
--returns 'a_c' and  '_cd'

条件表达式

CASE

标准SQL

IF

if(condition, true_value)

COALESCE

coalesce(value1, value2[, ...]) 返回第一个非 null 的值

NULLIF

nullif(value1, value2)

TRY

try(expression)

评估一个表达式,遇到指定的错误类型时返回 NULL

TRY 处理的错误:

  • Division by zero
  • Invalid cast argument or invalid function argument
  • Numeric value out of range
例子

与 coalesce 结合使用:

SELECT COALESCE(TRY(total_cost / packages), 0) AS per_package FROM shipping;

转换函数

  1. cast(value AS type) → type#

  2. try_cast(value AS type) → type# 与cast 类似,但是失败时返回 null 。

  3. parse_presto_data_size(string)#

SELECT parse_presto_data_size('1B'); -- 1
SELECT parse_presto_data_size('1kB'); -- 1024
SELECT parse_presto_data_size('1MB'); -- 1048576
SELECT parse_presto_data_size('2.3MB'); -- 2411724
  1. typeof(expr) → varchar#
SELECT typeof(123); -- integer
SELECT typeof('cat'); -- varchar(3)
SELECT typeof(cos(2) + 1.5); -- double

数学函数和操作符

未完...