11using System . Text . Json ;
22using System . Text . Json . Nodes ;
3+ using System . Threading . Tasks ;
34using Azure . Core ;
45using Microsoft . Azure . Functions . Worker ;
56using Microsoft . Extensions . Logging ;
@@ -54,14 +55,16 @@ public async Async.Task Run(
5455 return ;
5556 }
5657
58+ var storageAccount = new ResourceIdentifier ( topicElement . GetString ( ) ! ) ;
59+
5760 try {
5861 // Setting isLastRetryAttempt to false will rethrow any exceptions
5962 // With the intention that the azure functions runtime will handle requeing
6063 // the message for us. The difference is for the poison queue, we're handling the
6164 // requeuing ourselves because azure functions doesn't support retry policies
6265 // for queue based functions.
6366
64- var result = await FileAdded ( fileChangeEvent , isLastRetryAttempt : false ) ;
67+ var result = await FileAdded ( storageAccount , fileChangeEvent , isLastRetryAttempt : false ) ;
6568 if ( ! result . IsOk && result . ErrorV . Code == ErrorCode . ADO_WORKITEM_PROCESSING_DISABLED ) {
6669 await RequeueMessage ( msg , TimeSpan . FromDays ( 1 ) ) ;
6770 }
@@ -71,16 +74,47 @@ public async Async.Task Run(
7174 }
7275 }
7376
74- private async Async . Task < OneFuzzResultVoid > FileAdded ( JsonDocument fileChangeEvent , bool isLastRetryAttempt ) {
77+ private async Async . Task < OneFuzzResultVoid > FileAdded ( ResourceIdentifier storageAccount , JsonDocument fileChangeEvent , bool isLastRetryAttempt ) {
7578 var data = fileChangeEvent . RootElement . GetProperty ( "data" ) ;
7679 var url = data . GetProperty ( "url" ) . GetString ( ) ! ;
7780 var parts = url . Split ( "/" ) . Skip ( 3 ) . ToList ( ) ;
7881
79- var container = parts [ 0 ] ;
82+ var container = Container . Parse ( parts [ 0 ] ) ;
8083 var path = string . Join ( '/' , parts . Skip ( 1 ) ) ;
8184
82- _log . LogInformation ( "file added : {Container} - {Path}" , container , path ) ;
83- return await _notificationOperations . NewFiles ( Container . Parse ( container ) , path , isLastRetryAttempt ) ;
85+ _log . LogInformation ( "file added : {Container} - {Path}" , container . String , path ) ;
86+
87+ var ( _, result ) = await (
88+ ApplyRetentionPolicy ( storageAccount , container , path ) ,
89+ _notificationOperations . NewFiles ( container , path , isLastRetryAttempt ) ) ;
90+
91+ return result ;
92+ }
93+
94+ private async Async . Task < bool > ApplyRetentionPolicy ( ResourceIdentifier storageAccount , Container container , string path ) {
95+ if ( await _context . FeatureManagerSnapshot . IsEnabledAsync ( FeatureFlagConstants . EnableContainerRetentionPolicies ) ) {
96+ // default retention period can be applied to the container
97+ // if one exists, we will set the expiry date on the newly-created blob, if it doesn't already have one
98+ var account = await _storage . GetBlobServiceClientForAccount ( storageAccount ) ;
99+ var containerClient = account . GetBlobContainerClient ( container . String ) ;
100+ var containerProps = await containerClient . GetPropertiesAsync ( ) ;
101+ var retentionPeriod = RetentionPolicyUtils . GetContainerRetentionPeriodFromMetadata ( containerProps . Value . Metadata ) ;
102+ if ( ! retentionPeriod . IsOk ) {
103+ _log . LogError ( "invalid retention period: {Error}" , retentionPeriod . ErrorV ) ;
104+ } else if ( retentionPeriod . OkV is TimeSpan period ) {
105+ var blobClient = containerClient . GetBlobClient ( path ) ;
106+ var tags = ( await blobClient . GetTagsAsync ( ) ) . Value . Tags ;
107+ var expiryDate = DateTime . UtcNow + period ;
108+ var tag = RetentionPolicyUtils . CreateExpiryDateTag ( DateOnly . FromDateTime ( expiryDate ) ) ;
109+ if ( tags . TryAdd ( tag . Key , tag . Value ) ) {
110+ _ = await blobClient . SetTagsAsync ( tags ) ;
111+ _log . LogInformation ( "applied container retention policy ({Policy}) to {Path}" , period , path ) ;
112+ return true ;
113+ }
114+ }
115+ }
116+
117+ return false ;
84118 }
85119
86120 private async Async . Task RequeueMessage ( string msg , TimeSpan ? visibilityTimeout = null ) {
0 commit comments