Thumbnail.

Flat Earth

Description

Actual elevation data of Earth (from the National Centers for Environmental Information), crammed into a visual square.

# @title allRGB Earth Elevation (ETOPO1) — 4096×4096, uses all 24-bit RGB colors exactly once
# @markdown This cell downloads real ETOPO1 elevation data, caches it, and generates a 4096×4096 allRGB PNG
# @markdown where darkest colors map to deepest ocean and brightest to highest mountains.

import os, sys, io, zipfile, shutil, math, time, hashlib, textwrap, pathlib, tempfile, stat
import numpy as np
from PIL import Image

# Optional: use rasterio for robust GeoTIFF reading (faster/safer than PIL for big DEMs).
# If import fails, we fall back to PIL (works with this ETOPO1 TIFF).
try:
    import rasterio
    _HAS_RASTERIO = True
except Exception:
    _HAS_RASTERIO = False

# ---------- SETTINGS ----------
TARGET_W = 4096
TARGET_H = 4096
RANDOM_SEED = 1337  # affects only tie-break shuffles; final luminance/elevation ordering is deterministic otherwise

# ETOPO1 bedrock geotiff (global topography/bathymetry, 1 arc-minute ~ 60")
# NOAA NGDC legacy link (still widely mirrored); script tries these in order.
# Working NOAA mirrors (ETOPO1 Bedrock, GeoTIFF-in-a-zip)
ETOPO1_CANDIDATE_URLS = [
    # grid-registered (g) — preferred
    "https://www.ngdc.noaa.gov/mgg/global/relief/ETOPO1/data/bedrock/grid_registered/georeferenced_tiff/ETOPO1_Bed_g_geotiff.zip",
    # cell-registered (c) — fallback
    "https://www.ngdc.noaa.gov/mgg/global/relief/ETOPO1/data/bedrock/cell_registered/georeferenced_tiff/ETOPO1_Bed_c_geotiff.zip",
]

# Cache root: prefer Google Drive if available/authorized, else local /content cache
DEFAULT_LOCAL_CACHE = "/content/etopo1_cache"

# ---------- UTILITIES ----------
def human_bytes(n):
    for unit in ['B','KB','MB','GB','TB']:
        if n < 1024: return f"{n:.1f} {unit}"
        n /= 1024
    return f"{n:.1f} PB"

def ensure_dir(p):
    os.makedirs(p, exist_ok=True)
    return p

def detect_drive_mount():
    # If the user mounted Drive, prefer a folder there for persistence.
    # We don't auto-mount; we simply detect and use it if already mounted.
    drive_root = "/content/drive"
    if os.path.isdir(drive_root) and any(x for x in os.listdir(drive_root) if x.lower().startswith('my')):
        # Choose a subfolder in MyDrive
        mydrives = [x for x in os.listdir(drive_root) if x.lower().startswith("my")]
        base = os.path.join(drive_root, mydrives[0], "allRGB_Earth")
        return ensure_dir(base)
    return None

def pick_cache_root():
    root = detect_drive_mount()
    if root:
        print(f"✔ Using Google Drive cache: {root}")
        return root
    print(f"ℹ Using local cache: {DEFAULT_LOCAL_CACHE}\n   (Mount Drive first if you want persistence.)")
    return ensure_dir(DEFAULT_LOCAL_CACHE)

CACHE_ROOT = pick_cache_root()

ZIP_PATH = os.path.join(CACHE_ROOT, "ETOPO1_Bed_g_geotiff.zip")
TIF_PATH = os.path.join(CACHE_ROOT, "ETOPO1_Bed_g.tif")
DOWNSAMPLED_NPY = os.path.join(CACHE_ROOT, f"etopo1_{TARGET_W}x{TARGET_H}.npy")
OUTPUT_PNG = os.path.join(CACHE_ROOT, f"allrgb_earth_{TARGET_W}x{TARGET_H}.png")

# ---------- DOWNLOAD ----------
def stream_download(url, dst, chunk=1<<20):
    import requests
    with requests.get(url, stream=True, timeout=60) as r:
        r.raise_for_status()
        total = int(r.headers.get("content-length") or 0)
        got = 0
        t0 = time.time()
        with open(dst, "wb") as f:
            for ch in r.iter_content(chunk_size=chunk):
                if ch:
                    f.write(ch)
                    got += len(ch)
                    if total:
                        done = got/total*100
                        sys.stdout.write(f"\r↓ {url}  {human_bytes(got)}/{human_bytes(total)}  ({done:5.1f}%)")
                        sys.stdout.flush()
        dt = time.time()-t0
        sys.stdout.write(f"\nDownloaded in {dt:.1f}s ({human_bytes(got/dt)}/s)\n")

