cvs-proxy/cvs_proxy/cvs_client.py

448 lines
No EOL
18 KiB
Python

import subprocess
import os
import re
import sys
import shutil
class CVSClient:
def __init__(self, repo_url=None, repos_checkout=None, cvs_module=None):
"""
Initialize CVS client with repository URL, checkout location, and module
:param repo_url: CVS repository URL in the format :pserver:username@hostname:/path/to/repository
:type repo_url: str, optional
:param repos_checkout: Path to the directory where repositories will be checked out
:type repos_checkout: str, optional
:param cvs_module: CVS module to work with
:type cvs_module: str, optional
:raises ValueError: If no repository URL is provided
"""
if repo_url is None:
raise ValueError("CVS repository URL must be provided")
self.repo_url = repo_url
self.cvs_module = cvs_module
# Use provided repos_checkout or fall back to environment variable
checkouts_base_dir = repos_checkout or os.getenv('REPO_CHECKOUTS', '/tmp/cvs_checkouts')
# Create checkouts directory if it doesn't exist
os.makedirs(checkouts_base_dir, exist_ok=True)
# Generate a safe directory name from the repo URL
# Remove :pserver: prefix and replace non-alphanumeric characters with single underscore
safe_repo_name = re.sub(r'^:pserver:', '', repo_url)
safe_repo_name = re.sub(r'[^a-zA-Z0-9]+', '_', safe_repo_name)
self.local_repo_path = os.path.join(checkouts_base_dir, safe_repo_name)
# Ensure clean checkout
self._checkout_repository()
def _run_cvs_command(self, command, cwd=None):
"""
Run a CVS command with the configured repository URL
:param command: List of CVS command arguments
:type command: list
:param cwd: Working directory for the command
:type cwd: str, optional
:return: Command output as string
:rtype: str
:raises subprocess.CalledProcessError: If the CVS command fails
"""
full_command = ['cvs', '-d', self.repo_url] + command
# Debug printout of the command to be executed
print(f"DEBUG: Executing CVS command: {' '.join(full_command)}", file=sys.stderr)
print(f"DEBUG: Working directory: {cwd or self.local_repo_path}", file=sys.stderr)
try:
result = subprocess.run(
full_command,
capture_output=True,
text=True,
check=True,
cwd=cwd or self.local_repo_path
)
# Debug printout of stdout and stderr
if result.stdout:
print(f"DEBUG: CVS Command STDOUT:\n{result.stdout}", file=sys.stderr)
if result.stderr:
print(f"DEBUG: CVS Command STDERR:\n{result.stderr}", file=sys.stderr)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
# Debug printout for command failure
print(f"DEBUG: CVS Command FAILED", file=sys.stderr)
print(f"DEBUG: Return Code: {e.returncode}", file=sys.stderr)
print(f"DEBUG: Command: {' '.join(e.cmd)}", file=sys.stderr)
print(f"DEBUG: STDOUT:\n{e.stdout}", file=sys.stderr)
print(f"DEBUG: STDERR:\n{e.stderr}", file=sys.stderr)
# Re-raise to allow caller to handle specific errors
raise
def _run_cvsps_command(self, command, cwd=None):
"""
Run a cvsps command (standalone program, not a CVS subcommand)
:param command: List of cvsps command arguments
:type command: list
:param cwd: Working directory for the command
:type cwd: str, optional
:return: Command output as string
:rtype: str
:raises subprocess.CalledProcessError: If the cvsps command fails
"""
full_command = ['cvsps'] + command
# Debug printout of the command to be executed
print(f"DEBUG: Executing cvsps command: {' '.join(full_command)}", file=sys.stderr)
print(f"DEBUG: Working directory: {cwd or self.local_repo_path}", file=sys.stderr)
try:
result = subprocess.run(
full_command,
capture_output=True,
text=True,
check=True,
cwd=cwd or self.local_repo_path,
env={**os.environ, 'CVSROOT': self.repo_url}
)
# Debug printout of stdout and stderr
if result.stdout:
print(f"DEBUG: cvsps Command STDOUT:\n{result.stdout}", file=sys.stderr)
if result.stderr:
print(f"DEBUG: cvsps Command STDERR:\n{result.stderr}", file=sys.stderr)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
# Debug printout for command failure
print(f"DEBUG: cvsps Command FAILED", file=sys.stderr)
print(f"DEBUG: Return Code: {e.returncode}", file=sys.stderr)
print(f"DEBUG: Command: {' '.join(e.cmd)}", file=sys.stderr)
print(f"DEBUG: STDOUT:\n{e.stdout}", file=sys.stderr)
print(f"DEBUG: STDERR:\n{e.stderr}", file=sys.stderr)
# Re-raise to allow caller to handle specific errors
raise
def _checkout_repository(self):
"""
Checkout or update the local repository
"""
# Remove existing checkout if it exists
if os.path.exists(self.local_repo_path):
shutil.rmtree(self.local_repo_path)
# Create directory
os.makedirs(self.local_repo_path, exist_ok=True)
try:
# Perform initial checkout with module if specified
checkout_command = ['checkout']
if self.cvs_module:
checkout_command.append(self.cvs_module)
else:
checkout_command.append('.')
self._run_cvs_command(checkout_command, cwd=self.local_repo_path)
print(f"DEBUG: Repository checked out to {self.local_repo_path}", file=sys.stderr)
except subprocess.CalledProcessError as e:
print(f"DEBUG: Repository checkout failed: {e}", file=sys.stderr)
raise
def list_repository_tree(self, module=None):
"""
List repository tree structure using local filesystem
:param module: Optional module or subdirectory to list
:type module: str, optional
:return: List of files and directories
:rtype: list
"""
try:
# Determine the path to list
list_path = os.path.join(self.local_repo_path, module) if module else self.local_repo_path
# Walk through the directory
tree = []
for root, dirs, files in os.walk(list_path):
# Get relative paths
rel_root = os.path.relpath(root, self.local_repo_path)
# Add files
for file_name in files:
# Skip hidden files and CVS directories
if not file_name.startswith('.') and 'CVS' not in rel_root:
full_path = os.path.normpath(os.path.join(rel_root, file_name))
if full_path != '.':
# If a module is specified, strip the module prefix
if module:
full_path = os.path.basename(full_path)
tree.append(full_path)
return sorted(tree)
except Exception as e:
print(f"DEBUG: Error listing repository tree: {e}", file=sys.stderr)
return []
def get_file_diff(self, file_path, rev1, rev2):
"""
Get diff between two revisions of a file
:param file_path: Path to the file
:type file_path: str
:param rev1: First revision
:type rev1: str
:param rev2: Second revision
:type rev2: str
:return: Diff output
:rtype: str
"""
try:
output = self._run_cvs_command([
'rdiff',
'-u',
'-r', rev1,
'-r', rev2,
file_path
])
return output
except subprocess.CalledProcessError:
return f"Error generating diff for {file_path} between {rev1} and {rev2}"
def get_file_history(self, file_path):
"""
Get revision history for a file using 'cvs log' command
:param file_path: Path to the file
:type file_path: str
:return: List of revision details
:rtype: list
"""
try:
# Use 'cvs log' to get revision history for the file
output = self._run_cvs_command(['log', file_path])
# First pass: collect all tags and their revisions
tags_by_revision = {}
for line in output.split('\n'):
# Look for tag lines (format: "\tTAG_NAME: X.X")
tag_match = re.match(r'^\s+(\S+):\s+(\S+)', line)
if tag_match:
tag_name = tag_match.group(1)
tag_revision = tag_match.group(2)
if tag_revision not in tags_by_revision:
tags_by_revision[tag_revision] = []
tags_by_revision[tag_revision].append(tag_name)
# Second pass: parse revisions and attach tags
revisions = []
current_revision = {}
in_log = False
for line in output.split('\n'):
# Look for revision lines (format: "revision X.X")
rev_match = re.match(r'^revision\s+(\S+)', line)
# Look for date/author/state line (format: "date: YYYY/MM/DD HH:MM:SS; author: NAME; state: STATE;")
date_match = re.match(r'^date:\s+(\d{4}/\d{2}/\d{2}\s+\d{2}:\d{2}:\d{2});\s+author:\s+(\S+);\s+state:\s+(\S+);', line)
if rev_match:
# Start of a new revision
if current_revision and 'revision' in current_revision:
revisions.append(current_revision)
current_revision = {'revision': rev_match.group(1)}
in_log = False
if date_match:
current_revision.update({
'date': date_match.group(1),
'author': date_match.group(2),
'state': date_match.group(3),
'lines_changed': 'N/A' # cvs log doesn't provide line counts
})
in_log = False
# Capture log message (lines after date/author/state until next revision or separator)
if in_log:
if line.strip() == '' or re.match(r'^---', line):
in_log = False
elif not re.match(r'^(revision|date|branches):', line):
if 'log' not in current_revision:
current_revision['log'] = ''
if current_revision['log']:
current_revision['log'] += '\n' + line
else:
current_revision['log'] = line
# Start capturing log after date line
if date_match:
in_log = True
# Add the last revision
if current_revision and 'revision' in current_revision:
revisions.append(current_revision)
# Clean up logs and attach tags
for rev in revisions:
if 'log' in rev:
rev['log'] = rev['log'].strip().split('\n')[0]
else:
rev['log'] = ''
# Attach tags from the first pass
rev['tags'] = tags_by_revision.get(rev['revision'], [])
return revisions
except subprocess.CalledProcessError as e:
print(f"DEBUG: Error getting file history: {e}", file=sys.stderr)
return []
def get_file_content(self, file_path, revision=None):
"""
Get raw file content, optionally at a specific revision
:param file_path: Path to the file
:type file_path: str
:param revision: Optional specific revision
:type revision: str, optional
:return: File content
:rtype: str
"""
try:
command = ['checkout', '-p']
if revision:
command.extend(['-r', revision])
command.append(file_path)
return self._run_cvs_command(command)
except subprocess.CalledProcessError:
return f"Error retrieving content for {file_path}"
def get_patchsets(self):
"""
Get repository patchset history using cvsps command
Note: cvsps must be run from the module directory, not the repository root
:return: List of patchset details
:rtype: list
"""
try:
# Determine the working directory for cvsps
# If a module is specified, use the module directory; otherwise use the repository root
if self.cvs_module:
module_path = os.path.join(self.local_repo_path, self.cvs_module)
cvsps_cwd = module_path if os.path.exists(module_path) else self.local_repo_path
else:
cvsps_cwd = self.local_repo_path
# Use 'cvsps' to get all patch sets in the repository
# cvsps must be run from the module directory
output = self._run_cvsps_command([], cwd=cvsps_cwd)
# Parse cvsps output to extract patchset details
patchsets = []
current_patchset = {}
in_log = False
for line in output.split('\n'):
# Look for PatchSet lines (format: "PatchSet XXXX")
ps_match = re.match(r'^PatchSet\s+(\d+)', line)
# Look for Date line (format: "Date: YYYY/MM/DD HH:MM:SS")
date_match = re.match(r'^Date:\s+(\d{4}/\d{2}/\d{2}\s+\d{2}:\d{2}:\d{2})', line)
# Look for Author line (format: "Author: NAME")
author_match = re.match(r'^Author:\s+(\S+)', line)
# Look for Tag line (format: "Tag: TAG_NAME")
tag_match = re.match(r'^Tag:\s+(\S+)', line)
# Look for Log line (format: "Log:")
log_match = re.match(r'^Log:', line)
if ps_match:
# Start of a new patch set
if current_patchset and 'patchset' in current_patchset:
patchsets.append(current_patchset)
current_patchset = {'patchset': ps_match.group(1)}
in_log = False
if date_match:
current_patchset['date'] = date_match.group(1)
if author_match:
current_patchset['author'] = author_match.group(1)
if tag_match:
tag_value = tag_match.group(1)
# Don't store tags that are "(none)" or "(None)" - treat them as no tag (case-insensitive)
if tag_value.lower() != '(none)':
current_patchset['tag'] = tag_value
if log_match:
# Log section starts, capture until next PatchSet or end
current_patchset['log'] = ''
in_log = True
elif in_log:
# Capture log lines until we hit an empty line or next section
if line.strip() == '':
in_log = False
elif not re.match(r'^(PatchSet|Date|Author|Tag|Files):', line):
# Append to log if it's not a new section header
if current_patchset['log']:
current_patchset['log'] += '\n' + line
else:
current_patchset['log'] = line
# Add the last patchset
if current_patchset and 'patchset' in current_patchset:
patchsets.append(current_patchset)
# Ensure all patchsets have the required fields with defaults
for ps in patchsets:
if 'date' not in ps:
ps['date'] = 'N/A'
if 'author' not in ps:
ps['author'] = 'N/A'
if 'tag' not in ps:
ps['tag'] = 'N/A'
if 'log' not in ps:
ps['log'] = ''
# Clean up log - remove leading/trailing whitespace and limit to first line for compact display
ps['log'] = ps['log'].strip()
return patchsets
except subprocess.CalledProcessError as e:
print(f"DEBUG: Error getting patchsets: {e}", file=sys.stderr)
return []
def get_patchset_diff(self, patchset_number):
"""
Get diff for a specific patchset using cvsps command
:param patchset_number: Patchset number
:type patchset_number: str
:return: Diff output for the patchset
:rtype: str
"""
try:
# Determine the working directory for cvsps
# If a module is specified, use the module directory; otherwise use the repository root
if self.cvs_module:
module_path = os.path.join(self.local_repo_path, self.cvs_module)
cvsps_cwd = module_path if os.path.exists(module_path) else self.local_repo_path
else:
cvsps_cwd = self.local_repo_path
# Use 'cvsps' with -s flag to select the patchset and -g flag to generate the diff
output = self._run_cvsps_command(['-s', patchset_number, '-g'], cwd=cvsps_cwd)
return output
except subprocess.CalledProcessError as e:
print(f"DEBUG: Error getting patchset diff: {e}", file=sys.stderr)
return f"Error retrieving diff for patchset {patchset_number}"