Academy / PrintPal API and Automation / Batch Processing and Automation

Batch Processing and Automation

12 min read

This is where the API really shines compared to the web app. Instead of uploading images one at a time and clicking buttons, you can write a script that processes an entire folder of images automatically - even in parallel.

In this lesson, we'll build a complete batch processing script from scratch.

The Plan

Our script will:
1. Scan a folder for image files
2. Check if we have enough credits for all of them
3. Process the images in parallel (up to 5 at a time)
4. Save all the resulting 3D models to an output folder
5. Print a summary of what succeeded and what failed

The Complete Batch Script (Python)

Here's the full script, broken into sections:

Setting up and finding images:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import os
import sys
import time
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from printpal import PrintPal, Quality, Format, CREDIT_COSTS

# Configuration
QUALITY = Quality.DEFAULT
OUTPUT_FORMAT = Format.STL
MAX_CONCURRENT = 5  # API allows up to 5 concurrent generations
INPUT_DIR = "images"
OUTPUT_DIR = "models"

def find_images(directory):
    """Find all image files in a directory."""
    extensions = {".png", ".jpg", ".jpeg", ".webp"}
    images = []
    for ext in extensions:
        images.extend(Path(directory).glob(f"*{ext}"))
        images.extend(Path(directory).glob(f"*{ext.upper()}"))
    return sorted(images)

Processing a single image:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def process_single_image(client, image_path, output_dir):
    """Process one image and return the result."""
    output_name = f"{image_path.stem}.{OUTPUT_FORMAT.value}"
    output_path = Path(output_dir) / output_name

    try:
        result = client.generate_from_image(
            image_path=image_path,
            quality=QUALITY,
            format=OUTPUT_FORMAT,
        )

        client.wait_for_completion(result.generation_uid, poll_interval=5)
        client.download(result.generation_uid, output_path=output_path)

        return {
            "image": image_path.name,
            "output": str(output_path),
            "status": "success",
            "credits_used": result.credits_used,
        }

    except Exception as e:
        return {
            "image": image_path.name,
            "output": None,
            "status": "failed",
            "error": str(e),
        }

Running the batch with parallel processing:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def main():
    # Find images
    images = find_images(INPUT_DIR)
    if not images:
        print(f"No images found in '{INPUT_DIR}/'")
        sys.exit(1)

    credits_per_image = CREDIT_COSTS[QUALITY]
    total_cost = len(images) * credits_per_image

    print(f"Found {len(images)} images")
    print(f"Quality: {QUALITY.value} ({credits_per_image} credits each)")
    print(f"Total cost: {total_cost} credits")

    # Check credits
    client = PrintPal()
    credits = client.get_credits()
    print(f"Available: {credits.credits} credits")

    if credits.credits < total_cost:
        can_process = credits.credits // credits_per_image
        print(f"Warning: Can only process {can_process} of {len(images)} images")
        images = images[:can_process]

    # Create output directory
    Path(OUTPUT_DIR).mkdir(exist_ok=True)

    # Process in parallel
    print(f"\nProcessing {len(images)} images (up to {MAX_CONCURRENT} at a time)...\n")
    start_time = time.time()
    results = []

    with ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as executor:
        futures = {
            executor.submit(process_single_image, client, img, OUTPUT_DIR): img
            for img in images
        }

        for future in as_completed(futures):
            result = future.result()
            results.append(result)
            icon = "[OK]" if result["status"] == "success" else "[FAIL]"
            print(f"  {icon} {result['image']}")

    # Summary
    elapsed = time.time() - start_time
    success_count = sum(1 for r in results if r["status"] == "success")
    fail_count = len(results) - success_count
    total_credits = sum(r.get("credits_used", 0) for r in results)

    print(f"\nDone in {elapsed:.0f} seconds")
    print(f"Successful: {success_count}")
    print(f"Failed: {fail_count}")
    print(f"Credits used: {total_credits}")
    print(f"Output: {OUTPUT_DIR}/")

    if fail_count > 0:
        print("\nFailed images:")
        for r in results:
            if r["status"] == "failed":
                print(f"  {r['image']}: {r.get('error')}")

if __name__ == "__main__":
    main()
