111 lines
4.3 KiB
Python
111 lines
4.3 KiB
Python
import pytest
|
|
from unittest.mock import patch, MagicMock
|
|
from jose import jwt
|
|
from datetime import datetime, timedelta
|
|
|
|
from src.routers.auth import create_access_token, verify_token, authenticate_user
|
|
from src.models.auth import TokenData
|
|
from src.core.config import settings
|
|
from src.core.exceptions import AuthenticationException
|
|
|
|
class TestAuthService:
|
|
"""Test authentication functionality"""
|
|
|
|
def test_create_access_token(self):
|
|
"""Test JWT token creation"""
|
|
data = {"sub": "testuser", "scopes": ["read", "write"]}
|
|
token = create_access_token(data)
|
|
|
|
# Decode token to verify content
|
|
decoded = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
|
|
|
|
assert decoded["sub"] == "testuser"
|
|
assert decoded["scopes"] == ["read", "write"]
|
|
assert "exp" in decoded
|
|
|
|
def test_create_access_token_with_expiry(self):
|
|
"""Test JWT token creation with custom expiry"""
|
|
data = {"sub": "testuser"}
|
|
expires_delta = timedelta(minutes=60)
|
|
token = create_access_token(data, expires_delta)
|
|
|
|
decoded = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
|
|
|
|
# Check that expiry is approximately 60 minutes from now
|
|
exp_time = datetime.fromtimestamp(decoded["exp"])
|
|
expected_time = datetime.utcnow() + expires_delta
|
|
time_diff = abs((exp_time - expected_time).total_seconds())
|
|
|
|
assert time_diff < 10 # Within 10 seconds
|
|
|
|
def test_verify_token_valid(self):
|
|
"""Test token verification with valid token"""
|
|
data = {"sub": "testuser", "scopes": ["read"]}
|
|
token = create_access_token(data)
|
|
|
|
token_data = verify_token(token)
|
|
|
|
assert isinstance(token_data, TokenData)
|
|
assert token_data.username == "testuser"
|
|
assert token_data.scopes == ["read"]
|
|
|
|
def test_verify_token_invalid(self):
|
|
"""Test token verification with invalid token"""
|
|
invalid_token = "invalid.jwt.token"
|
|
|
|
with pytest.raises(AuthenticationException):
|
|
verify_token(invalid_token)
|
|
|
|
def test_verify_token_expired(self):
|
|
"""Test token verification with expired token"""
|
|
# Create token with past expiry
|
|
data = {"sub": "testuser", "exp": datetime.utcnow() - timedelta(minutes=1)}
|
|
expired_token = jwt.encode(data, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
|
|
|
|
with pytest.raises(AuthenticationException):
|
|
verify_token(expired_token)
|
|
|
|
@patch('src.routers.auth.user_service.get_user')
|
|
@patch('ldap3.Connection')
|
|
@patch('ldap3.Server')
|
|
async def test_authenticate_user_success(self, mock_server, mock_connection, mock_get_user):
|
|
"""Test successful user authentication"""
|
|
# Mock LDAP connection
|
|
mock_conn_instance = MagicMock()
|
|
mock_conn_instance.bound = True
|
|
mock_connection.return_value = mock_conn_instance
|
|
|
|
# Mock user data
|
|
from src.models.users import UserResponse, UserStatus
|
|
mock_user = UserResponse(
|
|
username="testuser",
|
|
first_name="Test",
|
|
last_name="User",
|
|
email="testuser@example.com",
|
|
dn="CN=testuser,CN=Users,DC=example,DC=com",
|
|
status=UserStatus.ACTIVE,
|
|
groups=["Domain Users"]
|
|
)
|
|
mock_get_user.return_value = mock_user
|
|
|
|
result = await authenticate_user("testuser", "password123")
|
|
|
|
assert result is not None
|
|
assert result["username"] == "testuser"
|
|
assert result["full_name"] == "Test User"
|
|
assert result["email"] == "testuser@example.com"
|
|
assert "read" in result["permissions"]
|
|
assert "write" in result["permissions"]
|
|
|
|
@patch('ldap3.Connection')
|
|
@patch('ldap3.Server')
|
|
async def test_authenticate_user_invalid_credentials(self, mock_server, mock_connection):
|
|
"""Test authentication with invalid credentials"""
|
|
# Mock failed LDAP connection
|
|
mock_conn_instance = MagicMock()
|
|
mock_conn_instance.bound = False
|
|
mock_connection.return_value = mock_conn_instance
|
|
|
|
result = await authenticate_user("testuser", "wrongpassword")
|
|
|
|
assert result is None |