Python 华为云OSS建桶与文件上传下载删除及检索示例


华为云OSS建桶与文件上传下载删除及检索示例

实践环境

运行环境:
Python 3.5.4
CentOS Linux release 7.4.1708 (Core)/Win10

需要安装以下类库:
https://github.com/huaweicloud/huaweicloud-sdk-python-obs/blob/master/release/huaweicloud-obs-sdk-python_3.22.2.zip

示例

import traceback

try:
    from obs import ObsClient
    from obs import StorageClass
    from obs import CreateBucketHeader
    from obs import DeleteObjectsRequest, Object

    obs_client = None
    obs_client = ObsClient(
      access_key_id='15K7IJ2GKROGEEEFGG51',
      secret_access_key='HVern8SOfk7SAzCEqDoVXySXZgcP7rpf0iJqiyKd',
      server='https://obs.cn-south-1.myhuaweicloud.com'
    )

    bucket_name = 'mybucket' # 必须全局唯一 -- 如果该名称被其它账户使用了,那么不能再用该名称创建通过桶

    location = 'cn-south-1'

    # 检查桶是否存在
    resp = obs_client.headBucket(bucket_name)
    if resp.status < 300:
        print('Bucket %s 已存在' % bucket_name)
    elif resp.status == 404:
        print('Bucket %s 不存在' % bucket_name)

        # 创建桶
        resp = obs_client.createBucket(bucketName=bucket_name, header=CreateBucketHeader(aclControl='private', storageClass=StorageClass.WARM), location=location)
        if resp.status < 300:
            print('requestId:', resp.requestId)
        else:
            print('errorCode:', resp.errorCode)
            print('errorMessage:', resp.errorMessage)

    #  上传本地文件
    resp = obs_client.putFile(bucket_name, 'f2b/artifacts/artifacts-0bb40b.tar.gz', 'd:\\artifacts-17a86f.tar.gz')
    if resp.status < 300:
        print('requestId:', resp.requestId)
        print('etag:', resp.body.etag)
        print('versionId:', resp.body.versionId)
        print('storageClass:', resp.body.storageClass)
    else:
        print('errorCode:', resp.errorCode)
        print('errorMessage:', resp.errorMessage)

    # 下载文件 # downloadPath: 下载的文件本地存储路径,如果不存在,会自动创建。注意:如果要将文件存在到当前路径下,不能仅写文件,会报错,需要写成 ./文件名称,形如 ./artifacts.tar.gz'
    # resp = obs_client.getObject(bucket_name, 'f2b/artifacts/artifacts-0bb40b.tar.gz', downloadPath='./artifacts.tar.gz')
    resp = obs_client.getObject(bucket_name, 'f2b/artifacts/artifacts-0bb40b.tar.gz', downloadPath='d:\\testdir\\artifacts.tar.gz')
    if resp.status < 300:
        print('requestId:', resp.requestId)
        print('url:', resp.body.url)
    else:
        print('errorCode:', resp.errorCode)
        print('errorMessage:', resp.errorMessage)

    #列举桶内对象
    # resp = obs_client.listObjects(bucket_name, prefix = None, max_keys=2)  # 如果不需要过滤,则prefix可以设置为None、空字符串
    # 如果查询到的文件对象数量超过max_keys则 resp.body.next_marker为下轮查询结果列表中某个对象的key,否则为None
    resp = obs_client.listObjects(bucket_name, prefix='f2b/artifacts', max_keys=5)
    while resp.status < 300 and resp.body.contents:
        print('requestId:', resp.requestId)
        print('name:', resp.body.name)
        print('prefix:', resp.body.prefix)
        print('max_keys:', resp.body.max_keys)
        print('is_truncated:', resp.body.is_truncated)
        index = 1
        print(resp.body.next_marker)
        for content in resp.body.contents:
            print('object [' + str(index) + ']')
            print('key:', content.key)
            print('lastModified:', content.lastModified)
            print('etag:', content.etag)
            print('size:', content.size)
            print('storageClass:', content.storageClass)
            print('owner_id:', content.owner.owner_id)
            print('owner_name:', content.owner.owner_name)
            index += 1

        if resp.body.next_marker:
            resp = obs_client.listObjects(bucket_name, prefix='f2b/artifacts', marker=resp.body.next_marker, max_keys=5)
        else:
            resp.body.contents = []
    else:
        if resp.status > 300:
            print('errorCode:', resp.errorCode)
            print('errorMessage:', resp.errorMessage)


    # 删除对象
    object1 = Object(key='f2b/artifacts/artifacts-0bb40b.tar.gz', versionId=None)
    resp = obs_client.deleteObjects(bucket_name, DeleteObjectsRequest(quiet=False, objects=[object1]))

    if resp.status < 300:
        print('requestId:', resp.requestId)
        if resp.body.deleted:
            index = 1
            for delete in resp.body.deleted:
                print('delete[' + str(index) + ']')
                print('key:', delete.key, ',deleteMarker:', delete.deleteMarker, ',deleteMarkerVersionId:', delete.deleteMarkerVersionId)
                print('versionId:', delete.versionId)
                index += 1
        if resp.body.error:
            index = 1
            for err in resp.body.error:
                print('err[' + str(index) + ']')
                print('key:', err.key, ',code:', err.code, ',message:', err.message)
                print('versionId:', err.versionId)
                index += 1
    else:
        print('errorCode:', resp.errorCode)
        print('errorMessage:', resp.errorMessage)
except Exception:
    print('%s' % traceback.format_exc())
finally:
    if obs_client:
        obs_client.close()

参考连接

https://support.huaweicloud.com/sdk-python-devg-obs/obs_22_0803.html
https://support.huaweicloud.com/sdk-python-devg-obs/obs_22_0801.html
https://support.huaweicloud.com/sdk-python-devg-obs/obs_22_0904.html
https://support.huaweicloud.com/sdk-python-devg-obs/obs_22_0911.html
https://support.huaweicloud.com/sdk-python-devg-obs/obs_22_0805.html
https://support.huaweicloud.com/sdk-python-devg-obs/obs_22_0919.html
https://support.huaweicloud.com/api-obs/obs_04_0022.html