"""CODEOWNERS Checker Parses and validates CODEOWNERS files for compliance enforcement. """ import fnmatch import logging import os import re from dataclasses import dataclass from pathlib import Path @dataclass class CodeOwnerRule: """A CODEOWNERS rule.""" pattern: str owners: list[str] line_number: int is_negation: bool = False def matches(self, path: str) -> bool: """Check if a path matches this rule. Args: path: File path to check. Returns: True if the path matches. """ path = path.lstrip("/") pattern = self.pattern.lstrip("/") # Handle directory patterns if pattern.endswith("/"): return path.startswith(pattern) or fnmatch.fnmatch(path, pattern + "*") # Handle ** patterns if "**" in pattern: regex = pattern.replace("**", ".*").replace("*", "[^/]*") return bool(re.match(f"^{regex}$", path)) # Standard fnmatch return fnmatch.fnmatch(path, pattern) or fnmatch.fnmatch(path, f"**/{pattern}") class CodeownersChecker: """Checker for CODEOWNERS file compliance.""" CODEOWNERS_LOCATIONS = [ "CODEOWNERS", ".github/CODEOWNERS", ".gitea/CODEOWNERS", "docs/CODEOWNERS", ] def __init__(self, repo_root: str | None = None): """Initialize CODEOWNERS checker. Args: repo_root: Repository root path. """ self.repo_root = repo_root or os.getcwd() self.rules: list[CodeOwnerRule] = [] self.codeowners_path: str | None = None self.logger = logging.getLogger(__name__) self._load_codeowners() def _load_codeowners(self): """Load CODEOWNERS file from repository.""" for location in self.CODEOWNERS_LOCATIONS: path = os.path.join(self.repo_root, location) if os.path.exists(path): self.codeowners_path = path self._parse_codeowners(path) break def _parse_codeowners(self, path: str): """Parse a CODEOWNERS file. Args: path: Path to CODEOWNERS file. """ with open(path) as f: for line_num, line in enumerate(f, 1): line = line.strip() # Skip empty lines and comments if not line or line.startswith("#"): continue # Parse pattern and owners parts = line.split() if len(parts) < 2: continue pattern = parts[0] owners = parts[1:] # Check for negation (optional syntax) is_negation = pattern.startswith("!") if is_negation: pattern = pattern[1:] self.rules.append( CodeOwnerRule( pattern=pattern, owners=owners, line_number=line_num, is_negation=is_negation, ) ) def get_owners(self, path: str) -> list[str]: """Get owners for a file path. Args: path: File path to check. Returns: List of owner usernames/teams. """ owners = [] # Apply rules in order (later rules override earlier ones) for rule in self.rules: if rule.matches(path): if rule.is_negation: owners = [] # Clear owners for negation else: owners = rule.owners return owners def get_owners_for_files(self, files: list[str]) -> dict[str, list[str]]: """Get owners for multiple files. Args: files: List of file paths. Returns: Dict mapping file paths to owner lists. """ return {f: self.get_owners(f) for f in files} def get_required_reviewers(self, files: list[str]) -> set[str]: """Get all required reviewers for a set of files. Args: files: List of file paths. Returns: Set of all required reviewer usernames/teams. """ reviewers = set() for f in files: reviewers.update(self.get_owners(f)) return reviewers def check_approval( self, files: list[str], approvers: list[str], ) -> dict: """Check if files have required approvals. Args: files: List of changed files. approvers: List of users who approved. Returns: Dict with approval status and missing approvers. """ required = self.get_required_reviewers(files) approvers_set = set(approvers) # Normalize @ prefixes required_normalized = {r.lstrip("@") for r in required} approvers_normalized = {a.lstrip("@") for a in approvers_set} missing = required_normalized - approvers_normalized # Check for team approvals (simplified - actual implementation # would need API calls to check team membership) teams = {r for r in missing if "/" in r} missing_users = missing - teams return { "approved": len(missing_users) == 0, "required_reviewers": list(required_normalized), "actual_approvers": list(approvers_normalized), "missing_approvers": list(missing_users), "pending_teams": list(teams), } def get_coverage_report(self, files: list[str]) -> dict: """Generate a coverage report for files. Args: files: List of file paths. Returns: Coverage report with owned and unowned files. """ owned = [] unowned = [] for f in files: owners = self.get_owners(f) if owners: owned.append({"file": f, "owners": owners}) else: unowned.append(f) return { "total_files": len(files), "owned_files": len(owned), "unowned_files": len(unowned), "coverage_percent": (len(owned) / len(files) * 100) if files else 0, "owned": owned, "unowned": unowned, } def validate_codeowners(self) -> dict: """Validate the CODEOWNERS file. Returns: Validation result with warnings and errors. """ if not self.codeowners_path: return { "valid": False, "errors": ["No CODEOWNERS file found"], "warnings": [], } errors = [] warnings = [] # Check for empty rules for rule in self.rules: if not rule.owners: errors.append( f"Line {rule.line_number}: Pattern '{rule.pattern}' has no owners" ) # Check for invalid owner formats for rule in self.rules: for owner in rule.owners: if not owner.startswith("@") and "/" not in owner: warnings.append( f"Line {rule.line_number}: Owner '{owner}' should start with @ or be a team (org/team)" ) # Check for overlapping patterns patterns_seen = {} for rule in self.rules: if rule.pattern in patterns_seen: warnings.append( f"Line {rule.line_number}: Pattern '{rule.pattern}' duplicates line {patterns_seen[rule.pattern]}" ) patterns_seen[rule.pattern] = rule.line_number return { "valid": len(errors) == 0, "errors": errors, "warnings": warnings, "rules_count": len(self.rules), "file_path": self.codeowners_path, } @classmethod def from_content(cls, content: str) -> "CodeownersChecker": """Create checker from CODEOWNERS content string. Args: content: CODEOWNERS file content. Returns: CodeownersChecker instance. """ checker = cls.__new__(cls) checker.repo_root = None checker.rules = [] checker.codeowners_path = "" checker.logger = logging.getLogger(__name__) for line_num, line in enumerate(content.split("\n"), 1): line = line.strip() if not line or line.startswith("#"): continue parts = line.split() if len(parts) < 2: continue pattern = parts[0] owners = parts[1:] is_negation = pattern.startswith("!") if is_negation: pattern = pattern[1:] checker.rules.append( CodeOwnerRule( pattern=pattern, owners=owners, line_number=line_num, is_negation=is_negation, ) ) return checker