Skip to content

Cross Modal¤

Utilities for multi-modal data processing.

See Also¤


datarax.core.cross_modal ¤

Cross-modal operator base classes.

This module provides base classes for operators that work across multiple modalities (reading from multiple fields and producing combined outputs). CrossModalOperator enables fusion, cross-attention, contrastive learning, and alignment operations.

Key Features:

  • Multi-field input/output transformations
  • Learnable fusion and attention mechanisms
  • Compatible with JAX transformations (jit, vmap, grad)
  • End-to-end differentiable cross-modal pipelines
  • Support for contrastive learning and alignment

Examples:

Deterministic fusion operator:

config = CrossModalOperatorConfig(
    input_fields=["image_embedding", "text_embedding"],
    output_fields=["fused_embedding"],
    operation="fusion"
)
operator = FusionOperator(config, rngs=nnx.Rngs(0))

Stochastic contrastive operator:

config = CrossModalOperatorConfig(
    input_fields=["anchor_emb", "positive_emb"],
    output_fields=["similarity"],
    operation="contrastive",
    stochastic=True,
    stream_name="contrastive"
)
operator = ContrastiveOperator(config, rngs=nnx.Rngs(0, contrastive=1))

logger module-attribute ¤

logger = getLogger(__name__)

CrossModalOperatorConfig dataclass ¤

CrossModalOperatorConfig(cacheable: bool = False, batch_stats_fn: Callable | Module | None = None, precomputed_stats: dict[str, Any] | None = None, stochastic: bool = False, stream_name: str | None = None, batch_strategy: str = 'vmap', *, input_fields: list[str], output_fields: list[str], operation: str = 'fusion', validate_alignment: bool = True)

Bases: OperatorConfig

Configuration for cross-modal operators.

Cross-modal operators read from MULTIPLE fields and may produce new combined fields. They can have learnable parameters and must be compatible with JAX transformations.

Use Cases:

- **Fusion**: Combine embeddings from different modalities
  Examples: [image_emb, text_emb] → joint_emb
- **Cross-attention**: Attend from one modality to another
  Examples: [image_features, text_features] → attended_features
- **Contrastive**: Compute similarity across modalities
  Examples: [image_emb, text_emb] → similarity_score
- **Alignment**: Enforce cross-modal consistency
  Examples: [image_features, text_features] → alignment_loss

Attributes:

Name Type Description
input_fields list[str]

List of input field names to read from Examples: ["image_embedding", "text_embedding"]

output_fields list[str]

List of output field names to write to Examples: ["fused_embedding", "similarity_score"]

operation str

Cross-modal operation type Common values: "fusion", "cross_attention", "contrastive", "alignment"

validate_alignment bool

Whether to validate input field alignment (e.g., check batch dimensions match)

Validation Rules:

- input_fields must be non-empty list of non-empty strings
- output_fields must be non-empty list of non-empty strings
- Inherits stochastic validation from OperatorConfig
- Inherits statistics validation from DataraxModuleConfig

Examples:

Simple fusion:

config = CrossModalOperatorConfig(
    input_fields=["image_emb", "text_emb"],
    output_fields=["fused_emb"]
)

Cross-attention with multiple outputs:

config = CrossModalOperatorConfig(
    input_fields=["query", "key", "value"],
    output_fields=["attended_output", "attention_weights"],
    operation="cross_attention"
)

Stochastic contrastive learning:

config = CrossModalOperatorConfig(
    input_fields=["anchor", "positive", "negative"],
    output_fields=["similarity"],
    operation="contrastive",
    stochastic=True,
    stream_name="contrastive"
)

input_fields class-attribute instance-attribute ¤

input_fields: list[str] = field(kw_only=True)

output_fields class-attribute instance-attribute ¤

output_fields: list[str] = field(kw_only=True)

operation class-attribute instance-attribute ¤

operation: str = field(default='fusion', kw_only=True)

validate_alignment class-attribute instance-attribute ¤

validate_alignment: bool = field(default=True, kw_only=True)

cacheable class-attribute instance-attribute ¤

cacheable: bool = False

batch_stats_fn class-attribute instance-attribute ¤

batch_stats_fn: Callable | Module | None = None

precomputed_stats class-attribute instance-attribute ¤

precomputed_stats: dict[str, Any] | None = None

stochastic class-attribute instance-attribute ¤

stochastic: bool = False

stream_name class-attribute instance-attribute ¤

stream_name: str | None = None

