Source code for z4ai

# SPDX-License-Identifier: Apache-2.0

"""z4ai: a lossless compression codec tuned for neural-network model weights.

z4ai transposes a tensor into byte planes (concentrating the low-entropy
exponent bytes) and compresses each plane with Zstandard configured for
long-distance matching over a large window, with a sparse zero-aware path for
pruned checkpoints.  That LZ stage captures the long-range redundancy in real
checkpoints - repeated values, pruned-to-zero regions, shared quantization
levels - which ZipNN's order-0 entropy coder cannot see.  On a real BF16
checkpoint this is roughly a +70% ratio win over ZipNN, while remaining a strict
match on purely random data.  See FINDINGS.md for measurements.

The public surface is intentionally tiny and mirrors ZipNN so z4ai is a
drop-in alternative (see ``API_CONTRACT.md``)::

    >>> import z4ai
    >>> blob = z4ai.compress(weights_bytes, dtype="bf16")
    >>> assert bytes(z4ai.decompress(blob)) == weights_bytes

For NumPy arrays the dtype and shape are inferred and round-tripped exactly::

    >>> import numpy as np, z4ai
    >>> w = np.random.randn(1000, 1000).astype(np.float32)
    >>> blob = z4ai.compress_ndarray(w)
    >>> np.array_equal(w, z4ai.decompress_ndarray(blob))
    True
"""

from __future__ import annotations

import os
from typing import Optional, Union

from . import auto as _autocodec
from . import backend, codec, format, transforms
from .codec import (
    DEFAULT_ESCALATE_LEVEL,
    DEFAULT_LEVEL,
    compress_array,
    decompress_array,
)
from .format import peek_is_z4ai
from . import delta
from . import model_delta

__all__ = [
    "compress",
    "decompress",
    "compress_ndarray",
    "decompress_ndarray",
    "compress_array",
    "decompress_array",
    "compress_delta",
    "decompress_delta",
    "delta",
    "model_delta",
    "peek_is_z4ai",
    "dtype_width",
    "DTYPE_WIDTHS",
    "DEFAULT_LEVEL",
    "DEFAULT_ESCALATE_LEVEL",
    "__version__",
]

__version__ = "0.3.0"

#: Element width in bytes for every dtype code understood by :func:`compress`.
#: The byte-plane transpose width is derived directly from these.  ``None`` and
#: any 1-byte code are treated as an opaque buffer (no float-plane split).
DTYPE_WIDTHS = {
    None: 1,
    "raw": 1,
    "uint8": 1,
    "int8": 1,
    "bool": 1,
    "fp8_e4m3": 1,
    "fp8_e5m2": 1,
    "bf16": 2,
    "fp16": 2,
    "float16": 2,
    "int16": 2,
    "uint16": 2,
    "fp32": 4,
    "float32": 4,
    "int32": 4,
    "uint32": 4,
    "fp64": 8,
    "float64": 8,
    "int64": 8,
    "uint64": 8,
}


