Initialisation depot
This commit is contained in:
2
samba-api/tests/__init__.py
Normal file
2
samba-api/tests/__init__.py
Normal file
@@ -0,0 +1,2 @@
|
||||
# Tests package initialization
|
||||
# This file makes the tests directory a Python package
|
||||
51
samba-api/tests/conftest.py
Normal file
51
samba-api/tests/conftest.py
Normal file
@@ -0,0 +1,51 @@
|
||||
import pytest
|
||||
import asyncio
|
||||
from httpx import AsyncClient
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from src.main import app
|
||||
|
||||
@pytest.fixture
|
||||
def client():
|
||||
"""Test client fixture"""
|
||||
return TestClient(app)
|
||||
|
||||
@pytest.fixture
|
||||
async def async_client():
|
||||
"""Async test client fixture"""
|
||||
async with AsyncClient(app=app, base_url="http://test") as ac:
|
||||
yield ac
|
||||
|
||||
@pytest.fixture
|
||||
def event_loop():
|
||||
"""Event loop fixture for async tests"""
|
||||
loop = asyncio.new_event_loop()
|
||||
yield loop
|
||||
loop.close()
|
||||
|
||||
@pytest.fixture
|
||||
def mock_user_data():
|
||||
"""Mock user data for testing"""
|
||||
return {
|
||||
"username": "testuser",
|
||||
"password": "TestPassword123!",
|
||||
"first_name": "Test",
|
||||
"last_name": "User",
|
||||
"email": "testuser@example.com",
|
||||
"description": "Test user account"
|
||||
}
|
||||
|
||||
@pytest.fixture
|
||||
def mock_group_data():
|
||||
"""Mock group data for testing"""
|
||||
return {
|
||||
"name": "testgroup",
|
||||
"description": "Test group",
|
||||
"group_type": "security",
|
||||
"scope": "global"
|
||||
}
|
||||
|
||||
@pytest.fixture
|
||||
def mock_jwt_token():
|
||||
"""Mock JWT token for authentication tests"""
|
||||
return "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0dXNlciIsImV4cCI6MTcwMDAwMDAwMCwic2NvcGVzIjpbInJlYWQiLCJ3cml0ZSJdfQ.test_token"
|
||||
111
samba-api/tests/test_auth.py
Normal file
111
samba-api/tests/test_auth.py
Normal file
@@ -0,0 +1,111 @@
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
from jose import jwt
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from src.routers.auth import create_access_token, verify_token, authenticate_user
|
||||
from src.models.auth import TokenData
|
||||
from src.core.config import settings
|
||||
from src.core.exceptions import AuthenticationException
|
||||
|
||||
class TestAuthService:
|
||||
"""Test authentication functionality"""
|
||||
|
||||
def test_create_access_token(self):
|
||||
"""Test JWT token creation"""
|
||||
data = {"sub": "testuser", "scopes": ["read", "write"]}
|
||||
token = create_access_token(data)
|
||||
|
||||
# Decode token to verify content
|
||||
decoded = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
|
||||
|
||||
assert decoded["sub"] == "testuser"
|
||||
assert decoded["scopes"] == ["read", "write"]
|
||||
assert "exp" in decoded
|
||||
|
||||
def test_create_access_token_with_expiry(self):
|
||||
"""Test JWT token creation with custom expiry"""
|
||||
data = {"sub": "testuser"}
|
||||
expires_delta = timedelta(minutes=60)
|
||||
token = create_access_token(data, expires_delta)
|
||||
|
||||
decoded = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
|
||||
|
||||
# Check that expiry is approximately 60 minutes from now
|
||||
exp_time = datetime.fromtimestamp(decoded["exp"])
|
||||
expected_time = datetime.utcnow() + expires_delta
|
||||
time_diff = abs((exp_time - expected_time).total_seconds())
|
||||
|
||||
assert time_diff < 10 # Within 10 seconds
|
||||
|
||||
def test_verify_token_valid(self):
|
||||
"""Test token verification with valid token"""
|
||||
data = {"sub": "testuser", "scopes": ["read"]}
|
||||
token = create_access_token(data)
|
||||
|
||||
token_data = verify_token(token)
|
||||
|
||||
assert isinstance(token_data, TokenData)
|
||||
assert token_data.username == "testuser"
|
||||
assert token_data.scopes == ["read"]
|
||||
|
||||
def test_verify_token_invalid(self):
|
||||
"""Test token verification with invalid token"""
|
||||
invalid_token = "invalid.jwt.token"
|
||||
|
||||
with pytest.raises(AuthenticationException):
|
||||
verify_token(invalid_token)
|
||||
|
||||
def test_verify_token_expired(self):
|
||||
"""Test token verification with expired token"""
|
||||
# Create token with past expiry
|
||||
data = {"sub": "testuser", "exp": datetime.utcnow() - timedelta(minutes=1)}
|
||||
expired_token = jwt.encode(data, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
|
||||
|
||||
with pytest.raises(AuthenticationException):
|
||||
verify_token(expired_token)
|
||||
|
||||
@patch('src.routers.auth.user_service.get_user')
|
||||
@patch('ldap3.Connection')
|
||||
@patch('ldap3.Server')
|
||||
async def test_authenticate_user_success(self, mock_server, mock_connection, mock_get_user):
|
||||
"""Test successful user authentication"""
|
||||
# Mock LDAP connection
|
||||
mock_conn_instance = MagicMock()
|
||||
mock_conn_instance.bound = True
|
||||
mock_connection.return_value = mock_conn_instance
|
||||
|
||||
# Mock user data
|
||||
from src.models.users import UserResponse, UserStatus
|
||||
mock_user = UserResponse(
|
||||
username="testuser",
|
||||
first_name="Test",
|
||||
last_name="User",
|
||||
email="testuser@example.com",
|
||||
dn="CN=testuser,CN=Users,DC=example,DC=com",
|
||||
status=UserStatus.ACTIVE,
|
||||
groups=["Domain Users"]
|
||||
)
|
||||
mock_get_user.return_value = mock_user
|
||||
|
||||
result = await authenticate_user("testuser", "password123")
|
||||
|
||||
assert result is not None
|
||||
assert result["username"] == "testuser"
|
||||
assert result["full_name"] == "Test User"
|
||||
assert result["email"] == "testuser@example.com"
|
||||
assert "read" in result["permissions"]
|
||||
assert "write" in result["permissions"]
|
||||
|
||||
@patch('ldap3.Connection')
|
||||
@patch('ldap3.Server')
|
||||
async def test_authenticate_user_invalid_credentials(self, mock_server, mock_connection):
|
||||
"""Test authentication with invalid credentials"""
|
||||
# Mock failed LDAP connection
|
||||
mock_conn_instance = MagicMock()
|
||||
mock_conn_instance.bound = False
|
||||
mock_connection.return_value = mock_conn_instance
|
||||
|
||||
result = await authenticate_user("testuser", "wrongpassword")
|
||||
|
||||
assert result is None
|
||||
66
samba-api/tests/test_main.py
Normal file
66
samba-api/tests/test_main.py
Normal file
@@ -0,0 +1,66 @@
|
||||
import pytest
|
||||
from unittest.mock import patch, AsyncMock
|
||||
|
||||
def test_app_startup(client):
|
||||
"""Test application startup and health endpoint"""
|
||||
response = client.get("/health")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"status": "healthy", "service": "samba-api"}
|
||||
|
||||
def test_root_endpoint(client):
|
||||
"""Test root endpoint"""
|
||||
response = client.get("/")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["message"] == "Samba API is running"
|
||||
assert data["version"] == "1.0.0"
|
||||
|
||||
def test_docs_endpoint(client):
|
||||
"""Test API documentation endpoint"""
|
||||
response = client.get("/docs")
|
||||
assert response.status_code == 200
|
||||
|
||||
def test_openapi_schema(client):
|
||||
"""Test OpenAPI schema endpoint"""
|
||||
response = client.get("/openapi.json")
|
||||
assert response.status_code == 200
|
||||
schema = response.json()
|
||||
assert schema["info"]["title"] == "Samba API"
|
||||
assert schema["info"]["version"] == "1.0.0"
|
||||
|
||||
class TestAPIEndpoints:
|
||||
"""Test API endpoint structure"""
|
||||
|
||||
def test_user_endpoints_exist(self, client):
|
||||
"""Test that user endpoints exist (will return 401 without auth)"""
|
||||
# These should return 401/403 without authentication, not 404
|
||||
response = client.get("/api/v1/users")
|
||||
assert response.status_code in [401, 403]
|
||||
|
||||
response = client.post("/api/v1/users", json={})
|
||||
assert response.status_code in [401, 403, 422]
|
||||
|
||||
def test_auth_endpoints_exist(self, client):
|
||||
"""Test that auth endpoints exist"""
|
||||
# Login endpoint should exist and return 422 for invalid data
|
||||
response = client.post("/api/v1/auth/login", json={})
|
||||
assert response.status_code == 422 # Validation error
|
||||
|
||||
# Me endpoint should return 401 without auth
|
||||
response = client.get("/api/v1/auth/me")
|
||||
assert response.status_code in [401, 403]
|
||||
|
||||
def test_group_endpoints_exist(self, client):
|
||||
"""Test that group endpoints exist"""
|
||||
response = client.get("/api/v1/groups")
|
||||
assert response.status_code in [401, 403]
|
||||
|
||||
def test_ou_endpoints_exist(self, client):
|
||||
"""Test that OU endpoints exist"""
|
||||
response = client.get("/api/v1/ous")
|
||||
assert response.status_code in [401, 403]
|
||||
|
||||
def test_computer_endpoints_exist(self, client):
|
||||
"""Test that computer endpoints exist"""
|
||||
response = client.get("/api/v1/computers")
|
||||
assert response.status_code in [401, 403]
|
||||
78
samba-api/tests/test_user_service.py
Normal file
78
samba-api/tests/test_user_service.py
Normal file
@@ -0,0 +1,78 @@
|
||||
import pytest
|
||||
from unittest.mock import patch, AsyncMock, MagicMock
|
||||
from src.services.user_service import user_service
|
||||
from src.models.users import UserCreate, UserUpdate, UserStatus
|
||||
from src.core.exceptions import UserNotFoundException, SambaAPIException
|
||||
|
||||
class TestUserService:
|
||||
"""Test user service functionality"""
|
||||
|
||||
@patch('src.services.user_service.samba_service._run_samba_tool')
|
||||
@patch('src.services.user_service.user_service.get_user')
|
||||
async def test_create_user_success(self, mock_get_user, mock_run_samba_tool):
|
||||
"""Test successful user creation"""
|
||||
# Mock data
|
||||
user_data = UserCreate(
|
||||
username="testuser",
|
||||
password="TestPassword123!",
|
||||
first_name="Test",
|
||||
last_name="User",
|
||||
email="testuser@example.com"
|
||||
)
|
||||
|
||||
# Mock samba-tool command
|
||||
mock_run_samba_tool.return_value = {"returncode": 0, "stdout": "", "stderr": ""}
|
||||
|
||||
# Mock get_user response
|
||||
from src.models.users import UserResponse
|
||||
mock_user_response = UserResponse(
|
||||
username="testuser",
|
||||
first_name="Test",
|
||||
last_name="User",
|
||||
email="testuser@example.com",
|
||||
dn="CN=testuser,CN=Users,DC=example,DC=com",
|
||||
status=UserStatus.ACTIVE,
|
||||
groups=[]
|
||||
)
|
||||
mock_get_user.return_value = mock_user_response
|
||||
|
||||
# Test user creation
|
||||
result = await user_service.create_user(user_data)
|
||||
|
||||
assert result.username == "testuser"
|
||||
assert result.first_name == "Test"
|
||||
assert result.last_name == "User"
|
||||
mock_run_samba_tool.assert_called_once()
|
||||
|
||||
@patch('src.services.user_service.samba_service._get_connection')
|
||||
async def test_get_user_not_found(self, mock_get_connection):
|
||||
"""Test get user when user doesn't exist"""
|
||||
# Mock LDAP connection with empty results
|
||||
mock_conn = MagicMock()
|
||||
mock_conn.entries = []
|
||||
mock_get_connection.return_value = mock_conn
|
||||
|
||||
with pytest.raises(UserNotFoundException):
|
||||
await user_service.get_user("nonexistent")
|
||||
|
||||
@patch('src.services.user_service.samba_service._run_samba_tool')
|
||||
async def test_delete_user_success(self, mock_run_samba_tool):
|
||||
"""Test successful user deletion"""
|
||||
mock_run_samba_tool.return_value = {"returncode": 0, "stdout": "", "stderr": ""}
|
||||
|
||||
result = await user_service.delete_user("testuser")
|
||||
|
||||
assert result is True
|
||||
mock_run_samba_tool.assert_called_once_with(['user', 'delete', 'testuser'])
|
||||
|
||||
@patch('src.services.user_service.samba_service._run_samba_tool')
|
||||
async def test_change_password_success(self, mock_run_samba_tool):
|
||||
"""Test successful password change"""
|
||||
mock_run_samba_tool.return_value = {"returncode": 0, "stdout": "", "stderr": ""}
|
||||
|
||||
result = await user_service.change_password("testuser", "NewPassword123!")
|
||||
|
||||
assert result is True
|
||||
mock_run_samba_tool.assert_called_once_with([
|
||||
'user', 'setpassword', 'testuser', '--newpassword', 'NewPassword123!'
|
||||
])
|
||||
Reference in New Issue
Block a user