batch_strategy class-attribute instance-attribute ¤

batch_strategy: str = 'vmap'

CrossModalOperator ¤

CrossModalOperator(config: CrossModalOperatorConfig, *, rngs: Rngs | None = None, name: str | None = None)

Bases: OperatorModule

Base class for cross-modal operators with learnable parameters.

Operates across multiple fields within an Element, enabling:

- Multi-modal fusion
- Cross-modal attention
- Contrastive learning
- Cross-modal alignment

Key Features:

- Compatible with nnx.jit, jax.vmap, jax.grad
- Supports learnable parameters via nnx.Param
- End-to-end differentiable
- Can be optimized jointly with model
- Operates on Batch[Element] (inherited from OperatorModule)

Inherited Features from OperatorModule:

- **apply_batch()**: Automatically handles batched operations by calling apply()

on each element. Override only if you need custom batch-level logic (e.g., batch-level contrastive loss, cross-element attention). Default is sufficient for most element-wise cross-modal operations.

- **Statistics system**: Optionally collect and use batch statistics via stats
  parameter in apply(). Useful for adaptive cross-modal operations (e.g.,
  batch-aware normalization of fused embeddings).

- **Caching system**: Results can be cached based on operator configuration

and input characteristics. Inherited from base OperatorModule, helps avoid redundant computation for deterministic cross-modal operations.

Subclass Implementation Pattern
class FusionOperator(CrossModalOperator):
    def __init__(self, config: CrossModalOperatorConfig, *, rngs: nnx.Rngs | None = None):
        super().__init__(config, rngs=rngs)
        # Add learnable fusion parameters
        self.fusion_weights = nnx.Param(jnp.ones(len(config.input_fields)))

    def apply(self, data, state, metadata, random_params=None, stats=None):
        # Extract inputs
        inputs = self._extract_inputs(data)

        # Fuse with learnable weights
        fused = sum(w * emb for w, emb in zip(self.fusion_weights[...], inputs))

        # Store outputs
        outputs = [fused]
        result = self._store_outputs(data, outputs)

        return result, state, metadata

    def generate_random_params(self, rng, data_shapes):
        # For stochastic operators only
        batch_size = data_shapes[self.config.input_fields[0]][0]
        return jax.random.normal(rng, (batch_size,))

Subclasses provide specific cross-modal operations:

- **FusionOperator**: Learned combination of embeddings
- **CrossAttentionOperator**: Learnable query/key/value projections
- **ContrastiveOperator**: Learnable projection heads and temperature

Examples:

Deterministic fusion:

fusion_op = FusionOperator(config, rngs=nnx.Rngs(0))

Learnable cross-attention:

class LearnedCrossAttention(CrossModalOperator):
    def __init__(self, config, *, rngs, dim, num_heads):
        super().__init__(config, rngs=rngs)
        self.q_proj = nnx.Linear(dim, dim, rngs=rngs)
        self.k_proj = nnx.Linear(dim, dim, rngs=rngs)
        self.v_proj = nnx.Linear(dim, dim, rngs=rngs)
        self.num_heads = num_heads

Batch-level contrastive operator:

class BatchContrastiveOperator(CrossModalOperator):
    def apply_batch(self, batch, stats=None):
        # Override for batch-level contrastive loss
        # Compute pairwise similarities across entire batch
        # Call apply() for final per-element outputs
        pass

Parameters:

Name Type Description Default
config CrossModalOperatorConfig

Cross-modal operator configuration (already validated)

required
rngs Rngs | None

Random number generators (required if stochastic=True)

None
name str | None

Optional operator name

None

Raises:

Type Description
ValueError

If stochastic=True but rngs is None

config instance-attribute ¤

config: CrossModalOperatorConfig = config

rngs instance-attribute ¤

rngs = rngs

name instance-attribute ¤

name = static(name)

stochastic instance-attribute ¤

stochastic = static(stochastic)

stream_name instance-attribute ¤

stream_name = static(stream_name)

apply ¤

apply(data: PyTree, state: PyTree, metadata: dict[str, Any] | None, random_params: Any = None, stats: dict[str, Any] | None = None) -> tuple[PyTree, PyTree, dict[str, Any] | None]

Apply cross-modal operation to element.

MUST be implemented by subclasses to provide cross-modal behavior.

This is a PURE FUNCTION that transforms a single data element. It should not access self.rngs or generate random numbers. All randomness comes through random_params argument.

Parameters:

Name Type Description Default
data PyTree

