Skip to content

Commit

Permalink
update readme
Browse files Browse the repository at this point in the history
  • Loading branch information
huangzbaws committed Mar 25, 2020
1 parent 76eea16 commit 46a16bd
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 11 deletions.
9 changes: 5 additions & 4 deletions cluster/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ Amazon S3 新增也可以直接触发Amazon SQS
* 设置 SSM ParaStore/Lambda Env 即可调整要发送Job的Bucket/Prefix,无需登录服务器

#### 性能测试
![SingleNodePerformance](./img/07.png)
经测试,对于大量的GB级文件,单机 25 线程以上(5文件x5线程)可达到跨境1Gbps带宽吞吐。如文件是MB级,则可以设置单节点并发处理更多的文件。
![Performance](./img/09.png)
经测试,对于大量的GB级文件,单机 25 线程以上(5文件x5线程)可达到跨境 800Mbps - 1Gbps 带宽吞吐。如文件是MB级,则可以设置单节点并发处理更多的文件。上图中可以看到Autoscaling Group在逐步增加EC2,多机吞吐叠加,传输1.2TB数据只用了1小时。并且在传输完成时自动关闭了服务器,只留下了一台。
![1.2TB](./img/08.png)

#### CDK 自动部署的 Dashboard
![dashboard](./img/06.png)

**以上所见的 Dashboard是 AWS CDK 自动部署的**

### 可靠与安全性
* 每个分片传输完成都会在Amazon S3上做MD5完整性校验。
Expand Down
3 changes: 2 additions & 1 deletion cluster/cdk-cluster/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
resource_stack.sqs_queue_DLQ,
resource_stack.ssm_bucket_para,
resource_stack.ssm_credential_para,
resource_stack.s3bucket)
# resource_stack.s3bucket
)

app.synth()
7 changes: 4 additions & 3 deletions cluster/cdk-cluster/cdk/cdk_ec2_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class CdkEc2Stack(core.Stack):

def __init__(self, scope: core.Construct, _id: str, vpc, bucket_para,
# key_name,
ddb_file_list, sqs_queue, sqs_queue_DLQ, ssm_bucket_para, ssm_credential_para, s3bucket,
ddb_file_list, sqs_queue, sqs_queue_DLQ, ssm_bucket_para, ssm_credential_para,
# s3bucket,
**kwargs) -> None:
super().__init__(scope, _id, **kwargs)

Expand Down Expand Up @@ -106,8 +107,8 @@ def __init__(self, scope: core.Construct, _id: str, vpc, bucket_para,
ssm_credential_para.grant_read(worker_asg)

# Allow EC2 access new s3 bucket
s3bucket.grant_read(jobsender)
s3bucket.grant_read(worker_asg)
# s3bucket.grant_read(jobsender)
# s3bucket.grant_read(worker_asg)

# Allow EC2 access exist s3 bucket
bucket_name = ''
Expand Down
6 changes: 3 additions & 3 deletions cluster/cdk-cluster/cdk/cdk_resource_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ def __init__(self, scope: core.Construct, _id: str, bucket_para, **kwargs) -> No
# This is not for existing S3 bucket. Jobsender will scan the existing bucket and create sqs jobs.
# 这里新建一个S3 bucket,里面新建Object就会触发SQS启动搬迁工作。
# 对于现有的S3 bucket,不在这里配置,由jobsender进行扫描并生成SQS Job任务。
self.s3bucket = s3.Bucket(self, "newbucket")
self.s3bucket.add_event_notification(s3.EventType.OBJECT_CREATED,
s3n.SqsDestination(self.sqs_queue))
# self.s3bucket = s3.Bucket(self, "newbucket")
# self.s3bucket.add_event_notification(s3.EventType.OBJECT_CREATED,
# s3n.SqsDestination(self.sqs_queue))
Binary file removed cluster/img/06.png
Binary file not shown.
Binary file removed cluster/img/07.png
Binary file not shown.
Binary file added cluster/img/08.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added cluster/img/09.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
62 changes: 62 additions & 0 deletions tools/clean_unfinished_multipart_upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import boto3, json


# Get unfinished multipart upload id from s3
def get_uploaded_list(s3_client, Des_bucket, Des_key, MaxRetry):
NextKeyMarker = ''
IsTruncated = True
multipart_uploaded_list = []
while IsTruncated:
IsTruncated = False
for retry in range(MaxRetry + 1):
try:
print(f'Getting unfinished upload id list {retry} retry {Des_bucket}/{Des_key}...')
list_multipart_uploads = s3_client.list_multipart_uploads(
Bucket=Des_bucket,
Prefix=Des_key,
MaxUploads=1000,
KeyMarker=NextKeyMarker
)
IsTruncated = list_multipart_uploads["IsTruncated"]
NextKeyMarker = list_multipart_uploads["NextKeyMarker"]
if "Uploads" in list_multipart_uploads:
for i in list_multipart_uploads["Uploads"]:
if i["Key"] == Des_key:
multipart_uploaded_list.append({
"Key": i["Key"],
"Initiated": i["Initiated"],
"UploadId": i["UploadId"]
})
print(f'Unfinished upload, Key: {i["Key"]}, Time: {i["Initiated"]}')
break # 退出重试循环
except Exception as e:
print(f'Fail to list multipart upload {str(e)}')
if retry >= MaxRetry:
print(f'Fail MaxRetry list multipart upload {str(e)}')
return []
else:
time.sleep(5 * retry)

return multipart_uploaded_list


# Main
if __name__ == '__main__':

session = boto3.session.Session(profile_name='zhy')
s3_des_client = session.client('s3')
Des_bucket = 's3-migration-test-nx'

multipart_uploaded_list = get_uploaded_list(s3_des_client, Des_bucket, "", 3)
for clean_i in multipart_uploaded_list:
try:
s3_des_client.abort_multipart_upload(
Bucket=Des_bucket,
Key=clean_i["Key"],
UploadId=clean_i["UploadId"]
)
except Exception as e:
print(f'Fail to clean {str(e)}')
multipart_uploaded_list = []
print('CLEAN UNFINISHED UPLOAD FINISHED: ', Des_bucket)

0 comments on commit 46a16bd

Please sign in to comment.