diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 2c02d0b..a9d38b4 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,5 +1,23 @@ TOS SDK for Python 版本记录 =========================== +Version 2.7.0 +------------- +- 新增:桶配置服务端加密的相关接口 +- 新增:事件通知配置type2接口 +- 新增:桶生命周期规则支持指定按size范围过滤及历史版本指定具体日期 +- 新增:桶生命周期规则支持按前缀重叠参数 +- 新增:初始化client参数disable_encoding_meta/except100_continue_threshold +- 新增:异步抓取支持设置回调 +- 新增:镜像回源支持配置将源端的头域写入自定义元数据 +- 新增:get_object/head_object支持CRR复制状态(replication_status) +- 新增:列举对象新增fetch_meta参数 +- 修改:同步/异步抓取接口适配content_md5 +- 修复:content-disposition编码问题 + +Version 2.6.11 +------------- +- 修复:get_fetch_task接口 + Version 2.6.10 ------------- - 增加:get_fetch_task接口 diff --git a/tests/common.py b/tests/common.py index d7ee301..29d5c40 100644 --- a/tests/common.py +++ b/tests/common.py @@ -74,6 +74,8 @@ def __init__(self, *args, **kwargs): self.sseKey = "Y2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2M=" self.sseKeyMd5 = "ACdH+Fu9K3HlXdIUBu8GdA==" self.sseAlg = "AES256" + self.callback_url = os.getenv('CallbackUrl') + self.cloud_function = os.getenv('CloudFunction') def setUp(self): self.client = TosClientV2(self.ak, self.sk, self.endpoint, self.region, enable_crc=True, max_retry_count=2) diff --git a/tests/test_v2_bucket.py b/tests/test_v2_bucket.py index 0596b6b..105c3d9 100644 --- a/tests/test_v2_bucket.py +++ b/tests/test_v2_bucket.py @@ -6,12 +6,14 @@ import time as tim import unittest +import crcmod from pytz import UTC import tos from tests.common import TosTestBase, clean_and_delete_bucket from tos import TosClientV2 from tos.checkpoint import TaskExecutor +from tos.consts import BUCKET_TYPE_HNS from tos.credential import EnvCredentialsProvider from tos.enum import ACLType, StorageClassType, RedirectType, StatusType, PermissionType, CannedType, GranteeType, \ VersioningStatusType, ProtocolType, AzRedundancyType, StorageClassInheritDirectiveType, CertStatus @@ -23,7 +25,9 @@ Destination, RedirectAllRequestsTo, IndexDocument, ErrorDocument, RoutingRules, RoutingRule, \ RoutingRuleCondition, RoutingRuleRedirect, CustomDomainRule, RealTimeLogConfiguration, AccessLogConfiguration, \ CloudFunctionConfiguration, Filter, FilterKey, FilterRule, RocketMQConfiguration, RocketMQConf, Transform, \ - ReplaceKeyPrefix + ReplaceKeyPrefix, FetchHeaderToMetaDataRule, BucketEncryptionRule, ApplyServerSideEncryptionByDefault, \ + BucketLifecycleFilter, NotificationRule, NotificationFilter, NotificationFilterKey, NotificationFilterRule, \ + NotificationDestination, DestinationVeFaaS, DestinationRocketMQ tos.set_logger() @@ -33,6 +37,53 @@ class TestBucket(TosTestBase): def test_ua(self): assert 'v' in tos.clientv2.USER_AGENT + def test_hns_bucket(self): + bucket_name = self.bucket_name + "hcl" + self.bucket_delete.append(bucket_name) + # bucket_name = "sun-eafrofzkzphcl" + rsp = self.client.create_bucket(bucket_name, bucket_type="hns") + print(rsp) + assert rsp.status_code == 200 + rsp = self.client.head_bucket(bucket=bucket_name) + assert rsp.status_code == 200 + assert rsp.bucket_type == BUCKET_TYPE_HNS + key = "hns/test/1.txt" + rsp = self.client.put_object(bucket=bucket_name, key=key, content="hello") + assert rsp.status_code == 200 + rsp = self.client.get_file_status(bucket=bucket_name, key=key) + assert rsp.status_code == 200 + append_key = "hns/test/2.txt" + rsp = self.client.put_object(bucket=bucket_name, key=append_key) + assert rsp.status_code == 200 + rsp = self.client.append_object(bucket=bucket_name, key=append_key, offset=0, content="hello1",) + offset = rsp.next_modify_offset + assert rsp.status_code == 200 + assert offset == 6 + rsp = self.client.append_object(bucket=bucket_name, key=append_key, offset=offset, content="hello2") + offset = rsp.next_modify_offset + assert rsp.status_code == 200 + assert offset == 12 + rsp = self.client.get_object(bucket=bucket_name, key=append_key) + data = rsp.read().decode('utf-8') + assert data == 'hello1hello2' + + + def test_get_file_status(self): + bucket_name = self.bucket_name + "basic" + self.bucket_delete.append(bucket_name) + out = self.client.create_bucket(bucket_name) + assert out.status_code == 200 + key = "hns/test/1.txt" + content = "hello" + rsp = self.client.put_object(bucket=bucket_name, key=key, content=content) + assert rsp.status_code == 200 + rsp = self.client.get_file_status(bucket=bucket_name, key=key) + assert rsp.status_code == 200 + assert rsp.key == key + do_crc64 = crcmod.mkCrcFun(0x142F0E1EBA9EA3693, initCrc=0, xorOut=0xffffffffffffffff, rev=True) + c1 = do_crc64(content.encode()) + assert rsp.crc64 == str(c1) + def test_bucket(self): bucket_name = self.bucket_name + "basic" self.bucket_delete.append(bucket_name) @@ -224,6 +275,7 @@ def test_get_location(self): def test_bucket_mirror(self): bucket_name = self.bucket_name + 'mirror' self.client.create_bucket(bucket=bucket_name) + self.bucket_delete.append(bucket_name) rules = [] rules.append(Rule( id='1', @@ -236,7 +288,8 @@ def test_bucket_mirror(self): follow_redirect=True, mirror_header=MirrorHeader(pass_all=True, pass_headers=['aaa', 'bbb'], remove=['xxx', 'xxx']), transform=Transform(with_key_prefix='prefix', with_key_suffix='suffix', - replace_key_prefix=ReplaceKeyPrefix(key_prefix='prefix1', replace_with='replace')) + replace_key_prefix=ReplaceKeyPrefix(key_prefix='prefix1', replace_with='replace')), + fetch_header_to_meta_data_rules=[FetchHeaderToMetaDataRule(source_header='a', meta_data_suffix='b')] ) )) put_out = self.client.put_bucket_mirror_back(bucket=bucket_name, rules=rules) @@ -259,6 +312,9 @@ def test_bucket_mirror(self): self.assertEqual(get_out.rules[0].redirect.transform.with_key_suffix, 'suffix') self.assertEqual(get_out.rules[0].redirect.transform.replace_key_prefix.key_prefix, 'prefix1') self.assertEqual(get_out.rules[0].redirect.transform.replace_key_prefix.replace_with, 'replace') + self.assertEqual(get_out.rules[0].redirect.fetch_header_to_meta_data_rules[0].source_header, 'a') + self.assertEqual(get_out.rules[0].redirect.fetch_header_to_meta_data_rules[0].meta_data_suffix, 'b') + rules = [] rules.append(Rule( id='2', @@ -292,6 +348,44 @@ def test_bucket_mirror(self): delete_out = self.client.delete_bucket_mirror_back(bucket=bucket_name) self.assertIsNotNone(delete_out.request_id) + + rules = [] + rules.append(Rule( + id='1', + condition=Condition(http_code=404, http_method=['GET', 'HEAD']), + redirect=Redirect( + redirect_type=RedirectType.Mirror, + fetch_source_on_redirect=True, + public_source=PublicSource(SourceEndpoint(primary=['http://test.com/obj/tostest/'])), + pass_query=True, + follow_redirect=True, + mirror_header=MirrorHeader(pass_all=True, pass_headers=['aaa', 'bbb'], remove=['xxx', 'xxx']), + transform=Transform(with_key_prefix='prefix', with_key_suffix='suffix', + replace_key_prefix=ReplaceKeyPrefix(key_prefix='prefix1', replace_with='replace')) + ) + )) + put_out = self.client.put_bucket_mirror_back(bucket=bucket_name, rules=rules) + self.assertIsNotNone(put_out.request_id) + get_out = self.client.get_bucket_mirror_back(bucket=bucket_name) + self.assertIsNotNone(get_out.request_id) + self.assertTrue(len(get_out.rules) == 1) + self.assertEqual(get_out.rules[0].id, '1') + self.assertEqual(get_out.rules[0].condition.http_code, 404) + self.assertEqual(get_out.rules[0].condition.http_method[0], 'GET') + self.assertEqual(get_out.rules[0].condition.http_method[1], 'HEAD') + self.assertEqual(get_out.rules[0].redirect.redirect_type, RedirectType.Mirror) + self.assertEqual(get_out.rules[0].redirect.follow_redirect, True) + self.assertEqual(get_out.rules[0].redirect.fetch_source_on_redirect, True) + self.assertEqual(get_out.rules[0].redirect.mirror_header.pass_all, True) + self.assertEqual(get_out.rules[0].redirect.mirror_header.pass_headers, ['aaa', 'bbb']) + self.assertEqual(get_out.rules[0].redirect.mirror_header.remove, ['xxx', 'xxx']) + self.assertEqual(get_out.rules[0].redirect.public_source.source_endpoint.primary, + ['http://test.com/obj/tostest/']) + self.assertEqual(get_out.rules[0].redirect.public_source.fixed_endpoint, None) + self.assertEqual(get_out.rules[0].redirect.transform.with_key_prefix, 'prefix') + self.assertEqual(get_out.rules[0].redirect.transform.with_key_suffix, 'suffix') + self.assertEqual(get_out.rules[0].redirect.transform.replace_key_prefix.key_prefix, 'prefix1') + self.assertEqual(get_out.rules[0].redirect.transform.replace_key_prefix.replace_with, 'replace') self.client.delete_bucket(bucket=bucket_name) def test_bucket_policy(self): @@ -338,13 +432,15 @@ def test_life_cycle(self): tags=[Tag(key='1', value="2"), Tag('test', 'test')], )) self.client.create_bucket(bucket_name) - self.client.put_bucket_lifecycle(bucket=bucket_name, rules=rules) + self.bucket_delete.append(bucket_name) + self.client.put_bucket_lifecycle(bucket=bucket_name, rules=rules, allow_same_action_overlap=True) out = self.client.get_bucket_lifecycle(bucket=bucket_name) self.assertEqual(len(out.rules[0].tags), 2) self.assertEqual(out.rules[0].tags[0].key, '1') self.assertEqual(out.rules[0].tags[0].value, '2') self.assertEqual(out.rules[0].tags[1].key, 'test') self.assertEqual(out.rules[0].tags[1].value, 'test') + self.assertEqual(out.allow_same_action_overlap, True) self.client.delete_bucket(bucket=bucket_name) def test_lifecycle_days(self): @@ -413,6 +509,7 @@ def test_lifecycle_days(self): self.assertEqual(rule1.transitions[0].storage_class, StorageClassType.Storage_Class_Ia) self.assertEqual(rule1.non_current_version_transitions[0].non_current_days, 30) self.assertEqual(rule1.non_current_version_transitions[0].storage_class, StorageClassType.Storage_Class_Ia) + self.assertEqual(out.allow_same_action_overlap, None) # 校验 rule2的正确性 rule2 = out.rules[1] self.assertEqual(rule2.id, '2') @@ -471,6 +568,53 @@ def test_lifecycle_date(self): self.assertEqual(rule1.non_current_version_transitions[0].non_current_days, 30) self.assertEqual(rule1.non_current_version_transitions[0].storage_class, StorageClassType.Storage_Class_Ia) + def test_lifecycle_filter(self): + bucket_name = self.bucket_name + 'lifecycle' + rules = [] + rules.append(BucketLifeCycleRule( + id='1', + prefix='test', + status=StatusType.Status_Enable, + # 指定 Bucket的过期属性 + expiration=BucketLifeCycleExpiration( + date=datetime.datetime(2022, 9, 30) + ), + no_current_version_expiration=BucketLifeCycleNoCurrentVersionExpiration( + non_current_date=datetime.datetime(2022, 11, 30) + ), + non_current_version_transitions=[BucketLifeCycleNonCurrentVersionTransition( + storage_class=StorageClassType.Storage_Class_Ia, + non_current_date=datetime.datetime(2022, 10, 30) + )], + filter=BucketLifecycleFilter( + object_size_greater_than=1, + object_size_less_than=1000, + greater_than_include_equal=StatusType.Status_Enable, + less_than_include_equal=StatusType.Status_Disable, + ) + )) + self.client.create_bucket(bucket_name) + self.bucket_delete.append(bucket_name) + self.client.put_bucket_lifecycle(bucket=bucket_name, rules=rules) + out = self.client.get_bucket_lifecycle(bucket=bucket_name) + # 检验 rule1的正确性 + rule1 = out.rules[0] + self.assertEqual(rule1.id, '1') + self.assertEqual(rule1.prefix, 'test') + self.assertEqual(rule1.status, StatusType.Status_Enable) + self.assertEqual(rule1.expiration.date, datetime.datetime(2022, 9, 30, tzinfo=UTC)) + self.assertEqual(rule1.no_current_version_expiration.non_current_date, + datetime.datetime(2022, 11, 30, tzinfo=UTC)) + self.assertEqual(rule1.non_current_version_transitions[0].non_current_date, + datetime.datetime(2022, 10, 30, tzinfo=UTC)) + self.assertEqual(rule1.non_current_version_transitions[0].non_current_date, + datetime.datetime(2022, 10, 30, tzinfo=UTC)) + self.assertEqual(rule1.non_current_version_transitions[0].storage_class, StorageClassType.Storage_Class_Ia) + self.assertEqual(rule1.filter.object_size_greater_than, 1) + self.assertEqual(rule1.filter.object_size_less_than, 1000) + self.assertEqual(rule1.filter.greater_than_include_equal, StatusType.Status_Enable) + self.assertEqual(rule1.filter.less_than_include_equal, StatusType.Status_Disable) + def test_put_bucket_acl(self): bucket_name = self.bucket_name + '-acl' self.client.create_bucket(bucket_name) @@ -757,6 +901,70 @@ def test_bucket_project_name(self): for bucket in list_out.buckets: self.assertEqual(bucket.project_name, project_name) + def test_bucket_encryption(self): + bucket_name = self.bucket_name + "-encryption" + endpoint = "https://{}".format(_get_clean_endpoint(self.endpoint)) + https_client = TosClientV2(self.ak, self.sk, endpoint, self.region, enable_crc=True, max_retry_count=2) + self.client.create_bucket(bucket_name) + self.bucket_delete.append(bucket_name) + + https_client.put_bucket_encryption(bucket_name, BucketEncryptionRule( + apply_server_side_encryption_by_default=ApplyServerSideEncryptionByDefault( + sse_algorithm="kms", + kms_master_key_id="123" + ) + )) + get_out = self.client.get_bucket_encryption(bucket_name) + self.assertEqual(get_out.rule.apply_server_side_encryption_by_default.sse_algorithm, "kms") + self.assertEqual(get_out.rule.apply_server_side_encryption_by_default.kms_master_key_id, "123") + self.client.delete_bucket_encryption(bucket_name) + with self.assertRaises(TosServerError): + self.client.get_bucket_encryption(bucket_name) + + def test_bucket_notification_type2(self): + bucket_name = self.bucket_name + "-notification-type2" + self.client.create_bucket(bucket_name) + self.bucket_delete.append(bucket_name) + + rules = [ + NotificationRule( + rule_id="test1", + events=["tos:ObjectCreated:Post", "tos:ObjectCreated:Origin"], + filter=NotificationFilter( + tos_key=NotificationFilterKey( + filter_rules=[ + NotificationFilterRule(name="prefix", value="test-") + ] + ) + ), + destination=NotificationDestination( + ve_faas=[DestinationVeFaaS(function_id=self.cloud_function)], + rocket_mq=[ + DestinationRocketMQ( + role="trn:iam::{}:role/{}".format(self.account_id, self.mq_role_name), + instance_id=self.mq_instance_id, + topic="SDK", + access_key_id=self.mq_access_key_id + ) + ] + ) + ) + ] + self.client.put_bucket_notification_type2(bucket_name, rules) + out = self.client.get_bucket_notification_type2(bucket_name) + + self.assertTrue(out.version != '') + self.assertEqual(len(out.rules), 1) + self.assertEqual(out.rules[0].rule_id, rules[0].rule_id) + self.assertEqual(out.rules[0].events, rules[0].events) + self.assertEqual(out.rules[0].filter.tos_key.filter_rules[0].name,rules[0].filter.tos_key.filter_rules[0].name) + self.assertEqual(out.rules[0].filter.tos_key.filter_rules[0].value, rules[0].filter.tos_key.filter_rules[0].value) + self.assertEqual(out.rules[0].destination.ve_faas[0].function_id, rules[0].destination.ve_faas[0].function_id) + self.assertEqual(out.rules[0].destination.rocket_mq[0].role, rules[0].destination.rocket_mq[0].role) + self.assertEqual(out.rules[0].destination.rocket_mq[0].topic, rules[0].destination.rocket_mq[0].topic) + self.assertEqual(out.rules[0].destination.rocket_mq[0].access_key_id, rules[0].destination.rocket_mq[0].access_key_id) + self.assertEqual(out.rules[0].destination.rocket_mq[0].instance_id, rules[0].destination.rocket_mq[0].instance_id) + def retry_assert(self, func): for i in range(5): if func(): @@ -778,5 +986,13 @@ def test_delete_all(self): task.run() +def _get_clean_endpoint(endpoint): + if endpoint.startswith('http://'): + return endpoint[7:] + elif endpoint.startswith('https://'): + return endpoint[:8] + return endpoint + + if __name__ == "__main__": unittest.main() diff --git a/tests/test_v2_object.py b/tests/test_v2_object.py index e6dbde0..7c2e1f4 100644 --- a/tests/test_v2_object.py +++ b/tests/test_v2_object.py @@ -4,10 +4,14 @@ import http import http.client as httplib import os +import random +import threading import time import unittest +import urllib.parse from io import StringIO, BytesIO +import crcmod import requests import tos @@ -17,11 +21,13 @@ from tos.credential import EnvCredentialsProvider from tos.enum import (ACLType, AzRedundancyType, DataTransferType, GranteeType, MetadataDirectiveType, PermissionType, - StorageClassType, VersioningStatusType, TierType, CopyEventType, HttpMethodType) + StorageClassType, VersioningStatusType, TierType, CopyEventType, HttpMethodType, + convert_storage_class_type) from tos.exceptions import TosClientError, TosServerError from tos.models2 import Deleted, Grant, Grantee, ListObjectsOutput, Owner, ObjectTobeDeleted, Tag, \ PostSignatureCondition, UploadedPart, PolicySignatureCondition, RestoreJobParameters, GenericInput -from tos.utils import RateLimiter +from tos.safe_map import SafeMapFIFO +from tos.utils import RateLimiter, meta_header_encode, meta_header_decode def get_socket_io(): @@ -40,6 +46,36 @@ def _get_host_schema(endpoint): class TestObject(TosTestBase): + + def test_crc64(self): + bucket_name = self.bucket_name + "-test-crc" + self.bucket_delete.append(bucket_name) + key = self.random_key() + do_crc64 = crcmod.mkCrcFun(0x142F0E1EBA9EA3693, initCrc=0, xorOut=0xffffffffffffffff, rev=True) + data1 = "Hello TOS" + data2 = "Hello TOS 中文字符" + c1 = do_crc64(data1.encode()) + c2 = do_crc64(data2.encode()) + print("c1:{}, c2:{}".format(c1, c2)) + content1 = StringIO(data1) + content2 = StringIO(data2) + self.client.create_bucket(bucket_name) + rsp = self.client.put_object(bucket=bucket_name, key=key, content=content1) + print('request_id_1: {}'.format(rsp.request_id)) + print('crc64_1: {}'.format(rsp.hash_crc64_ecma)) + assert rsp.hash_crc64_ecma == c1 + + rsp = self.client.put_object(bucket=bucket_name, key=key, content=content2) + print('request_id_2: {}'.format(rsp.request_id)) + print('crc64_2: {}'.format(rsp.hash_crc64_ecma)) + assert rsp.hash_crc64_ecma == c2 + rsp = self.client.get_object(bucket=bucket_name, key=key) + value = rsp.read() + assert len(value) == len(data2.encode()) + self.assertEqual(value.decode('utf-8'), data2) + self.client.delete_object(bucket=bucket_name, key=key) + self.client.delete_bucket(bucket=bucket_name) + def test_object(self): bucket_name = self.bucket_name + '-test-object' self.bucket_delete.append(bucket_name) @@ -110,14 +146,23 @@ def test_put_with_meta(self): content = content_io def test_with_string_io(self): - io = StringIO('a') - io.seek(0) + io = StringIO('哈哈') + io.seek(1) bucket_name = self.bucket_name + 'string-io' self.client.create_bucket(bucket_name) self.bucket_delete.append(bucket_name) self.client.put_object(bucket=bucket_name, key="2", content=io) out = self.client.get_object(bucket=bucket_name, key='2') - self.assertEqual(out.read(), b'a') + self.assertEqual(out.read(), '哈'.encode('utf8')) + io.seek(2) + self.client.put_object(bucket=bucket_name, key="2", content=io) + out = self.client.get_object(bucket=bucket_name, key='2') + self.assertEqual(out.read(), b'') + io = StringIO('') + io.seek(1) + self.client.put_object(bucket=bucket_name, key="2", content=io) + out = self.client.get_object(bucket=bucket_name, key='2') + self.assertEqual(out.read(), b'') self.client.put_object(bucket=bucket_name, key='4', content=b'') def test_put_with_options(self): @@ -335,7 +380,7 @@ def test_with_stream(self): bucket_endpoint = '{}.{}'.format(bucket_name, endpoint) object_endpoint = bucket_endpoint + '/' + key2 conn = http.client.HTTPConnection(bucket_endpoint) - conn.request('GET', '/'+key2) + conn.request('GET', '/' + key2) content = conn.getresponse() def generator(): @@ -620,22 +665,22 @@ def test_list_object_info(self): key = self.random_key('.js') content = random_bytes(1024) - self.client.create_bucket(bucket_name, az_redundancy=AzRedundancyType.Az_Redundancy_Multi_Az) + self.client.create_bucket(bucket_name) self.bucket_delete.append(bucket_name) - self.client.put_object(bucket_name, key=key, content=content) - list_object_out = self.client.list_objects(bucket_name) + raw = "!@#$%^&*()_+-=[]{}|;':\",./<>?中文测试编码%20%%%^&abcd /\\" + meta = {'name': ' %张/三%', 'age': '12', 'special': raw, raw: raw} + self.client.put_object(bucket_name, key=key, content=content, meta=meta) + list_object_out = self.client.list_objects(bucket_name, fetch_meta=True) self.assertEqual(list_object_out.name, bucket_name) self.assertFalse(list_object_out.is_truncated) self.assertEqual(list_object_out.max_keys, 1000) self.assertTrue(len(list_object_out.contents) > 0) - object = list_object_out.contents[0] - self.assertTrue(len(object.etag) > 0) - self.assertTrue(len(object.key) > 0) - self.assertTrue(object.last_modified is not None) - self.assertTrue(object.size == 1024) - self.assertTrue(len(object.owner.id) > 0) - self.assertTrue(object.storage_class, StorageClassType.Storage_Class_Standard) + obj = list_object_out.contents[0] + self.assertTrue(obj.storage_class, StorageClassType.Storage_Class_Standard) + self.assertTrue(obj.meta['name'], meta['name']) + self.assertTrue(obj.meta['special'], meta['special']) + self.assertTrue(obj.meta[raw], meta[raw]) def test_list_object_full_func(self): bucket_name = self.bucket_name + '-test-list-object' @@ -993,13 +1038,9 @@ def test_list_object_v2(self): for k in range(3): path = '{}/{}/{}'.format(i, j, k) self.client.put_object(bucket_name, path, content=b'1') - self.client.put_object(bucket_name, 'key') - - # out_2 = self.client.list_objects_type2(bucket=bucket_name, prefix='0', start_after='0/1', max_keys=2) - # out_2_reverse = self.client.list_objects_type2(bucket=bucket_name, prefix='0', start_after='0/1', max_keys=2) - # out_3 = self.client.list_objects_type2(bucket=bucket_name, prefix='0', start_after='0/1', max_keys=2, - # delimiter='/', continuation_token=out_2.next_continuation_token) - # + raw = "!@#$%^&*()_+-=[]{}|;':\",./<>?中文测试编码%20%%%^&abcd /\\" + meta = {'name': ' %张/三%', 'age': '12', 'special': raw, raw: raw} + self.client.put_object(bucket_name, 'key', meta=meta) out_base = self.client.list_objects_type2(bucket_name, delimiter='/', max_keys=1) self.assertEqual(out_base.name, bucket_name) @@ -1076,6 +1117,13 @@ def test_list_object_v2(self): self.assertIsNotNone(continuation_token) self.assertEqual(count, 4) + list_object_out = self.client.list_objects_type2(bucket=bucket_name, prefix='key', max_keys=1, fetch_meta=True) + object = list_object_out.contents[0] + self.assertTrue(object.meta['age'], meta['age']) + self.assertTrue(object.meta['name'], meta['name']) + self.assertTrue(object.meta['special'], meta['special']) + self.assertTrue(object.meta[raw], meta[raw]) + def test_fetch_object(self): bucket_name = self.bucket_name + '-fetch-object' bucket_fetch = self.bucket_name + '-fetch-test' @@ -1088,16 +1136,27 @@ def test_fetch_object(self): key = self.random_key('.js') self.client.create_bucket(bucket=bucket_name) meta = {'姓名': '张三'} - fetch_out = self.client.fetch_object(bucket=bucket_name, key=key, - url="https://{}.{}".format(bucket_fetch, - self.endpoint) + '/' + object_name, - meta=meta, acl=ACLType.ACL_Public_Read) - out = self.client.get_object(bucket=bucket_name, key=key) + self.client.fetch_object(bucket=bucket_name, key=key, + url="https://{}.{}".format(bucket_fetch, + self.endpoint) + '/' + object_name, + meta=meta, acl=ACLType.ACL_Public_Read) acl_out = self.client.get_object_acl(bucket_name, key) self.assertEqual(acl_out.grants[0].permission, PermissionType.Permission_Read) get_out = self.client.get_object(bucket=bucket_name, key=key) get_out.meta['姓名'] = meta['姓名'] + self.client.delete_object(bucket=bucket_name, key=key) + with self.assertRaises(TosServerError): + self.client.fetch_object(bucket=bucket_name, key=key, + url="https://{}.{}".format(bucket_fetch, + self.endpoint) + '/' + object_name, + content_md5="123") + with self.assertRaises(TosServerError): + self.client.fetch_object(bucket=bucket_name, key=key, + url="https://{}.{}".format(bucket_fetch, + self.endpoint) + '/' + object_name, + hex_md5="123") + def test_fetch_task_object(self): bucket_name = self.bucket_name + '-fetch-object' self.bucket_delete.append(bucket_name) @@ -1118,9 +1177,13 @@ def test_fetch_task_object(self): self.client.create_bucket(bucket=bucket_name) raw = "!@#$%^&*()_+-=[]{}|;':\",./<>?中文测试编码%20%%%^&abcd /\\" meta = {'name': ' %张/三%', 'age': '12', 'special': raw, raw: raw} + callback_body = '{"bucket":${bucket},"object":${object}}' out = self.client.put_fetch_task(bucket=bucket_name, key=key, url=url, meta=meta, acl=ACLType.ACL_Public_Read_Write, - storage_class=StorageClassType.Storage_Class_Ia) + storage_class=StorageClassType.Storage_Class_Ia, + callback_url=self.callback_url, + callback_body=callback_body, + callback_body_type='application/json') self.assertIsNotNone(out.task_id) get_out = self.client.get_fetch_task(bucket=bucket_name, task_id=out.task_id) self.assertEqual(get_out.task.meta['name'], meta['name']) @@ -1128,6 +1191,9 @@ def test_fetch_task_object(self): self.assertEqual(get_out.task.meta[raw], meta[raw]) self.assertEqual(get_out.task.acl, ACLType.ACL_Public_Read_Write) self.assertEqual(get_out.task.storage_class, StorageClassType.Storage_Class_Ia) + self.assertEqual(get_out.task.callback_url, self.callback_url) + self.assertEqual(get_out.task.callback_body_type, 'application/json') + self.assertEqual(get_out.task.callback_body, callback_body) def test_post_object(self): bucket_name = self.bucket_name + '-post-object' @@ -1541,6 +1607,70 @@ def test_request_date(self): with self.assertRaises(TosServerError) as cm: self.client.resumable_copy_object(bucket_name, 'test', bucket_name, key, generic_input=input) + def test_disable_encoding_meta(self): + client = TosClientV2(self.ak, self.sk, self.endpoint, self.region, disable_encoding_meta=True) + bucket_name = self.bucket_name + '-object-disable-encoding-meta' + client.create_bucket(bucket_name) + self.bucket_delete.append(bucket_name) + + key = random_string(5) + special_key = "中文😋.pdf" + raw = "!@#$%^&*()_+-=[]{}|;':\",./<>?中文测试编码%20%%%^&abcd /\\" + meta = {'name': ' %张/三%', 'age': '12', 'special': raw, raw: raw} + encode_meta = meta_header_encode(meta) + content_disposition = "attachment; filename='{}'".format(urllib.parse.quote(special_key)) + client.put_object(bucket_name, key, meta=encode_meta, content_disposition=content_disposition) + out = client.head_object(bucket_name, key) + out_meta = meta_header_decode(out.meta) + self.assertEqual(out.content_disposition, content_disposition) + self.assertEqual(out_meta['name'], meta['name']) + self.assertEqual(out_meta['age'], meta['age']) + self.assertEqual(out_meta['special'], meta['special']) + self.assertEqual(out_meta[raw], meta[raw]) + + def test_syncMap(self): + m1 = SafeMapFIFO(max_length=3, default_expiration_sec=5, sync_delete=False) + m1.put("bucket1", "hns") + m1.put("bucket2", "hns") + m1.put("bucket3", "hns") + m1.put("bucket4", "hns") + v1 = m1.get("bucket1") + assert v1 == None + + time.sleep(5) + v2 = m1.get("bucket2") + assert v2 == None + v3 = m1.get("bucket3") + assert v3 == None + v4 = m1.get("bucket4") + assert v4 == None + + m1 = SafeMapFIFO(max_length=100, default_expiration_sec=5, sync_delete=False) + def put_items(): + for i in range(1000): + key = f"key_{i}" + val = str(i) + m1.put(key, val) + print("put {}:{}".format(key, val)) + + def get_items(): + for _ in range(1000): + key = f"key_{random.randint(0, 1000)}" # 随机获取一个已插入的键 + value = m1.get(key) + print("get {}:{}".format(key, value)) + put_thread1 = threading.Thread(target=put_items) + put_thread2 = threading.Thread(target=put_items) + put_thread3 = threading.Thread(target=put_items) + get_thread1 = threading.Thread(target=get_items) + get_thread2 = threading.Thread(target=get_items) + get_thread3 = threading.Thread(target=get_items) + put_thread1.start() + get_thread1.start() + put_thread2.start() + get_thread2.start() + put_thread3.start() + get_thread3.start() + def wrapper_socket_io(self, init, crc, use_data_transfer_listener, ues_limiter, bucket_name): def progress(consumed_bytes, total_bytes, rw_once_bytes, data_type: DataTransferType): diff --git a/tests/test_v2_with_version.py b/tests/test_v2_with_version.py index 8862ecc..1f0d98a 100644 --- a/tests/test_v2_with_version.py +++ b/tests/test_v2_with_version.py @@ -82,11 +82,13 @@ def test_list_with_version(self): self.client.create_bucket(bucket_name) self.version_client.put_bucket_versioning(bucket_name, True) time.sleep(30) + raw = "!@#$%^&*()_+-=[]{}|;':\",./<>?中文测试编码%20%%%^&abcd /\\" + meta = {'name': ' %张/三%', 'age': '12', 'special': raw, raw: raw} self.client.put_object(bucket_name, 'test.txt', content=content, - storage_class=tos.StorageClassType.Storage_Class_Ia) + storage_class=tos.StorageClassType.Storage_Class_Ia, meta=meta) self.client.put_object(bucket_name, 'test.txt', content=content, - storage_class=tos.StorageClassType.Storage_Class_Ia) - out = self.client.list_object_versions(bucket_name) + storage_class=tos.StorageClassType.Storage_Class_Ia, meta=meta) + out = self.client.list_object_versions(bucket_name, fetch_meta=True) for version in out.versions: self.assertIsNotNone(version.version_id) self.assertIsNotNone(version.etag) @@ -94,6 +96,10 @@ def test_list_with_version(self): self.assertIsNotNone(version.is_latest) self.assertEqual(version.key, 'test.txt') self.assertEqual(version.storage_class, tos.StorageClassType.Storage_Class_Ia) + self.assertTrue(version.meta['name'], meta['name']) + self.assertTrue(version.meta['age'], meta['age']) + self.assertTrue(version.meta['special'], meta['special']) + self.assertTrue(version.meta[raw], meta[raw]) self.client.put_object(bucket_name, 'test2.txt', content='123') out_1 = self.client.list_object_versions(bucket_name, max_keys=1) diff --git a/tos/__version__.py b/tos/__version__.py index 0daec3d..4f84928 100644 --- a/tos/__version__.py +++ b/tos/__version__.py @@ -1 +1 @@ -__version__ = 'v2.6.10' +__version__ = 'v2.7.0' diff --git a/tos/checkpoint.py b/tos/checkpoint.py index 0be77d7..b617097 100644 --- a/tos/checkpoint.py +++ b/tos/checkpoint.py @@ -297,6 +297,10 @@ def _last_task(self): download_crc = cal_crc_from_upload_parts(parts) check_crc("upload_file", download_crc, result.hash_crc64_ecma, result.request_id) return result + except TosServerError as e: + if e.status_code and (e.status_code == 404 or e.status_code == 203): + self._delete_checkpoint() + raise TaskCompleteMultipartError(e) except Exception as e: raise TaskCompleteMultipartError(e) @@ -409,6 +413,10 @@ def _last_task(self, **kwargs): result = self.client.complete_multipart_upload(self.bucket, self.key, self.upload_id, parts=parts, generic_input=self.generic_input) return result + except TosServerError as e: + if e.status_code and (e.status_code == 404 or e.status_code == 203): + self._delete_checkpoint() + raise TaskCompleteMultipartError(e) except Exception as e: raise TaskCompleteMultipartError(e) diff --git a/tos/clientv2.py b/tos/clientv2.py index a1a79d2..50ceb3c 100644 --- a/tos/clientv2.py +++ b/tos/clientv2.py @@ -26,9 +26,10 @@ from .auth import AnonymousAuth, CredentialProviderAuth from .checkpoint import (CheckPointStore, _BreakpointDownloader, _BreakpointUploader, _BreakpointResumableCopyObject) +from .safe_map import SafeMapFIFO from .client import _make_virtual_host_url, _make_virtual_host_uri, _get_virtual_host, _get_host, _get_scheme from .consts import (GMT_DATE_FORMAT, SLEEP_BASE_TIME, UNSIGNED_PAYLOAD, - WHITE_LIST_FUNCTION, CALLBACK_FUNCTION) + WHITE_LIST_FUNCTION, CALLBACK_FUNCTION, BUCKET_TYPE_FNS, BUCKET_TYPE_HNS) from .credential import StaticCredentialsProvider from .enum import (ACLType, AzRedundancyType, DataTransferType, HttpMethodType, MetadataDirectiveType, StorageClassType, UploadEventType, VersioningStatusType, CopyEventType) @@ -38,7 +39,8 @@ to_put_acl_request, to_delete_multi_objects_request, to_put_bucket_cors_request, to_put_bucket_mirror_back, to_put_bucket_lifecycle, to_put_tagging, to_fetch_object, to_put_replication, to_put_bucket_website, to_put_bucket_notification, to_put_custom_domain, - to_put_bucket_real_time_log, to_restore_object) + to_put_bucket_real_time_log, to_restore_object, to_bucket_encrypt, to_put_fetch_object, + to_put_bucket_notification_type2) from .log import get_logger from .models2 import (AbortMultipartUpload, AppendObjectOutput, CompleteMultipartUploadOutput, CopyObjectOutput, @@ -69,7 +71,9 @@ PolicySignatureCondition, RestoreObjectOutput, RestoreJobParameters, RenameObjectOutput, PutBucketRenameOutput, DeleteBucketRenameOutput, GetBucketRenameOutput, PutBucketTaggingOutput, DeleteBucketTaggingOutput, GetBucketTaggingOutput, PutSymlinkOutput, GetSymlinkOutput, - GenericInput, GetFetchTaskOutput) + GenericInput, GetFetchTaskOutput, BucketEncryptionRule, GetBucketEncryptionOutput, + DeleteBucketEncryptionOutput, PutBucketEncryptionOutput, PutBucketNotificationType2Output, + GetBucketNotificationType2Output, FileStatusOutput, ModifyObjectOutput) from .thread_ctx import consume_body from .utils import (SizeAdapter, _make_copy_source, _make_range_string, _make_upload_part_file_content, @@ -79,7 +83,7 @@ to_unicode, init_path, DnsCacheService, check_enum_type, check_part_size, check_part_number, check_client_encryption_algorithm, check_server_encryption_algorithm, try_make_file_dir, _IterableAdapter, init_checkpoint_dir, resolve_ip_list, - UploadEventHandler, ResumableCopyObject, DownloadEventHandler, LogInfo) + UploadEventHandler, ResumableCopyObject, DownloadEventHandler, LogInfo, content_disposition_encode) _dns_cache = DnsCacheService() _orig_create_connection = connection.create_connection @@ -90,7 +94,7 @@ def _get_create_bucket_headers(ACL: ACLType, AzRedundancy: AzRedundancyType, GrantFullControl, GrantRead, GrantReadACP, - GrantWrite, GrantWriteACP, StorageClass: StorageClassType, ProjectName): + GrantWrite, GrantWriteACP, StorageClass: StorageClassType, ProjectName, BucketType): headers = {} if ACL: headers['x-tos-acl'] = ACL.value @@ -110,6 +114,8 @@ def _get_create_bucket_headers(ACL: ACLType, AzRedundancy: AzRedundancyType, Gra headers['x-tos-az-redundancy'] = AzRedundancy.value if ProjectName: headers['x-tos-project-name'] = ProjectName + if BucketType: + headers['x-tos-bucket-type'] = BucketType return headers @@ -119,12 +125,14 @@ def _get_copy_object_headers(ACL, CacheControl, ContentDisposition, ContentEncod GrantRead, GrantReadACP, GrantWriteACP, Metadata, MetadataDirective, SSECustomerAlgorithm, SSECustomerKey, SSECustomerKeyMD5, server_side_encryption, website_redirect_location, storage_class: StorageClassType, - SSECAlgorithm, SSECKey, SSECKeyMD5, TrafficLimit, ForbidOverwrite, IfMatch): + SSECAlgorithm, SSECKey, SSECKeyMD5, TrafficLimit, ForbidOverwrite, IfMatch, + DisableEncodingMeta): headers = {} if Metadata: for k in Metadata: headers['x-tos-meta-' + k] = Metadata[k] - headers = meta_header_encode(headers) + if not DisableEncodingMeta: + headers = meta_header_encode(headers) if isinstance(CopySource, str): headers['x-tos-copy-source'] = CopySource elif isinstance(CopySource, dict): @@ -137,7 +145,8 @@ def _get_copy_object_headers(ACL, CacheControl, ContentDisposition, ContentEncod if CacheControl: headers['cache-control'] = CacheControl if ContentDisposition: - headers['content-disposition'] = urllib.parse.quote(ContentDisposition) + headers['content-disposition'] = ContentDisposition if DisableEncodingMeta else content_disposition_encode( + ContentDisposition) if ContentEncoding: headers['content-encoding'] = ContentEncoding if ContentLanguage: @@ -191,7 +200,7 @@ def _get_copy_object_headers(ACL, CacheControl, ContentDisposition, ContentEncod return headers -def _get_list_object_params(Delimiter, EncodingType, Marker, MaxKeys, Prefix, Reverse): +def _get_list_object_params(Delimiter, EncodingType, Marker, MaxKeys, Prefix, Reverse, FetchMeta): params = {} if Delimiter: params['delimiter'] = Delimiter @@ -206,10 +215,12 @@ def _get_list_object_params(Delimiter, EncodingType, Marker, MaxKeys, Prefix, Re if Reverse: params['reverse'] = Reverse + if FetchMeta: + params['fetch-meta'] = FetchMeta return params -def _get_list_object_version_params(Delimiter, EncodingType, KeyMarker, MaxKeys, Prefix, VersionIdMarker): +def _get_list_object_version_params(Delimiter, EncodingType, KeyMarker, MaxKeys, Prefix, VersionIdMarker, FetchMeta): params = {'versions': ''} if Delimiter: params['delimiter'] = Delimiter @@ -223,10 +234,13 @@ def _get_list_object_version_params(Delimiter, EncodingType, KeyMarker, MaxKeys, params['key-marker'] = KeyMarker if VersionIdMarker: params['version-id-marker'] = VersionIdMarker + if FetchMeta: + params['fetch-meta'] = FetchMeta return params -def _get_list_object_v2_params(Delimiter, Start_After, ContinueToken, Reverse, MaxKeys, EncodingType, Prefix): +def _get_list_object_v2_params(Delimiter, Start_After, ContinueToken, Reverse, MaxKeys, EncodingType, Prefix, + FetchMeta): params = {'list-type': '2', "fetch-owner": "true"} if Delimiter: params['delimiter'] = Delimiter @@ -242,7 +256,8 @@ def _get_list_object_v2_params(Delimiter, Start_After, ContinueToken, Reverse, M params['reverse'] = Reverse if Prefix: params['prefix'] = Prefix - + if FetchMeta: + params['fetch-meta'] = FetchMeta return params @@ -328,16 +343,17 @@ def _get_upload_part_copy_headers(CopySource, CopySourceIfMatch, CopySourceIfMod def _get_put_object_headers(recognize_content_type, ACL, CacheControl, ContentDisposition, ContentEncoding, - ContentLanguage, - ContentLength, ContentMD5, ContentSha256, ContentType, Expires, GrantFullControl, - GrantRead, GrantReadACP, GrantWriteACP, Key, Metadata, SSECustomerAlgorithm, - SSECustomerKey, SSECustomerKeyMD5, ServerSideEncryption, StorageClass, - WebsiteRedirectLocation, TrafficLimit, Callback, CallbackVar, ForbidOverwrite, IfMatch): + ContentLanguage, ContentLength, ContentMD5, ContentSha256, ContentType, Expires, + GrantFullControl, GrantRead, GrantReadACP, GrantWriteACP, Key, Metadata, + SSECustomerAlgorithm, SSECustomerKey, SSECustomerKeyMD5, ServerSideEncryption, StorageClass, + WebsiteRedirectLocation, TrafficLimit, Callback, CallbackVar, ForbidOverwrite, IfMatch, + DisableEncodingMeta): headers = {} if Metadata: for k in Metadata: headers['x-tos-meta-' + k] = Metadata[k] - headers = meta_header_encode(headers) + if not DisableEncodingMeta: + headers = meta_header_encode(headers) if ContentLength: headers['Content-Length'] = str(ContentLength) if ACL: @@ -355,7 +371,8 @@ def _get_put_object_headers(recognize_content_type, ACL, CacheControl, ContentDi if CacheControl: headers['cache-control'] = CacheControl if ContentDisposition: - headers['content-disposition'] = urllib.parse.quote(ContentDisposition) + headers['content-disposition'] = ContentDisposition if DisableEncodingMeta else content_disposition_encode( + ContentDisposition) if ContentEncoding: headers['content-encoding'] = ContentEncoding if ContentLanguage: @@ -392,7 +409,6 @@ def _get_put_object_headers(recognize_content_type, ACL, CacheControl, ContentDi headers['x-tos-if-match'] = IfMatch return headers - def _get_object_headers(IfMatch, IfModifiedSince, IfNoneMatch, IfUnmodifiedSince, Range, SSECustomerAlgorithm, SSECustomerKey, SSECustomerKeyMD5, TrafficLimit): headers = {} @@ -420,14 +436,16 @@ def _get_object_headers(IfMatch, IfModifiedSince, IfNoneMatch, IfUnmodifiedSince def _get_object_params(ResponseCacheControl, ResponseContentDisposition, ResponseContentEncoding, ResponseContentLanguage, ResponseContentType, ResponseExpires, VersionId, Process, - SaveAsBucket, SaveAsObject): + SaveAsBucket, SaveAsObject, DisableEncodingMeta): params = {} if VersionId: params['versionId'] = VersionId if ResponseCacheControl: params['response-cache-control'] = ResponseCacheControl if ResponseContentDisposition: - params['response-content-disposition'] = urllib.parse.quote(ResponseContentDisposition) + params[ + 'response-content-disposition'] = ResponseContentDisposition if DisableEncodingMeta else urllib.parse.quote( + ResponseContentDisposition) if ResponseContentEncoding: params['response-content-encoding'] = ResponseContentEncoding if ResponseContentLanguage: @@ -445,16 +463,70 @@ def _get_object_params(ResponseCacheControl, ResponseContentDisposition, Respons return params +def _get_modify_object_headers_params(recognize_content_type, acl, cache_control, content_disposition, + content_encoding, content_language, content_length, content_type, expires, + grant_full_control, grant_read, grant_read_ACP, grant_write_ACP, key, metadata, + storage_class, website_redirect_location, traffic_limit, if_match, + disable_encoding_meta): + headers = {} + if metadata: + for k in metadata: + headers['x-tos-meta-' + k] = metadata[k] + if not disable_encoding_meta: + headers = meta_header_encode(headers) + + if acl: + headers['x-tos-acl'] = acl.value + if grant_full_control: + headers['x-tos-grant-full-control'] = grant_full_control + if grant_read: + headers['x-tos-grant-read'] = grant_read + if grant_read_ACP: + headers['x-tos-grant-read-acp'] = grant_read_ACP + if grant_write_ACP: + headers['x-tos-grant-write-acp'] = grant_write_ACP + if cache_control: + headers['cache-control'] = cache_control + + if content_disposition: + headers['content-disposition'] = content_disposition if disable_encoding_meta else content_disposition_encode( + content_disposition) + + if content_encoding: + headers['content-encoding'] = content_encoding + if content_language: + headers["content-language"] = content_language + if content_type: + headers["content-type"] = content_type + + elif recognize_content_type: + headers['content-type'] = get_content_type(key) + if expires: + headers["expires"] = expires.strftime(GMT_DATE_FORMAT) + if website_redirect_location: + headers['x-tos-website-redirect-location'] = website_redirect_location + if storage_class: + headers['x-tos-storage-class'] = storage_class.value + if if_match: + headers['x-tos-if-match'] = if_match + if traffic_limit: + headers['x-tos-traffic-limit'] = str(traffic_limit) + if content_length: + headers['Content-Length'] = str(content_length) + return headers + + def _get_append_object_headers_params(recognize_content_type, ACL, CacheControl, ContentDisposition, - ContentEncoding, ContentLanguage, - ContentLength, ContentType, Expires, GrantFullControl, GrantRead, - GrantReadACP, GrantWriteACP, Key, Metadata, StorageClass, - WebsiteRedirectLocation, TrafficLimit, IfMatch): + ContentEncoding, ContentLanguage, ContentLength, ContentType, Expires, + GrantFullControl, GrantRead, GrantReadACP, GrantWriteACP, Key, Metadata, + StorageClass, WebsiteRedirectLocation, TrafficLimit, IfMatch, + DisableEncodingMeta): headers = {} if Metadata: for k in Metadata: headers['x-tos-meta-' + k] = Metadata[k] - headers = meta_header_encode(headers) + if not DisableEncodingMeta: + headers = meta_header_encode(headers) if ACL: headers['x-tos-acl'] = ACL.value if GrantFullControl: @@ -468,7 +540,8 @@ def _get_append_object_headers_params(recognize_content_type, ACL, CacheControl, if CacheControl: headers['cache-control'] = CacheControl if ContentDisposition: - headers['content-disposition'] = urllib.parse.quote(ContentDisposition) + headers['content-disposition'] = ContentDisposition if DisableEncodingMeta else content_disposition_encode( + ContentDisposition) if ContentEncoding: headers['content-encoding'] = ContentEncoding if ContentLanguage: @@ -493,16 +566,17 @@ def _get_append_object_headers_params(recognize_content_type, ACL, CacheControl, def _get_create_multipart_upload_headers(recognize_content_type, ACL, CacheControl, ContentDisposition, ContentEncoding, - ContentLanguage, ContentType, - Expires, GrantFullControl, GrantRead, GrantReadACP, - GrantWriteACP, Key, Metadata, SSECustomerAlgorithm, SSECustomerKey, - SSECustomerKeyMD5, ServerSideEncryption, WebsiteRedirectLocation, - StorageClass: StorageClassType, ForbidOverwrite): + ContentLanguage, ContentType, Expires, GrantFullControl, GrantRead, + GrantReadACP, GrantWriteACP, Key, Metadata, SSECustomerAlgorithm, + SSECustomerKey, SSECustomerKeyMD5, ServerSideEncryption, + WebsiteRedirectLocation, StorageClass: StorageClassType, ForbidOverwrite, + DisableEncodingMeta): headers = {} if Metadata: for k in Metadata: headers['x-tos-meta-' + k] = Metadata[k] - headers = meta_header_encode(headers) + if not DisableEncodingMeta: + headers = meta_header_encode(headers) if ACL: headers['x-tos-acl'] = ACL.value if GrantFullControl: @@ -516,7 +590,8 @@ def _get_create_multipart_upload_headers(recognize_content_type, ACL, CacheContr if CacheControl: headers['cache-control'] = CacheControl if ContentDisposition: - headers['content-disposition'] = urllib.parse.quote(ContentDisposition) + headers['content-disposition'] = ContentDisposition if DisableEncodingMeta else content_disposition_encode( + ContentDisposition) if ContentEncoding: headers['content-encoding'] = ContentEncoding if ContentLanguage: @@ -545,17 +620,18 @@ def _get_create_multipart_upload_headers(recognize_content_type, ACL, CacheContr def _get_set_object_meta_headers(recognize_content_type, cache_control, content_disposition, content_encoding, - content_language, - content_type, expires, key, meta): + content_language, content_type, expires, key, meta, disable_encoding_meta): headers = {} if meta: for k in meta: headers['x-tos-meta-' + k] = meta[k] - headers = meta_header_encode(headers) + if not disable_encoding_meta: + headers = meta_header_encode(headers) if cache_control: headers['cache-control'] = cache_control if content_disposition: - headers['content-disposition'] = urllib.parse.quote(content_disposition) + headers['content-disposition'] = content_disposition if disable_encoding_meta else content_disposition_encode( + content_disposition) if content_encoding: headers['content-encoding'] = content_encoding if content_language: @@ -594,7 +670,8 @@ def _get_put_acl_headers(ACL, GrantFullControl, GrantRead, GrantReadACP, GrantWr return headers -def _get_put_symlink_headers(TargetKey, TargetBucket, ACL, StorageClass, Metadata, ForbidOverwrite): +def _get_put_symlink_headers(TargetKey, TargetBucket, ACL, StorageClass, Metadata, ForbidOverwrite, + DisableEncodingMeta): headers = {"x-tos-symlink-target": urllib.parse.quote(TargetKey, '/~')} if TargetBucket: headers["x-tos-symlink-bucket"] = TargetBucket @@ -605,7 +682,8 @@ def _get_put_symlink_headers(TargetKey, TargetBucket, ACL, StorageClass, Metadat if Metadata: for k in Metadata: headers['x-tos-meta-' + k] = Metadata[k] - headers = meta_header_encode(headers) + if not DisableEncodingMeta: + headers = meta_header_encode(headers) if ForbidOverwrite: headers['x-tos-forbid-overwrite'] = ForbidOverwrite return headers @@ -635,12 +713,13 @@ def _get_upload_part_headers(content_length, content_md5, server_side_encryption def _get_fetch_headers(storage_class, acl, grant_full_control, grant_read, grant_read_acp, grant_write_acp, meta, ssec_customer_algorithm, - ssec_customer_key, sse_customer_key_md5): + ssec_customer_key, sse_customer_key_md5, disable_encoding_meta): headers = {} if meta: for k in meta: headers['x-tos-meta-' + k] = meta[k] - headers = meta_header_encode(headers) + if not disable_encoding_meta: + headers = meta_header_encode(headers) if acl: headers['x-tos-acl'] = acl.value if grant_full_control: @@ -803,7 +882,9 @@ def __init__(self, ak='', sk='', endpoint='', region='', is_custom_domain: bool = False, high_latency_log_threshold: int = 100, socket_timeout=30, - credentials_provider=None): + credentials_provider=None, + disable_encoding_meta: bool = None, + except100_continue_threshold: int = 65536): """创建client @@ -828,6 +909,8 @@ def __init__(self, ak='', sk='', endpoint='', region='', :param high_latency_log_threshold: 大于 0 时,代表开启高延迟日志,单位:KB,默认为 100,当单次请求传输总速率低于该值且总请求耗时大于 500 毫秒时打印 WARN 级别日志 :param socket_timeout: 连接建立成功后,单个请求的 Socket 读写超时时间,单位:秒,默认 30 秒,参考: https://requests.readthedocs.io/en/latest/user/quickstart/#timeouts :param credentials_provider: 通过 credentials_provider 实现永久访问密钥、临时访问密钥、ECS免密登陆、环境变量获取访问密钥等方式 + :param disable_encoding_meta: 是否对用户自定义元数据x-tos-meta-*/Content-Disposition进行编码,默认编码,设置为true时不进行编码 + :param except100_continue_threshold: 大于0时,表示上传对象相关接口对与待上传数据长度大于该阈值的请求(无法预测数据长度的情况统一判断为大于阈值)开启100-continue机制,单位字节,默认65536 :return TosClientV2: """ @@ -870,6 +953,8 @@ def __init__(self, ak='', sk='', endpoint='', region='', self.is_custom_domain = is_custom_domain self.high_latency_log_threshold = high_latency_log_threshold if high_latency_log_threshold >= 0 else 0 self.socket_timeout = socket_timeout if socket_timeout > 0 else self.request_timeout if self.request_timeout > 0 else 30 + self.disable_encoding_meta = disable_encoding_meta + self.except100_continue_threshold = except100_continue_threshold # 通过 hook 机制实现in-request log self.session.hooks['response'].append(hook_request_log) @@ -879,6 +964,8 @@ def __init__(self, ak='', sk='', endpoint='', region='', if self.dns_cache_time is not None and self.dns_cache_time > 0: self._start_async_refresh_cache = self._open_dns_cache() + self.bucket_type_cache = SafeMapFIFO(max_length=100, default_expiration_sec=60) + def close(self): """关闭Client @@ -986,6 +1073,7 @@ def create_bucket(self, bucket: str, storage_class: StorageClassType = None, az_redundancy: AzRedundancyType = None, project_name: str = None, + bucket_type: str = BUCKET_TYPE_FNS, generic_input: GenericInput = None) -> CreateBucketOutput: """创建bucket @@ -1016,13 +1104,53 @@ def create_bucket(self, bucket: str, headers = _get_create_bucket_headers(acl, az_redundancy, grant_full_control, grant_read, grant_read_acp, grant_write, - grant_write_acp, storage_class, project_name) + grant_write_acp, storage_class, project_name, bucket_type) resp = self._req(bucket=bucket, method=HttpMethodType.Http_Method_Put.value, headers=headers, generic_input=generic_input) return CreateBucketOutput(resp) + def get_file_status(self, bucket: str, key: str, project_name: str = None, generic_input: GenericInput = None): + """查询文件状态 + + 此接口用于查询HNS桶的文件状态 + 如果桶不存在或者没有访问桶的权限,此接口会返回404 Not Found或403 Forbidden状态码的TosServerError。 + + :param bucket: 桶名 + :param key: 文件名 + :param project_name: 桶所属项目名 + :param generic_input: 通用请求参数,比如request_date设置签名UTC时间,代表本次请求Header中指定的 X-Tos-Date 头域 + :return: FileStatusOutput + """ + + _is_valid_bucket_name(bucket) + headers = {} + if project_name: + headers['x-tos-project-name'] = project_name + + bucket_type = self._get_bucket_type(bucket) + if bucket_type == BUCKET_TYPE_HNS: + # head + resp = self._req(bucket=bucket, key=key, method=HttpMethodType.Http_Method_Head.value, + headers=headers, + generic_input=generic_input) + return FileStatusOutput(key, bucket_type, resp) + headers = {} + query = {"stat": ""} + resp = self._req(bucket=bucket, key=key, method=HttpMethodType.Http_Method_Get.value, headers=headers, + params=query, generic_input=generic_input) + + return FileStatusOutput(key, bucket_type, resp) + + def _get_bucket_type(self, bucket: str = None): + bucket_type = self.bucket_type_cache.get(bucket) + if bucket_type is None: + rsp = self.head_bucket(bucket=bucket) + bucket_type = rsp.bucket_type + self.bucket_type_cache.put(key=bucket, value=bucket_type) + return bucket_type + def head_bucket(self, bucket: str, project_name: str = None, generic_input: GenericInput = None) -> HeadBucketOutput: """查询桶元数据 @@ -1164,7 +1292,7 @@ def copy_object(self, bucket: str, key: str, src_bucket: str, src_key: str, metadata_directive, copy_source_ssec_algorithm, copy_source_ssec_key, copy_source_ssec_key_md5, server_side_encryption, website_redirect_location, storage_class, ssec_algorithm, ssec_key, ssec_key_md5, traffic_limit, - forbid_overwrite, if_match) + forbid_overwrite, if_match, self.disable_encoding_meta) resp = self._req(bucket=bucket, key=key, method=HttpMethodType.Http_Method_Put.value, headers=headers, generic_input=generic_input) @@ -1273,7 +1401,7 @@ def head_object(self, bucket: str, key: str, headers=headers, generic_input=generic_input) - return HeadObjectOutput(resp) + return HeadObjectOutput(resp, self.disable_encoding_meta) def list_objects(self, bucket: str, prefix: str = None, @@ -1282,7 +1410,8 @@ def list_objects(self, bucket: str, max_keys: int = None, reverse: bool = None, encoding_type: str = None, - generic_input: GenericInput = None) -> ListObjectsOutput: + generic_input: GenericInput = None, + fetch_meta: bool = None) -> ListObjectsOutput: """列举对象 :param bucket: 桶名 @@ -1293,14 +1422,15 @@ def list_objects(self, bucket: str, :param prefix: 前缀 :param reverse: 反转列举 :param generic_input: 通用请求参数,比如request_date设置签名UTC时间,代表本次请求Header中指定的 X-Tos-Date 头域 + :param fetch_meta: 是否获取对象的自定义meta :return: ListObjectsOutput """ - params = _get_list_object_params(delimiter, encoding_type, marker, max_keys, prefix, reverse) + params = _get_list_object_params(delimiter, encoding_type, marker, max_keys, prefix, reverse, fetch_meta) resp = self._req(bucket=bucket, method=HttpMethodType.Http_Method_Get.value, params=params, generic_input=generic_input) - return ListObjectsOutput(resp) + return ListObjectsOutput(resp, self.disable_encoding_meta) def list_object_versions(self, bucket: str, prefix: str = None, @@ -1309,7 +1439,8 @@ def list_object_versions(self, bucket: str, version_id_marker: str = None, max_keys: int = None, encoding_type: str = None, - generic_input: GenericInput = None) -> ListObjectVersionsOutput: + generic_input: GenericInput = None, + fetch_meta: bool = None) -> ListObjectVersionsOutput: """列举多版本对象 :param bucket: 桶名 @@ -1320,15 +1451,16 @@ def list_object_versions(self, bucket: str, :param prefix: 前缀 :param version_id_marker: 版本号分页标志 :param generic_input: 通用请求参数,比如request_date设置签名UTC时间,代表本次请求Header中指定的 X-Tos-Date 头域 + :param fetch_meta: 是否获取对象的自定义meta :return: ListObjectVersionsOutput """ params = _get_list_object_version_params(delimiter, encoding_type, key_marker, max_keys, prefix, - version_id_marker) + version_id_marker, fetch_meta) resp = self._req(bucket=bucket, method=HttpMethodType.Http_Method_Get.value, params=params, generic_input=generic_input) - return ListObjectVersionsOutput(resp) + return ListObjectVersionsOutput(resp, self.disable_encoding_meta) def put_object_acl(self, bucket: str, key: str, version: str = None, @@ -1466,7 +1598,11 @@ def put_object(self, bucket: str, key: str, grant_full_control, grant_read, grant_read_acp, grant_writeAcp, key, meta, ssec_algorithm, ssec_key, ssec_key_md5, server_side_encryption, storage_class, website_redirect_location, - traffic_limit, callback, callback_var, forbid_overwrite, if_match) + traffic_limit, callback, callback_var, forbid_overwrite, if_match, + self.disable_encoding_meta) + if self.except100_continue_threshold > 0 and ( + content_length is None or content_length > self.except100_continue_threshold): + headers['Expect'] = "100-continue" if content: content = init_content(content) @@ -1578,6 +1714,73 @@ def put_object_from_file(self, bucket: str, key: str, file_path: str, data_transfer_listener, rate_limiter, traffic_limit, f, callback, callback_var, forbid_overwrite, if_match, generic_input) + def _modify_object(self, bucket: str, key: str, offset: int, + content=None, + content_length: int = None, + cache_control: str = None, + content_disposition: str = None, + content_encoding: str = None, + content_language: str = None, + content_type: str = None, + expires: datetime = None, + acl: ACLType = None, + grant_full_control: str = None, + grant_read: str = None, + grant_read_acp: str = None, + grant_write_acp: str = None, + meta: Dict = None, + website_redirect_location: str = None, + storage_class: StorageClassType = None, + data_transfer_listener=None, + rate_limiter=None, + pre_hash_crc64_ecma: int = None, + traffic_limit: int = None, + if_match: str = None, + generic_input: GenericInput = None): + + check_enum_type(acl=acl, storage_class=storage_class) + _is_valid_object_name(key) + + params = {"modify": ""} + if offset is not None: + params['offset'] = offset + headers = _get_modify_object_headers_params(self.recognize_content_type, acl, cache_control, + content_disposition, + content_encoding, + content_language, content_length, content_type, + expires, grant_full_control, grant_read, grant_read_acp, + grant_write_acp, key, meta, storage_class, + website_redirect_location, traffic_limit, if_match, + self.disable_encoding_meta) + + if self.except100_continue_threshold > 0 and ( + content_length is None or content_length > self.except100_continue_threshold): + headers['Expect'] = "100-continue" + if content: + content = init_content(content) + patch_content(content) + if isinstance(content, _ReaderAdapter) and content.size == 0: + raise TosClientError('Your proposed append content is smaller than the minimum allowed size') + + if data_transfer_listener: + content = utils.add_progress_listener_func(content, data_transfer_listener) + + if rate_limiter: + content = utils.add_rate_limiter_func(content, rate_limiter) + + if content and self.enable_crc and pre_hash_crc64_ecma is not None: + content = utils.add_crc_func(content, init_crc=pre_hash_crc64_ecma) + + resp = self._req(bucket=bucket, key=key, method=HttpMethodType.Http_Method_Post.value, data=content, + headers=headers, params=params, generic_input=generic_input) + + result = ModifyObjectOutput(resp) + + if self.enable_crc and result.hash_crc64_ecma is not None and pre_hash_crc64_ecma is not None: + utils.check_crc('append object', content.crc, result.hash_crc64_ecma, resp.request_id) + + return result + @high_latency_log def append_object(self, bucket: str, key: str, offset: int, content=None, @@ -1638,13 +1841,30 @@ def append_object(self, bucket: str, key: str, offset: int, params = {'append': '', 'offset': offset} + bucket_type = self._get_bucket_type(bucket) + if bucket_type == BUCKET_TYPE_HNS: + return self._modify_object(bucket=bucket, key=key, offset=offset, content=content, + content_length=content_length, cache_control=cache_control, + content_disposition=content_disposition, content_encoding=content_encoding, + content_language=content_language, content_type=content_type, expires=expires, + acl=acl, grant_full_control=grant_full_control, grant_read=grant_read, + grant_read_acp=grant_read_acp, grant_write_acp=grant_write_acp, meta=meta, + website_redirect_location=website_redirect_location, storage_class=storage_class, + data_transfer_listener=data_transfer_listener, rate_limiter=rate_limiter, + pre_hash_crc64_ecma=pre_hash_crc64_ecma, traffic_limit=traffic_limit, + if_match=if_match, generic_input=generic_input) + headers = _get_append_object_headers_params(self.recognize_content_type, acl, cache_control, content_disposition, content_encoding, content_language, content_length, content_type, expires, grant_full_control, grant_read, grant_read_acp, grant_write_acp, key, meta, storage_class, - website_redirect_location, traffic_limit, if_match) + website_redirect_location, traffic_limit, if_match, + self.disable_encoding_meta) + if self.except100_continue_threshold > 0 and ( + content_length is None or content_length > self.except100_continue_threshold): + headers['Expect'] = "100-continue" if content: content = init_content(content) @@ -1698,8 +1918,8 @@ def set_object_meta(self, bucket: str, key: str, """ headers = _get_set_object_meta_headers(self.recognize_content_type, cache_control, content_disposition, - content_encoding, - content_language, content_type, expires, key, meta) + content_encoding, content_language, content_type, expires, key, meta, + self.disable_encoding_meta) params = {'metadata': ''} @@ -1778,13 +1998,13 @@ def get_object(self, bucket: str, key: str, params = _get_object_params(response_cache_control, response_content_disposition, response_content_encoding, response_content_language, response_content_type, response_expires, version_id, - process, save_bucket, save_object) + process, save_bucket, save_object, self.disable_encoding_meta) resp = self._req(bucket=bucket, key=key, method=HttpMethodType.Http_Method_Get.value, headers=headers, params=params, generic_input=generic_input) return GetObjectOutput(resp, progress_callback=data_transfer_listener, rate_limiter=rate_limiter, - enable_crc=self.enable_crc) + enable_crc=self.enable_crc, disable_encoding_meta=self.disable_encoding_meta) @high_latency_log def _get_object_by_part(self, bucket: str, key: str, part, file, if_match=None, data_transfer_listener=None, @@ -1958,7 +2178,7 @@ def create_multipart_upload(self, bucket: str, key: str, grant_read, grant_read_acp, grant_write_acp, key, meta, ssec_algorithm, ssec_key, ssec_key_md5, server_side_encryption, website_redirect_location, storage_class, - forbid_overwrite) + forbid_overwrite, self.disable_encoding_meta) params = {'uploads': ''} if encoding_type: @@ -2280,6 +2500,8 @@ def resumable_copy_object(self, bucket: str, key: str, src_bucket: str, src_key: generic_input=generic_input) return ResumableCopyObjectOutput(copy_resp=copy_output, bucket=bucket, key=key) size = head_out.content_length + if not copy_source_if_match: + copy_source_if_match = head_out.etag # if size == 0: # raise TosClientError('object size is 0, please use copy_object') @@ -2381,7 +2603,7 @@ def resumable_copy_object(self, bucket: str, key: str, src_bucket: str, src_key: record=record, size=size, ssec_key=ssec_key, ssec_key_md5=ssec_key_md5, ssec_algorithm=ssec_algorithm, - copy_source_if_match=head_out.etag, + copy_source_if_match=copy_source_if_match, upload_event_listener=copy_event_listener, cancel_hook=cancel_hook, copy_source_ssec_algorithm=copy_source_ssec_algorithm, @@ -2469,6 +2691,8 @@ def download_file(self, bucket: str, key: str, file_path: str, content_length = result.content_length if result.object_type == 'Symlink': content_length = int(result.header.get('x-tos-symlink-target-size')) + if not if_match: + if_match = result.etag if checkpoint_file: dir, file = os.path.split(checkpoint_file) @@ -2585,6 +2809,9 @@ def upload_part(self, bucket: str, key: str, upload_id: str, part_number: int, headers = _get_upload_part_headers(content_length, content_md5, server_side_encryption, ssec_algorithm, ssec_key, ssec_key_md5, traffic_limit) + if self.except100_continue_threshold > 0 and ( + content_length is None or content_length > self.except100_continue_threshold): + headers['Expect'] = "100-continue" if content: content = init_content(content) patch_content(content) @@ -2880,7 +3107,8 @@ def list_objects_type2(self, bucket: str, max_keys: int = 1000, encoding_type: str = None, list_only_once: bool = False, - generic_input: GenericInput = None) -> ListObjectType2Output: + generic_input: GenericInput = None, + fetch_meta: bool = None) -> ListObjectType2Output: """ 列举 bucket 中所有 objects 信息 :param bucket: 桶名 @@ -2893,18 +3121,20 @@ def list_objects_type2(self, bucket: str, :param encoding_type: 返回key编码类型 :param list_only_once: 是否只列举一次 :param generic_input: 通用请求参数,比如request_date设置签名UTC时间,代表本次请求Header中指定的 X-Tos-Date 头域 + :param fetch_meta: 是否获取对象的自定义meta :return: ListObjectType2Output """ params = _get_list_object_v2_params(delimiter, start_after, continuation_token, reverse, max_keys, - encoding_type, prefix) + encoding_type, prefix, fetch_meta) if list_only_once: resp = self._req(bucket=bucket, method=HttpMethodType.Http_Method_Get.value, params=params, generic_input=generic_input) - return ListObjectType2Output(resp) + return ListObjectType2Output(resp, self.disable_encoding_meta) iterator = ListObjectsIterator(self._req, max_keys, bucket=bucket, method=HttpMethodType.Http_Method_Get.value, - params=params, func='list_objects_type2') + params=params, func='list_objects_type2', + disable_encoding_meta=self.disable_encoding_meta) result_arr = [] for iterm in iterator: result_arr.append(iterm) @@ -2934,7 +3164,7 @@ def put_symlink(self, bucket: str, key: str, symlink_target_key: str, symlink_ta """ headers = _get_put_symlink_headers(symlink_target_key, symlink_target_bucket, acl, storage_class, meta, - forbid_overwrite) + forbid_overwrite, self.disable_encoding_meta) params = {'symlink': ''} @@ -2993,17 +3223,22 @@ def get_bucket_location(self, bucket: str, generic_input: GenericInput = None) - return GetBucketLocationOutput(resp) def put_bucket_lifecycle(self, bucket: str, rules: [], - generic_input: GenericInput = None) -> PutBucketLifecycleOutput: + generic_input: GenericInput = None, + allow_same_action_overlap: bool = None) -> PutBucketLifecycleOutput: """ 设置 bucket 的生命周期规则 :param bucket: 桶名 :param rules: 生命周期规则 :param generic_input: 通用请求参数,比如request_date设置签名UTC时间,代表本次请求Header中指定的 X-Tos-Date 头域 + :param allow_same_action_overlap: 是否支持前缀重叠 :return: PutBucketLifecycleOutput """ data = to_put_bucket_lifecycle(rules) data = json.dumps(data) - headers = {"Content-MD5": to_str(base64.b64encode(hashlib.md5(to_bytes(data)).digest()))} + if allow_same_action_overlap: + allow_same_action_overlap = str(allow_same_action_overlap).lower() + headers = {"Content-MD5": to_str(base64.b64encode(hashlib.md5(to_bytes(data)).digest())), + "x-tos-allow-same-action-overlap": allow_same_action_overlap} resp = self._req(bucket=bucket, method=HttpMethodType.Http_Method_Put.value, data=data, headers=headers, params={'lifecycle': ''}, generic_input=generic_input) @@ -3266,8 +3501,9 @@ def fetch_object(self, bucket: str, key: str, url: str, ssec_key_md5: str = None, meta: Dict = None, ignore_same_key: bool = False, - hex_md5: str = None, - generic_input: GenericInput = None) -> FetchObjectOutput: + hex_md5: str = None, # deprecated + generic_input: GenericInput = None, + content_md5: str = None) -> FetchObjectOutput: """ fetch 拉取对象 :param bucket: 桶名 @@ -3290,15 +3526,17 @@ def fetch_object(self, bucket: str, key: str, url: str, :param ssec_key_md5: 该头域表示加密目标对象使用的密钥的MD5值。MD5值用于消息完整性检查,确认加密密钥传输过程中没有出错。 :param meta: 对象元数据 :param ignore_same_key: 是否忽略相同的对象名 - :param hex_md5: 对象md5值 + :param hex_md5: deprecated,该参数不再使用,请用content_md5,deprecated :param generic_input: 通用请求参数,比如request_date设置签名UTC时间,代表本次请求Header中指定的 X-Tos-Date 头域 + :param content_md5: 对象md5值 :return: FetchObjectOutput """ - data = to_fetch_object(url, key, ignore_same_key, hex_md5) + data = to_fetch_object(url, key, ignore_same_key, hex_md5, content_md5) data = json.dumps(data) headers = _get_fetch_headers(storage_class, acl, grant_full_control, grant_read, grant_read_acp, - grant_write_acp, meta, ssec_algorithm, ssec_key, ssec_key_md5) + grant_write_acp, meta, ssec_algorithm, ssec_key, ssec_key_md5, + self.disable_encoding_meta) headers['Content-Length'] = str(len(data)) resp = self._req(bucket=bucket, key=key, params={'fetch': ''}, headers=headers, method=HttpMethodType.Http_Method_Post.value, data=data, generic_input=generic_input) @@ -3318,7 +3556,12 @@ def put_fetch_task(self, bucket: str, key: str, url: str, meta: Dict = None, ignore_same_key: bool = False, hex_md5: str = None, - generic_input: GenericInput = None) -> PutFetchTaskOutput: + generic_input: GenericInput = None, + content_md5: str = None, + callback_url: str = None, + callback_host: str = None, + callback_body: str = None, + callback_body_type: str = None) -> PutFetchTaskOutput: """ 添加 fetch 拉起对象任务 :param bucket: 桶名 @@ -3341,14 +3584,21 @@ def put_fetch_task(self, bucket: str, key: str, url: str, :param ssec_key_md5: 该头域表示加密目标对象使用的密钥的MD5值。MD5值用于消息完整性检查,确认加密密钥传输过程中没有出错。 :param meta: 对象元数据 :param ignore_same_key: 是否忽略相同的对象名 - :param hex_md5: 对象md5值 + :param hex_md5: deprecated,该参数不再使用,请用content_md5,deprecated :param generic_input: 通用请求参数,比如request_date设置签名UTC时间,代表本次请求Header中指定的 X-Tos-Date 头域 + :param content_md5: 对象md5值 + :param callback_url + :param callback_host + :param callback_body + :param callback_body_type :return: PutFetchTaskOutput """ headers = _get_fetch_headers(storage_class, acl, grant_full_control, grant_read, grant_read_acp, - grant_write_acp, meta, ssec_algorithm, ssec_key, ssec_key_md5) - data = to_fetch_object(url, key, ignore_same_key, hex_md5) + grant_write_acp, meta, ssec_algorithm, ssec_key, ssec_key_md5, + self.disable_encoding_meta) + data = to_put_fetch_object(url, key, ignore_same_key, hex_md5, content_md5, callback_url, callback_host, + callback_body, callback_body_type) data = json.dumps(data) resp = self._req(bucket=bucket, params={'fetchTask': ''}, headers=headers, @@ -3519,6 +3769,37 @@ def get_bucket_notification(self, bucket, generic_input: GenericInput = None) -> generic_input=generic_input) return GetBucketNotificationOutput(resp) + def put_bucket_notification_type2(self, bucket: str, rule: [] = None, version: str = None, + generic_input: GenericInput = None) -> PutBucketNotificationType2Output: + """设置桶事件通知规则 + + :param: bucket: 桶名 + :param: rules: 配置 + :param: version: 事件通知规则的版本号 + :param generic_input: 通用请求参数,比如request_date设置签名UTC时间,代表本次请求Header中指定的 X-Tos-Date 头域 + :return: PutBucketNotificationType2Output + """ + data = to_put_bucket_notification_type2(rule, version) + data = json.dumps(data) + headers = {"Content-MD5": to_str(base64.b64encode(hashlib.md5(to_bytes(data)).digest()))} + resp = self._req(bucket=bucket, method=HttpMethodType.Http_Method_Put.value, params={'notification_v2': ''}, + data=data, + headers=headers, + generic_input=generic_input) + return PutBucketNotificationType2Output(resp) + + def get_bucket_notification_type2(self, bucket, + generic_input: GenericInput = None) -> GetBucketNotificationType2Output: + """获取桶事件通知规则 + + :param: bucket: 桶名 + :param generic_input: 通用请求参数,比如request_date设置签名UTC时间,代表本次请求Header中指定的 X-Tos-Date 头域 + :return: GetBucketNotificationType2Output + """ + resp = self._req(bucket=bucket, method=HttpMethodType.Http_Method_Get.value, params={'notification_v2': ''}, + generic_input=generic_input) + return GetBucketNotificationType2Output(resp) + def put_bucket_custom_domain(self, bucket: str, rule: CustomDomainRule, generic_input: GenericInput = None) -> PutBucketCustomDomainOutput: """ 设置自定义域名 @@ -3682,6 +3963,46 @@ def delete_bucket_rename(self, bucket: str, generic_input: GenericInput = None): generic_input=generic_input) return DeleteBucketRenameOutput(resp) + def put_bucket_encryption(self, bucket: str, rule: BucketEncryptionRule, + generic_input: GenericInput = None) -> PutBucketReplicationOutput: + """ 设置桶加密规则 + + :param bucket: 桶名 + :param rule: 规则 + :param generic_input: 通用请求参数,比如request_date设置签名UTC时间,代表本次请求Header中指定的 X-Tos-Date 头域 + :return: PutBucketEncryptionOutput + """ + data = to_bucket_encrypt(rule) + data = json.dumps(data) + headers = {"Content-MD5": to_str(base64.b64encode(hashlib.md5(to_bytes(data)).digest()))} + resp = self._req(bucket=bucket, data=data, params={'encryption': ''}, headers=headers, + method=HttpMethodType.Http_Method_Put.value, generic_input=generic_input) + return PutBucketEncryptionOutput(resp) + + def get_bucket_encryption(self, bucket, generic_input: GenericInput = None) -> GetBucketReplicationOutput: + """ 获取桶加密规则 + + :param bucket: 桶名 + :param generic_input: 通用请求参数,比如request_date设置签名UTC时间,代表本次请求Header中指定的 X-Tos-Date 头域 + :return: GetBucketEncryptionOutput + """ + + resp = self._req(bucket=bucket, params={'encryption': ''}, method=HttpMethodType.Http_Method_Get.value, + generic_input=generic_input) + return GetBucketEncryptionOutput(resp) + + def delete_bucket_encryption(self, bucket: str, + generic_input: GenericInput = None) -> DeleteBucketReplicationOutput: + """ 删除桶加密规则 + + :param bucket: 桶名 + :param generic_input: 通用请求参数,比如request_date设置签名UTC时间,代表本次请求Header中指定的 X-Tos-Date 头域 + :return: DeleteBucketEncryptionOutput + """ + resp = self._req(bucket=bucket, method=HttpMethodType.Http_Method_Delete.value, params={'encryption': ''}, + generic_input=generic_input) + return DeleteBucketEncryptionOutput(resp) + def _req(self, bucket=None, key=None, method=None, data=None, headers=None, params=None, func=None, generic_input=None): consume_body() diff --git a/tos/consts.py b/tos/consts.py index cc15493..c66c188 100644 --- a/tos/consts.py +++ b/tos/consts.py @@ -41,3 +41,5 @@ SIGNATURE_QUERY_LOWER = "x-tos-signature" V4_PREFIX = "x-tos" +BUCKET_TYPE_FNS = "fns" +BUCKET_TYPE_HNS = "hns" \ No newline at end of file diff --git a/tos/enum.py b/tos/enum.py index 5f47bea..e20d43d 100644 --- a/tos/enum.py +++ b/tos/enum.py @@ -250,3 +250,17 @@ def convert_tier_type(s: str): if t.value == s: return t return TierType.Tier_Unknown + + +class ReplicationStatusType(Enum): + ReplicationStatusType_Pending = "PENDING" + ReplicationStatusType_Complete = "COMPLETE" + ReplicationStatusType_Failed = "FAILED" + ReplicationStatusType_Replica = "REPLICA" + + +def convert_replication_status_type(s: str): + for t in ReplicationStatusType: + if t.value == s: + return t + return "" diff --git a/tos/json_utils.py b/tos/json_utils.py index f8f540f..8b02110 100644 --- a/tos/json_utils.py +++ b/tos/json_utils.py @@ -1,6 +1,6 @@ from .consts import LAST_MODIFY_TIME_DATE_FORMAT from .models2 import Owner, RedirectAllRequestsTo, IndexDocument, ErrorDocument, RoutingRules, CustomDomainRule, \ - RealTimeLogConfiguration, RestoreJobParameters + RealTimeLogConfiguration, RestoreJobParameters, BucketEncryptionRule from .utils import check_enum_type @@ -86,6 +86,8 @@ def to_put_bucket_mirror_back(rules: []): info['Condition']['KeyPrefix'] = rule.condition.key_prefix if rule.condition.key_suffix: info['Condition']['KeySuffix'] = rule.condition.key_suffix + if rule.condition.http_method: + info['Condition']['HttpMethod'] = rule.condition.http_method if rule.redirect: info['Redirect'] = {} if rule.redirect.redirect_type: @@ -130,7 +132,15 @@ def to_put_bucket_mirror_back(rules: []): if rule.redirect.transform.replace_key_prefix.replace_with: info['Redirect']['Transform']['ReplaceKeyPrefix'][ 'ReplaceWith'] = rule.redirect.transform.replace_key_prefix.replace_with - + if rule.redirect.fetch_header_to_meta_data_rules: + meta_data_rules = [] + for r in rule.redirect.fetch_header_to_meta_data_rules: + meta_data_rule = { + 'SourceHeader': r.source_header, + 'MetaDataSuffix': r.meta_data_suffix + } + meta_data_rules.append(meta_data_rule) + info['Redirect']['FetchHeaderToMetaDataRules'] = meta_data_rules arr.append(info) data['Rules'] = arr @@ -161,9 +171,14 @@ def to_put_bucket_lifecycle(rules: []): info['Expiration']['Date'] = rule.expiration.date.strftime(LAST_MODIFY_TIME_DATE_FORMAT) if rule.no_current_version_expiration: - info['NoncurrentVersionExpiration'] = { - 'NoncurrentDays': rule.no_current_version_expiration.no_current_days - } + info['NoncurrentVersionExpiration'] = {} + if rule.no_current_version_expiration.no_current_days: + info['NoncurrentVersionExpiration'][ + 'NoncurrentDays'] = rule.no_current_version_expiration.no_current_days + if rule.no_current_version_expiration.non_current_date: + info['NoncurrentVersionExpiration'][ + 'NoncurrentDate'] = rule.no_current_version_expiration.non_current_date.strftime( + LAST_MODIFY_TIME_DATE_FORMAT) if rule.abort_in_complete_multipart_upload and rule.abort_in_complete_multipart_upload.days_after_init: info['AbortIncompleteMultipartUpload'] = { @@ -198,18 +213,29 @@ def to_put_bucket_lifecycle(rules: []): non_current_version_transition_info = {} if tr.non_current_days: non_current_version_transition_info['NoncurrentDays'] = tr.non_current_days - if tr.storage_class: non_current_version_transition_info['StorageClass'] = tr.storage_class.value + if tr.non_current_date: + non_current_version_transition_info['NoncurrentDate'] = tr.non_current_date.strftime( + LAST_MODIFY_TIME_DATE_FORMAT) current_version_transition_arr.append(non_current_version_transition_info) info['NoncurrentVersionTransitions'] = current_version_transition_arr + if rule.filter: + info['Filter'] = {} + if rule.filter.object_size_greater_than: + info['Filter']['ObjectSizeGreaterThan'] = rule.filter.object_size_greater_than + if rule.filter.greater_than_include_equal.value: + info['Filter']['GreaterThanIncludeEqual'] = rule.filter.greater_than_include_equal.value + if rule.filter.object_size_less_than: + info['Filter']['ObjectSizeLessThan'] = rule.filter.object_size_less_than + if rule.filter.less_than_include_equal.value: + info['Filter']['LessThanIncludeEqual'] = rule.filter.less_than_include_equal.value arr.append(info) data['Rules'] = arr - return data @@ -226,18 +252,34 @@ def to_put_tagging(tags: []): return data -def to_fetch_object(url: str, object: str = None, ignore_same_key=None, content_md5=None): +def to_fetch_object(url: str, object: str = None, ignore_same_key=None, hex_md5=None, content_md5=None): info = {'URL': url} if object: info['Object'] = object if ignore_same_key: info['IgnoreSameKey'] = ignore_same_key + if not content_md5 and hex_md5: + content_md5 = hex_md5 if content_md5: info['ContentMD5'] = content_md5 return info +def to_put_fetch_object(url: str, object: str = None, ignore_same_key=None, hex_md5=None, content_md5=None, + callback_url=None, callback_host=None, callback_body=None, callback_body_type=None): + info = to_fetch_object(url, object, ignore_same_key, hex_md5, content_md5, ) + if callback_url: + info['CallbackUrl'] = callback_url + if callback_host: + info['CallbackHost'] = callback_host + if callback_body: + info['CallbackBody'] = callback_body + if callback_body_type: + info['CallbackBodyType'] = callback_body_type + return info + + def to_put_replication(role: str, rules: []): info = {} if role: @@ -430,3 +472,61 @@ def to_restore_object(days: int, tier: RestoreJobParameters): info['RestoreJobParameters'] = {"Tier": tier.tier.value} return info + + +def to_bucket_encrypt(rule: BucketEncryptionRule): + info = {} + if rule and rule.apply_server_side_encryption_by_default: + info['Rule'] = {} + info['Rule']['ApplyServerSideEncryptionByDefault'] = { + 'SSEAlgorithm': rule.apply_server_side_encryption_by_default.sse_algorithm, + 'KMSMasterKeyID': rule.apply_server_side_encryption_by_default.kms_master_key_id + } + return info + + +def to_put_bucket_notification_type2(rules: [], version: str): + info = {} + if version: + info['Version'] = version + if rules: + info['Rules'] = [] + for rule in rules: + config = {'RuleID': rule.rule_id} + if rule.events: + config['Events'] = rule.events + if rule.filter and rule.filter.tos_key and rule.filter.tos_key.filter_rules: + config['Filter'] = {'TOSKey': {}} + config_filter_rules = [] + for filter_rule in rule.filter.tos_key.filter_rules: + config_filter_rules.append({ + 'Name': filter_rule.name, + 'Value': filter_rule.value, + }) + config['Filter']['TOSKey'] = {'FilterRules': config_filter_rules} + if rule.destination: + config['Destination'] = {} + if rule.destination.rocket_mq: + rocket_mqs = [] + for r in rule.destination.rocket_mq: + rocket_mq = {} + if r.role: + rocket_mq['Role'] = r.role + if r.instance_id: + rocket_mq['InstanceID'] = r.instance_id + if r.topic: + rocket_mq['Topic'] = r.topic + if r.access_key_id: + rocket_mq['AccessKeyID'] = r.access_key_id + rocket_mqs.append(rocket_mq) + config['Destination']['RocketMQ'] = rocket_mqs + if rule.destination.ve_faas: + ve_faas = [] + for r in rule.destination.ve_faas: + ve_faas.append({ + 'FunctionID': r.function_id + }) + config['Destination']['VeFaaS'] = ve_faas + + info['Rules'].append(config) + return info diff --git a/tos/models2.py b/tos/models2.py index d25fef7..7676dc3 100644 --- a/tos/models2.py +++ b/tos/models2.py @@ -1,20 +1,21 @@ +import json import urllib.parse from datetime import datetime - -from requests.structures import CaseInsensitiveDict +from typing import List from . import utils from .enum import CannedType, GranteeType, PermissionType, StorageClassType, RedirectType, StatusType, \ StorageClassInheritDirectiveType, VersioningStatusType, ProtocolType, CertStatus, AzRedundancyType, \ convert_storage_class_type, convert_az_redundancy_type, convert_permission_type, convert_grantee_type, \ convert_canned_type, convert_redirect_type, convert_status_type, convert_versioning_status_type, \ - convert_protocol_type, convert_cert_status, TierType, convert_tier_type, ACLType -from .consts import CHUNK_SIZE + convert_protocol_type, convert_cert_status, TierType, convert_tier_type, ACLType, convert_replication_status_type +from .consts import CHUNK_SIZE, BUCKET_TYPE_HNS from .exceptions import TosClientError, make_server_error_with_exception from .models import CommonPrefixInfo, DeleteMarkerInfo from .utils import (get_etag, get_value, meta_header_decode, parse_gmt_time_to_utc_datetime, - parse_modify_time_to_utc_datetime, _param_to_quoted_query, _make_virtual_host_url) + parse_modify_time_to_utc_datetime, _param_to_quoted_query, _make_virtual_host_url, + convert_meta) class ResponseInfo(object): @@ -38,6 +39,25 @@ def __init__(self, resp): self.storage_class = get_value(self.header, "x-tos-storage-class", lambda x: convert_storage_class_type(x)) self.az_redundancy = get_value(self.header, "x-tos-az-redundancy", lambda x: convert_az_redundancy_type(x)) self.project_name = get_value(self.header, "x-tos-project-name") + self.bucket_type = get_value(self.header, "x-tos-bucket-type") + + +class FileStatusOutput(ResponseInfo): + def __init__(self, key, bucket_type, resp): + super(FileStatusOutput, self).__init__(resp) + if bucket_type == BUCKET_TYPE_HNS: + self.key = key + self.size = get_value(resp.headers, 'Content-Length') + self.last_modified = get_value(resp.headers, 'Last-Modified') + self.crc64 = get_value(resp.headers, 'x-tos-hash-crc64ecma') + self.crc32 = get_value(resp.headers, 'x-tos-hash-crc32c') + return + data = json.loads(resp.read()) + self.key = get_value(data, 'Key') + self.size = get_value(data, 'Size') + self.last_modified = get_value(data, 'LastModified') + self.crc32 = get_value(data, 'CRC32') + self.crc64 = get_value(data, 'CRC64') class DeleteBucketOutput(ResponseInfo): @@ -198,7 +218,7 @@ def __init__(self, grantee: Grantee, permission: PermissionType): class HeadObjectOutput(ResponseInfo): - def __init__(self, resp): + def __init__(self, resp, disable_encoding_meta: bool = None): super(HeadObjectOutput, self).__init__(resp) self.etag = get_etag(resp.headers) self.version_id = get_value(resp.headers, "x-tos-version-id") @@ -210,17 +230,17 @@ def __init__(self, resp): self.restore = get_value(resp.headers, "x-tos-restore") self.restore_expiry_days = get_value(resp.headers, "x-tos-restore-expiry-days", lambda x: int(x)) self.restore_tier = get_value(resp.headers, "x-tos-restore-tier", lambda x: convert_tier_type(x)) - self.meta = CaseInsensitiveDict() + self.object_type = get_value(resp.headers, "x-tos-object-type") self.symlink_target_size = get_value(resp.headers, "x-tos-symlink-target-size", lambda x: int(x)) if not self.object_type: self.object_type = "Normal" - - meta = {} + self.meta = {} for k in resp.headers: if k.startswith('x-tos-meta-'): - meta[k[11:]] = resp.headers[k] - self.meta = meta_header_decode(meta) + self.meta[k[11:]] = resp.headers[k] + if not disable_encoding_meta: + self.meta = meta_header_decode(self.meta) self.last_modified = get_value(resp.headers, 'last-modified') if self.last_modified: @@ -239,15 +259,20 @@ def __init__(self, resp): self.cache_control = get_value(resp.headers, "cache-control") content_dis_str = get_value(resp.headers, 'content-disposition') if content_dis_str: - self.content_disposition = urllib.parse.unquote(get_value(resp.headers, "content-disposition")) + if disable_encoding_meta: + self.content_disposition = get_value(resp.headers, "content-disposition") + else: + self.content_disposition = urllib.parse.unquote(get_value(resp.headers, "content-disposition")) else: self.content_disposition = '' self.content_encoding = get_value(resp.headers, "content-encoding") self.content_language = get_value(resp.headers, "content-language") + self.replication_status = get_value(resp.headers, "x-tos-replication-status", + lambda x: convert_replication_status_type(x)) class ListObjectsOutput(ResponseInfo): - def __init__(self, resp): + def __init__(self, resp, disable_encoding_meta: bool = None): super(ListObjectsOutput, self).__init__(resp) data = resp.json_read() @@ -285,7 +310,8 @@ def __init__(self, resp): size=get_value(object, 'Size', int), storage_class=get_value(object, 'StorageClass', lambda x: convert_storage_class_type(x)), hash_crc64_ecma=get_value(object, "HashCrc64ecma", lambda x: int(x)), - object_type=get_value(object, 'Type') + object_type=get_value(object, 'Type'), + meta=convert_meta(get_value(object, "UserMeta"), disable_encoding_meta) ) owner_info = get_value(object, 'Owner') if owner_info: @@ -298,7 +324,7 @@ def __init__(self, resp): class ListObjectsIterator(object): def __init__(self, req_func, max_key, bucket=None, key=None, method=None, data=None, headers=None, params=None, - func=None): + func=None, disable_encoding_meta: bool = None): self.req = req_func self.bucket = bucket self.key = key @@ -310,6 +336,7 @@ def __init__(self, req_func, max_key, bucket=None, key=None, method=None, data=N self.max_key = max_key self.number = 0 self.is_truncated = True + self.disable_encoding_meta = disable_encoding_meta def __iter__(self): return self @@ -321,7 +348,7 @@ def next(self): if self.is_truncated and self.new_max_key > 0: resp = self.req(bucket=self.bucket, method=self.method, key=self.key, data=self.data, headers=self.headers, params=self.params) - info = ListObjectType2Output(resp) + info = ListObjectType2Output(resp, self.disable_encoding_meta) self.is_truncated = info.is_truncated self.number += len(info.contents) + len(info.common_prefixes) self.params['max-keys'] = self.new_max_key @@ -336,7 +363,7 @@ def new_max_key(self): class ListObjectType2Output(ResponseInfo): - def __init__(self, resp): + def __init__(self, resp, disable_encoding_meta: bool = None): super(ListObjectType2Output, self).__init__(resp) data = resp.json_read() self.name = get_value(data, 'Name') @@ -374,7 +401,8 @@ def __init__(self, resp): size=get_value(object, 'Size', int), storage_class=get_value(object, 'StorageClass', lambda x: convert_storage_class_type(x)), hash_crc64_ecma=get_value(object, "HashCrc64ecma", lambda x: int(x)), - object_type=get_value(object, "Type") + object_type=get_value(object, "Type"), + meta=convert_meta(get_value(object, "UserMeta"), disable_encoding_meta) ) owner_info = get_value(object, 'Owner') if owner_info: @@ -400,7 +428,7 @@ def combine(self, listObjectType2Outputs: []): class ListedObject(object): def __init__(self, key: str, last_modified: datetime, etag: str, size: int, storage_class: StorageClassType, - hash_crc64_ecma: str, owner: Owner = None, object_type=None): + hash_crc64_ecma: str, owner: Owner = None, object_type=None, meta=None): self.key = key self.last_modified = last_modified self.etag = etag @@ -409,6 +437,7 @@ def __init__(self, key: str, last_modified: datetime, etag: str, size: int, stor self.storage_class = storage_class self.hash_crc64_ecma = hash_crc64_ecma self.object_type = object_type + self.meta = meta def __str__(self): info = {"key": self.key, "last_modified": self.last_modified, "etag": self.etag, "size": self.size, @@ -426,15 +455,15 @@ def __init__(self, prefix: str): class ListedObjectVersion(ListedObject): def __init__(self, key: str, last_modified: datetime, etag: str, size: int, storage_class: StorageClassType, hash_crc64_ecma, owner: Owner = None, version_id: str = None, is_latest: bool = None, - object_type=None): + object_type=None, meta=None): super(ListedObjectVersion, self).__init__(key, last_modified, etag, size, storage_class, hash_crc64_ecma, owner, - object_type) + object_type, meta) self.version_id = version_id self.is_latest = is_latest class ListObjectVersionsOutput(ResponseInfo): - def __init__(self, resp): + def __init__(self, resp, disable_encoding_meta: bool = None): super(ListObjectVersionsOutput, self).__init__(resp) self.name = '' self.prefix = '' @@ -488,7 +517,8 @@ def __init__(self, resp): version_id=get_value(object, 'VersionId'), hash_crc64_ecma=get_value(object, "HashCrc64ecma", lambda x: int(x)), is_latest=get_value(object, "IsLatest", lambda x: bool(x)), - object_type=get_value(object, 'Type') + object_type=get_value(object, 'Type'), + meta=convert_meta(get_value(object, "UserMeta"), disable_encoding_meta) ) owner_info = get_value(object, 'Owner') if owner_info: @@ -562,8 +592,8 @@ def __init__(self, resp): class GetObjectOutput(HeadObjectOutput): - def __init__(self, resp, progress_callback=None, rate_limiter=None, enable_crc=False, discard=0): - super(GetObjectOutput, self).__init__(resp) + def __init__(self, resp, progress_callback=None, rate_limiter=None, enable_crc=False, disable_encoding_meta=0): + super(GetObjectOutput, self).__init__(resp, disable_encoding_meta) self.enable_crc = enable_crc self.content_range = get_value(resp.headers, "content-range") self.content = resp @@ -619,6 +649,14 @@ def __init__(self, resp): self.hash_crc64_ecma = get_value(resp.headers, "x-tos-hash-crc64ecma", lambda x: int(x)) +class ModifyObjectOutput(AppendObjectOutput): + def __init__(self, resp): + super(ModifyObjectOutput, self).__init__(resp) + self.version_id = get_value(resp.headers, "x-tos-version-id") + self.next_modify_offset = get_value(resp.headers, "x-tos-next-modify-offset", lambda x: int(x)) + self.hash_crc64_ecma = get_value(resp.headers, "x-tos-hash-crc64ecma", lambda x: int(x)) + + class CreateMultipartUploadOutput(ResponseInfo): def __init__(self, resp): super(CreateMultipartUploadOutput, self).__init__(resp) @@ -830,10 +868,12 @@ def __init__(self, resp): class Condition(object): - def __init__(self, http_code: int = None, key_prefix: str = None, key_suffix: str = None): + def __init__(self, http_code: int = None, key_prefix: str = None, key_suffix: str = None, + http_method: List[str] = None): self.http_code = http_code self.key_prefix = key_prefix self.key_suffix = key_suffix + self.http_method = http_method class ReplaceKeyPrefix(object): @@ -872,7 +912,8 @@ def __init__(self, pass_all: bool = None, pass_headers: [] = None, remove: [] = class Redirect(object): def __init__(self, redirect_type: RedirectType = None, public_source: PublicSource = None, fetch_source_on_redirect: bool = None, pass_query: bool = None, follow_redirect: bool = None, - mirror_header: MirrorHeader = None, transform: Transform = None): + mirror_header: MirrorHeader = None, transform: Transform = None, + fetch_header_to_meta_data_rules: list = None): self.redirect_type = redirect_type self.fetch_source_on_redirect = fetch_source_on_redirect self.public_source = public_source @@ -880,6 +921,13 @@ def __init__(self, redirect_type: RedirectType = None, public_source: PublicSour self.follow_redirect = follow_redirect self.mirror_header = mirror_header self.transform = transform + self.fetch_header_to_meta_data_rules = fetch_header_to_meta_data_rules + + +class FetchHeaderToMetaDataRule(object): + def __init__(self, source_header: str = None, meta_data_suffix: str = None): + self.source_header = source_header + self.meta_data_suffix = meta_data_suffix class Rule(object): @@ -1041,8 +1089,9 @@ def __init__(self, days: int = None, date: datetime = None): class BucketLifeCycleNoCurrentVersionExpiration(object): - def __init__(self, no_current_days: int = None): + def __init__(self, no_current_days: int = None, non_current_date: datetime = None): self.no_current_days = no_current_days + self.non_current_date = non_current_date class BucketLifeCycleAbortInCompleteMultipartUpload(object): @@ -1058,9 +1107,20 @@ def __init__(self, storage_class: StorageClassType = None, days: int = None, dat class BucketLifeCycleNonCurrentVersionTransition(object): - def __init__(self, storage_class: StorageClassType = None, non_current_days: int = None): + def __init__(self, storage_class: StorageClassType = None, non_current_days: int = None, + non_current_date: datetime = None): self.storage_class = storage_class self.non_current_days = non_current_days + self.non_current_date = non_current_date + + +class BucketLifecycleFilter(object): + def __init__(self, object_size_greater_than: int = None, greater_than_include_equal: StatusType = None, + object_size_less_than: int = None, less_than_include_equal: StatusType = None): + self.object_size_greater_than = object_size_greater_than + self.object_size_less_than = object_size_less_than + self.greater_than_include_equal = greater_than_include_equal + self.less_than_include_equal = less_than_include_equal class BucketLifeCycleRule(object): @@ -1073,7 +1133,8 @@ def __init__(self, status: StatusType = None, transitions: [] = None, non_current_version_transitions: [] = None, id: str = None, - prefix: str = None): + prefix: str = None, + filter: BucketLifecycleFilter = None): self.id = id self.prefix = prefix self.status = status @@ -1083,6 +1144,7 @@ def __init__(self, status: StatusType = None, self.tags = tags self.transitions = transitions self.non_current_version_transitions = non_current_version_transitions + self.filter = filter class PutBucketLifecycleOutput(ResponseInfo): @@ -1095,6 +1157,7 @@ def __init__(self, resp): self.rules = [] super(GetBucketLifecycleOutput, self).__init__(resp) data = resp.json_read() + self.allow_same_action_overlap = get_value(resp.headers, 'x-tos-allow-same-action-overlap', lambda x: bool(x)) rules_json = get_value(data, 'Rules') or [] for rule_json in rules_json: rule = BucketLifeCycleRule() @@ -1109,6 +1172,7 @@ def __init__(self, resp): tags_json = get_value(rule_json, 'Tags') or [] transitions_json = get_value(rule_json, 'Transitions') or [] non_current_version_transitions_json = get_value(rule_json, 'NoncurrentVersionTransitions') or [] + filter_json = get_value(rule_json, 'Filter') if expiration_json: bucket_expiration = BucketLifeCycleExpiration() @@ -1123,6 +1187,9 @@ def __init__(self, resp): tr = BucketLifeCycleNonCurrentVersionTransition() tr.storage_class = get_value(vt, 'StorageClass', lambda x: convert_storage_class_type(x)) tr.non_current_days = get_value(vt, 'NoncurrentDays', int) + if get_value(vt, 'NoncurrentDate'): + tr.non_current_date = parse_modify_time_to_utc_datetime( + get_value(vt, 'NoncurrentDate')) rule.non_current_version_transitions.append(tr) if transitions_json: @@ -1153,7 +1220,24 @@ def __init__(self, resp): if non_current_version_expiration_json: exp = BucketLifeCycleNoCurrentVersionExpiration() exp.no_current_days = get_value(non_current_version_expiration_json, 'NoncurrentDays', int) + if get_value(non_current_version_expiration_json, 'NoncurrentDate'): + exp.non_current_date = parse_modify_time_to_utc_datetime( + get_value(non_current_version_expiration_json, 'NoncurrentDate')) rule.no_current_version_expiration = exp + + if filter_json: + lifecycle_filter = BucketLifecycleFilter() + if get_value(filter_json, 'ObjectSizeGreaterThan'): + lifecycle_filter.object_size_greater_than = get_value(filter_json, 'ObjectSizeGreaterThan', int) + if get_value(filter_json, 'ObjectSizeLessThan'): + lifecycle_filter.object_size_less_than = get_value(filter_json, 'ObjectSizeLessThan', int) + if get_value(filter_json, 'GreaterThanIncludeEqual'): + lifecycle_filter.greater_than_include_equal = get_value(filter_json, 'GreaterThanIncludeEqual', + lambda x: convert_status_type(x)) + if get_value(filter_json, 'LessThanIncludeEqual'): + lifecycle_filter.less_than_include_equal = get_value(filter_json, 'LessThanIncludeEqual', + lambda x: convert_status_type(x)) + rule.filter = lifecycle_filter self.rules.append(rule) @@ -1194,7 +1278,8 @@ def __init__(self, resp): condition = Condition( http_code=get_value(cond, 'HttpCode', int), key_prefix=get_value(cond, 'KeyPrefix', str), - key_suffix=get_value(cond, 'KeySuffix', str) + key_suffix=get_value(cond, 'KeySuffix', str), + http_method=get_value(cond, 'HttpMethod', list) ) if red: redirect = Redirect() @@ -1229,6 +1314,14 @@ def __init__(self, resp): replace_with=get_value(get_value(get_value(red, 'Transform'), 'ReplaceKeyPrefix'), 'ReplaceWith') ) + if get_value(red, 'FetchHeaderToMetaDataRules'): + meta_data_rules = [] + for r in get_value(red, 'FetchHeaderToMetaDataRules'): + meta_data_rules.append(FetchHeaderToMetaDataRule( + source_header=get_value(r, 'SourceHeader'), + meta_data_suffix=get_value(r, 'MetaDataSuffix'), + )) + redirect.fetch_header_to_meta_data_rules = meta_data_rules r = Rule(id=id, condition=condition, redirect=redirect) self.rules.append(r) @@ -1372,36 +1465,35 @@ def __init__(self, bucket: str = None, key: str = None, url: str = None, ignore_ class GetFetchTaskOutput(ResponseInfo): - def __init__(self, resp): + def __init__(self, resp, disable_encoding_meta: bool = None): super(GetFetchTaskOutput, self).__init__(resp) data = resp.json_read() self.state = get_value(data, 'State') self.err = get_value(data, 'Err') + self.task = None task = get_value(data, 'Task') - meta = {} - for m in task.get('UserMeta', []): - meta[m['Key']] = m['Value'] - meta = meta_header_decode(meta) - self.task = FetchTask( - bucket=get_value(task, 'Bucket'), - key=get_value(task, 'Key'), - url=get_value(task, 'URL'), - ignore_same_key=get_value(task, 'IgnoreSameKey', lambda x: bool(x)), - callback_url=get_value(task, 'CallBackURL'), - callback_host=get_value(task, 'CallbackHost'), - callback_body=get_value(task, 'CallBackBody'), - callback_body_type=get_value(task, 'CallBackBodyType'), - storage_class=get_value(task, 'StorageClass', lambda x: StorageClassType(x)), - acl=get_value(task, 'Acl', lambda x: ACLType(x)), - grant_full_control=get_value(task, 'GrantFullControl'), - grant_read=get_value(task, 'GrantRead'), - grant_read_acp=get_value(task, 'GrantReadAcp'), - grant_write_acp=get_value(task, 'GrantWriteAcp'), - ssec_algorithm=get_value(task, 'SSECAlgorithm'), - ssec_key=get_value(task, 'SSECKey'), - ssec_key_md5=get_value(task, 'SSECKeyMd5'), - meta=meta - ) + if task: + meta = convert_meta(task.get('UserMeta'), disable_encoding_meta) + self.task = FetchTask( + bucket=get_value(task, 'Bucket'), + key=get_value(task, 'Key'), + url=get_value(task, 'URL'), + ignore_same_key=get_value(task, 'IgnoreSameKey', lambda x: bool(x)), + callback_url=get_value(task, 'CallbackURL'), + callback_host=get_value(task, 'CallbackHost'), + callback_body=get_value(task, 'CallbackBody'), + callback_body_type=get_value(task, 'CallbackBodyType'), + storage_class=get_value(task, 'StorageClass', lambda x: convert_storage_class_type(x)), + acl=get_value(task, 'Acl', lambda x: ACLType(x)), + grant_full_control=get_value(task, 'GrantFullControl'), + grant_read=get_value(task, 'GrantRead'), + grant_read_acp=get_value(task, 'GrantReadAcp'), + grant_write_acp=get_value(task, 'GrantWriteAcp'), + ssec_algorithm=get_value(task, 'SSECAlgorithm'), + ssec_key=get_value(task, 'SSECKey'), + ssec_key_md5=get_value(task, 'SSECKeyMd5'), + meta=meta + ) class PutBucketReplicationOutput(ResponseInfo): @@ -1896,3 +1988,135 @@ def __init__(self, resp): class GenericInput(object): def __init__(self, request_date: datetime = None): self.request_date = request_date + + +class ApplyServerSideEncryptionByDefault(object): + def __init__(self, sse_algorithm: str = None, kms_master_key_id: str = None): + self.sse_algorithm = sse_algorithm + self.kms_master_key_id = kms_master_key_id + + +class BucketEncryptionRule(object): + def __init__(self, apply_server_side_encryption_by_default: ApplyServerSideEncryptionByDefault = None): + self.apply_server_side_encryption_by_default = apply_server_side_encryption_by_default + + +class PutBucketEncryptionOutput(ResponseInfo): + def __init__(self, resp): + super(PutBucketEncryptionOutput, self).__init__(resp) + + +class GetBucketEncryptionOutput(ResponseInfo): + def __init__(self, resp): + super(GetBucketEncryptionOutput, self).__init__(resp) + self.rule = None + data = resp.json_read() + rule = get_value(get_value(data, 'Rule'), 'ApplyServerSideEncryptionByDefault') + if rule: + self.rule = BucketEncryptionRule( + apply_server_side_encryption_by_default=ApplyServerSideEncryptionByDefault( + sse_algorithm=get_value(rule, 'SSEAlgorithm'), + kms_master_key_id=get_value(rule, 'KMSMasterKeyID') + ) + ) + + +class DeleteBucketEncryptionOutput(ResponseInfo): + def __init__(self, resp): + super(DeleteBucketEncryptionOutput, self).__init__(resp) + + +class NotificationFilterRule(object): + def __init__(self, name: str = None, value: str = None): + self.name = name + self.value = value + + +class NotificationFilterKey(object): + def __init__(self, filter_rules: [] = None): + self.filter_rules = filter_rules + + +class NotificationFilter(object): + def __init__(self, tos_key: NotificationFilterKey = None): + self.tos_key = tos_key + + +class DestinationRocketMQ(object): + def __init__(self, role: str = None, instance_id: str = None, topic: str = None, access_key_id: str = None): + self.role = role + self.instance_id = instance_id + self.topic = topic + self.access_key_id = access_key_id + + +class DestinationVeFaaS(object): + def __init__(self, function_id: str = None): + self.function_id = function_id + + +class NotificationDestination(object): + def __init__(self, rocket_mq: [] = None, ve_faas: [] = None): + self.rocket_mq = rocket_mq + self.ve_faas = ve_faas + + +class NotificationRule(object): + def __init__(self, rule_id: str = None, events: [] = None, filter: NotificationFilter = None, + destination: NotificationDestination = None): + self.rule_id = rule_id + self.events = events + self.filter = filter + self.destination = destination + + +class PutBucketNotificationType2Output(ResponseInfo): + def __init__(self, resp): + super(PutBucketNotificationType2Output, self).__init__(resp) + + +class GetBucketNotificationType2Output(ResponseInfo): + def __init__(self, resp): + super(GetBucketNotificationType2Output, self).__init__(resp) + data = resp.json_read() + self.version = get_value(data, 'Version') + self.rules = [] + rules = get_value(data, 'Rules') or [] + for rule in rules: + config = NotificationRule( + rule_id=get_value(rule, 'RuleId'), + events=get_value(rule, 'Events'), + ) + if get_value(rule, 'Destination'): + config.destination = NotificationDestination() + destination_json = get_value(rule, 'Destination') + if get_value(destination_json, 'RocketMQ'): + rocket_mqs = [] + for r in get_value(destination_json, 'RocketMQ'): + rocket_mqs.append(DestinationRocketMQ( + role=get_value(r, 'Role'), + instance_id=get_value(r, 'InstanceId'), + topic=get_value(r, 'Topic'), + access_key_id=get_value(r, 'AccessKeyId') + )) + config.destination.rocket_mq = rocket_mqs + if get_value(destination_json, 'VeFaaS'): + ve_faas = [] + for r in get_value(destination_json, 'VeFaaS'): + ve_faas.append(DestinationVeFaaS(function_id=get_value(r, 'FunctionId'))) + config.destination.ve_faas = ve_faas + if get_value(rule, 'Filter') and get_value(get_value(rule, 'Filter'), 'TOSKey') and get_value( + get_value(get_value(rule, 'Filter'), 'TOSKey'), 'FilterRules'): + filter_rules = get_value(get_value(get_value(rule, 'Filter'), 'TOSKey'), 'FilterRules') + config_rules = [] + for r in filter_rules: + config_rules.append(NotificationFilterRule( + name=get_value(r, 'Name'), + value=get_value(r, 'Value') + )) + config.filter = NotificationFilter( + tos_key=NotificationFilterKey( + filter_rules=config_rules + ) + ) + self.rules.append(config) diff --git a/tos/safe_map.py b/tos/safe_map.py new file mode 100644 index 0000000..ca44bd5 --- /dev/null +++ b/tos/safe_map.py @@ -0,0 +1,52 @@ +import threading +import time + + +class SafeMapFIFO: + def __init__(self, max_length: int = 100, default_expiration_sec: int = 60): + self.map = {} + self.lock = threading.Lock() + self.max_length = max_length + self.default_expiration_sec = default_expiration_sec + + def _clean_expired_keys(self): + current_time = time.time() + with self.lock: + keys_to_delete = [key for key, value in self.map.items() if + current_time - value['insert_time'] > value['expiration']] + for key in keys_to_delete: + del self.map[key] + + def put(self, key, value, expiration_time=None): + with self.lock: + if len(self.map) >= self.max_length: + # 达到最大长度,删除最早插入的元素 + oldest_key = min(self.map.keys(), key=lambda k: self.map[k]['insert_time']) + del self.map[oldest_key] + now = time.time() + expiration = expiration_time if expiration_time else self.default_expiration_sec + self.map[key] = {'value': value, 'insert_time': now, 'expiration': expiration} + + def get(self, key): + with self.lock: + if key in self.map: + item = self.map[key] + if time.time() - item['insert_time'] <= item['expiration']: + return item['value'] + else: + del self.map[key] # 过期删除 + return None + + def delete(self, key): + with self.lock: + if key in self.map: + del self.map[key] + + def has_key(self, key): + with self.lock: + return key in self.map + + def items(self): + with self.lock: + current_time = time.time() + return [(k, v['value']) for k, v in self.map.items() if current_time - v['insert_time'] <= v['expiration']] diff --git a/tos/utils.py b/tos/utils.py index a883a2d..2785cca 100644 --- a/tos/utils.py +++ b/tos/utils.py @@ -8,7 +8,9 @@ import sys import threading import time +import urllib.parse from hashlib import sha256 +from io import StringIO from urllib.parse import unquote_to_bytes, quote import crcmod as crcmod @@ -26,6 +28,7 @@ from .exceptions import TosClientError from .log import get_logger from .mine_type import TYPES_MAP +from io import StringIO REGION_MAP = { 'cn-beijing': 'tos-cn-beijing.volces.com', @@ -328,6 +331,33 @@ def meta_header_decode(headers): return decode_headers +def convert_meta(meta_list, disable_encoding_meta=None): + if not meta_list: + return meta_list + meta = {} + for m in meta_list: + meta[m['Key']] = m['Value'] + if disable_encoding_meta: + return meta + return meta_header_decode(meta) + + +def content_disposition_encode(s): + if not s: + return s + content_dispositions = s.split(';') + result = [] + for content_disposition in content_dispositions: + if "=" in content_disposition: + i = content_disposition.index("=") + if content_disposition[:i].strip().lower() == "filename": + value = urllib.parse.quote(content_disposition[i + 1:], safe=" \"\'") + result.append(content_disposition[:i] + "=" + value) + continue + result.append(content_disposition) + return ";".join(result) + + def makedir_p(dirpath): try: os.makedirs(dirpath) @@ -587,6 +617,10 @@ def init_content(data, can_reset=None, init_offset=None): if not (hasattr(data, 'seek') and hasattr(data, 'tell')): return add_Background_func(data, can_reset=True) + if isinstance(data, StringIO): + data = data.read() + return add_Background_func(data, can_reset=True) + # 兜底不可reset return add_Background_func(data, can_reset=False) @@ -600,6 +634,7 @@ def add_Background_func(data, can_reset=False, init_offset=None, size=None): 3. size 为空 但具备 __iter__ 直接封装为_IterableAdapter、通过http chuck方式发送 4. size 不为空,直接封装为 _ReaderAdapter """ + data = to_bytes(data) if size is None: size = _get_size(data) @@ -656,6 +691,8 @@ def add_crc_func(data, init_crc=0, discard=0, size=None, can_reset=False, is_res def _get_size(data): + if isinstance(data, StringIO): + return len(to_bytes(data.getvalue()[data.tell():])) if hasattr(data, '__len__') or hasattr(data, 'len') or (hasattr(data, 'seek') and hasattr(data, 'tell')) or hasattr( data, 'read'): if hasattr(data, '__len__'):