Element data PyTree (contains fields specified by config.input_fields) Typically dict[str, Array] with no batch dimension

required
state PyTree

Element state PyTree (typically dict[str, Any])

required
metadata dict[str, Any] | None

Element metadata dict

required
random_params Any

Random parameters for this element (from generate_random_params)

None
stats dict[str, Any] | None

Optional batch statistics (from get_statistics() or passed explicitly)

None

Returns:

Type Description
PyTree

Tuple of (transformed_data, new_state, new_metadata)

PyTree
  • transformed_data: PyTree with original fields + new output fields
dict[str, Any] | None
  • new_state: Updated state PyTree
tuple[PyTree, PyTree, dict[str, Any] | None]
  • new_metadata: Updated metadata dict
Implementation Pattern
def apply(self, data, state, metadata, random_params=None, stats=None):
    # 1. Extract input fields
    inputs = self._extract_inputs(data)

    # 2. Perform cross-modal operation
    outputs = self._cross_modal_transform(inputs, random_params, stats)

    # 3. Store outputs in data
    result = self._store_outputs(data, outputs)

    return result, state, metadata

Raises:

Type Description
NotImplementedError

If not implemented by subclass

generate_random_params ¤

generate_random_params(rng: Array, data_shapes: PyTree) -> PyTree

Generate random parameters for stochastic cross-modal operations.

MUST be implemented by stochastic operators (config.stochastic=True). Deterministic operators can use default implementation (returns None).

Generates PyTree of random parameters for cross-modal operations. For example, contrastive learning might generate per-element noise for augmentation.

This method is impure (uses RNG) and called once per batch. The generated parameters are then passed to apply() for each element via vmap.

Parameters:

Name Type Description Default
rng Array

JAX random key for this batch

required
data_shapes PyTree

PyTree with same structure as batch.data, containing shapes Examples: {"image_emb": (batch_size, dim), "text_emb": (batch_size, dim)}

required

Returns:

Type Description
PyTree

PyTree of random parameters for this batch.

PyTree

Structure depends on operator needs.

PyTree

For deterministic operators, returns None.

Examples:

# Stochastic contrastive operator with noise augmentation
def generate_random_params(self, rng, data_shapes):
    batch_size = data_shapes[self.config.input_fields[0]][0]
    # Generate per-element noise scales
    return jax.random.uniform(rng, (batch_size,), minval=0.0, maxval=0.1)

Raises:

Type Description
NotImplementedError

If stochastic=True but not implemented

get_operation_stats ¤

get_operation_stats() -> dict[str, int]

Get operation statistics.

Note: This method converts JAX arrays to Python ints for introspection. It is intended for use outside of JIT-compiled functions.

Returns:

Type Description
dict[str, int]

Dictionary with 'applied_count' and 'skipped_count'

reset_operation_stats ¤

reset_operation_stats() -> None

Reset operation statistics to zero.

Note: Creates new JAX arrays to reset the counters.

compute_statistics ¤

compute_statistics(data: Any) -> dict[str, Any] | None

Compute statistics from data using batch_stats_fn.

If batch_stats_fn is not configured, returns None. Computed statistics are cached in _computed_stats.

Parameters:

Name Type Description Default
data Any

Input data to compute statistics from

required

Returns:

Type Description
dict[str, Any] | None

Dictionary of statistics, or None if no batch_stats_fn configured

get_statistics ¤

get_statistics() -> dict[str, Any] | None

Get current statistics.

Returns precomputed_stats if configured (unless reset was called), otherwise returns cached computed statistics, or None if no statistics available.

Returns:

Type Description
dict[str, Any] | None

Dictionary of statistics, or None if no statistics available

set_statistics ¤

set_statistics(stats: dict[str, Any]) -> None

Manually set statistics.

This overwrites any previously computed statistics and clears reset flag.

Parameters:

Name Type Description Default
stats dict[str, Any]

Dictionary of statistics to set

required

reset_statistics ¤

reset_statistics() -> None

Reset all statistics to None.

This clears both computed statistics and marks that precomputed_stats should be ignored (via internal flag). After reset, get_statistics() will return None until new statistics are set or computed.

reset_cache ¤

reset_cache() -> None

Clear the cache.

Only has effect if cacheable=True in config.

copy ¤

copy(*, config: DataraxModuleConfig | None = None, rngs: Rngs | None = None, name: str | None = None) -> DataraxModule

Create a copy of this module with optional config/parameter changes.

