modalities package

Subpackages

Submodules

modalities.api module

class modalities.api.FileExistencePolicy(value)[source]

Bases: Enum

ERROR = 'error'
OVERRIDE = 'override'
SKIP = 'skip'
modalities.api.convert_pytorch_to_hf_checkpoint(config_file_path, output_hf_checkpoint_dir, prediction_key)[source]

Converts a PyTorch checkpoint to a Hugging Face checkpoint.

Return type:

HFModelAdapter

Parameters:
  • config_file_path (Path)

  • output_hf_checkpoint_dir (Path)

  • prediction_key (str)

Args:

config_file_path (Path): Path to the config that generated the pytorch checkpoint. output_hf_checkpoint_dir (Path): Path to the output directory for the converted HF checkpoint. prediction_key (str): The key in the models output where one can find the predictions of interest.

Returns:

HFModelAdapter: The Hugging Face model adapter.

modalities.api.create_filtered_tokenized_dataset(input_data_path, filter_routine, output_data_path, file_existence_policy)[source]
Parameters:
modalities.api.create_raw_data_index(src_path, index_path, file_existence_policy=FileExistencePolicy.ERROR)[source]

Creates the index file for the content of a large jsonl-file. The index file contains the byte-offsets and lengths of each line in the jsonl-file. Background is the ability to further process the respective file without loading it, while splitting its content line-based. This step is necessary in advance of further processing like tokenization. It is only necessary once for a jsonl-file and allows therefore different tokenizations without re-indexing.

Args:

src_path (Path): The path to the jsonl-file. index_path (Path): The path to the index file, that will be created. file_existence_policy (FileExistencePolicy): Policy to apply when the index file already exists.

Defaults to FileExistencePolicy.ERROR.

Raises:

ValueError: If the index file already exists.

Parameters:
modalities.api.create_shuffled_dataset_chunk(file_path_list, output_chunk_file_path, chunk_id, num_chunks, file_existence_policy, global_seed=None)[source]

Creates a shuffled dataset chunk. Given a dataset consisting of multiple tokenized pbin files, this function creates a shuffled dataset chunk for a given chunk id. From each tokenized pbin file, the respective chunk is extracted, shuffled and written to a new pbin file.

Args:

file_path_list (list[Path]): List of paths to the tokenized input pbin files. output_chunk_file_path (Path): Path to the output chunk which will be stored in pbin format. chunk_id (int): The id of the chunk to create. num_chunks (int): The total number of chunks to create. file_existence_policy (FileExistencePolicy): Policy to apply when the output chunk file already exists. global_seed (Optional[int]): The global seed to use for shuffling.

Raises:

ValueError: If the chunk has no samples.

Parameters:
modalities.api.create_shuffled_jsonl_dataset_chunk(file_path_list, output_chunk_file_path, chunk_id, num_chunks, file_existence_policy, global_seed=None)[source]

Creates a shuffled jsonl dataset chunk. Given a dataset consisting of multiple jsonl files, this function creates a shuffled dataset chunk for a given chunk id. From each jsonl file, the respective chunk is extracted, shuffled and written to a new jsonl file.

Args:

file_path_list (list[Path]): List of paths to the input jsonl files. output_chunk_file_path (Path): Path to the output chunk which will be stored in jsonl format. chunk_id (int): The id of the chunk to create. num_chunks (int): The total number of chunks to create. file_existence_policy (FileExistencePolicy): Policy to apply when the output chunk file already exists. global_seed (Optional[int]): The global seed to use for shuffling.

Raises:

ValueError: If the chunk has no samples.

Parameters:
modalities.api.enforce_file_existence_policy(file_path, file_existence_policy)[source]

Enforces the file existence policy. Function returns True, if processing should be stopped. Otherwise False.

Return type:

bool

Parameters:
Args:

file_path (Path): File path to the file to check. file_existence_policy (FileExistencePolicy): The file existence policy.

Raises:

