[阿里云]Datahub测试使用记录


由于需要测试阿里云Datahub功能,因此测了一下Datahub的一些功能

DATAHUB: 简介: 阿里云的流式数据(streaming)处理平台 对流式数据的发布(publish)订阅(subscribe)和分发功能   主要功能: 采集实时数据,如移动设备,传感器,网站服务等 使用脚本或流计算引擎来处理写入datahub的数据 最后生成实时图表/报警信息等   术语: project:项目,包含多个topic topic:可以表示一种类型的流,订阅和发布单位 shard:topic的并发通道 record:用户数据与datahub端交互的基本单位 recordtype:topic的数据类型,支持tuple和blob DataConnect:把datahub中的流式数据同步到其他云产品中的功能,现在支持odps/oss/es/mysql   操作过程 首先在新建project,注意管理员账号 注意授权信息 参考<授权信息管理>   然后进入后,创建topic schema是指column,可以选择多种数据类型   新建DataConnect,来设置下游数据   设置maxcompute连接   使用python来插入数据到topic
import sys
import traceback
from datahub import DataHub
from datahub.exceptions import ResourceExistException
from datahub.models import FieldType, RecordSchema, TupleRecord, BlobRecord, CursorType, RecordType

access_id = 
access_key = 
endpoint = 'https://dh-cn-shanghai.aliyuncs.com'
dh = DataHub(access_id, access_key, endpoint)


##写入
project_name=
topic_name = 
try:
    # block等待所有shard状态ready
    dh.wait_shards_ready(project_name, topic_name)
    print("shards all ready!!!")
    print("=======================================\n\n")
    topic_result = dh.get_topic(project_name, topic_name)
    print(topic_result)
    if topic_result.record_type != RecordType.TUPLE:
        print("topic type illegal!")
        sys.exit(-1)
    print("topic type normal")
    print("=======================================\n\n")
    record_schema = topic_result.record_schema
    records0 = []
    record0 = TupleRecord(schema=record_schema, values=['1', '2yc1', '30.01', '4True', '5455869335000000','6','1455869335000000'])
    record0.shard_id = '0'
    record0.put_attribute('AK', '47')
    records0.append(record0)
    for i in range (1,10000):
        record2 = TupleRecord(schema=record_schema)
        record2.set_value(0, str(i))
        record2.set_value(1, str(i)+'yc3')
        record2.set_value(2,  str(i+1.1))
        record2.set_value(3, str(i))
        record2.set_value(4, '1455869335000011')
        record2.set_value(5, '20180913_1115')
        record2.set_value(6, int(time.time())*1000000)
        record2.attributes = {'key': 'value'}
        record2.partition_key = 'EVENT_TIME'
        records0.append(record2)

    put_result = dh.put_records(project_name, topic_name, records0)
    print(put_result)
    print("put tuple %d records, failed count: %d" %(len(records0), put_result.failed_record_count))
    # failed_record_count如果大于0最好对failed record再进行重试
    print('结束')
    print
    print("=======================================\n\n")
except DatahubException as e:
    print(e)
    sys.exit(-1)

进行验证数据导入

maxcompute默认是五分钟或者50M触发一次同步,如果需要实时的就要rds登场了 这样就测试完成了.后期进行压测,待续..