22import  json 
33import  logging 
44import  os 
5+ import  shutil 
56import  sys 
67import  tempfile 
78import  uuid 
8- from  pathlib  import  Path 
99from  typing  import  Annotated , Any 
1010
1111import  typer 
12+ from  pathvalidate  import  sanitize_filepath 
1213from  pydantic  import  Field 
1314
1415from  cycode .cli .cli_types  import  McpTransportOption , ScanTypeOption 
2627
2728_logger  =  get_logger ('Cycode MCP' )
2829
29- _DEFAULT_RUN_COMMAND_TIMEOUT  =  5  *  60 
30+ _DEFAULT_RUN_COMMAND_TIMEOUT  =  10  *  60 
3031
3132_FILES_TOOL_FIELD  =  Field (description = 'Files to scan, mapping file paths to their content' )
3233
@@ -91,48 +92,76 @@ async def _run_cycode_command(*args: str, timeout: int = _DEFAULT_RUN_COMMAND_TI
9192        return  {'error' : f'Failed to run command: { e !s}  }
9293
9394
94- def  _create_temp_files (files_content : dict [str , str ]) ->  list [str ]:
95-     """Create temporary files from content and return their paths.""" 
96-     temp_dir  =  tempfile .mkdtemp (prefix = 'cycode_mcp_' )
97-     temp_files  =  []
95+ def  _sanitize_file_path (file_path : str ) ->  str :
96+     """Sanitize file path to prevent path traversal and other security issues. 
9897
99-     _logger .debug ('Creating temporary files in directory: %s' , temp_dir )
98+     Args: 
99+         file_path: The file path to sanitize 
100100
101-     for  file_path , content  in  files_content .items ():
102-         safe_filename  =  f'{ _gen_random_id ()} { Path (file_path ).name }  
103-         temp_file_path  =  os .path .join (temp_dir , safe_filename )
101+     Returns: 
102+         Sanitized file path safe for use in temporary directory 
104103
105-         os .makedirs (os .path .dirname (temp_file_path ), exist_ok = True )
104+     Raises: 
105+         ValueError: If the path is invalid or potentially dangerous 
106+     """ 
107+     if  not  file_path  or  not  isinstance (file_path , str ):
108+         raise  ValueError ('File path must be a non-empty string' )
106109
107-         _logger .debug ('Creating temp file: %s' , temp_file_path )
108-         with  open (temp_file_path , 'w' , encoding = 'UTF-8' ) as  f :
109-             f .write (content )
110+     return  sanitize_filepath (file_path , platform = 'auto' , validate_after_sanitize = True )
110111
111-         temp_files .append (temp_file_path )
112112
113-     return  temp_files 
113+ class  _TempFilesManager :
114+     """Context manager for creating and cleaning up temporary files. 
114115
116+     Creates a temporary directory structure that preserves original file paths 
117+     inside a call_id as a suffix. Automatically cleans up all files and directories 
118+     when exiting the context. 
119+     """ 
115120
116- def  _cleanup_temp_files (temp_files : list [str ]) ->  None :
117-     """Clean up temporary files and directories.""" 
121+     def  __init__ (self , files_content : dict [str , str ], call_id : str ) ->  None :
122+         self .files_content  =  files_content 
123+         self .call_id  =  call_id 
124+         self .temp_base_dir  =  None 
125+         self .temp_files  =  []
118126
119-     temp_dirs  =  set ()
120-     for  temp_file  in  temp_files :
121-         try :
122-             if  os .path .exists (temp_file ):
123-                 _logger .debug ('Removing temp file: %s' , temp_file )
124-                 os .remove (temp_file )
125-                 temp_dirs .add (os .path .dirname (temp_file ))
126-         except  OSError  as  e :
127-             _logger .warning ('Failed to remove temp file %s: %s' , temp_file , e )
128- 
129-     for  temp_dir  in  temp_dirs :
130-         try :
131-             if  os .path .exists (temp_dir ) and  not  os .listdir (temp_dir ):
132-                 _logger .debug ('Removing temp directory: %s' , temp_dir )
133-                 os .rmdir (temp_dir )
134-         except  OSError  as  e :
135-             _logger .warning ('Failed to remove temp directory %s: %s' , temp_dir , e )
127+     def  __enter__ (self ) ->  list [str ]:
128+         self .temp_base_dir  =  tempfile .mkdtemp (prefix = 'cycode_mcp_' , suffix = self .call_id )
129+         _logger .debug ('Creating temporary files in directory: %s' , self .temp_base_dir )
130+ 
131+         for  file_path , content  in  self .files_content .items ():
132+             try :
133+                 sanitized_path  =  _sanitize_file_path (file_path )
134+                 temp_file_path  =  os .path .join (self .temp_base_dir , sanitized_path )
135+ 
136+                 # Ensure the normalized path is still within our temp directory 
137+                 normalized_temp_path  =  os .path .normpath (temp_file_path )
138+                 normalized_base_path  =  os .path .normpath (self .temp_base_dir )
139+                 if  not  normalized_temp_path .startswith (normalized_base_path  +  os .sep ):
140+                     raise  ValueError (f'Path escapes temporary directory: { file_path }  )
141+ 
142+                 os .makedirs (os .path .dirname (temp_file_path ), exist_ok = True )
143+ 
144+                 _logger .debug ('Creating temp file: %s (from: %s)' , temp_file_path , file_path )
145+                 with  open (temp_file_path , 'w' , encoding = 'UTF-8' ) as  f :
146+                     f .write (content )
147+ 
148+                 self .temp_files .append (temp_file_path )
149+             except  ValueError  as  e :
150+                 _logger .error ('Invalid file path rejected: %s - %s' , file_path , str (e ))
151+                 continue 
152+             except  Exception  as  e :
153+                 _logger .error ('Failed to create temp file for %s: %s' , file_path , str (e ))
154+                 continue 
155+ 
156+         if  not  self .temp_files :
157+             raise  ValueError ('No valid files provided after sanitization' )
158+ 
159+         return  self .temp_files 
160+ 
161+     def  __exit__ (self , * _ ) ->  None :
162+         if  self .temp_base_dir  and  os .path .exists (self .temp_base_dir ):
163+             _logger .debug ('Removing temp directory recursively: %s' , self .temp_base_dir )
164+             shutil .rmtree (self .temp_base_dir , ignore_errors = True )
136165
137166
138167async  def  _run_cycode_scan (scan_type : ScanTypeOption , temp_files : list [str ]) ->  dict [str , Any ]:
@@ -153,21 +182,36 @@ async def _cycode_scan_tool(scan_type: ScanTypeOption, files: dict[str, str] = _
153182        _logger .error ('No files provided for scan' )
154183        return  json .dumps ({'error' : 'No files provided' })
155184
156-     temp_files  =  _create_temp_files (files )
157- 
158185    try :
159-         _logger .info (
160-             'Running Cycode scan, %s' ,
161-             {'scan_type' : scan_type , 'files_count' : len (temp_files ), 'call_id' : _tool_call_id },
162-         )
163-         result  =  await  _run_cycode_scan (scan_type , temp_files )
164-         _logger .info ('Scan completed, %s' , {'scan_type' : scan_type , 'call_id' : _tool_call_id })
165-         return  json .dumps (result , indent = 2 )
186+         with  _TempFilesManager (files , _tool_call_id ) as  temp_files :
187+             original_count  =  len (files )
188+             processed_count  =  len (temp_files )
189+ 
190+             if  processed_count  <  original_count :
191+                 _logger .warning (
192+                     'Some files were rejected during sanitization, %s' ,
193+                     {
194+                         'scan_type' : scan_type ,
195+                         'original_count' : original_count ,
196+                         'processed_count' : processed_count ,
197+                         'call_id' : _tool_call_id ,
198+                     },
199+                 )
200+ 
201+             _logger .info (
202+                 'Running Cycode scan, %s' ,
203+                 {'scan_type' : scan_type , 'files_count' : processed_count , 'call_id' : _tool_call_id },
204+             )
205+             result  =  await  _run_cycode_scan (scan_type , temp_files )
206+ 
207+             _logger .info ('Scan completed, %s' , {'scan_type' : scan_type , 'call_id' : _tool_call_id })
208+             return  json .dumps (result , indent = 2 )
209+     except  ValueError  as  e :
210+         _logger .error ('Invalid input files, %s' , {'scan_type' : scan_type , 'call_id' : _tool_call_id , 'error' : str (e )})
211+         return  json .dumps ({'error' : f'Invalid input files: { e !s}  }, indent = 2 )
166212    except  Exception  as  e :
167213        _logger .error ('Scan failed, %s' , {'scan_type' : scan_type , 'call_id' : _tool_call_id , 'error' : str (e )})
168214        return  json .dumps ({'error' : f'Scan failed: { e !s}  }, indent = 2 )
169-     finally :
170-         _cleanup_temp_files (temp_files )
171215
172216
173217async  def  cycode_secret_scan (files : dict [str , str ] =  _FILES_TOOL_FIELD ) ->  str :
@@ -197,6 +241,10 @@ async def cycode_sca_scan(files: dict[str, str] = _FILES_TOOL_FIELD) -> str:
197241      - verify software supply chain security 
198242      - review package.json, requirements.txt, pom.xml and other dependency files 
199243
244+     Important: 
245+         You must also include lock files (like package-lock.json, Pipfile.lock, etc.) to get accurate results. 
246+         You must provide manifest and lock files together. 
247+ 
200248    Args: 
201249        files: Dictionary mapping file paths to their content 
202250
0 commit comments