How many generations can run in parallel at the same time?
1 - only one at a time
5 - the API allows up to 5 concurrent generations
Unlimited - there is no concurrency limit
10 - the API allows up to 10 concurrent generations

Batch Processing in JavaScript/TypeScript

The same approach works in JavaScript, using Promise.all with a concurrency limiter:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import { PrintPal, Quality, Format, CREDIT_COSTS } from 'printpal';
import * as fs from 'fs';
import * as path from 'path';

const QUALITY = Quality.DEFAULT;
const MAX_CONCURRENT = 5;
const INPUT_DIR = './images';
const OUTPUT_DIR = './models';

async function processSingleImage(client, imagePath, outputDir) {
    const baseName = path.basename(imagePath, path.extname(imagePath));
    const outputPath = path.join(outputDir, `${baseName}.stl`);

    try {
        const result = await client.generateFromImage(imagePath, {
            quality: QUALITY,
            format: Format.STL,
        });

        await client.waitForCompletion(result.generationUid);
        await client.download(result.generationUid, outputPath);

        return { image: path.basename(imagePath), status: 'success' };
    } catch (error) {
        return { image: path.basename(imagePath), status: 'failed', error: error.message };
    }
}

async function main() {
    const client = new PrintPal({ apiKey: process.env.PRINTPAL_API_KEY });

    // Find images
    const extensions = ['.png', '.jpg', '.jpeg', '.webp'];
    const images = fs.readdirSync(INPUT_DIR)
        .filter(f => extensions.includes(path.extname(f).toLowerCase()))
        .map(f => path.join(INPUT_DIR, f));

    console.log(`Found ${images.length} images`);

    // Create output directory
    if (!fs.existsSync(OUTPUT_DIR)) fs.mkdirSync(OUTPUT_DIR);

    // Process in batches of MAX_CONCURRENT
    const results = [];
    for (let i = 0; i < images.length; i += MAX_CONCURRENT) {
        const batch = images.slice(i, i + MAX_CONCURRENT);
        const batchResults = await Promise.all(
            batch.map(img => processSingleImage(client, img, OUTPUT_DIR))
        );
        results.push(...batchResults);
        batchResults.forEach(r => {
            const icon = r.status === 'success' ? '[OK]' : '[FAIL]';
            console.log(`  ${icon} ${r.image}`);
        });
    }

    const successCount = results.filter(r => r.status === 'success').length;
    console.log(`\nDone: ${successCount}/${results.length} succeeded`);
}

main().catch(console.error);

Making It Production-Ready

Here are some improvements you might add for a production batch processing pipeline:

Retry logic - If a generation fails due to a transient error (network issue, rate limit), retry it automatically with a delay:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import time
from printpal import RateLimitError, PrintPalError

def generate_with_retry(client, image_path, max_retries=3):
    for attempt in range(max_retries):
        try:
            result = client.generate_from_image(image_path, quality=Quality.DEFAULT)
            return client.wait_and_download(result.generation_uid, f"{image_path.stem}.stl")
        except RateLimitError as e:
            wait_time = int(e.retry_after or 60)
            print(f"  Rate limited, waiting {wait_time}s (attempt {attempt + 1})")
            time.sleep(wait_time)
        except PrintPalError as e:
            if attempt < max_retries - 1:
                print(f"  Error: {e.message}, retrying in 10s...")
                time.sleep(10)
            else:
                raise

Skip already-processed files - Check if the output file already exists before generating:

1
2
3
4
output_path = Path(OUTPUT_DIR) / f"{image_path.stem}.stl"
if output_path.exists():
    print(f"  Skipping {image_path.name} (already processed)")
    continue

Save a manifest - Write a JSON file listing what was processed, what failed, and when:

1
2
3
4
5
6
7
8
import json

manifest = {
    "processed_at": time.strftime("%Y-%m-%d %H:%M:%S"),
    "quality": QUALITY.value,
    "results": results,
}
Path(OUTPUT_DIR, "manifest.json").write_text(json.dumps(manifest, indent=2))
Why is it a good practice to check if an output file already exists before generating?
The API requires it or the request will fail
It makes the script run faster by skipping the credit check
The file format might be different if it already exists
It avoids re-processing images that are already done, saving credits and time
Have you finished this lesson?