Files
wallpapers/analyze_images.py
T
2026-01-20 16:24:08 +00:00

248 lines
7.9 KiB
Python
Executable File

#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "httpx",
# "pillow",
# "python-dotenv",
# "tqdm",
# ]
# ///
"""
Image analyzer script that uses OpenRouter AI to analyze and rename images.
Moves processed images to ANALYZED_DO_NOT_SCAN_THIS_ONE folder.
"""
import base64
import httpx
import logging
import os
import re
import sys
from pathlib import Path
from PIL import Image
import io
from dotenv import load_dotenv
from tqdm import tqdm
# Load environment variables from .env file
load_dotenv()
# Configure logging to file
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("analyze_images.log"),
],
)
logger = logging.getLogger(__name__)
# Configuration
OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY")
OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"
# Using a vision-capable model
MODEL = "google/gemini-2.5-flash"
OUTPUT_FOLDER = "files"
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}
def get_image_base64(image_path: Path) -> tuple[str, str]:
"""Read image and convert to base64, resizing if needed to reduce tokens."""
with Image.open(image_path) as img:
# Convert to RGB if necessary (for PNG with transparency, etc.)
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
# Resize if too large (max 1024px on longest side to save tokens)
max_size = 1024
if max(img.size) > max_size:
ratio = max_size / max(img.size)
new_size = (int(img.width * ratio), int(img.height * ratio))
img = img.resize(new_size, Image.Resampling.LANCZOS)
# Save to bytes
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=85)
buffer.seek(0)
base64_data = base64.standard_b64encode(buffer.read()).decode("utf-8")
return base64_data, "image/jpeg"
def analyze_image(image_path: Path, client: httpx.Client) -> str | None:
"""Send image to OpenRouter for analysis and get a descriptive filename."""
try:
base64_data, media_type = get_image_base64(image_path)
except Exception as e:
logger.error(f"Error reading image {image_path}: {e}")
return None
prompt = """Analyze this image and provide a short, descriptive filename for it.
Rules for the filename:
1. Use lowercase letters only
2. Use underscores to separate words (no spaces or special characters)
3. Keep it concise but descriptive (3-6 words max)
4. Do NOT include the file extension
5. Describe the main subject/theme of the image
Respond with ONLY the filename, nothing else. Example responses:
- sunset_over_mountains
- anime_girl_with_sword
- abstract_blue_waves
- cyberpunk_city_night"""
payload = {
"model": MODEL,
"messages": [
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": f"data:{media_type};base64,{base64_data}"},
},
{"type": "text", "text": prompt},
],
}
],
"max_tokens": 50,
"temperature": 0.3,
}
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
"HTTP-Referer": "https://github.com/local-script",
"X-Title": "Image Analyzer Script",
}
try:
response = client.post(
OPENROUTER_URL, json=payload, headers=headers, timeout=60.0
)
response.raise_for_status()
result = response.json()
if "choices" in result and len(result["choices"]) > 0:
filename = result["choices"][0]["message"]["content"].strip()
# Clean up the filename
filename = re.sub(r"[^a-z0-9_]", "_", filename.lower())
filename = re.sub(r"_+", "_", filename) # Remove multiple underscores
filename = filename.strip("_")
return filename
else:
logger.warning(f"Unexpected API response for {image_path}: {result}")
return None
except httpx.HTTPStatusError as e:
logger.error(
f"API error for {image_path}: {e.response.status_code} - {e.response.text}"
)
return None
except Exception as e:
logger.error(f"Error calling API for {image_path}: {e}")
return None
def find_images(root_dir: Path, exclude_folder: str) -> list[Path]:
"""Find all image files in subdirectories, excluding the output folder."""
images = []
for path in root_dir.rglob("*"):
if path.is_file() and path.suffix.lower() in IMAGE_EXTENSIONS:
# Skip files in the output folder or hidden directories
if exclude_folder not in path.parts and not any(
p.startswith(".") for p in path.parts
):
images.append(path)
return sorted(images)
def get_unique_filename(output_dir: Path, base_name: str, extension: str) -> Path:
"""Generate a unique filename by appending a number if needed."""
candidate = output_dir / f"{base_name}{extension}"
if not candidate.exists():
return candidate
counter = 1
while True:
candidate = output_dir / f"{base_name}_{counter}{extension}"
if not candidate.exists():
return candidate
counter += 1
def main():
if not OPENROUTER_API_KEY:
print("Error: OPENROUTER_API_KEY environment variable is not set.")
print("Please set it with: export OPENROUTER_API_KEY='your-api-key'")
sys.exit(1)
root_dir = Path.cwd()
output_dir = root_dir / OUTPUT_FOLDER
# Create output directory if it doesn't exist
output_dir.mkdir(exist_ok=True)
logger.info(f"Output directory: {output_dir}")
# Find all images
images = find_images(root_dir, OUTPUT_FOLDER)
if not images:
print("No images found to process.")
logger.info("No images found to process.")
return
print(f"Found {len(images)} images to analyze.")
logger.info(f"Found {len(images)} images to analyze.")
# Process images
processed = 0
failed = 0
with httpx.Client() as client:
for image_path in tqdm(images, desc="Analyzing images", unit="img"):
relative_path = image_path.relative_to(root_dir)
logger.info(f"Processing: {relative_path}")
new_name = analyze_image(image_path, client)
if new_name:
extension = image_path.suffix.lower()
new_path = get_unique_filename(output_dir, new_name, extension)
# Move the file
try:
image_path.rename(new_path)
logger.info(f"Moved {relative_path} -> {new_path.name}")
processed += 1
except Exception as e:
logger.error(f"Error moving file {relative_path}: {e}")
failed += 1
else:
logger.warning(f"Skipped {relative_path} (analysis failed)")
failed += 1
print(f"\nDone! Processed: {processed}, Failed: {failed}")
logger.info(f"Completed. Processed: {processed}, Failed: {failed}")
# Clean up empty directories
print("Cleaning up empty directories...")
for dirpath in sorted(root_dir.rglob("*"), reverse=True):
if dirpath.is_dir() and not any(dirpath.iterdir()) and dirpath != output_dir:
if not any(p.startswith(".") for p in dirpath.relative_to(root_dir).parts):
try:
dirpath.rmdir()
logger.info(
f"Removed empty directory: {dirpath.relative_to(root_dir)}"
)
except Exception:
pass
print(f"See analyze_images.log for detailed logs.")
if __name__ == "__main__":
main()