def ensure_zip():
    if os.path.isfile(ZIP_PATH) and os.path.getsize(ZIP_PATH) > 10_000_000:
        print(f"✔ Cache hit: {ZIP_PATH} ({human_bytes(os.path.getsize(ZIP_PATH))})")
        return
    last_err = None
    for url in ETOPO1_CANDIDATE_URLS:
        try:
            print(f"Attempting download: {url}")
            stream_download(url, ZIP_PATH)
            if os.path.getsize(ZIP_PATH) > 10_000_000:
                print("✔ Download OK.")
                return
        except Exception as e:
            print(f"✖ Failed: {e}")
            last_err = e
    raise RuntimeError(f"Could not download ETOPO1 zip from any source. Last error: {last_err}")

# ---------- UNZIP ----------
def ensure_tif_from_zip():
    if os.path.isfile(TIF_PATH) and os.path.getsize(TIF_PATH) > 10_000_000:
        print(f"✔ Cache hit: {TIF_PATH} ({human_bytes(os.path.getsize(TIF_PATH))})")
        return
    print("Extracting GeoTIFF…")
    with zipfile.ZipFile(ZIP_PATH, 'r') as z:
        members = z.namelist()
        tif_candidates = [m for m in members if m.lower().endswith(".tif")]
        if not tif_candidates:
            raise RuntimeError("No .tif found in ETOPO1 zip.")
        # Most zips have ETOPO1_Bed_g.tif
        member = tif_candidates[0]
        with z.open(member) as zin, open(TIF_PATH, "wb") as fout:
            shutil.copyfileobj(zin, fout)
    print(f"✔ Extracted: {TIF_PATH} ({human_bytes(os.path.getsize(TIF_PATH))})")

# ---------- READ + RESAMPLE ----------
def read_etopo_tiff_as_array():
    """Return elevation in meters as float32 numpy array, shape (H, W)."""
    if _HAS_RASTERIO:
        with rasterio.open(TIF_PATH) as ds:
            arr = ds.read(1).astype(np.float32)  # first band = elevation (meters)
            # (Height, Width)
            return arr
    else:
        # Fallback via PIL (works with this TIFF, but no georeferencing needed here)
        im = Image.open(TIF_PATH)
        arr = np.array(im, dtype=np.float32)
        return arr

def resample_to_target(elev):
    """Resample to (TARGET_H, TARGET_W) with simple area-ish average using block aggregation or bilinear, depending on ratio."""
    h, w = elev.shape
    if (h == TARGET_H) and (w == TARGET_W):
        return elev.copy()

    # Use PIL bilinear (good quality, fast)
    # Normalize to float32, push through PIL, then back.
    # Note: Elevation can be negative; shift to positive for resize to avoid dtype issues; shift back.
    mn, mx = float(elev.min()), float(elev.max())
    shift = -mn if mn < 0 else 0.0
    scale = 1.0/(mx - mn + 1e-9)
    tmp = (elev + shift)*scale  # 0..1
    pil = Image.fromarray(np.clip(tmp, 0, 1), mode='F')  # 32-bit float image
    pil_res = pil.resize((TARGET_W, TARGET_H), resample=Image.BILINEAR)
    out = np.asarray(pil_res, dtype=np.float32)
    out = out*(mx - mn + 1e-9) - shift
    return out

def ensure_downsampled_npy():
    if os.path.isfile(DOWNSAMPLED_NPY):
        print(f"✔ Cache hit: {DOWNSAMPLED_NPY} ({human_bytes(os.path.getsize(DOWNSAMPLED_NPY))})")
        return np.load(DOWNSAMPLED_NPY)
    print("Reading ETOPO1 GeoTIFF…")
    arr = read_etopo_tiff_as_array()
    print(f"  Source DEM shape: {arr.shape}, range: [{arr.min():.1f}, {arr.max():.1f}] m")

    print(f"Resampling to {TARGET_W}×{TARGET_H}…")
    ds = resample_to_target(arr)
    print(f"  Downsampled range: [{ds.min():.1f}, {ds.max():.1f}] m")

    np.save(DOWNSAMPLED_NPY, ds.astype(np.float32))
    print(f"✔ Saved cache: {DOWNSAMPLED_NPY} ({human_bytes(os.path.getsize(DOWNSAMPLED_NPY))})")
    return ds

