import subprocess import os import re import sys import shutil class CVSClient: def __init__(self, repo_url=None, repos_checkout=None, cvs_module=None): """ Initialize CVS client with repository URL, checkout location, and module :param repo_url: CVS repository URL in the format :pserver:username@hostname:/path/to/repository :type repo_url: str, optional :param repos_checkout: Path to the directory where repositories will be checked out :type repos_checkout: str, optional :param cvs_module: CVS module to work with :type cvs_module: str, optional :raises ValueError: If no repository URL is provided """ if repo_url is None: raise ValueError("CVS repository URL must be provided") self.repo_url = repo_url self.cvs_module = cvs_module # Use provided repos_checkout or fall back to environment variable checkouts_base_dir = repos_checkout or os.getenv('REPO_CHECKOUTS', '/tmp/cvs_checkouts') # Create checkouts directory if it doesn't exist os.makedirs(checkouts_base_dir, exist_ok=True) # Generate a safe directory name from the repo URL # Remove :pserver: prefix and replace non-alphanumeric characters with single underscore safe_repo_name = re.sub(r'^:pserver:', '', repo_url) safe_repo_name = re.sub(r'[^a-zA-Z0-9]+', '_', safe_repo_name) self.local_repo_path = os.path.join(checkouts_base_dir, safe_repo_name) # Ensure clean checkout self._checkout_repository() def _run_cvs_command(self, command, cwd=None): """ Run a CVS command with the configured repository URL :param command: List of CVS command arguments :type command: list :param cwd: Working directory for the command :type cwd: str, optional :return: Command output as string :rtype: str :raises subprocess.CalledProcessError: If the CVS command fails """ full_command = ['cvs', '-d', self.repo_url] + command # Debug printout of the command to be executed print(f"DEBUG: Executing CVS command: {' '.join(full_command)}", file=sys.stderr) print(f"DEBUG: Working directory: {cwd or self.local_repo_path}", file=sys.stderr) try: result = subprocess.run( full_command, capture_output=True, text=True, check=True, cwd=cwd or self.local_repo_path ) # Debug printout of stdout and stderr if result.stdout: print(f"DEBUG: CVS Command STDOUT:\n{result.stdout}", file=sys.stderr) if result.stderr: print(f"DEBUG: CVS Command STDERR:\n{result.stderr}", file=sys.stderr) return result.stdout.strip() except subprocess.CalledProcessError as e: # Debug printout for command failure print(f"DEBUG: CVS Command FAILED", file=sys.stderr) print(f"DEBUG: Return Code: {e.returncode}", file=sys.stderr) print(f"DEBUG: Command: {' '.join(e.cmd)}", file=sys.stderr) print(f"DEBUG: STDOUT:\n{e.stdout}", file=sys.stderr) print(f"DEBUG: STDERR:\n{e.stderr}", file=sys.stderr) # Re-raise to allow caller to handle specific errors raise def _run_cvsps_command(self, command, cwd=None): """ Run a cvsps command (standalone program, not a CVS subcommand) :param command: List of cvsps command arguments :type command: list :param cwd: Working directory for the command :type cwd: str, optional :return: Command output as string :rtype: str :raises subprocess.CalledProcessError: If the cvsps command fails """ full_command = ['cvsps'] + command # Debug printout of the command to be executed print(f"DEBUG: Executing cvsps command: {' '.join(full_command)}", file=sys.stderr) print(f"DEBUG: Working directory: {cwd or self.local_repo_path}", file=sys.stderr) try: result = subprocess.run( full_command, capture_output=True, text=True, check=True, cwd=cwd or self.local_repo_path, env={**os.environ, 'CVSROOT': self.repo_url} ) # Debug printout of stdout and stderr if result.stdout: print(f"DEBUG: cvsps Command STDOUT:\n{result.stdout}", file=sys.stderr) if result.stderr: print(f"DEBUG: cvsps Command STDERR:\n{result.stderr}", file=sys.stderr) return result.stdout.strip() except subprocess.CalledProcessError as e: # Debug printout for command failure print(f"DEBUG: cvsps Command FAILED", file=sys.stderr) print(f"DEBUG: Return Code: {e.returncode}", file=sys.stderr) print(f"DEBUG: Command: {' '.join(e.cmd)}", file=sys.stderr) print(f"DEBUG: STDOUT:\n{e.stdout}", file=sys.stderr) print(f"DEBUG: STDERR:\n{e.stderr}", file=sys.stderr) # Re-raise to allow caller to handle specific errors raise def _checkout_repository(self): """ Checkout or update the local repository """ # Remove existing checkout if it exists if os.path.exists(self.local_repo_path): shutil.rmtree(self.local_repo_path) # Create directory os.makedirs(self.local_repo_path, exist_ok=True) try: # Perform initial checkout with module if specified checkout_command = ['checkout'] if self.cvs_module: checkout_command.append(self.cvs_module) else: checkout_command.append('.') self._run_cvs_command(checkout_command, cwd=self.local_repo_path) print(f"DEBUG: Repository checked out to {self.local_repo_path}", file=sys.stderr) except subprocess.CalledProcessError as e: print(f"DEBUG: Repository checkout failed: {e}", file=sys.stderr) raise def list_repository_tree(self, module=None): """ List repository tree structure using local filesystem :param module: Optional module or subdirectory to list :type module: str, optional :return: List of files and directories :rtype: list """ try: # Determine the path to list list_path = os.path.join(self.local_repo_path, module) if module else self.local_repo_path # Walk through the directory tree = [] for root, dirs, files in os.walk(list_path): # Get relative paths rel_root = os.path.relpath(root, self.local_repo_path) # Add files for file_name in files: # Skip hidden files and CVS directories if not file_name.startswith('.') and 'CVS' not in rel_root: full_path = os.path.normpath(os.path.join(rel_root, file_name)) if full_path != '.': # If a module is specified, strip the module prefix if module: full_path = os.path.basename(full_path) tree.append(full_path) return sorted(tree) except Exception as e: print(f"DEBUG: Error listing repository tree: {e}", file=sys.stderr) return [] def get_file_diff(self, file_path, rev1, rev2): """ Get diff between two revisions of a file :param file_path: Path to the file :type file_path: str :param rev1: First revision :type rev1: str :param rev2: Second revision :type rev2: str :return: Diff output :rtype: str """ try: output = self._run_cvs_command([ 'rdiff', '-u', '-r', rev1, '-r', rev2, file_path ]) return output except subprocess.CalledProcessError: return f"Error generating diff for {file_path} between {rev1} and {rev2}" def get_file_history(self, file_path): """ Get revision history for a file using 'cvs log' command :param file_path: Path to the file :type file_path: str :return: List of revision details :rtype: list """ try: # Use 'cvs log' to get revision history for the file output = self._run_cvs_command(['log', file_path]) # Parse log output to extract revision details revisions = [] current_revision = {} in_log = False in_tags = False for line in output.split('\n'): # Look for revision lines (format: "revision X.X") rev_match = re.match(r'^revision\s+(\S+)', line) # Look for date/author/state line (format: "date: YYYY/MM/DD HH:MM:SS; author: NAME; state: STATE;") date_match = re.match(r'^date:\s+(\d{4}/\d{2}/\d{2}\s+\d{2}:\d{2}:\d{2});\s+author:\s+(\S+);\s+state:\s+(\S+);', line) # Look for branches line (format: "branches: ...") branches_match = re.match(r'^branches:', line) # Look for tag lines (format: "\tTAG_NAME: X.X") tag_match = re.match(r'^\s+(\S+):\s+(\S+)', line) if rev_match: # Start of a new revision if current_revision and 'revision' in current_revision: revisions.append(current_revision) current_revision = {'revision': rev_match.group(1)} in_log = False in_tags = False if date_match: current_revision.update({ 'date': date_match.group(1), 'author': date_match.group(2), 'state': date_match.group(3), 'lines_changed': 'N/A' # cvs log doesn't provide line counts }) in_log = False in_tags = False if branches_match: # Branches section starts, tags follow in_tags = True in_log = False continue # Capture tags (lines with indentation after branches line) if in_tags and tag_match and not re.match(r'^---', line): tag_name = tag_match.group(1) tag_revision = tag_match.group(2) # Only add tags that match the current revision if tag_revision == current_revision.get('revision'): if 'tags' not in current_revision: current_revision['tags'] = [] current_revision['tags'].append(tag_name) # Capture log message (lines after date/author/state until next revision or separator) if in_log: if line.strip() == '' or re.match(r'^---', line): in_log = False in_tags = False elif not re.match(r'^(revision|date|branches):', line): if 'log' not in current_revision: current_revision['log'] = '' if current_revision['log']: current_revision['log'] += '\n' + line else: current_revision['log'] = line # Start capturing log after date line if date_match: in_log = True # Add the last revision if current_revision and 'revision' in current_revision: revisions.append(current_revision) # Clean up logs - strip whitespace and take first line only for rev in revisions: if 'log' in rev: rev['log'] = rev['log'].strip().split('\n')[0] else: rev['log'] = '' # Ensure tags field exists if 'tags' not in rev: rev['tags'] = [] return revisions except subprocess.CalledProcessError as e: print(f"DEBUG: Error getting file history: {e}", file=sys.stderr) return [] def get_file_content(self, file_path, revision=None): """ Get raw file content, optionally at a specific revision :param file_path: Path to the file :type file_path: str :param revision: Optional specific revision :type revision: str, optional :return: File content :rtype: str """ try: command = ['checkout', '-p'] if revision: command.extend(['-r', revision]) command.append(file_path) return self._run_cvs_command(command) except subprocess.CalledProcessError: return f"Error retrieving content for {file_path}" def get_patchsets(self): """ Get repository patchset history using cvsps command Note: cvsps must be run from the module directory, not the repository root :return: List of patchset details :rtype: list """ try: # Determine the working directory for cvsps # If a module is specified, use the module directory; otherwise use the repository root if self.cvs_module: module_path = os.path.join(self.local_repo_path, self.cvs_module) cvsps_cwd = module_path if os.path.exists(module_path) else self.local_repo_path else: cvsps_cwd = self.local_repo_path # Use 'cvsps' to get all patch sets in the repository # cvsps must be run from the module directory output = self._run_cvsps_command([], cwd=cvsps_cwd) # Parse cvsps output to extract patchset details patchsets = [] current_patchset = {} in_log = False for line in output.split('\n'): # Look for PatchSet lines (format: "PatchSet XXXX") ps_match = re.match(r'^PatchSet\s+(\d+)', line) # Look for Date line (format: "Date: YYYY/MM/DD HH:MM:SS") date_match = re.match(r'^Date:\s+(\d{4}/\d{2}/\d{2}\s+\d{2}:\d{2}:\d{2})', line) # Look for Author line (format: "Author: NAME") author_match = re.match(r'^Author:\s+(\S+)', line) # Look for Tag line (format: "Tag: TAG_NAME") tag_match = re.match(r'^Tag:\s+(\S+)', line) # Look for Log line (format: "Log:") log_match = re.match(r'^Log:', line) if ps_match: # Start of a new patch set if current_patchset and 'patchset' in current_patchset: patchsets.append(current_patchset) current_patchset = {'patchset': ps_match.group(1)} in_log = False if date_match: current_patchset['date'] = date_match.group(1) if author_match: current_patchset['author'] = author_match.group(1) if tag_match: tag_value = tag_match.group(1) # Don't store tags that are "(none)" or "(None)" - treat them as no tag (case-insensitive) if tag_value.lower() != '(none)': current_patchset['tag'] = tag_value if log_match: # Log section starts, capture until next PatchSet or end current_patchset['log'] = '' in_log = True elif in_log: # Capture log lines until we hit an empty line or next section if line.strip() == '': in_log = False elif not re.match(r'^(PatchSet|Date|Author|Tag|Files):', line): # Append to log if it's not a new section header if current_patchset['log']: current_patchset['log'] += '\n' + line else: current_patchset['log'] = line # Add the last patchset if current_patchset and 'patchset' in current_patchset: patchsets.append(current_patchset) # Ensure all patchsets have the required fields with defaults for ps in patchsets: if 'date' not in ps: ps['date'] = 'N/A' if 'author' not in ps: ps['author'] = 'N/A' if 'tag' not in ps: ps['tag'] = 'N/A' if 'log' not in ps: ps['log'] = '' # Clean up log - remove leading/trailing whitespace and limit to first line for compact display ps['log'] = ps['log'].strip() return patchsets except subprocess.CalledProcessError as e: print(f"DEBUG: Error getting patchsets: {e}", file=sys.stderr) return [] def get_patchset_diff(self, patchset_number): """ Get diff for a specific patchset using cvsps command :param patchset_number: Patchset number :type patchset_number: str :return: Diff output for the patchset :rtype: str """ try: # Determine the working directory for cvsps # If a module is specified, use the module directory; otherwise use the repository root if self.cvs_module: module_path = os.path.join(self.local_repo_path, self.cvs_module) cvsps_cwd = module_path if os.path.exists(module_path) else self.local_repo_path else: cvsps_cwd = self.local_repo_path # Use 'cvsps' with -s flag to select the patchset and -g flag to generate the diff output = self._run_cvsps_command(['-s', patchset_number, '-g'], cwd=cvsps_cwd) return output except subprocess.CalledProcessError as e: print(f"DEBUG: Error getting patchset diff: {e}", file=sys.stderr) return f"Error retrieving diff for patchset {patchset_number}"