import subprocess import os import re import sys import shutil class CVSClient: def __init__(self, repo_url=None, repos_checkout=None, cvs_module=None): """ Initialize CVS client with repository URL, checkout location, and module :param repo_url: CVS repository URL in the format :pserver:username@hostname:/path/to/repository :type repo_url: str, optional :param repos_checkout: Path to the directory where repositories will be checked out :type repos_checkout: str, optional :param cvs_module: CVS module to work with :type cvs_module: str, optional :raises ValueError: If no repository URL is provided """ if repo_url is None: raise ValueError("CVS repository URL must be provided") self.repo_url = repo_url self.cvs_module = cvs_module # Use provided repos_checkout or fall back to environment variable checkouts_base_dir = repos_checkout or os.getenv('REPO_CHECKOUTS', '/tmp/cvs_checkouts') # Create checkouts directory if it doesn't exist os.makedirs(checkouts_base_dir, exist_ok=True) # Generate a safe directory name from the repo URL # Remove :pserver: prefix and replace non-alphanumeric characters with single underscore safe_repo_name = re.sub(r'^:pserver:', '', repo_url) safe_repo_name = re.sub(r'[^a-zA-Z0-9]+', '_', safe_repo_name) self.local_repo_path = os.path.join(checkouts_base_dir, safe_repo_name) # Ensure clean checkout self._checkout_repository() def _run_cvs_command(self, command, cwd=None): """ Run a CVS command with the configured repository URL :param command: List of CVS command arguments :type command: list :param cwd: Working directory for the command :type cwd: str, optional :return: Command output as string :rtype: str :raises subprocess.CalledProcessError: If the CVS command fails """ full_command = ['cvs', '-d', self.repo_url] + command # Debug printout of the command to be executed print(f"DEBUG: Executing CVS command: {' '.join(full_command)}", file=sys.stderr) print(f"DEBUG: Working directory: {cwd or self.local_repo_path}", file=sys.stderr) try: result = subprocess.run( full_command, capture_output=True, text=True, check=True, cwd=cwd or self.local_repo_path ) # Debug printout of stdout and stderr if result.stdout: print(f"DEBUG: CVS Command STDOUT:\n{result.stdout}", file=sys.stderr) if result.stderr: print(f"DEBUG: CVS Command STDERR:\n{result.stderr}", file=sys.stderr) return result.stdout.strip() except subprocess.CalledProcessError as e: # Debug printout for command failure print(f"DEBUG: CVS Command FAILED", file=sys.stderr) print(f"DEBUG: Return Code: {e.returncode}", file=sys.stderr) print(f"DEBUG: Command: {' '.join(e.cmd)}", file=sys.stderr) print(f"DEBUG: STDOUT:\n{e.stdout}", file=sys.stderr) print(f"DEBUG: STDERR:\n{e.stderr}", file=sys.stderr) # Re-raise to allow caller to handle specific errors raise def _checkout_repository(self): """ Checkout or update the local repository """ # Remove existing checkout if it exists if os.path.exists(self.local_repo_path): shutil.rmtree(self.local_repo_path) # Create directory os.makedirs(self.local_repo_path, exist_ok=True) try: # Perform initial checkout with module if specified checkout_command = ['checkout'] if self.cvs_module: checkout_command.append(self.cvs_module) else: checkout_command.append('.') self._run_cvs_command(checkout_command, cwd=self.local_repo_path) print(f"DEBUG: Repository checked out to {self.local_repo_path}", file=sys.stderr) except subprocess.CalledProcessError as e: print(f"DEBUG: Repository checkout failed: {e}", file=sys.stderr) raise def list_repository_tree(self, module=None): """ List repository tree structure using local filesystem :param module: Optional module or subdirectory to list :type module: str, optional :return: List of files and directories :rtype: list """ try: # Determine the path to list list_path = os.path.join(self.local_repo_path, module) if module else self.local_repo_path # Walk through the directory tree = [] for root, dirs, files in os.walk(list_path): # Get relative paths rel_root = os.path.relpath(root, self.local_repo_path) # Add files for file_name in files: # Skip hidden files and CVS directories if not file_name.startswith('.') and 'CVS' not in rel_root: full_path = os.path.normpath(os.path.join(rel_root, file_name)) if full_path != '.': # If a module is specified, strip the module prefix if module: full_path = os.path.basename(full_path) tree.append(full_path) return sorted(tree) except Exception as e: print(f"DEBUG: Error listing repository tree: {e}", file=sys.stderr) return [] def get_file_diff(self, file_path, rev1, rev2): """ Get diff between two revisions of a file :param file_path: Path to the file :type file_path: str :param rev1: First revision :type rev1: str :param rev2: Second revision :type rev2: str :return: Diff output :rtype: str """ try: output = self._run_cvs_command([ 'rdiff', '-u', '-r', rev1, '-r', rev2, file_path ]) return output except subprocess.CalledProcessError: return f"Error generating diff for {file_path} between {rev1} and {rev2}" def get_file_history(self, file_path): """ Get revision history for a file using 'cvs log' command :param file_path: Path to the file :type file_path: str :return: List of revision details :rtype: list """ try: # Use 'cvs log' instead of 'rlog' as it's more widely supported output = self._run_cvs_command(['log', file_path]) # Parse log output to extract revision details revisions = [] current_revision = {} for line in output.split('\n'): # Look for revision lines (format: "revision X.X") rev_match = re.match(r'^revision\s+(\S+)', line) # Look for date/author/state line (format: "date: YYYY/MM/DD HH:MM:SS; author: NAME; state: STATE;") date_match = re.match(r'^date:\s+(\d{4}/\d{2}/\d{2}\s+\d{2}:\d{2}:\d{2});\s+author:\s+(\S+);\s+state:\s+(\S+);', line) if rev_match: # Start of a new revision if current_revision and 'revision' in current_revision: revisions.append(current_revision) current_revision = {'revision': rev_match.group(1)} if date_match: current_revision.update({ 'date': date_match.group(1), 'author': date_match.group(2), 'state': date_match.group(3), 'lines_changed': 'N/A' # cvs log doesn't provide line counts }) # Add the last revision if current_revision and 'revision' in current_revision: revisions.append(current_revision) return revisions except subprocess.CalledProcessError as e: print(f"DEBUG: Error getting file history: {e}", file=sys.stderr) return [] def get_file_content(self, file_path, revision=None): """ Get raw file content, optionally at a specific revision :param file_path: Path to the file :type file_path: str :param revision: Optional specific revision :type revision: str, optional :return: File content :rtype: str """ try: command = ['checkout', '-p'] if revision: command.extend(['-r', revision]) command.append(file_path) return self._run_cvs_command(command) except subprocess.CalledProcessError: return f"Error retrieving content for {file_path}"