Datahub小结
往datahub发送数据时,建议使用Producer。好处是不用设置shardId,这样datahub在增加或减少shard时,业务代码都不需要变更。
另外datahub的shardId只会往前增,老的数据不用,只能停用。
<dependency> <groupId>com.aliyun.datahubgroupId> <artifactId>aliyun-sdk-datahubartifactId> <version>2.18.0-publicversion> dependency> <dependency> <groupId>com.aliyun.datahubgroupId> <artifactId>datahub-client-libraryartifactId> <version>1.1.12-publicversion> dependency>
import com.aliyun.datahub.client.exception.AuthorizationFailureException; import com.aliyun.datahub.client.exception.DatahubClientException; import com.aliyun.datahub.client.exception.InvalidParameterException; import com.aliyun.datahub.client.exception.MalformedRecordException; import com.aliyun.datahub.client.exception.NoPermissionException; import com.aliyun.datahub.client.exception.ShardNotFoundException; import com.aliyun.datahub.client.model.Field; import com.aliyun.datahub.client.model.FieldType; import com.aliyun.datahub.client.model.RecordEntry; import com.aliyun.datahub.client.model.RecordSchema; import com.aliyun.datahub.client.model.TupleRecordData; import com.aliyun.datahub.clientlibrary.config.ProducerConfig; import com.aliyun.datahub.clientlibrary.producer.Producer; import com.aliyun.datahub.exception.ResourceNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; public class DatahubWriter { private static final Logger LOG = LoggerFactory.getLogger(DatahubWriter.class); private static void sleep(long milliSeconds) { try { TimeUnit.MILLISECONDS.sleep(milliSeconds); } catch (InterruptedException e) { // TODO:自行处理异常 } } private static ListgenRecords(RecordSchema schema) { List recordEntries = new ArrayList<>(); for (int cnt = 0; cnt < 10; ++cnt) { RecordEntry entry = new RecordEntry(); entry.addAttribute("key1", "value1"); entry.addAttribute("key2", "value2"); TupleRecordData data = new TupleRecordData(schema); data.setField("field1", "testValue"); data.setField("field2", 1); entry.setRecordData(data); recordEntries.add(entry); } return recordEntries; } private static void sendRecords(Producer producer, List recordEntries) { int maxRetry = 3; while (true) { try { // 自动选择shard写入 producer.send(recordEntries, maxRetry); // 指定写入shard "0" // producer.send(recordEntries, "0", maxRetry); LOG.error("send records: {}", recordEntries.size()); break; } catch (MalformedRecordException e) { // record 格式非法,根据业务场景选择忽略或直接抛异常 LOG.error("write fail", e); throw e; } catch (InvalidParameterException | AuthorizationFailureException | NoPermissionException e) { // 请求参数非法 // 签名不正确 // 没有权限 LOG.error("write fail", e); throw e; } catch (ShardNotFoundException e) { // shard 不存在, 如果不是写入自己指定的shard,可以不用处理 LOG.error("write fail", e); sleep(1000); } catch (ResourceNotFoundException e) { // project, topic 或 shard 不存在 LOG.error("write fail", e); throw e; } catch (DatahubClientException e) { // 基类异常,包含网络问题等,可以选择重试 LOG.error("write fail", e); sleep(1000); } } } public static void main(String[] args) { // Endpoint以Region: 华东1为例,其他Region请按实际情况填写 String endpoint = "http://dh-cn-hangzhou.aliyuncs.com"; String accessId = " "; String accessKey = " "; String projectName = " "; String topicName = " "; RecordSchema schema = new RecordSchema(); schema.addField(new Field("field1", FieldType.STRING)); schema.addField(new Field("field2", FieldType.BIGINT)); ProducerConfig config = new ProducerConfig(endpoint, accessId, accessKey); Producer producer = new Producer(projectName, topicName, config); // 根据场景控制循环 boolean stop = false; try { while (!stop) { List recordEntries = genRecords(schema); sendRecords(producer, recordEntries); } } finally { // 确保资源正确释放 producer.close(); } } }
上面示例中的RecordSchema也可以通过datahubclient动态获取:
RecordSchema recordSchema = datahubClient.getTopic(projectName, topicName).getRecordSchema();
初始化datahubClient的办法:
// https://help.aliyun.com/document_detail/158841.html
// Endpoint以Region: 华东1为例,其他Region请按实际情况填写 String endpoint = "http://dh-cn-hangzhou.aliyuncs.com"; String accessId = ""; String accessKey = " "; // 创建DataHubClient实例 DatahubClient datahubClient = DatahubClientBuilder.newBuilder() .setDatahubConfig( new DatahubConfig(endpoint, // 是否开启二进制传输,服务端2.12版本开始支持 new AliyunAccount(accessId, accessKey), true)) //专有云使用出错尝试将参数设置为 false // HttpConfig可不设置,不设置时采用默认值 .setHttpConfig(new HttpConfig() .setCompressType(HttpConfig.CompressType.LZ4) // 读写数据推荐打开网络传输 LZ4压缩 .setConnTimeout(10000)) .build();
可能会出现的报错:
[{"errorcode":"LimitExceeded","index":1,"message":"The limit of throughput rate is exceeded."},{"errorcode":"LimitExceeded","index":2,"message":"The limit of throughput rate is exceeded."}],"requestId":"202203101111111111111"}
报错的原因,超过了相关指标。
指标描述及查看:
Web Console目前提供Metric功能,用户可以通过Metric界面查看准实时的Topic级别流量等信息,目前提供的指标有:
QPS:读写Request/Second
RPS: 读写Record/Second
Throughput:读写Throughput/Second (单位KB)
Latency:读写请求Latency/Request (单位微秒)
https://help.aliyun.com/document_detail/158786.html
相关限制描述【超过会报错】:
https://help.aliyun.com/document_detail/47441.html
以下三个指标都是基于数据包大小的,只是不同的维度:
单个String长度:是针对单个filed的
Http BodySize:这个限制是针对单个写入请求
Throughput限制 :是某个时间点,所有请求加起来的大小 。如果超限,则报错:
The limit of throughput rate is exceeded.
关于RPS批量提交场景的统计规则:
ListrecordEntries
producer.send(recordEntries, maxRetry);
调批量接口时,是不是List中的多少条,就有多少条Record。
从api调用看只有1次,但RPS可能很大,譬如List有1万条,那么RPS就是1万
sdk中批量api将待推送的数据作为一个整体发给datahub,然后DataHub收到后一条一条处理