[docs] def dtype_width(dtype: Optional[str]) -> int: """Return the byte-plane width for a dtype code (see :data:`DTYPE_WIDTHS`).""" try: return DTYPE_WIDTHS[dtype] except KeyError: known = sorted(k for k in DTYPE_WIDTHS if k is not None) raise ValueError( f"unknown dtype code {dtype!r}; known codes: {known}" ) from None
def _as_bytes(data) -> bytes: if isinstance(data, bytes): return data if isinstance(data, (bytearray, memoryview)): return bytes(data) raise TypeError( "data must be bytes-like or a numpy.ndarray, got " f"{type(data).__name__}" ) def _codec_kwargs(level: Optional[int], threads: int) -> dict: """Translate the public ``level``/``threads`` knobs to codec internals.""" kw: dict = {} if level is not None: kw["level"] = level # ``threads`` follows the ZipNN convention: 0 = auto (all cores), 1 = # single-threaded, n = n workers. We fan out across planes (each plane is # independent); zstd's own threads stay at 0 for deterministic output. if threads == 1: kw["plane_workers"] = 1 elif threads and threads > 1: kw["plane_workers"] = threads else: # 0 or None -> auto kw["plane_workers"] = max(1, os.cpu_count() or 1) return kw
[docs] def compress( data: Union[bytes, bytearray, memoryview, "object"], *, dtype: Optional[str] = None, level: Optional[int] = None, threads: int = 0, auto: bool = True, effort: Optional[str] = None, ) -> bytes: """Compress a buffer (or NumPy array) into a self-describing z4ai frame. Args: data: raw ``bytes``/``bytearray``/``memoryview``, or a ``numpy.ndarray`` (dtype and shape are then inferred and round-tripped). dtype: element type code when ``data`` is a raw buffer, e.g. ``"bf16"``, ``"fp16"``, ``"fp32"``, ``"fp64"``. ``None`` treats the buffer as opaque bytes (still compressed, no float-plane split). Ignored when ``data`` is a NumPy array. level: Zstd level (1..22). ``None`` uses the codec default. threads: ``0`` = auto (all cores), ``1`` = single-threaded, ``n`` = n workers. auto: when ``True`` (default), also try plain whole-buffer Zstd, the byte-plane transpose, and the sparse zero-aware path, keeping the smallest frame. Set ``False`` to force the dtype's primary transform only (fastest compress; skips the best-of passes). effort: ratio/speed tier. ``None`` (default) uses the fast streaming codec (the balanced default — multi-GB/s, ratio ties-or-beats ZipNN). ``"max"`` selects the high-ratio AUTO codec, which adds a context-modeling backend (brotli, chunk-parallel) on top of the field split for the **smallest possible file** — the right choice for model *distribution* (compress once, download many): measured **+3% to +12% smaller than ZipNN** on real transformers, at a ~3-12 MB/s offline compress and ~1 GB/s decompress. ``"balanced"`` is an intermediate tier. See ``README.md`` for the comparison table. Returns: A ``bytes`` frame: ``b"Z4AI"`` for the default codec, ``b"ZARA"`` for ``effort="max"``/``"balanced"``. :func:`decompress` accepts either; decompression needs no side information. """ # Lazy ndarray detection so ``import z4ai`` never requires numpy at import # time beyond what the codec already pulls in. is_array = ( hasattr(data, "dtype") and hasattr(data, "shape") and hasattr(data, "tobytes") ) # Always build the fast streaming frame first: it is the floor (multi-GB/s) # and, on long-range-duplicate weights, its whole-buffer LDM beats the AUTO # codec's chunk-parallel brotli (which forfeits cross-chunk dedup). # # ``effort="fast"`` additionally drops the per-plane rANS entropy candidates: # rANS reaches the entropy floor (a small ratio win) but its streams decode # ~6x slower than the block-parallel Zstd path, so the decode-speed tier keeps # the fast multi-core decode. The ratio-first default / "max" tiers leave it on. use_rans = effort != "fast" if is_array: default_frame = compress_array( data, auto=auto, rans=use_rans, **_codec_kwargs(level, threads) ) else: buf = _as_bytes(data) dtype_width(dtype) # validate dtype code early for a friendly error default_frame = codec.compress( buf, dtype=dtype, auto=auto, rans=use_rans, **_codec_kwargs(level, threads) ) want_max = effort is not None and effort != "fast" if not want_max: return default_frame # High-ratio tier: also build the AUTO frame (field split + brotli/context # backend) and keep whichever is smaller. This makes ``effort="max"`` a # strict superset of the default — never worse in ratio, and on data where # brotli does not help (e.g. i.i.d. mantissa) it transparently keeps the fast # default frame (which also decodes faster). if is_array: auto_frame = _autocodec.compress(data, effort=effort) else: auto_frame = _autocodec.compress(buf, dtype=dtype, effort=effort) return auto_frame if len(auto_frame) < len(default_frame) else default_frame
[docs] def decompress(blob: bytes, *, out: Optional[Union[bytearray, memoryview]] = None): """Decompress a z4ai frame back to the original buffer (byte-identical). Args: blob: a frame produced by :func:`compress`. out: optional writable buffer to decode into; when given it is filled and returned. Returns: The original ``bytes`` (or ``out`` if provided). """ # ``effort="max"`` frames carry the ``b"ZARA"`` magic; everything else is a # streaming ``b"Z4AI"`` (or sparse) frame handled by the core codec. if len(blob) >= 4 and bytes(blob[:4]) == b"ZARA": result = _autocodec.decompress(blob) else: result = codec.decompress(blob) if out is not None: mv = memoryview(out) if len(mv) < len(result): raise ValueError(f"out buffer too small: {len(mv)} < {len(result)}") mv[: len(result)] = result return out return result
# --- NumPy convenience helpers ------------------------------------------------ def compress_ndarray( arr, *, level: Optional[int] = None, threads: int = 0, effort: Optional[str] = None ) -> bytes: """Compress a NumPy array, preserving dtype and shape (see :func:`decompress_ndarray`). ``effort="max"`` selects the high-ratio AUTO codec (see :func:`compress`). """ return compress(arr, level=level, threads=threads, effort=effort) def decompress_ndarray(blob: bytes): """Reconstruct the original NumPy array (dtype + shape) from a z4ai frame.""" if len(blob) >= 4 and bytes(blob[:4]) == b"ZARA": return _autocodec.decompress_array(blob) return decompress_array(blob) # --- Cross-checkpoint delta -------------------------------------------------
[docs] def compress_delta( current, reference, *, dtype: Optional[str] = None, level: Optional[int] = None, threads: int = 0, ) -> bytes: """Losslessly compress ``current`` as a delta against a ``reference`` checkpoint. Consecutive training/fine-tuning checkpoints differ only slightly; storing the bit-exact XOR delta (``current ^ reference``) makes most bytes zero and compresses far below the from-scratch float-entropy ceiling. Decompress with :func:`decompress_delta` and the *same* ``reference``. See :mod:`z4ai.delta`. """ return delta.compress(current, reference, dtype=dtype, level=level, threads=threads)
[docs] def decompress_delta(blob: bytes, reference, *, out: Optional[bytearray] = None): """Reconstruct a checkpoint from a delta frame and its ``reference`` (see :func:`compress_delta`).""" return delta.decompress(blob, reference, out=out)
# Lazy re-export of the optional HuggingFace integration (needs torch + # safetensors). Accessed as ``z4ai.enable_hf`` / ``z4ai.load_file`` etc. without # importing torch at package import time. _HF_EXPORTS = {"enable_hf", "disable_hf", "is_enabled", "load_file", "save_file"} def __getattr__(name: str): if name in _HF_EXPORTS: from . import integrations return getattr(integrations, name) raise AttributeError(f"module {__name__!r} has no attribute {name!r}")