ValueError: Raised if the file existence policy is unknown or the policy requires to raise a ValueError.

Returns:

bool: True if processing should be stopped, otherwise False.

modalities.api.generate_text(config_file_path)[source]

Inference function to generate text with a given model.

Args:

config_file_path (FilePath): Path to the YAML config file.

Parameters:

config_file_path (Annotated[Path, PathType(path_type=file)])

modalities.api.merge_packed_data_files(src_paths, target_path)[source]

Utility function for merging different pbin-files into one. This is especially useful, if different datasets were at different points in time or if one encoding takes so long, that the overall process was done in chunks. It is important that the same tokenizer got used for all chunks.

Specify an arbitrary amount of pbin-files and/or directory containing such as input.

Args:

src_paths (list[Path]): List of paths to the pbin-files or directories containing such. target_path (Path): The path to the merged pbin-file, that will be created.

Parameters:
modalities.api.pack_encoded_data(config_dict, file_existence_policy)[source]

Packs and encodes an indexed, large jsonl-file. (see also create_index for more information) Returns .pbin-file, which can be inserted into a training process directly and does not require its original jsonl-file or the respective index file anymore.

Args:

config_dict (dict): Dictionary containing the configuration for the packed data generation. file_existence_policy (FileExistencePolicy): Policy to apply when the output file already exists.

Parameters:
modalities.api.shuffle_jsonl_data(input_data_path, output_data_path, file_existence_policy, seed=None)[source]

Shuffles a JSONL file (.jsonl) and stores it on disc.

Args:

input_data_path (Path): File path to the jsonl data (.jsonl). output_data_path (Path): File path to write the shuffled jsonl data. file_existence_policy (FileExistencePolicy): Policy to apply when the output file already exists. seed (Optional[int]): The seed to use for shuffling.

Parameters:
modalities.api.shuffle_tokenized_data(input_data_path, output_data_path, batch_size, file_existence_policy, seed=None)[source]

Shuffles a tokenized file (.pbin) and stores it on disc.

Args:

input_data_path (Path): File path to the tokenized data (.pbin). output_data_path (Path): File path to write the shuffled tokenized data. batch_size (int): Number of documents to process per batch. file_existence_policy (FileExistencePolicy): Policy to apply when the output file already exists. seed (Optional[int]): The seed to use for shuffling.

Parameters:

modalities.batch module

class modalities.batch.Batch[source]

Bases: ABC

Abstract class that defines the necessary methods any Batch implementation needs to implement.

class modalities.batch.DatasetBatch(samples, targets, batch_dim=0)[source]

Bases: Batch, TorchDeviceMixin

A batch of samples and its targets. Used to batch train a model.

Parameters:
batch_dim: int = 0
detach()[source]
property device: device
samples: dict[str, Tensor] = <dataclasses._MISSING_TYPE object>
targets: dict[str, Tensor] = <dataclasses._MISSING_TYPE object>
to(device)[source]
Parameters:

device (device)

class modalities.batch.EvaluationResultBatch(dataloader_tag, num_train_steps_done, losses=<factory>, metrics=<factory>, throughput_metrics=<factory>)[source]

Bases: Batch

Data class for storing the results of a single or multiple batches. Also entire epoch results are stored in here.

Parameters:
dataloader_tag: str = <dataclasses._MISSING_TYPE object>
losses: dict[str, ResultItem] = <dataclasses._MISSING_TYPE object>
metrics: dict[str, ResultItem] = <dataclasses._MISSING_TYPE object>
num_train_steps_done: int = <dataclasses._MISSING_TYPE object>
throughput_metrics: dict[str, ResultItem] = <dataclasses._MISSING_TYPE object>
class modalities.batch.InferenceResultBatch(targets, predictions, batch_dim=0)[source]

Bases: Batch, TorchDeviceMixin

Stores targets and predictions of an entire batch.

Parameters:
batch_dim: int = 0
detach()[source]
property device: device
get_predictions(key)[source]
Return type:

