from fastapi import FastAPI, HTTPException, UploadFile, File, Query
from fastapi.responses import JSONResponse, HTMLResponse
from pydantic import BaseModel, Field
import os
import shutil
import subprocess
import hashlib
import bcrypt
import base64
from pathlib import Path
from typing import Optional, List
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(
title="Arti-API",
description="""
# Artifactory Management API
A comprehensive REST API for managing artifactory server components including:
- **Debian Repository**: Upload, manage, and serve .deb packages for arm64 and amd64 architectures
- **Helm Chart Museum**: Store and manage Helm charts (.tgz packages)
- **Docker Registry**: Integration with Docker registry for image management
## Features
- ✅ Multi-architecture Debian package support (arm64/amd64)
- ✅ Automatic package index generation and refresh
- ✅ Helm chart repository management with index.yaml generation
- ✅ Docker registry integration
- ✅ Health monitoring and status endpoints
- ✅ Comprehensive error handling and logging
## Volume Structure
The API manages a shared PVC volume with the following structure:
```
/data/
├── docker/ # Docker registry data
├── debian/
│ ├── dist/
│ │ ├── Release
│ │ └── main/
│ │ ├── binary-arm64/
│ │ └── binary-amd64/
│ └── pool/ # Package storage
└── charts/ # Helm charts storage
```
""",
version="1.0.0",
contact={
"name": "Arti-API Support",
"url": "https://github.com/hexah/arti-api",
"email": "support@your-org.com",
},
license_info={
"name": "MIT",
"url": "https://opensource.org/licenses/MIT",
},
servers=[
{
"url": "http://localhost:8000",
"description": "Development server"
},
{
"url": "https://api.aipice.fr",
"description": "Production server"
}
],
tags_metadata=[
{
"name": "health",
"description": "Health check and status endpoints",
},
{
"name": "debian",
"description": "Debian package repository management",
},
{
"name": "helm",
"description": "Helm chart repository management",
},
{
"name": "docker",
"description": "Docker registry operations",
},
{
"name": "refresh",
"description": "Repository refresh and index generation operations",
},
{
"name": "users",
"description": "Docker registry user management",
},
]
)
# Base paths for the shared PVC volume
BASE_PATH = "/data"
DOCKER_PATH = f"{BASE_PATH}/docker"
DEBIAN_PATH = f"{BASE_PATH}/debian"
CHARTS_PATH = f"{BASE_PATH}/charts"
DEBIAN_DIST_PATH = f"{DEBIAN_PATH}/dist"
DEBIAN_POOL_PATH = f"{DEBIAN_PATH}/pool"
HTPASSWD_PATH = f"{BASE_PATH}/htpasswd"
class BinaryInfo(BaseModel):
"""Information about a binary package"""
name: str = Field(..., description="Package name", example="my-app")
version: str = Field(..., description="Package version", example="1.0.0")
architecture: str = Field(..., description="Target architecture", example="amd64")
description: Optional[str] = Field(None, description="Package description", example="My awesome application")
class ChartInfo(BaseModel):
"""Information about a Helm chart"""
name: str = Field(..., description="Chart name", example="my-chart")
version: str = Field(..., description="Chart version", example="0.1.0")
description: Optional[str] = Field(None, description="Chart description", example="A Helm chart for my application")
class PackageResponse(BaseModel):
"""Response model for package information"""
name: str = Field(..., description="Package filename")
size: int = Field(..., description="File size in bytes")
modified: str = Field(..., description="Last modification timestamp")
class ChartResponse(BaseModel):
"""Response model for chart information"""
name: str = Field(..., description="Chart filename")
size: int = Field(..., description="File size in bytes")
modified: str = Field(..., description="Last modification timestamp")
class SuccessResponse(BaseModel):
"""Standard success response"""
message: str = Field(..., description="Success message")
path: Optional[str] = Field(None, description="File path if applicable")
class HealthResponse(BaseModel):
"""Health check response"""
status: str = Field(..., description="Health status", example="healthy")
timestamp: str = Field(..., description="Current timestamp")
class StatusResponse(BaseModel):
"""API status response"""
message: str = Field(..., description="Status message")
timestamp: str = Field(..., description="Current timestamp")
class UserInfo(BaseModel):
"""Docker registry user information"""
username: str = Field(..., description="Username", example="john_doe")
password: str = Field(..., description="Password", example="secure_password123")
class UserResponse(BaseModel):
"""Response model for user information"""
username: str = Field(..., description="Username")
created: Optional[str] = Field(None, description="Creation timestamp")
class UserListResponse(BaseModel):
"""Response model for user list"""
users: List[str] = Field(..., description="List of usernames")
# Utility functions for htpasswd management
def _generate_bcrypt_hash(password: str) -> str:
"""Generate bcrypt hash for password (Docker registry compatible)"""
try:
# Generate bcrypt hash with cost factor 12
salt = bcrypt.gensalt(rounds=12)
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
except Exception:
# Fallback to Apache htpasswd compatible format
import crypt
import random
import string
salt = ''.join(random.choices(string.ascii_letters + string.digits, k=8))
return crypt.crypt(password, f"$2y$12${salt}")
def _read_htpasswd() -> dict:
"""Read and parse htpasswd file"""
users = {}
if os.path.exists(HTPASSWD_PATH):
try:
with open(HTPASSWD_PATH, 'r') as f:
for line in f:
line = line.strip()
if line and ':' in line:
username, password_hash = line.split(':', 1)
users[username] = password_hash
except Exception as e:
logger.error(f"Error reading htpasswd file: {str(e)}")
return users
def _write_htpasswd(users: dict) -> None:
"""Write users dict to htpasswd file"""
try:
# Ensure the directory exists
os.makedirs(os.path.dirname(HTPASSWD_PATH), exist_ok=True)
with open(HTPASSWD_PATH, 'w') as f:
for username, password_hash in users.items():
f.write(f"{username}:{password_hash}\n")
# Set appropriate permissions (readable by Docker registry)
os.chmod(HTPASSWD_PATH, 0o644)
except Exception as e:
logger.error(f"Error writing htpasswd file: {str(e)}")
raise
@app.on_event("startup")
async def startup_event():
"""Initialize directory structure on startup"""
directories = [
DOCKER_PATH,
DEBIAN_DIST_PATH,
f"{DEBIAN_DIST_PATH}/main",
f"{DEBIAN_DIST_PATH}/main/binary-arm64",
f"{DEBIAN_DIST_PATH}/main/binary-amd64",
DEBIAN_POOL_PATH,
CHARTS_PATH
]
logger.info("Initializing directory structure...")
for directory in directories:
try:
Path(directory).mkdir(parents=True, exist_ok=True)
logger.info(f"✓ Ensured directory exists: {directory}")
except Exception as e:
logger.error(f"✗ Failed to create directory {directory}: {str(e)}")
raise
logger.info("Directory structure initialization completed successfully")
@app.get(
"/",
tags=["health"],
response_class=HTMLResponse,
summary="Service Status Page",
description="Returns a minimal HTML page showing that the service is running"
)
async def root():
"""
**Service Status Page**
Returns a minimal HTML page indicating that the Arti-API service is running.
This endpoint provides a user-friendly status page for browser access.
"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC")
html_content = f"""
Arti-API Status
Service is running
Arti-API - {current_time}
"""
return HTMLResponse(content=html_content, status_code=200)
@app.get(
"/health",
tags=["health"],
response_model=HealthResponse,
summary="Health Check",
description="Health check endpoint for monitoring and load balancer probes"
)
async def health_check():
"""
**Health Check Endpoint**
This endpoint is designed for:
- Kubernetes liveness and readiness probes
- Load balancer health checks
- Monitoring system status verification
Returns a simple health status with timestamp.
"""
return HealthResponse(
status="healthy",
timestamp=datetime.now().isoformat()
)
# Debian Package Management
@app.post(
"/debian/upload",
tags=["debian"],
response_model=SuccessResponse,
summary="Upload Debian Package",
description="Upload a .deb package to the Debian repository"
)
async def upload_debian_package(
file: UploadFile = File(..., description="The .deb package file to upload"),
architecture: str = Query(
default="amd64",
description="Target architecture for the package",
regex="^(amd64|arm64)$",
example="amd64"
)
):
"""
**Upload Debian Package**
Upload a .deb package file to the Debian repository. The package will be:
1. Saved to the package pool directory
2. Automatically indexed for the specified architecture
3. Made available through the Debian repository
**Parameters:**
- **file**: The .deb package file (must have .deb extension)
- **architecture**: Target architecture (amd64 or arm64)
**Returns:**
- Success message with the file path where the package was stored
**Example usage:**
```bash
curl -X POST "http://localhost:8000/debian/upload?architecture=amd64" \
-H "Content-Type: multipart/form-data" \
-F "file=@my-package_1.0.0_amd64.deb"
```
"""
if not file.filename.endswith('.deb'):
raise HTTPException(status_code=400, detail="File must be a .deb package")
if architecture not in ["amd64", "arm64"]:
raise HTTPException(status_code=400, detail="Architecture must be amd64 or arm64")
try:
# Save file to pool directory
file_path = f"{DEBIAN_POOL_PATH}/{file.filename}"
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Update package index
await refresh_debian_packages()
logger.info(f"Uploaded Debian package: {file.filename} for {architecture}")
return SuccessResponse(
message=f"Package {file.filename} uploaded successfully",
path=file_path
)
except Exception as e:
logger.error(f"Error uploading Debian package: {str(e)}")
raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}")
@app.delete(
"/debian/package/{package_name}",
tags=["debian"],
response_model=SuccessResponse,
summary="Delete Debian Package",
description="Delete a specific Debian package from the repository"
)
async def delete_debian_package(
package_name: str
):
"""
**Delete Debian Package**
Remove a Debian package from the repository. This will:
1. Delete the package file from the pool directory
2. Automatically refresh the package indexes
3. Remove the package from repository listings
**Parameters:**
- **package_name**: The exact filename of the package to delete
**Returns:**
- Success message confirming deletion
**Example usage:**
```bash
curl -X DELETE "http://localhost:8000/debian/package/my-package_1.0.0_amd64.deb"
```
"""
try:
file_path = f"{DEBIAN_POOL_PATH}/{package_name}"
if os.path.exists(file_path):
os.remove(file_path)
await refresh_debian_packages()
logger.info(f"Deleted Debian package: {package_name}")
return SuccessResponse(message=f"Package {package_name} deleted successfully")
else:
raise HTTPException(status_code=404, detail="Package not found")
except Exception as e:
logger.error(f"Error deleting Debian package: {str(e)}")
raise HTTPException(status_code=500, detail=f"Deletion failed: {str(e)}")
@app.get(
"/debian/packages",
tags=["debian"],
response_model=dict,
summary="List Debian Packages",
description="Get a list of all Debian packages in the repository"
)
async def list_debian_packages():
"""
**List Debian Packages**
Retrieve a list of all .deb packages currently stored in the repository.
**Returns:**
- Array of package objects with metadata including:
- **name**: Package filename
- **size**: File size in bytes
- **modified**: Last modification timestamp
**Example usage:**
```bash
curl -X GET "http://localhost:8000/debian/packages"
```
**Example response:**
```json
{
"packages": [
{
"name": "my-app_1.0.0_amd64.deb",
"size": 1024000,
"modified": "2023-12-01T10:30:00"
}
]
}
```
"""
try:
packages = []
if os.path.exists(DEBIAN_POOL_PATH):
for file in os.listdir(DEBIAN_POOL_PATH):
if file.endswith('.deb'):
file_path = f"{DEBIAN_POOL_PATH}/{file}"
stat = os.stat(file_path)
packages.append(PackageResponse(
name=file,
size=stat.st_size,
modified=datetime.fromtimestamp(stat.st_mtime).isoformat()
).dict())
return {"packages": packages}
except Exception as e:
logger.error(f"Error listing Debian packages: {str(e)}")
raise HTTPException(status_code=500, detail=f"Listing failed: {str(e)}")
# Helm Chart Management
@app.post(
"/helm/upload",
tags=["helm"],
response_model=SuccessResponse,
summary="Upload Helm Chart",
description="Upload a Helm chart package (.tgz) to the chart repository"
)
async def upload_helm_chart(
file: UploadFile = File(..., description="The .tgz Helm chart package to upload")
):
"""
**Upload Helm Chart**
Upload a Helm chart package to the chart repository. The chart will be:
1. Saved to the charts directory
2. Automatically indexed in the repository
3. Made available for Helm installations
**Parameters:**
- **file**: The .tgz chart package file (must have .tgz extension)
**Returns:**
- Success message with the file path where the chart was stored
**Example usage:**
```bash
curl -X POST "http://localhost:8000/helm/upload" \
-H "Content-Type: multipart/form-data" \
-F "file=@my-chart-0.1.0.tgz"
```
"""
if not file.filename.endswith('.tgz'):
raise HTTPException(status_code=400, detail="File must be a .tgz chart package")
try:
file_path = f"{CHARTS_PATH}/{file.filename}"
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Update chart index
await refresh_helm_charts()
logger.info(f"Uploaded Helm chart: {file.filename}")
return SuccessResponse(
message=f"Chart {file.filename} uploaded successfully",
path=file_path
)
except Exception as e:
logger.error(f"Error uploading Helm chart: {str(e)}")
raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}")
@app.delete(
"/helm/chart/{chart_name}",
tags=["helm"],
response_model=SuccessResponse,
summary="Delete Helm Chart",
description="Delete a specific Helm chart from the repository"
)
async def delete_helm_chart(
chart_name: str
):
"""
**Delete Helm Chart**
Remove a Helm chart from the repository. This will:
1. Delete the chart file from the charts directory
2. Automatically refresh the chart index
3. Remove the chart from repository listings
**Parameters:**
- **chart_name**: The exact filename of the chart to delete
**Returns:**
- Success message confirming deletion
**Example usage:**
```bash
curl -X DELETE "http://localhost:8000/helm/chart/my-chart-0.1.0.tgz"
```
"""
try:
file_path = f"{CHARTS_PATH}/{chart_name}"
if os.path.exists(file_path):
os.remove(file_path)
await refresh_helm_charts()
logger.info(f"Deleted Helm chart: {chart_name}")
return SuccessResponse(message=f"Chart {chart_name} deleted successfully")
else:
raise HTTPException(status_code=404, detail="Chart not found")
except Exception as e:
logger.error(f"Error deleting Helm chart: {str(e)}")
raise HTTPException(status_code=500, detail=f"Deletion failed: {str(e)}")
@app.get(
"/helm/charts",
tags=["helm"],
response_model=dict,
summary="List Helm Charts",
description="Get a list of all Helm charts in the repository"
)
async def list_helm_charts():
"""
**List Helm Charts**
Retrieve a list of all .tgz chart packages currently stored in the repository.
**Returns:**
- Array of chart objects with metadata including:
- **name**: Chart filename
- **size**: File size in bytes
- **modified**: Last modification timestamp
**Example usage:**
```bash
curl -X GET "http://localhost:8000/helm/charts"
```
**Example response:**
```json
{
"charts": [
{
"name": "my-chart-0.1.0.tgz",
"size": 2048000,
"modified": "2023-12-01T10:30:00"
}
]
}
```
"""
try:
charts = []
if os.path.exists(CHARTS_PATH):
for file in os.listdir(CHARTS_PATH):
if file.endswith('.tgz'):
file_path = f"{CHARTS_PATH}/{file}"
stat = os.stat(file_path)
charts.append(ChartResponse(
name=file,
size=stat.st_size,
modified=datetime.fromtimestamp(stat.st_mtime).isoformat()
).dict())
return {"charts": charts}
except Exception as e:
logger.error(f"Error listing Helm charts: {str(e)}")
raise HTTPException(status_code=500, detail=f"Listing failed: {str(e)}")
# Docker Registry Management
@app.get(
"/docker/images",
tags=["docker"],
response_model=dict,
summary="List Docker Images",
description="Get a list of Docker images in the registry"
)
async def list_docker_images():
"""
**List Docker Images**
Retrieve information about Docker images stored in the registry.
This provides a simplified view of the Docker registry contents.
**Returns:**
- Array of image objects with registry paths
**Note:**
This is a simplified implementation. In a production environment,
you would typically interact with the Docker Registry HTTP API v2
to get detailed image information including tags, manifests, and layers.
**Example usage:**
```bash
curl -X GET "http://localhost:8000/docker/images"
```
**Example response:**
```json
{
"images": [
{"path": "repositories/my-app/manifest.json"},
{"path": "repositories/my-app/_manifests/tags/latest/current/link"}
]
}
```
"""
try:
images = []
if os.path.exists(DOCKER_PATH):
# This is a simplified implementation
# In a real scenario, you'd interact with the Docker registry API
for root, dirs, files in os.walk(DOCKER_PATH):
for file in files:
if file == "manifest.json" or file.endswith(".json"):
rel_path = os.path.relpath(os.path.join(root, file), DOCKER_PATH)
images.append({"path": rel_path})
return {"images": images}
except Exception as e:
logger.error(f"Error listing Docker images: {str(e)}")
raise HTTPException(status_code=500, detail=f"Listing failed: {str(e)}")
# Docker Registry User Management
@app.get(
"/users",
tags=["users"],
response_model=UserListResponse,
summary="List Registry Users",
description="Get a list of all Docker registry users"
)
async def list_users():
"""
**List Docker Registry Users**
Retrieve a list of all users configured for Docker registry authentication.
Users are stored in the htpasswd file format compatible with Docker registry.
**Returns:**
- Array of usernames configured in the registry
**Example usage:**
```bash
curl -X GET "http://localhost:8000/users"
```
**Example response:**
```json
{
"users": ["admin", "developer", "readonly"]
}
```
"""
try:
users_dict = _read_htpasswd()
usernames = list(users_dict.keys())
return UserListResponse(users=usernames)
except Exception as e:
logger.error(f"Error listing users: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to list users: {str(e)}")
@app.get(
"/users/{username}",
tags=["users"],
response_model=UserResponse,
summary="Get User Info",
description="Get information about a specific Docker registry user"
)
async def get_user(
username: str
):
"""
**Get User Information**
Retrieve information about a specific Docker registry user.
Returns the username and creation timestamp if available.
**Parameters:**
- **username**: The username to look up
**Returns:**
- User information including username and metadata
**Example usage:**
```bash
curl -X GET "http://localhost:8000/users/admin"
```
"""
try:
users_dict = _read_htpasswd()
if username not in users_dict:
raise HTTPException(status_code=404, detail="User not found")
# Try to get file modification time as creation timestamp
created_time = None
if os.path.exists(HTPASSWD_PATH):
stat = os.stat(HTPASSWD_PATH)
created_time = datetime.fromtimestamp(stat.st_mtime).isoformat()
return UserResponse(username=username, created=created_time)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting user {username}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get user: {str(e)}")
@app.post(
"/users",
tags=["users"],
response_model=SuccessResponse,
summary="Create/Update User",
description="Create a new user or update an existing user's password"
)
async def set_user(user: UserInfo):
"""
**Create or Update Docker Registry User**
Create a new user or update an existing user's password for Docker registry authentication.
Passwords are hashed using bcrypt for security and Docker registry compatibility.
**Parameters:**
- **user**: User information including username and password
**Returns:**
- Success message confirming user creation/update
**Security Notes:**
- Passwords are hashed using bcrypt with cost factor 12
- Original passwords are never stored in plain text
- htpasswd file is created with appropriate permissions (644)
**Example usage:**
```bash
curl -X POST "http://localhost:8000/users" \
-H "Content-Type: application/json" \
-d '{"username": "newuser", "password": "secure_password123"}'
```
"""
try:
if not user.username or not user.password:
raise HTTPException(status_code=400, detail="Username and password are required")
# Validate username (basic validation)
if not user.username.replace('_', '').replace('-', '').replace('.', '').isalnum():
raise HTTPException(status_code=400, detail="Username can only contain letters, numbers, hyphens, underscores, and dots")
# Read existing users
users_dict = _read_htpasswd()
# Generate password hash
password_hash = _generate_bcrypt_hash(user.password)
# Add/update user
action = "updated" if user.username in users_dict else "created"
users_dict[user.username] = password_hash
# Write back to file
_write_htpasswd(users_dict)
logger.info(f"User {user.username} {action} successfully")
return SuccessResponse(message=f"User {user.username} {action} successfully")
except HTTPException:
raise
except Exception as e:
logger.error(f"Error setting user {user.username}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to set user: {str(e)}")
@app.delete(
"/users/{username}",
tags=["users"],
response_model=SuccessResponse,
summary="Delete User",
description="Delete a Docker registry user"
)
async def delete_user(
username: str
):
"""
**Delete Docker Registry User**
Remove a user from the Docker registry authentication system.
This will prevent the user from authenticating with the registry.
**Parameters:**
- **username**: The username to delete
**Returns:**
- Success message confirming user deletion
**Example usage:**
```bash
curl -X DELETE "http://localhost:8000/users/olduser"
```
"""
try:
users_dict = _read_htpasswd()
if username not in users_dict:
raise HTTPException(status_code=404, detail="User not found")
# Remove user
del users_dict[username]
# Write back to file
_write_htpasswd(users_dict)
logger.info(f"User {username} deleted successfully")
return SuccessResponse(message=f"User {username} deleted successfully")
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting user {username}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to delete user: {str(e)}")
# Refresh Operations
@app.post(
"/refresh/debian",
tags=["refresh"],
response_model=SuccessResponse,
summary="Refresh Debian Repository",
description="Regenerate Debian package indexes and metadata"
)
async def refresh_debian_packages():
"""
**Refresh Debian Repository**
Regenerate all Debian repository metadata including:
- Package indexes for each architecture (amd64, arm64)
- Release files with repository information
- Package file listings and checksums
This operation should be called after:
- Adding new packages to the repository
- Removing packages from the repository
- Manual repository maintenance
**Returns:**
- Success message confirming the refresh operation
**Example usage:**
```bash
curl -X POST "http://localhost:8000/refresh/debian"
```
"""
try:
# Generate Packages files for each architecture
for arch in ["amd64", "arm64"]:
arch_path = f"{DEBIAN_DIST_PATH}/main/binary-{arch}"
packages_file = f"{arch_path}/Packages"
# Create Packages file
with open(packages_file, "w") as f:
if os.path.exists(DEBIAN_POOL_PATH):
for deb_file in os.listdir(DEBIAN_POOL_PATH):
if deb_file.endswith('.deb'):
deb_path = f"{DEBIAN_POOL_PATH}/{deb_file}"
# This is a simplified implementation
# In reality, you'd parse the .deb file to extract metadata
f.write(f"Package: {deb_file.replace('.deb', '')}\\n")
f.write(f"Filename: pool/{deb_file}\\n")
f.write(f"Size: {os.path.getsize(deb_path)}\\n")
f.write(f"Architecture: {arch}\\n")
f.write("\\n")
# Generate Release file
release_file = f"{DEBIAN_DIST_PATH}/Release"
with open(release_file, "w") as f:
f.write(f"Date: {datetime.now().strftime('%a, %d %b %Y %H:%M:%S %Z')}\\n")
f.write("Architectures: amd64 arm64\\n")
f.write("Components: main\\n")
logger.info("Refreshed Debian package indexes")
return SuccessResponse(message="Debian packages refreshed successfully")
except Exception as e:
logger.error(f"Error refreshing Debian packages: {str(e)}")
raise HTTPException(status_code=500, detail=f"Refresh failed: {str(e)}")
@app.post(
"/refresh/helm",
tags=["refresh"],
response_model=SuccessResponse,
summary="Refresh Helm Repository",
description="Regenerate Helm chart repository index"
)
async def refresh_helm_charts():
"""
**Refresh Helm Repository**
Regenerate the Helm chart repository index (index.yaml) which contains:
- Chart metadata and versions
- Download URLs for each chart
- Chart descriptions and maintainer information
This operation should be called after:
- Adding new charts to the repository
- Removing charts from the repository
- Updating existing charts
**Returns:**
- Success message confirming the refresh operation
**Example usage:**
```bash
curl -X POST "http://localhost:8000/refresh/helm"
```
"""
try:
# This would typically generate an index.yaml file for Helm
index_file = f"{CHARTS_PATH}/index.yaml"
with open(index_file, "w") as f:
f.write("apiVersion: v1\\n")
f.write("entries:\\n")
if os.path.exists(CHARTS_PATH):
for chart_file in os.listdir(CHARTS_PATH):
if chart_file.endswith('.tgz'):
f.write(f" {chart_file.replace('.tgz', '')}:\\n")
f.write(f" - name: {chart_file.replace('.tgz', '')}\\n")
f.write(f" urls:\\n")
f.write(f" - {chart_file}\\n")
logger.info("Refreshed Helm chart index")
return SuccessResponse(message="Helm charts refreshed successfully")
except Exception as e:
logger.error(f"Error refreshing Helm charts: {str(e)}")
raise HTTPException(status_code=500, detail=f"Refresh failed: {str(e)}")
@app.post(
"/refresh/all",
tags=["refresh"],
response_model=SuccessResponse,
summary="Refresh All Repositories",
description="Regenerate indexes for all repository types (Debian, Helm)"
)
async def refresh_all():
"""
**Refresh All Repositories**
Perform a complete refresh of all repository types:
- Debian package repository indexes
- Helm chart repository index
This is a convenience endpoint that combines all refresh operations
into a single API call. Useful for:
- Periodic maintenance operations
- Post-deployment refresh procedures
- Bulk repository updates
**Returns:**
- Success message confirming all refresh operations
**Example usage:**
```bash
curl -X POST "http://localhost:8000/refresh/all"
```
"""
try:
await refresh_debian_packages()
await refresh_helm_charts()
logger.info("Refreshed all repositories")
return SuccessResponse(message="All repositories refreshed successfully")
except Exception as e:
logger.error(f"Error refreshing all repositories: {str(e)}")
raise HTTPException(status_code=500, detail=f"Refresh failed: {str(e)}")
# Refresh Operations
@app.post("/refresh/debian")
async def refresh_debian_packages():
"""Refresh Debian package indexes"""
try:
# Generate Packages files for each architecture
for arch in ["amd64", "arm64"]:
arch_path = f"{DEBIAN_DIST_PATH}/main/binary-{arch}"
packages_file = f"{arch_path}/Packages"
# Create Packages file
with open(packages_file, "w") as f:
if os.path.exists(DEBIAN_POOL_PATH):
for deb_file in os.listdir(DEBIAN_POOL_PATH):
if deb_file.endswith('.deb'):
deb_path = f"{DEBIAN_POOL_PATH}/{deb_file}"
# This is a simplified implementation
# In reality, you'd parse the .deb file to extract metadata
f.write(f"Package: {deb_file.replace('.deb', '')}\n")
f.write(f"Filename: pool/{deb_file}\n")
f.write(f"Size: {os.path.getsize(deb_path)}\n")
f.write(f"Architecture: {arch}\n")
f.write("\n")
# Generate Release file
release_file = f"{DEBIAN_DIST_PATH}/Release"
with open(release_file, "w") as f:
f.write(f"Date: {datetime.now().strftime('%a, %d %b %Y %H:%M:%S %Z')}\n")
f.write("Architectures: amd64 arm64\n")
f.write("Components: main\n")
logger.info("Refreshed Debian package indexes")
return {"message": "Debian packages refreshed successfully"}
except Exception as e:
logger.error(f"Error refreshing Debian packages: {str(e)}")
raise HTTPException(status_code=500, detail=f"Refresh failed: {str(e)}")
@app.post("/refresh/helm")
async def refresh_helm_charts():
"""Refresh Helm chart index"""
try:
# This would typically generate an index.yaml file for Helm
index_file = f"{CHARTS_PATH}/index.yaml"
with open(index_file, "w") as f:
f.write("apiVersion: v1\n")
f.write("entries:\n")
if os.path.exists(CHARTS_PATH):
for chart_file in os.listdir(CHARTS_PATH):
if chart_file.endswith('.tgz'):
f.write(f" {chart_file.replace('.tgz', '')}:\n")
f.write(f" - name: {chart_file.replace('.tgz', '')}\n")
f.write(f" urls:\n")
f.write(f" - {chart_file}\n")
logger.info("Refreshed Helm chart index")
return {"message": "Helm charts refreshed successfully"}
except Exception as e:
logger.error(f"Error refreshing Helm charts: {str(e)}")
raise HTTPException(status_code=500, detail=f"Refresh failed: {str(e)}")
@app.post("/refresh/all")
async def refresh_all():
"""Refresh all repositories"""
try:
await refresh_debian_packages()
await refresh_helm_charts()
logger.info("Refreshed all repositories")
return {"message": "All repositories refreshed successfully"}
except Exception as e:
logger.error(f"Error refreshing all repositories: {str(e)}")
raise HTTPException(status_code=500, detail=f"Refresh failed: {str(e)}")
if __name__ == "__main__":
try:
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
except ImportError:
print("uvicorn not available, run with: uvicorn app:app --host 0.0.0.0 --port 8000")