Initial commit
This commit is contained in:
commit
f2817bc1f1
17 changed files with 2785 additions and 0 deletions
2
cvs_proxy/__init__.py
Normal file
2
cvs_proxy/__init__.py
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
# CVS Proxy Package
|
||||
# This file makes the directory a Python package
|
||||
216
cvs_proxy/app.py
Normal file
216
cvs_proxy/app.py
Normal file
|
|
@ -0,0 +1,216 @@
|
|||
from flask import Flask, request, jsonify, send_from_directory
|
||||
from .cvs_client import CVSClient
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
|
||||
# Get the directory where this app.py is located
|
||||
app_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
# Get the parent directory (cvs-proxy root)
|
||||
project_root = os.path.dirname(app_dir)
|
||||
# UI directory path
|
||||
ui_dir = os.path.join(project_root, 'ui')
|
||||
|
||||
app = Flask(__name__, static_folder=ui_dir, static_url_path='')
|
||||
|
||||
# Global variables to store configuration
|
||||
cvs_client = None
|
||||
app_config = {}
|
||||
|
||||
# Initialize CVS Client using provided parameters
|
||||
def create_cvs_client(cvs_url, repos_checkout=None, cvs_module=None):
|
||||
"""
|
||||
Create CVS client from provided parameters
|
||||
|
||||
:param cvs_url: CVS repository URL
|
||||
:type cvs_url: str
|
||||
:param repos_checkout: Path to the directory where repositories will be checked out
|
||||
:type repos_checkout: str, optional
|
||||
:param cvs_module: CVS module to work with
|
||||
:type cvs_module: str, optional
|
||||
:return: Configured CVS Client
|
||||
:rtype: CVSClient
|
||||
:raises ValueError: If CVS URL is not provided
|
||||
"""
|
||||
if not cvs_url:
|
||||
raise ValueError("CVS_URL must be provided as a command-line argument")
|
||||
|
||||
return CVSClient(cvs_url, repos_checkout=repos_checkout, cvs_module=cvs_module)
|
||||
|
||||
@app.route('/v1/tree', methods=['GET'])
|
||||
def get_repository_tree():
|
||||
"""
|
||||
Get repository tree structure
|
||||
Optional query param: module
|
||||
"""
|
||||
if not cvs_client:
|
||||
return jsonify({"error": "CVS Client not initialized"}), 500
|
||||
|
||||
module = request.args.get('module')
|
||||
try:
|
||||
tree = cvs_client.list_repository_tree(module)
|
||||
return jsonify(tree)
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
@app.route('/v1/diff', methods=['GET'])
|
||||
def get_file_diff():
|
||||
"""
|
||||
Get diff between two revisions of a file
|
||||
Required query params: file, rev1, rev2
|
||||
"""
|
||||
if not cvs_client:
|
||||
return jsonify({"error": "CVS Client not initialized"}), 500
|
||||
|
||||
file_path = request.args.get('file')
|
||||
rev1 = request.args.get('rev1')
|
||||
rev2 = request.args.get('rev2')
|
||||
|
||||
if not all([file_path, rev1, rev2]):
|
||||
return jsonify({"error": "Missing required parameters"}), 400
|
||||
|
||||
try:
|
||||
diff = cvs_client.get_file_diff(file_path, rev1, rev2)
|
||||
return jsonify({"diff": diff})
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
@app.route('/v1/history', methods=['GET'])
|
||||
def get_file_history():
|
||||
"""
|
||||
Get revision history for a file
|
||||
Required query param: file
|
||||
"""
|
||||
if not cvs_client:
|
||||
return jsonify({"error": "CVS Client not initialized"}), 500
|
||||
|
||||
file_path = request.args.get('file')
|
||||
|
||||
if not file_path:
|
||||
return jsonify({"error": "Missing file parameter"}), 400
|
||||
|
||||
try:
|
||||
history = cvs_client.get_file_history(file_path)
|
||||
return jsonify(history)
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
@app.route('/v1/file', methods=['GET'])
|
||||
def get_file_content():
|
||||
"""
|
||||
Get raw file content at a specific revision
|
||||
Required param: file
|
||||
Optional param: revision
|
||||
"""
|
||||
if not cvs_client:
|
||||
return jsonify({"error": "CVS Client not initialized"}), 500
|
||||
|
||||
file_path = request.args.get('file')
|
||||
revision = request.args.get('revision')
|
||||
|
||||
if not file_path:
|
||||
return jsonify({"error": "Missing file parameter"}), 400
|
||||
|
||||
try:
|
||||
content = cvs_client.get_file_content(file_path, revision)
|
||||
return content
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
@app.route('/v1/health', methods=['GET'])
|
||||
def health_check():
|
||||
"""
|
||||
Simple health check endpoint
|
||||
"""
|
||||
return jsonify({
|
||||
"status": "healthy",
|
||||
"cvs_client": "initialized" if cvs_client else "not initialized"
|
||||
})
|
||||
|
||||
@app.route('/', methods=['GET'])
|
||||
def index():
|
||||
"""
|
||||
Serve the UI index.html
|
||||
"""
|
||||
return send_from_directory(ui_dir, 'index.html')
|
||||
|
||||
@app.route('/<path:filename>', methods=['GET'])
|
||||
def serve_static(filename):
|
||||
"""
|
||||
Serve static files (CSS, JS, etc.)
|
||||
"""
|
||||
return send_from_directory(ui_dir, filename)
|
||||
|
||||
def main():
|
||||
"""
|
||||
Main entry point for the CVS Proxy application
|
||||
"""
|
||||
global cvs_client, app_config
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description='CVS Proxy - A web proxy for CVS repositories'
|
||||
)
|
||||
|
||||
# CVS configuration arguments
|
||||
parser.add_argument(
|
||||
'--cvs-url',
|
||||
required=True,
|
||||
help='CVS repository URL (e.g., :pserver:user@host:/path/to/repo)'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--repo-checkouts',
|
||||
default='/tmp/cvs_checkouts',
|
||||
help='Path to the directory where repositories will be checked out (default: /tmp/cvs_checkouts)'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--cvs-module',
|
||||
default=None,
|
||||
help='CVS module to work with (optional)'
|
||||
)
|
||||
|
||||
# Flask configuration arguments
|
||||
parser.add_argument(
|
||||
'--host',
|
||||
default='0.0.0.0',
|
||||
help='Flask host to bind to (default: 0.0.0.0)'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--port',
|
||||
type=int,
|
||||
default=5000,
|
||||
help='Flask port to bind to (default: 5000)'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--debug',
|
||||
action='store_true',
|
||||
help='Enable Flask debug mode'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Store configuration
|
||||
app_config = {
|
||||
'cvs_url': args.cvs_url,
|
||||
'repo_checkouts': args.repo_checkouts,
|
||||
'cvs_module': args.cvs_module,
|
||||
'host': args.host,
|
||||
'port': args.port,
|
||||
'debug': args.debug
|
||||
}
|
||||
|
||||
# Attempt to create CVS client at startup
|
||||
try:
|
||||
cvs_client = create_cvs_client(
|
||||
args.cvs_url,
|
||||
repos_checkout=args.repo_checkouts,
|
||||
cvs_module=args.cvs_module
|
||||
)
|
||||
except ValueError as e:
|
||||
print(f"Error initializing CVS Client: {e}", file=sys.stderr)
|
||||
cvs_client = None
|
||||
|
||||
print(f"Starting CVS Proxy on {args.host}:{args.port}")
|
||||
app.run(host=args.host, port=args.port, debug=args.debug)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
238
cvs_proxy/cvs_client.py
Normal file
238
cvs_proxy/cvs_client.py
Normal file
|
|
@ -0,0 +1,238 @@
|
|||
import subprocess
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import shutil
|
||||
|
||||
class CVSClient:
|
||||
def __init__(self, repo_url=None, repos_checkout=None, cvs_module=None):
|
||||
"""
|
||||
Initialize CVS client with repository URL, checkout location, and module
|
||||
|
||||
:param repo_url: CVS repository URL in the format :pserver:username@hostname:/path/to/repository
|
||||
:type repo_url: str, optional
|
||||
:param repos_checkout: Path to the directory where repositories will be checked out
|
||||
:type repos_checkout: str, optional
|
||||
:param cvs_module: CVS module to work with
|
||||
:type cvs_module: str, optional
|
||||
:raises ValueError: If no repository URL is provided
|
||||
"""
|
||||
if repo_url is None:
|
||||
raise ValueError("CVS repository URL must be provided")
|
||||
|
||||
self.repo_url = repo_url
|
||||
self.cvs_module = cvs_module
|
||||
|
||||
# Use provided repos_checkout or fall back to environment variable
|
||||
checkouts_base_dir = repos_checkout or os.getenv('REPO_CHECKOUTS', '/tmp/cvs_checkouts')
|
||||
|
||||
# Create checkouts directory if it doesn't exist
|
||||
os.makedirs(checkouts_base_dir, exist_ok=True)
|
||||
|
||||
# Generate a safe directory name from the repo URL
|
||||
# Remove :pserver: prefix and replace non-alphanumeric characters with single underscore
|
||||
safe_repo_name = re.sub(r'^:pserver:', '', repo_url)
|
||||
safe_repo_name = re.sub(r'[^a-zA-Z0-9]+', '_', safe_repo_name)
|
||||
|
||||
self.local_repo_path = os.path.join(checkouts_base_dir, safe_repo_name)
|
||||
|
||||
# Ensure clean checkout
|
||||
self._checkout_repository()
|
||||
|
||||
def _run_cvs_command(self, command, cwd=None):
|
||||
"""
|
||||
Run a CVS command with the configured repository URL
|
||||
|
||||
:param command: List of CVS command arguments
|
||||
:type command: list
|
||||
:param cwd: Working directory for the command
|
||||
:type cwd: str, optional
|
||||
:return: Command output as string
|
||||
:rtype: str
|
||||
:raises subprocess.CalledProcessError: If the CVS command fails
|
||||
"""
|
||||
full_command = ['cvs', '-d', self.repo_url] + command
|
||||
|
||||
# Debug printout of the command to be executed
|
||||
print(f"DEBUG: Executing CVS command: {' '.join(full_command)}", file=sys.stderr)
|
||||
print(f"DEBUG: Working directory: {cwd or self.local_repo_path}", file=sys.stderr)
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
full_command,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
cwd=cwd or self.local_repo_path
|
||||
)
|
||||
|
||||
# Debug printout of stdout and stderr
|
||||
if result.stdout:
|
||||
print(f"DEBUG: CVS Command STDOUT:\n{result.stdout}", file=sys.stderr)
|
||||
if result.stderr:
|
||||
print(f"DEBUG: CVS Command STDERR:\n{result.stderr}", file=sys.stderr)
|
||||
|
||||
return result.stdout.strip()
|
||||
except subprocess.CalledProcessError as e:
|
||||
# Debug printout for command failure
|
||||
print(f"DEBUG: CVS Command FAILED", file=sys.stderr)
|
||||
print(f"DEBUG: Return Code: {e.returncode}", file=sys.stderr)
|
||||
print(f"DEBUG: Command: {' '.join(e.cmd)}", file=sys.stderr)
|
||||
print(f"DEBUG: STDOUT:\n{e.stdout}", file=sys.stderr)
|
||||
print(f"DEBUG: STDERR:\n{e.stderr}", file=sys.stderr)
|
||||
|
||||
# Re-raise to allow caller to handle specific errors
|
||||
raise
|
||||
|
||||
def _checkout_repository(self):
|
||||
"""
|
||||
Checkout or update the local repository
|
||||
"""
|
||||
# Remove existing checkout if it exists
|
||||
if os.path.exists(self.local_repo_path):
|
||||
shutil.rmtree(self.local_repo_path)
|
||||
|
||||
# Create directory
|
||||
os.makedirs(self.local_repo_path, exist_ok=True)
|
||||
|
||||
try:
|
||||
# Perform initial checkout with module if specified
|
||||
checkout_command = ['checkout']
|
||||
if self.cvs_module:
|
||||
checkout_command.append(self.cvs_module)
|
||||
else:
|
||||
checkout_command.append('.')
|
||||
|
||||
self._run_cvs_command(checkout_command, cwd=self.local_repo_path)
|
||||
print(f"DEBUG: Repository checked out to {self.local_repo_path}", file=sys.stderr)
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"DEBUG: Repository checkout failed: {e}", file=sys.stderr)
|
||||
raise
|
||||
|
||||
def list_repository_tree(self, module=None):
|
||||
"""
|
||||
List repository tree structure using local filesystem
|
||||
|
||||
:param module: Optional module or subdirectory to list
|
||||
:type module: str, optional
|
||||
:return: List of files and directories
|
||||
:rtype: list
|
||||
"""
|
||||
try:
|
||||
# Determine the path to list
|
||||
list_path = os.path.join(self.local_repo_path, module) if module else self.local_repo_path
|
||||
|
||||
# Walk through the directory
|
||||
tree = []
|
||||
for root, dirs, files in os.walk(list_path):
|
||||
# Get relative paths
|
||||
rel_root = os.path.relpath(root, self.local_repo_path)
|
||||
|
||||
# Add files
|
||||
for file_name in files:
|
||||
# Skip hidden files and CVS directories
|
||||
if not file_name.startswith('.') and 'CVS' not in rel_root:
|
||||
full_path = os.path.normpath(os.path.join(rel_root, file_name))
|
||||
if full_path != '.':
|
||||
# If a module is specified, strip the module prefix
|
||||
if module:
|
||||
full_path = os.path.basename(full_path)
|
||||
tree.append(full_path)
|
||||
|
||||
return sorted(tree)
|
||||
except Exception as e:
|
||||
print(f"DEBUG: Error listing repository tree: {e}", file=sys.stderr)
|
||||
return []
|
||||
|
||||
def get_file_diff(self, file_path, rev1, rev2):
|
||||
"""
|
||||
Get diff between two revisions of a file
|
||||
|
||||
:param file_path: Path to the file
|
||||
:type file_path: str
|
||||
:param rev1: First revision
|
||||
:type rev1: str
|
||||
:param rev2: Second revision
|
||||
:type rev2: str
|
||||
:return: Diff output
|
||||
:rtype: str
|
||||
"""
|
||||
try:
|
||||
output = self._run_cvs_command([
|
||||
'rdiff',
|
||||
'-u',
|
||||
'-r', rev1,
|
||||
'-r', rev2,
|
||||
file_path
|
||||
])
|
||||
return output
|
||||
except subprocess.CalledProcessError:
|
||||
return f"Error generating diff for {file_path} between {rev1} and {rev2}"
|
||||
|
||||
def get_file_history(self, file_path):
|
||||
"""
|
||||
Get revision history for a file using 'cvs log' command
|
||||
|
||||
:param file_path: Path to the file
|
||||
:type file_path: str
|
||||
:return: List of revision details
|
||||
:rtype: list
|
||||
"""
|
||||
try:
|
||||
# Use 'cvs log' instead of 'rlog' as it's more widely supported
|
||||
output = self._run_cvs_command(['log', file_path])
|
||||
|
||||
# Parse log output to extract revision details
|
||||
revisions = []
|
||||
current_revision = {}
|
||||
|
||||
for line in output.split('\n'):
|
||||
# Look for revision lines (format: "revision X.X")
|
||||
rev_match = re.match(r'^revision\s+(\S+)', line)
|
||||
|
||||
# Look for date/author/state line (format: "date: YYYY/MM/DD HH:MM:SS; author: NAME; state: STATE;")
|
||||
date_match = re.match(r'^date:\s+(\d{4}/\d{2}/\d{2}\s+\d{2}:\d{2}:\d{2});\s+author:\s+(\S+);\s+state:\s+(\S+);', line)
|
||||
|
||||
if rev_match:
|
||||
# Start of a new revision
|
||||
if current_revision and 'revision' in current_revision:
|
||||
revisions.append(current_revision)
|
||||
current_revision = {'revision': rev_match.group(1)}
|
||||
|
||||
if date_match:
|
||||
current_revision.update({
|
||||
'date': date_match.group(1),
|
||||
'author': date_match.group(2),
|
||||
'state': date_match.group(3),
|
||||
'lines_changed': 'N/A' # cvs log doesn't provide line counts
|
||||
})
|
||||
|
||||
# Add the last revision
|
||||
if current_revision and 'revision' in current_revision:
|
||||
revisions.append(current_revision)
|
||||
|
||||
return revisions
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"DEBUG: Error getting file history: {e}", file=sys.stderr)
|
||||
return []
|
||||
|
||||
def get_file_content(self, file_path, revision=None):
|
||||
"""
|
||||
Get raw file content, optionally at a specific revision
|
||||
|
||||
:param file_path: Path to the file
|
||||
:type file_path: str
|
||||
:param revision: Optional specific revision
|
||||
:type revision: str, optional
|
||||
:return: File content
|
||||
:rtype: str
|
||||
"""
|
||||
try:
|
||||
command = ['checkout', '-p']
|
||||
if revision:
|
||||
command.extend(['-r', revision])
|
||||
command.append(file_path)
|
||||
|
||||
return self._run_cvs_command(command)
|
||||
except subprocess.CalledProcessError:
|
||||
return f"Error retrieving content for {file_path}"
|
||||
189
cvs_proxy/test_cvs_client.py
Normal file
189
cvs_proxy/test_cvs_client.py
Normal file
|
|
@ -0,0 +1,189 @@
|
|||
import unittest
|
||||
from unittest.mock import patch, MagicMock, mock_open
|
||||
from cvs_proxy.cvs_client import CVSClient
|
||||
import subprocess
|
||||
import os
|
||||
import tempfile
|
||||
import shutil
|
||||
|
||||
class TestCVSClient(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Use a test repository URL directly
|
||||
self.test_url = ':pserver:testuser@test.repo.com:/path/to/repo'
|
||||
|
||||
# Create a temporary directory to simulate checkouts
|
||||
self.test_checkouts_dir = tempfile.mkdtemp()
|
||||
|
||||
# Patch the checkout method to prevent actual checkout
|
||||
CVSClient._checkout_repository = lambda self: None
|
||||
|
||||
# Create the CVS client with explicit repos_checkout and cvs_module
|
||||
self.cvs_client = CVSClient(self.test_url, repos_checkout=self.test_checkouts_dir, cvs_module='test_module')
|
||||
|
||||
# Override local_repo_path with a test directory
|
||||
self.test_repo_dir = os.path.join(self.test_checkouts_dir, 'testuser_test_repo_com_path_to_repo')
|
||||
os.makedirs(self.test_repo_dir, exist_ok=True)
|
||||
self.cvs_client.local_repo_path = self.test_repo_dir
|
||||
|
||||
def tearDown(self):
|
||||
# Clean up the temporary directory
|
||||
shutil.rmtree(self.test_checkouts_dir)
|
||||
|
||||
# Remove the environment variable
|
||||
if 'REPO_CHECKOUTS' in os.environ:
|
||||
del os.environ['REPO_CHECKOUTS']
|
||||
|
||||
def _create_test_files(self):
|
||||
"""
|
||||
Create a test directory structure for repository tree testing
|
||||
"""
|
||||
# Create test files and directories
|
||||
os.makedirs(os.path.join(self.test_repo_dir, 'src'), exist_ok=True)
|
||||
os.makedirs(os.path.join(self.test_repo_dir, 'docs'), exist_ok=True)
|
||||
|
||||
# Create some files
|
||||
with open(os.path.join(self.test_repo_dir, 'README.md'), 'w') as f:
|
||||
f.write('Test README')
|
||||
|
||||
with open(os.path.join(self.test_repo_dir, 'src', 'main.py'), 'w') as f:
|
||||
f.write('def main():\n pass')
|
||||
|
||||
with open(os.path.join(self.test_repo_dir, 'docs', 'manual.txt'), 'w') as f:
|
||||
f.write('Documentation')
|
||||
|
||||
def test_initialization(self):
|
||||
"""Test CVS client initialization"""
|
||||
# Verify repository URL
|
||||
self.assertEqual(self.cvs_client.repo_url, ':pserver:testuser@test.repo.com:/path/to/repo')
|
||||
|
||||
# Verify checkout directory is created in the specified REPO_CHECKOUTS location
|
||||
self.assertTrue(os.path.exists(self.test_checkouts_dir))
|
||||
self.assertTrue(os.path.exists(self.test_repo_dir))
|
||||
|
||||
def test_missing_url(self):
|
||||
"""Test initialization without URL"""
|
||||
with self.assertRaises(ValueError):
|
||||
CVSClient()
|
||||
|
||||
def test_list_repository_tree(self):
|
||||
"""Test listing repository tree structure"""
|
||||
# Create test files
|
||||
self._create_test_files()
|
||||
|
||||
# Test full repository tree
|
||||
tree = self.cvs_client.list_repository_tree()
|
||||
expected_tree = [
|
||||
'README.md',
|
||||
'docs/manual.txt',
|
||||
'src/main.py'
|
||||
]
|
||||
self.assertEqual(sorted(tree), sorted(expected_tree))
|
||||
|
||||
# Test subdirectory listing
|
||||
src_tree = self.cvs_client.list_repository_tree('src')
|
||||
self.assertEqual(src_tree, ['main.py'])
|
||||
|
||||
# Test non-existent directory
|
||||
empty_tree = self.cvs_client.list_repository_tree('nonexistent')
|
||||
self.assertEqual(empty_tree, [])
|
||||
|
||||
def test_checkout_directory_configuration(self):
|
||||
"""Test that checkout directory can be configured via environment variable"""
|
||||
# Create another test directory
|
||||
another_checkouts_dir = tempfile.mkdtemp()
|
||||
os.environ['REPO_CHECKOUTS'] = another_checkouts_dir
|
||||
|
||||
# Create a new CVS client
|
||||
another_client = CVSClient(':pserver:another@example.com:/another/repo')
|
||||
|
||||
# Verify the checkout is in the new directory
|
||||
expected_repo_path = os.path.join(another_checkouts_dir, 'another_example_com_another_repo')
|
||||
self.assertEqual(another_client.local_repo_path, expected_repo_path)
|
||||
|
||||
# Clean up
|
||||
shutil.rmtree(another_checkouts_dir)
|
||||
del os.environ['REPO_CHECKOUTS']
|
||||
|
||||
@patch('cvs_proxy.cvs_client.subprocess.run')
|
||||
def test_run_cvs_command(self, mock_run):
|
||||
"""Test the internal _run_cvs_command method"""
|
||||
mock_run.return_value = MagicMock(
|
||||
stdout='Command output',
|
||||
stderr='',
|
||||
returncode=0
|
||||
)
|
||||
|
||||
result = self.cvs_client._run_cvs_command(['rlog', 'test.txt'])
|
||||
self.assertEqual(result, 'Command output')
|
||||
|
||||
# Verify the command was constructed correctly
|
||||
mock_run.assert_called_once()
|
||||
call_args = mock_run.call_args[0][0]
|
||||
self.assertEqual(call_args, [
|
||||
'cvs',
|
||||
'-d',
|
||||
':pserver:testuser@test.repo.com:/path/to/repo',
|
||||
'rlog',
|
||||
'test.txt'
|
||||
])
|
||||
|
||||
@patch('cvs_proxy.cvs_client.subprocess.run')
|
||||
def test_get_file_diff(self, mock_run):
|
||||
"""Test getting file differences"""
|
||||
mock_run.return_value = MagicMock(
|
||||
stdout='--- a/file.txt\n+++ b/file.txt\n@@ -1,3 +1,4 @@\n line1\n-old line\n+new line\n line3',
|
||||
stderr='',
|
||||
returncode=0
|
||||
)
|
||||
|
||||
diff = self.cvs_client.get_file_diff('file.txt', '1.1', '1.2')
|
||||
self.assertIn('line1', diff)
|
||||
self.assertIn('-old line', diff)
|
||||
self.assertIn('+new line', diff)
|
||||
|
||||
@patch('cvs_proxy.cvs_client.subprocess.run')
|
||||
def test_get_file_history(self, mock_run):
|
||||
"""Test retrieving file history"""
|
||||
mock_run.return_value = MagicMock(
|
||||
stdout='''
|
||||
revision 1.2
|
||||
date: 2023/11/20 10:00:00; author: testuser; state: Exp; lines: +5 -2
|
||||
revision 1.1
|
||||
date: 2023/11/19 15:30:00; author: testuser; state: Exp; lines: +10 -0
|
||||
''',
|
||||
stderr='',
|
||||
returncode=0
|
||||
)
|
||||
|
||||
history = self.cvs_client.get_file_history('file.txt')
|
||||
self.assertIsInstance(history, list)
|
||||
self.assertEqual(len(history), 2)
|
||||
self.assertIn('revision', history[0])
|
||||
self.assertEqual(history[0]['revision'], '1.2')
|
||||
|
||||
@patch('cvs_proxy.cvs_client.subprocess.run')
|
||||
def test_get_file_content(self, mock_run):
|
||||
"""Test retrieving file content"""
|
||||
mock_run.return_value = MagicMock(
|
||||
stdout='File contents\nMultiple lines\nof text',
|
||||
stderr='',
|
||||
returncode=0
|
||||
)
|
||||
|
||||
content = self.cvs_client.get_file_content('file.txt', '1.2')
|
||||
self.assertEqual(content, 'File contents\nMultiple lines\nof text')
|
||||
|
||||
@patch('cvs_proxy.cvs_client.subprocess.run')
|
||||
def test_command_failure(self, mock_run):
|
||||
"""Test handling of CVS command failure"""
|
||||
mock_run.side_effect = subprocess.CalledProcessError(
|
||||
returncode=1,
|
||||
cmd=['cvs', 'rlog'],
|
||||
stderr='Error: Repository not found'
|
||||
)
|
||||
|
||||
with self.assertRaises(subprocess.CalledProcessError):
|
||||
self.cvs_client._run_cvs_command(['rlog'])
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue