88
99from  .file_utils  import  carve , is_safe_path 
1010from  .models  import  Chunk , File , PaddingChunk , TaskResult , UnknownChunk , ValidChunk 
11- from  .report  import  MaliciousSymlinkRemoved 
1211
1312logger  =  get_logger ()
1413
1514FILE_PERMISSION_MASK  =  0o644 
1615DIR_PERMISSION_MASK  =  0o775 
1716
17+ _  =  is_safe_path   # it is re-exported 
18+ 
1819
1920def  carve_chunk_to_file (carve_path : Path , file : File , chunk : Chunk ):
2021    """Extract valid chunk to a file, which we then pass to another tool to extract it.""" 
@@ -57,8 +58,11 @@ def sanitize_symlink_target(base_dir, current_dir, target):
5758    # Normalize all paths to their absolute forms 
5859    base_dir_abs  =  os .path .abspath (base_dir )
5960    current_dir_abs  =  os .path .abspath (current_dir )
60-     target_abs  =  os .path .abspath (os .path .join (current_dir , target )) \
61-                     if  not  os .path .isabs (target ) else  os .path .abspath (target )
61+     target_abs  =  (
62+         os .path .abspath (os .path .join (current_dir , target ))
63+         if  not  os .path .isabs (target )
64+         else  os .path .abspath (target )
65+     )
6266
6367    # Check if the target is absolute and within the base_dir 
6468    if  os .path .isabs (target ):
@@ -67,7 +71,13 @@ def sanitize_symlink_target(base_dir, current_dir, target):
6771        else :
6872            # Target is absolute but outside base_dir - we'll pretend base_dir is our root 
6973            # and adjust the target to be within base_dir 
70-             abs  =  base_dir  +  "/"  +  os .path .relpath (target_abs , os .path .commonpath ([target_abs , base_dir_abs ]))
74+             abs  =  (
75+                 base_dir 
76+                 +  "/" 
77+                 +  os .path .relpath (
78+                     target_abs , os .path .commonpath ([target_abs , base_dir_abs ])
79+                 )
80+             )
7181            # We want to return the relative path from current_dir to the adjusted target 
7282            return  os .path .relpath (abs , current_dir_abs )
7383    else :
@@ -82,18 +92,21 @@ def sanitize_symlink_target(base_dir, current_dir, target):
8292            # relative path from /host/test_archive/foo to /host/test_archive/etc/passwd 
8393            # without escaping /host/test_archive 
8494
85-             for  drop_count  in  range (0 ,  len (target .split ('/' ))):
95+             for  drop_count  in  range (len (target .split ("/" ))):
8696                # We drop '..'s from the target by prepending placeholder directories until we get something valid 
8797                abs  =  current_dir  +  "/"  +  "/" .join (["foo" ] *  drop_count ) +  target 
8898                resolved  =  os .path .abspath (abs )
8999                if  resolved .startswith (base_dir_abs ):
90100                    break 
91101            else :
92-                 raise  ValueError (f"Could not resolve symlink target { target }   within base_dir { base_dir }  " )
102+                 raise  ValueError (
103+                     f"Could not resolve symlink target { target }   within base_dir { base_dir }  " 
104+                 )
93105
94106            # We need to add the /placeholder to the relative path because we need 
95107            # to act like a file within base_dir is our root (as opposed to base_dir itself) 
96-             return  os .path .relpath (resolved , base_dir_abs  +  '/placeholder' )
108+             return  os .path .relpath (resolved , base_dir_abs  +  "/placeholder" )
109+ 
97110
98111def  fix_extracted_directory (outdir : Path , task_result : TaskResult ):
99112    def  _fix_extracted_directory (directory : Path ):
@@ -103,7 +116,7 @@ def _fix_extracted_directory(directory: Path):
103116        base_dir  =  os .path .abspath (outdir )
104117        for  root , dirs , files  in  os .walk (base_dir , topdown = True ):
105118            fix_permission (Path (root ))
106-             for  name  in  dirs + files :
119+             for  name  in  dirs   +   files :
107120                try :
108121                    full_path  =  os .path .join (root , name )
109122                    if  os .path .islink (full_path ):
@@ -113,9 +126,15 @@ def _fix_extracted_directory(directory: Path):
113126                        if  new_target  !=  target :
114127                            os .remove (full_path )
115128                            os .symlink (new_target , full_path )
116-                             logger .info ("Updated symlink" , path = full_path , target = new_target )
129+                             logger .info (
130+                                 "Updated symlink" , path = full_path , target = new_target 
131+                             )
117132                        else :
118-                             logger .debug ("Symlink is already sanitized" , path = full_path , target = new_target )
133+                             logger .debug (
134+                                 "Symlink is already sanitized" ,
135+                                 path = full_path ,
136+                                 target = new_target ,
137+                             )
119138                except  OSError  as  e :
120139                    if  e .errno  ==  errno .ENAMETOOLONG :
121140                        continue 
0 commit comments