1+ #!/usr/bin/env python3
2+ """
3+ Password Reset Script for BrainDrive
4+ Enhanced script with username lookup, password validation, and confirmation
5+ """
6+
7+ import asyncio
8+ import argparse
9+ import getpass
10+ import sys
11+ import os
12+ from datetime import datetime
13+ from pathlib import Path
14+
15+ # Add the backend directory to the Python path
16+ backend_dir = Path (__file__ ).parent .parent
17+ sys .path .insert (0 , str (backend_dir ))
18+
19+ from sqlalchemy .ext .asyncio import AsyncSession
20+ from app .core .database import get_db , db_factory
21+ from app .models .user import User
22+ from app .core .security import hash_password
23+
24+
25+ class PasswordResetTool :
26+ """Enhanced password reset tool with validation and confirmation"""
27+
28+ def __init__ (self ):
29+ self .session : AsyncSession = None
30+
31+ async def initialize_db (self ):
32+ """Initialize database connection"""
33+ try :
34+ # Get database session
35+ db_gen = get_db ()
36+ self .session = await db_gen .__anext__ ()
37+ print ("✓ Database connection established" )
38+ return True
39+ except Exception as e :
40+ print (f"✗ Failed to connect to database: { e } " )
41+ return False
42+
43+ async def find_user_by_username (self , username : str ) -> User | None :
44+ """Find user by username with detailed feedback"""
45+ try :
46+ # Try to find by username first (if the field exists)
47+ # For now, we'll skip this since get_by_username doesn't exist
48+ print (f"✗ No user found with username: { username } " )
49+ return None
50+ except Exception as e :
51+ print (f"✗ Error searching for user: { e } " )
52+ return None
53+
54+ async def find_user_by_email (self , email : str ) -> User | None :
55+ """Find user by email as fallback"""
56+ try :
57+ user = await User .get_by_email (self .session , email )
58+ if user :
59+ print (f"✓ User found by email: { user .username } ({ user .email } )" )
60+ return user
61+ else :
62+ print (f"✗ No user found with email: { email } " )
63+ return None
64+ except Exception as e :
65+ print (f"✗ Error searching for user by email: { e } " )
66+ return None
67+
68+ def validate_password_strength (self , password : str ) -> tuple [bool , list [str ]]:
69+ """Validate password strength with detailed feedback"""
70+ suggestions = []
71+
72+ if len (password ) < 8 :
73+ suggestions .append ("Use at least 8 characters" )
74+
75+ if not any (c .islower () for c in password ):
76+ suggestions .append ("Include lowercase letters" )
77+
78+ if not any (c .isupper () for c in password ):
79+ suggestions .append ("Include uppercase letters" )
80+
81+ if not any (c .isdigit () for c in password ):
82+ suggestions .append ("Include numbers" )
83+
84+ if not any (c in "!@#$%^&*(),.?\" :{}|<>" for c in password ):
85+ suggestions .append ("Include special characters" )
86+
87+ is_valid = len (suggestions ) == 0
88+ return is_valid , suggestions
89+
90+ def get_secure_password (self ) -> str | None :
91+ """Get password with validation and confirmation"""
92+ max_attempts = 3
93+
94+ for attempt in range (max_attempts ):
95+ print (f"\n Password attempt { attempt + 1 } /{ max_attempts } " )
96+
97+ # Get password
98+ password = getpass .getpass ("Enter new password: " )
99+
100+ if not password :
101+ print ("✗ Password cannot be empty" )
102+ continue
103+
104+ # Validate password strength
105+ is_valid , suggestions = self .validate_password_strength (password )
106+
107+ if not is_valid :
108+ print ("✗ Password does not meet requirements:" )
109+ for suggestion in suggestions :
110+ print (f" - { suggestion } " )
111+ continue
112+
113+ # Confirm password
114+ confirm_password = getpass .getpass ("Confirm new password: " )
115+
116+ if password != confirm_password :
117+ print ("✗ Passwords do not match" )
118+ continue
119+
120+ print ("✓ Password meets all requirements" )
121+ return password
122+
123+ print (f"✗ Maximum password attempts ({ max_attempts } ) exceeded" )
124+ return None
125+
126+ async def update_user_password (self , user : User , new_password : str ) -> bool :
127+ """Update user password with proper hashing"""
128+ try :
129+ # Hash the new password
130+ hashed_password = hash_password (new_password )
131+
132+ # Update user password
133+ user .password = hashed_password
134+ await user .save (self .session )
135+
136+ print ("✓ Password updated successfully" )
137+ return True
138+ except Exception as e :
139+ print (f"✗ Failed to update password: { e } " )
140+ return False
141+
142+ def log_password_reset (self , user : User , success : bool ):
143+ """Log password reset activity"""
144+ timestamp = datetime .now ().isoformat ()
145+ log_entry = {
146+ "timestamp" : timestamp ,
147+ "action" : "password_reset" ,
148+ "user_id" : user .id ,
149+ "username" : user .username ,
150+ "email" : user .email ,
151+ "success" : success ,
152+ "performed_by" : "admin_script"
153+ }
154+
155+ # Create logs directory if it doesn't exist
156+ logs_dir = backend_dir / "logs"
157+ logs_dir .mkdir (exist_ok = True )
158+
159+ # Write to audit log
160+ log_file = logs_dir / "password_reset_audit.log"
161+ with open (log_file , "a" ) as f :
162+ f .write (f"{ timestamp } - Password reset for { user .username } ({ 'SUCCESS' if success else 'FAILED' } )\n " )
163+
164+ print (f"✓ Activity logged to { log_file } " )
165+
166+ async def reset_password_interactive (self , username : str ):
167+ """Interactive password reset process"""
168+ print (f"\n { '=' * 50 } " )
169+ print ("BrainDrive Password Reset Tool" )
170+ print (f"{ '=' * 50 } " )
171+
172+ # Initialize database
173+ if not await self .initialize_db ():
174+ return False
175+
176+ # Find user
177+ print (f"\n Searching for user: { username } " )
178+ user = await self .find_user_by_username (username )
179+
180+ # If not found by username, try email
181+ if not user and "@" in username :
182+ print ("\n Trying to find user by email..." )
183+ user = await self .find_user_by_email (username )
184+
185+ if not user :
186+ print ("\n ✗ User not found. Please check the username/email and try again." )
187+ return False
188+
189+ # Confirm user details
190+ print (f"\n User Details:" )
191+ print (f" Username: { user .username } " )
192+ print (f" Email: { user .email } " )
193+ print (f" Active: { 'Yes' if user .is_active else 'No' } " )
194+
195+ confirm = input (f"\n Reset password for this user? (y/N): " ).lower ().strip ()
196+ if confirm != 'y' :
197+ print ("✗ Password reset cancelled" )
198+ return False
199+
200+ # Get new password
201+ print ("\n Password Requirements:" )
202+ print (" - At least 8 characters" )
203+ print (" - Include uppercase and lowercase letters" )
204+ print (" - Include numbers" )
205+ print (" - Include special characters" )
206+
207+ new_password = self .get_secure_password ()
208+ if not new_password :
209+ return False
210+
211+ # Update password
212+ print ("\n Updating password..." )
213+ success = await self .update_user_password (user , new_password )
214+
215+ # Log the activity
216+ self .log_password_reset (user , success )
217+
218+ if success :
219+ print (f"\n ✓ Password successfully reset for user: { user .username } " )
220+ print ("✓ User can now log in with the new password" )
221+ else :
222+ print (f"\n ✗ Failed to reset password for user: { user .username } " )
223+
224+ return success
225+
226+ async def cleanup (self ):
227+ """Clean up database connection"""
228+ if self .session :
229+ try :
230+ await self .session .close ()
231+ except Exception as e :
232+ print (f"Note: Database session cleanup completed with minor issues: { e } " )
233+
234+
235+ async def main ():
236+ """Main function with command line argument parsing"""
237+ parser = argparse .ArgumentParser (
238+ description = "Reset password for a BrainDrive user" ,
239+ formatter_class = argparse .RawDescriptionHelpFormatter ,
240+ epilog = """
241+ Examples:
242+ python reset_password.py --username john_doe
243+ python reset_password.py --username user@example.com
244+ python reset_password.py -u admin_user
245+ """
246+ )
247+
248+ parser .add_argument (
249+ "--username" , "-u" ,
250+ required = True ,
251+ help = "Username or email address of the user"
252+ )
253+
254+ parser .add_argument (
255+ "--version" , "-v" ,
256+ action = "version" ,
257+ version = "BrainDrive Password Reset Tool v1.0"
258+ )
259+
260+ args = parser .parse_args ()
261+
262+ # Create password reset tool
263+ reset_tool = PasswordResetTool ()
264+
265+ try :
266+ # Run password reset
267+ success = await reset_tool .reset_password_interactive (args .username )
268+
269+ # Exit with appropriate code
270+ sys .exit (0 if success else 1 )
271+
272+ except KeyboardInterrupt :
273+ print ("\n \n ✗ Operation cancelled by user" )
274+ sys .exit (1 )
275+ except Exception as e :
276+ print (f"\n ✗ Unexpected error: { e } " )
277+ sys .exit (1 )
278+ finally :
279+ await reset_tool .cleanup ()
280+
281+
282+ if __name__ == "__main__" :
283+ # Run the async main function
284+ asyncio .run (main ())
0 commit comments