# ---------- allRGB PALETTE ORDERING ----------
def build_allrgb_sorted_by_luminance():
    """
    Generate every 24-bit RGB color exactly once (0..16,777,215),
    sort by linear luminance Y = 0.2126R + 0.7152G + 0.0722B,
    then break ties with G, R, B (stable lexicographic).
    Returns uint8 array of shape (N, 3), ordered dark->bright.
    """
    N = 256*256*256
    print("Generating full 24-bit RGB palette…")
    # Using uint32 then bit ops is memory-efficient and very fast.
    vals = np.arange(N, dtype=np.uint32)
    R = ((vals >> 16) & 0xFF).astype(np.uint16)  # use u16 to avoid float upcast cost early
    G = ((vals >> 8)  & 0xFF).astype(np.uint16)
    B = (vals & 0xFF).astype(np.uint16)

    # Linear luminance (no gamma correction; lightweight and good enough for ordering)
    # float32 to control memory.
    Y = (0.2126*R.astype(np.float32) + 0.7152*G.astype(np.float32) + 0.0722*B.astype(np.float32))

    # lexsort sorts by last key primary; we want Y primary, with G,R,B tie-breakers
    # So provide keys in order: (B, R, G, Y)
    print("Sorting colors by luminance… (this can take ~10–40s in Colab)")
    idx = np.lexsort((B, R, G, Y))
    # Build ordered palette
    out = np.empty((N,3), dtype=np.uint8)
    out[:,0] = ( (vals[idx] >> 16) & 0xFF ).astype(np.uint8)
    out[:,1] = ( (vals[idx] >> 8)  & 0xFF ).astype(np.uint8)
    out[:,2] = (  vals[idx]        & 0xFF ).astype(np.uint8)
    return out

# ---------- MAPPING: elevation rank -> color rank ----------
def _pack24(rgb_u8):
    # rgb_u8: (N,3) uint8 -> uint32 0xRRGGBB
    return (rgb_u8[:,0].astype(np.uint32) << 16) | \
           (rgb_u8[:,1].astype(np.uint32) << 8)  | \
            rgb_u8[:,2].astype(np.uint32)

def make_allrgb_elevation_image(elev_2d, palette_1d_rgb):
    """
    Map pixels sorted by elevation (low->high) onto colors sorted by luminance (dark->bright).
    Ensures a 1:1 mapping: every pixel gets a unique color exactly once.
    """
    H, W = elev_2d.shape
    N = H*W
    assert N == 256*256*256, "Image must be exactly 4096×4096 to use all 16,777,216 colors once."

    print("Ranking elevation…")
    flat = elev_2d.reshape(-1)
    elev_rank = np.argsort(flat, kind='stable')  # low -> high

    print("Assigning colors to pixels…")
    colors = palette_1d_rgb  # (N,3) uint8, dark -> bright
    out = np.empty((N, 3), dtype=np.uint8)
    out[elev_rank] = colors

    # ---- Correct multiset verification (order-independent) ----
    print("Verifying color accounting (this takes a few seconds)…")
    pal_codes = _pack24(colors)
    img_codes = _pack24(out)

    # Count occurrences for each 24-bit code; must be exactly 1 everywhere
    # Memory note: bincount with minlength=16,777,216 uses ~128–135 MB (OK in Colab).
    pal_counts = np.bincount(pal_codes, minlength=256*256*256)
    img_counts = np.bincount(img_codes, minlength=256*256*256)

    assert pal_counts.dtype == np.int64 and img_counts.dtype == np.int64
    # Both should be vectors of all 1s
    if not (pal_counts == img_counts).all():
        raise AssertionError("Color multiset mismatch — mapping error.")
    if pal_counts.min() != 1 or pal_counts.max() != 1 or img_counts.min() != 1 or img_counts.max() != 1:
        raise AssertionError("Some colors are missing or duplicated.")

    print("✔ Color accounting validated (each of 16,777,216 colors used exactly once).")
    return out.reshape(H, W, 3)

# ---------- MAIN ----------
def main():
    print("=== allRGB Earth Elevation (ETOPO1 bedrock) ===")
    print(f"Target image: {TARGET_W}×{TARGET_H}  (pixels: {TARGET_W*TARGET_H:,})")

    # 1) Ensure elevation data is cached and downsampled
    ensure_zip()
    ensure_tif_from_zip()
    elev = ensure_downsampled_npy()

    # 2) Prepare palette (all 24-bit RGB sorted by luminance)
    palette = build_allrgb_sorted_by_luminance()

    # 3) Map elevation rank -> color rank
    img = make_allrgb_elevation_image(elev, palette)

    # 4) Save PNG
    Image.fromarray(img, mode="RGB").save(OUTPUT_PNG, optimize=True)
    print(f"✔ Saved: {OUTPUT_PNG}")

    # 5) Tiny preview (downscaled) for quick sanity check inside Colab output
    try:
        display(Image.fromarray(img).resize((1024,1024), Image.NEAREST))
    except Exception:
        pass

if __name__ == "__main__":
    main()

Author

ACJ
69 entries

Stats

Date
Colors16,777,216
Pixels16,777,216
Dimensions4,096 × 4,096
Bytes50,331,713