#!/usr/bin/env -S uv run --script # /// script # requires-python = ">=3.11" # dependencies = [ # "httpx", # "pillow", # "python-dotenv", # "tqdm", # ] # /// """ Image analyzer script that uses OpenRouter AI to analyze and rename images. Moves processed images to ANALYZED_DO_NOT_SCAN_THIS_ONE folder. """ import base64 import httpx import logging import os import re import sys from pathlib import Path from PIL import Image import io from dotenv import load_dotenv from tqdm import tqdm # Load environment variables from .env file load_dotenv() # Configure logging to file logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler("analyze_images.log"), ], ) logger = logging.getLogger(__name__) # Configuration OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" # Using a vision-capable model MODEL = "google/gemini-2.5-flash" OUTPUT_FOLDER = "ANALYZED_DO_NOT_SCAN_THIS_ONE" IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"} def get_image_base64(image_path: Path) -> tuple[str, str]: """Read image and convert to base64, resizing if needed to reduce tokens.""" with Image.open(image_path) as img: # Convert to RGB if necessary (for PNG with transparency, etc.) if img.mode in ("RGBA", "P"): img = img.convert("RGB") # Resize if too large (max 1024px on longest side to save tokens) max_size = 1024 if max(img.size) > max_size: ratio = max_size / max(img.size) new_size = (int(img.width * ratio), int(img.height * ratio)) img = img.resize(new_size, Image.Resampling.LANCZOS) # Save to bytes buffer = io.BytesIO() img.save(buffer, format="JPEG", quality=85) buffer.seek(0) base64_data = base64.standard_b64encode(buffer.read()).decode("utf-8") return base64_data, "image/jpeg" def analyze_image(image_path: Path, client: httpx.Client) -> str | None: """Send image to OpenRouter for analysis and get a descriptive filename.""" try: base64_data, media_type = get_image_base64(image_path) except Exception as e: logger.error(f"Error reading image {image_path}: {e}") return None prompt = """Analyze this image and provide a short, descriptive filename for it. Rules for the filename: 1. Use lowercase letters only 2. Use underscores to separate words (no spaces or special characters) 3. Keep it concise but descriptive (3-6 words max) 4. Do NOT include the file extension 5. Describe the main subject/theme of the image Respond with ONLY the filename, nothing else. Example responses: - sunset_over_mountains - anime_girl_with_sword - abstract_blue_waves - cyberpunk_city_night""" payload = { "model": MODEL, "messages": [ { "role": "user", "content": [ { "type": "image_url", "image_url": {"url": f"data:{media_type};base64,{base64_data}"}, }, {"type": "text", "text": prompt}, ], } ], "max_tokens": 50, "temperature": 0.3, } headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", "HTTP-Referer": "https://github.com/local-script", "X-Title": "Image Analyzer Script", } try: response = client.post( OPENROUTER_URL, json=payload, headers=headers, timeout=60.0 ) response.raise_for_status() result = response.json() if "choices" in result and len(result["choices"]) > 0: filename = result["choices"][0]["message"]["content"].strip() # Clean up the filename filename = re.sub(r"[^a-z0-9_]", "_", filename.lower()) filename = re.sub(r"_+", "_", filename) # Remove multiple underscores filename = filename.strip("_") return filename else: logger.warning(f"Unexpected API response for {image_path}: {result}") return None except httpx.HTTPStatusError as e: logger.error( f"API error for {image_path}: {e.response.status_code} - {e.response.text}" ) return None except Exception as e: logger.error(f"Error calling API for {image_path}: {e}") return None def find_images(root_dir: Path, exclude_folder: str) -> list[Path]: """Find all image files in subdirectories, excluding the output folder.""" images = [] for path in root_dir.rglob("*"): if path.is_file() and path.suffix.lower() in IMAGE_EXTENSIONS: # Skip files in the output folder or hidden directories if exclude_folder not in path.parts and not any( p.startswith(".") for p in path.parts ): images.append(path) return sorted(images) def get_unique_filename(output_dir: Path, base_name: str, extension: str) -> Path: """Generate a unique filename by appending a number if needed.""" candidate = output_dir / f"{base_name}{extension}" if not candidate.exists(): return candidate counter = 1 while True: candidate = output_dir / f"{base_name}_{counter}{extension}" if not candidate.exists(): return candidate counter += 1 def main(): if not OPENROUTER_API_KEY: print("Error: OPENROUTER_API_KEY environment variable is not set.") print("Please set it with: export OPENROUTER_API_KEY='your-api-key'") sys.exit(1) root_dir = Path.cwd() output_dir = root_dir / OUTPUT_FOLDER # Create output directory if it doesn't exist output_dir.mkdir(exist_ok=True) logger.info(f"Output directory: {output_dir}") # Find all images images = find_images(root_dir, OUTPUT_FOLDER) if not images: print("No images found to process.") logger.info("No images found to process.") return print(f"Found {len(images)} images to analyze.") logger.info(f"Found {len(images)} images to analyze.") # Process images processed = 0 failed = 0 with httpx.Client() as client: for image_path in tqdm(images, desc="Analyzing images", unit="img"): relative_path = image_path.relative_to(root_dir) logger.info(f"Processing: {relative_path}") new_name = analyze_image(image_path, client) if new_name: extension = image_path.suffix.lower() new_path = get_unique_filename(output_dir, new_name, extension) # Move the file try: image_path.rename(new_path) logger.info(f"Moved {relative_path} -> {new_path.name}") processed += 1 except Exception as e: logger.error(f"Error moving file {relative_path}: {e}") failed += 1 else: logger.warning(f"Skipped {relative_path} (analysis failed)") failed += 1 print(f"\nDone! Processed: {processed}, Failed: {failed}") logger.info(f"Completed. Processed: {processed}, Failed: {failed}") # Clean up empty directories print("Cleaning up empty directories...") for dirpath in sorted(root_dir.rglob("*"), reverse=True): if dirpath.is_dir() and not any(dirpath.iterdir()) and dirpath != output_dir: if not any(p.startswith(".") for p in dirpath.relative_to(root_dir).parts): try: dirpath.rmdir() logger.info( f"Removed empty directory: {dirpath.relative_to(root_dir)}" ) except Exception: pass print(f"See analyze_images.log for detailed logs.") if __name__ == "__main__": main()