Tensor

Parameters:

key (str)

get_targets(key)[source]
Return type:

Tensor

Parameters:

key (str)

predictions: dict[str, Tensor] = <dataclasses._MISSING_TYPE object>
targets: dict[str, Tensor] = <dataclasses._MISSING_TYPE object>
to(device)[source]
Parameters:

device (device)

to_cpu()[source]
class modalities.batch.ResultItem(value, decimal_places=None)[source]

Bases: object

Parameters:
decimal_places: Optional[int] = None
value: Tensor = <dataclasses._MISSING_TYPE object>
class modalities.batch.TorchDeviceMixin[source]

Bases: ABC

abstractmethod detach()[source]
abstract property device: device
abstractmethod to(device)[source]
Parameters:

device (device)

modalities.evaluator module

class modalities.evaluator.Evaluator(progress_publisher, evaluation_result_publisher)[source]

Bases: object

Evaluator class which is responsible for evaluating the model on a set of datasets

Initializes the Evaluator class.

Args:

progress_publisher (MessagePublisher[ProgressUpdate]): Publisher for progress updates evaluation_result_publisher (MessagePublisher[EvaluationResultBatch]): Publisher for evaluation results

Parameters:
evaluate(model, data_loaders, loss_fun, num_train_steps_done)[source]

Evaluate the model on a set of datasets.

Return type:

dict[str, EvaluationResultBatch]

Parameters:
Args:

model (nn.Module): The model to evaluate data_loaders (list[LLMDataLoader]): List of dataloaders to evaluate the model on loss_fun (Callable[[InferenceResultBatch], torch.Tensor]): The loss function to calculate the loss num_train_steps_done (int): The number of training steps done so far for logging purposes

Returns:

dict[str, EvaluationResultBatch]: A dictionary containing the evaluation results for each dataloader

evaluate_batch(batch, model, loss_fun)[source]

Evaluate a single batch by forwarding it through the model and calculating the loss.

Return type:

Tensor

Parameters:
Args:

batch (DatasetBatch): The batch to evaluate model (nn.Module): The model to evaluate loss_fun (Callable[[InferenceResultBatch], torch.Tensor]): The loss function to calculate the loss

Returns:

torch.Tensor: The loss of the batch

modalities.exceptions module

exception modalities.exceptions.BatchStateError[source]

Bases: Exception

exception modalities.exceptions.CheckpointingError[source]

Bases: Exception

exception modalities.exceptions.ConfigError[source]

Bases: Exception

exception modalities.exceptions.DatasetNotFoundError[source]

Bases: Exception

exception modalities.exceptions.ModelStateError[source]

Bases: Exception

exception modalities.exceptions.OptimizerError[source]

Bases: Exception

exception modalities.exceptions.RunningEnvError[source]

Bases: Exception

exception modalities.exceptions.TimeRecorderStateError[source]

Bases: Exception

modalities.gym module

class modalities.gym.Gym(trainer, evaluator, loss_fun, num_ranks)[source]

Bases: object

Class to perform the model training, including evaluation and checkpointing.

Initializes a Gym object.

Args:

trainer (Trainer): Trainer object to perform the training. evaluator (Evaluator): Evaluator object to perform the evaluation. loss_fun (Loss): Loss function applied during training and evaluation. num_ranks (int): Number of ranks used for distributed training.

Parameters:
run(app_state, training_log_interval_in_steps, checkpointing_interval_in_steps, evaluation_interval_in_steps, train_data_loader, evaluation_data_loaders, checkpoint_saving)[source]

Runs the model training, including evaluation and checkpointing.

Args:

app_state (AppState): Application state containing the model, optimizer and lr scheduler. training_log_interval_in_steps (int): Interval in steps to log training progress. checkpointing_interval_in_steps (int): Interval in steps to save checkpoints. evaluation_interval_in_steps (int): Interval in steps to perform evaluation. train_data_loader (LLMDataLoader): Data loader with the training data. evaluation_data_loaders (list[LLMDataLoader]): List of data loaders with the evaluation data. checkpoint_saving (CheckpointSaving): Routine for saving checkpoints.