This allows creating a new module instance with modified configuration while preserving other attributes. Useful for hyperparameter tuning.

Parameters:

Name Type Description Default
config DataraxModuleConfig | None

New config (if None, uses current config)

None
rngs Rngs | None

New RNG state (if None, uses current rngs)

None
name str | None

New name (if None, uses current name)

None

Returns:

Type Description
DataraxModule

New module instance with updated parameters

Examples:

Change configuration¤

new_config = DataraxModuleConfig(cacheable=True) new_module = module.copy(config=new_config)

Change name only¤

renamed = module.copy(name="new_name")

Note

Subclasses can override this method to provide more fine-grained control over copying, such as allowing individual config field updates without requiring dataclass replace().

get_state ¤

get_state() -> dict[str, Any]

Get module state for checkpointing.

This method implements the Checkpointable protocol using NNX state management. It extracts all state variables from the module and converts them to a serializable format.

Returns:

Type Description
dict[str, Any]

A dictionary containing the internal state of the component.

set_state ¤

set_state(state: dict[str, Any]) -> None

Restore module state from a checkpoint.

This method implements the Checkpointable protocol using NNX state management. It restores the module state from a serialized format. Restoration is strict: checkpoint structure must match module state.

Parameters:

Name Type Description Default
state dict[str, Any]

A dictionary containing the internal state to restore.

required

Raises:

Type Description
TypeError

If state is not a dictionary.

ValueError

If checkpoint structure does not match module state.

clone ¤

clone() -> DataraxModule

Create a new instance with the same state as this module.

Uses NNX's clone function for proper deep cloning of all state.

Returns:

Type Description
DataraxModule

A new module instance with the same state.

requires_rng_streams ¤

requires_rng_streams() -> list[str] | None

Get the list of RNG streams required by this module.

Returns:

Type Description
list[str] | None

A list of required RNG stream names, or None if no RNG streams

list[str] | None

are required.

ensure_rng_streams ¤

ensure_rng_streams(stream_names: list[str]) -> None

Ensure that the required RNG streams are available.

Parameters:

Name Type Description Default
stream_names list[str]

A list of available RNG stream names.

required

Raises:

Type Description
ValueError

If a required RNG stream is not available.

get_output_structure ¤

get_output_structure(sample_data: PyTree, sample_state: PyTree) -> tuple[PyTree, PyTree]

Declare output PyTree structure for vmap axis specification.

Default uses jax.eval_shape to discover structure automatically. Override for efficiency or when eval_shape doesn't work (e.g., data-dependent shapes).

Parameters:

Name Type Description Default
sample_data PyTree

Single element data (not batched)

required
sample_state PyTree

Single element state (not batched)

required

Returns:

Type Description
PyTree

Tuple of (output_data_structure, output_state_structure) with None leaves.

PyTree

The structure (keys/nesting) matters, leaf values are ignored.

Example override for operator that adds keys

def get_output_structure(self, sample_data, sample_state): out_data = { **jax.tree.map(lambda _: None, sample_data), "score": None, "alignment": None, } return out_data, sample_state

apply_batch ¤

apply_batch(batch: Batch, stats: dict[str, Any] | None = None) -> Batch

Process entire batch with vmap and optional RNG generation.

This method implements the batch processing logic for both stochastic and deterministic modes. It uses static branching on self.stochastic for JIT compilation efficiency.

The implementation delegates to _vmap_apply() for the shared computational core, then wraps the result in a Batch object.

Parameters:

Name Type Description Default
batch Batch

Input batch (Batch[Element] structure)

required
stats dict[str, Any] | None

Optional statistics (if None, uses get_statistics())

None

Returns:

Type Description
Batch

Transformed batch with same structure

Note

This method is concrete (not abstract). Subclasses typically don't override it, but can if they need custom batch processing logic.

output_spec ¤

output_spec(input_spec: PyTree) -> PyTree

Return the operator's output spec given an input spec.

Most operators (normalization, additive noise, simple element-wise transforms) do not change shape; the default returns input_spec unchanged. Shape-changing operators (Resize, Crop, Reshape) MUST override this method.

Parameters:

Name Type Description Default
input_spec PyTree

PyTree of jax.ShapeDtypeStruct describing the input element (matching the upstream DataSourceModule.element_spec() or another operator's output_spec).

required

Returns:

Type Description
PyTree

PyTree of jax.ShapeDtypeStruct describing the operator's output.

PyTree

By default, equal to input_spec.