Parameters:

modalities.loss_functions module

class modalities.loss_functions.CLMCrossEntropyLoss(target_key, prediction_key, tag='CLMCrossEntropyLoss')[source]

Bases: Loss

Parameters:
  • target_key (str)

  • prediction_key (str)

  • tag (str)

class modalities.loss_functions.Loss(tag)[source]

Bases: ABC

Parameters:

tag (str)

property tag: str
class modalities.loss_functions.NCELoss(prediction_key1, prediction_key2, is_asymmetric=True, temperature=1.0, tag='NCELoss')[source]

Bases: Loss

Noise Contrastive Estimation Loss

Args:

prediction_key1 (str): key to access embedding 1. prediction_key2 (str): key to access embedding 2. is_asymmetric (bool, optional): specifies symmetric or asymmetric calculation of NCEloss. Defaults to True. temperature (float, optional): temperature. Defaults to 1.0. tag (str, optional): Defaults to “NCELoss”.

Parameters:
  • prediction_key1 (str)

  • prediction_key2 (str)

  • is_asymmetric (bool)

  • temperature (float)

  • tag (str)

modalities.loss_functions.nce_loss(embedding1, embedding2, device, is_asymmetric, temperature)[source]

This implementation calculates the noise contrastive estimation loss between embeddings of two different modalities Implementation slightly adapted from https://arxiv.org/pdf/1912.06430.pdf, https://github.com/antoine77340/MIL-NCE_HowTo100M changes include adding a temperature value and the choice of calculating asymmetric loss w.r.t. one modality This implementation is adapted to contrastive loss from CoCa model https://arxiv.org/pdf/2205.01917.pdf

Return type:

Tensor

Parameters:
Args:

embedding1 (torch.Tensor): embeddings from modality 1 of size batch_size x embed_dim. embedding2 (torch.Tensor): embeddings from modality 2 of size batch_size x embed_dim. device (torch.device): torch device for calculating loss. is_asymmetric (bool): boolean value to specify if the loss is calculated in one direction or both directions. temperature (float): temperature value for regulating loss.

Returns:

torch.Tensor: loss tensor.

modalities.trainer module

class modalities.trainer.ThroughputAggregationKeys(value)[source]

Bases: Enum

FORWARD_BACKWARD_TIME = 'FORWARD_BACKWARD_TIME'
NUM_SAMPLES = 'NUM_SAMPLES'
class modalities.trainer.Trainer(global_rank, progress_publisher, evaluation_result_publisher, gradient_acc_steps, global_num_tokens_per_train_step, num_seen_train_steps, global_num_seen_tokens, num_target_steps, num_target_tokens, gradient_clipper, mfu_calculator=None)[source]

Bases: object

Initializes the Trainer object.

Args:

global_rank (int): The global rank to which operates the trainer object. progress_publisher (MessagePublisher[ProgressUpdate]): The publisher for progress updates. evaluation_result_publisher (MessagePublisher[EvaluationResultBatch]):

The publisher for evaluation result batches.

gradient_acc_steps (int): The number of gradient accumulation steps. global_num_tokens_per_train_step (int): The number of global tokens per training step. num_seen_train_steps (int): The number of training steps already seen. global_num_seen_tokens (int): The number of tokens already seen. num_target_steps (int): The target number of training steps. num_target_tokens (int): The target number of tokens. gradient_clipper (GradientClipperIF): The gradient clipper. mfu_calculator (Optional[MFUCalculatorABC]): The MFU calculator.

Returns:

None

Parameters:
train(app_state, train_loader, loss_fun, training_log_interval_in_steps, evaluation_callback, checkpointing_callback)[source]

Trains the model.

Args:

app_state (AppState): The application state containing the model, optimizer and lr scheduler. train_loader (LLMDataLoader): The data loader containing the training data. loss_fun (Loss): The loss function used for training. training_log_interval_in_steps (int): The interval at which training progress is logged. evaluation_callback (Callable[[TrainingProgress], None]): A callback function for evaluation. checkpointing_callback (Callable[[TrainingProgress], None]): A callback function for checkpointing.

Returns:

None

Parameters:

modalities.util module

class modalities.util.Aggregator[source]

Bases: Generic[T]

add_value(key, value)[source]
Parameters:
get_all_reduced_value(key, reduce_operation=<RedOpType.SUM: 0>, postprocessing_fun=None)[source]
Return type:

Tensor

Parameters:
remove_key(key)[source]
Parameters:

key (T)

remove_keys()[source]
class modalities.util.TimeRecorder[source]

Bases: object

Class with context manager to record execution time

reset()[source]
start()[source]
stop()[source]
class modalities.util.TimeRecorderStates(value)[source]

Bases: Enum

RUNNING = 'RUNNING'
STOPPED = 'STOPPED'
modalities.util.format_metrics_to_gb(item)[source]

quick function to format numbers to gigabyte and round to 4 digit precision

modalities.util.get_experiment_id_of_run(config_file_path, hash_length=8, max_experiment_id_byte_length=1024)[source]

Create a unique experiment ID for the current run on rank 0 and broadcast it to all ranks. Internally, the experiment ID is generated by hashing the configuration file path and appending the current date and time. The experiment ID is then converted to a byte array (with maximum length of max_experiment_id_byte_length) and broadcasted to all ranks. In the unlikely case of the experiment ID being too long, a ValueError is raised and max_experment_id_byte_length must be increased. Each rank then decodes the byte array to the original string representation and returns it. Having a globally synced experiment ID is mandatory for saving files / checkpionts in a distributed training setup.

Return type:

str

Parameters:
  • config_file_path (Path)

  • hash_length (int | None)

  • max_experiment_id_byte_length (int | None)

Args:

config_file_path (Path): Path to the configuration file. hash_length (Optional[int], optional): Defines the char length of the commit hash. Defaults to 8. max_experiment_id_byte_length (Optional[int]): Defines max byte length of the experiment_id

to be shared to other ranks. Defaults to 1024.

Returns:

str: The experiment ID.

modalities.util.get_local_number_of_trainable_parameters(model)[source]

Returns the number of trainable parameters that are materialized on the current rank. The model can be sharded with FSDP1 or FSDP2 or not sharded at all.

Return type:

int

Parameters:

model (Module)

Args:

model (nn.Module): The model for which to calculate the number of trainable parameters.

Returns:

int: The number of trainable parameters materialized on the current rank.

modalities.util.get_module_class_from_name(module, name)[source]

From Accelerate source code (https://github.com/huggingface/accelerate/blob/1f7a79b428749f45187ec69485f2c966fe21926e/src/accelerate/utils/dataclasses.py#L1902) Gets a class from a module by its name.

Return type:

Optional[Type[Module]]

Parameters:
Args:

module (torch.nn.Module): The module to get the class from. name (str): The name of the class.

modalities.util.get_total_number_of_trainable_parameters(model)[source]

Returns the total number of trainable parameters across all ranks. The model must be sharded with FSDP1 or FSDP2.

Return type:

Union[int, float, bool]

Parameters:

model (FullyShardedDataParallel | FSDPModule)

Args:

model (FSDPX): The model for which to calculate the number of trainable parameters.

Returns:

Number: The total number of trainable parameters across all ranks.

modalities.util.parse_enum_by_name(name, enum_type)[source]
Return type:

Enum

Parameters:
modalities.util.print_rank_0(message)[source]

If torch.distributed is initialized, print only on rank 0.

Parameters:

message (str)

modalities.util.warn_rank_0(message)[source]

If torch.distributed is initialized, print only on rank 0.

Parameters:

message (str)

Module contents