modalities.dataloader package

Subpackages

Submodules

modalities.dataloader.create_index module

class modalities.dataloader.create_index.IndexGenerator(src_file, drop_faulty_entries=False)[source]

Bases: object

Initializes an IndexGenerator object. Reads a JSONL file as a binary file, and iterates through it character by character. It builds the sample index by tracking the start and end positions of each JSON sample based on the positions of newline (

) characters.

Args:

src_file (Path): Path to a jsonl-file. drop_faulty_entries (bool): Allow broken json entries in src_file by just skipping them.

Otherwise, the index generation fails with an exception.

Returns:

None

Parameters:
  • src_file (Path)

  • drop_faulty_entries (bool)

create_index(target_path_for_index_file)[source]

Creates an index file where each item in the index represents the start and length of a JSON document within a JSONL file.

Args:

target_path_for_index_file (Path): The path where the index file will be created.

Raises:

Exception: If an exception occurs during the indexing process.

Returns:

None

Parameters:

target_path_for_index_file (Path)

modalities.dataloader.create_packed_data module

class modalities.dataloader.create_packed_data.EmbeddedStreamData(data_path, load_index=True)[source]

Bases: object

Initializes an EmbeddedStreamData object.

Args:

data_path (Path): The path to the packed data file. load_index (bool, optional): Whether to load the index. Defaults to True.

Raises:

FileNotFoundError: If the packed data file is not found at the specified path.

Parameters:
  • data_path (Path)

  • load_index (bool | None)

DATA_SECTION_LENGTH_IN_BYTES = 8
HEADER_SIZE_IN_BYTES = 12
TOKEN_SIZE_DESCRIPTOR_LENGTH_IN_BYTES = 4
property data: ndarray
property index_base: list[tuple[int, int]]
exception modalities.dataloader.create_packed_data.EmptySampleError[source]

Bases: RuntimeError

class modalities.dataloader.create_packed_data.PackedDataGenerator(src_path, tokenizer, eod_token, number_of_processes, jq_pattern, processing_batch_size, raw_samples_queue_size, processed_samples_queue_size, index_path=None)[source]

Bases: object

Reads in a JSONL file and the corresponding index file and packs the dataset for LLM training.

Initializes a PackedDataGenerator object.

Args:

src_path (FilePath): Path to a JSONL file, which holds text data. tokenizer (TokenizerWrapper): PretrainedTokenizer object used to tokenize the provided data in src_path. eod_token (str): End-of-document token. number_of_processes (int): Number of processes used for parallel processing. jq_pattern (str): jq-pattern applied on every jsonl-entry. Results are afterwards tokenized and packed. processing_batch_size (int): Size of the batches that the workers process. raw_samples_queue_size (int): Maximum size of the raw samples queue. processed_samples_queue_size (int): Maximum size of the processed samples queue. index_path (Optional[FilePath]): Path to an index file,

which indicates the start character position and length of samples given in src_path. If not defined, an index file next to src_path is picked, by replacing its suffix with “.idx”. Defaults to None.

Returns:

None

Parameters:
  • src_path (Annotated[Path, PathType(path_type=file)])

  • tokenizer (TokenizerWrapper)

  • eod_token (str)

  • number_of_processes (int)

  • jq_pattern (str)

  • processing_batch_size (int)

  • raw_samples_queue_size (int)

  • processed_samples_queue_size (int)

  • index_path (Annotated[Path, PathType(path_type=file)] | None)

run(dst_path=None)[source]

Packs data and saves it to (default) dst_path.

Args:

dst_path (Optional[Path]): The destination path to save the packed data. If not provided, a default destination path will be used.

Raises:

ValueError: If the file already exists at the destination path. Exception: If an exception occurs during the data packing process.

Returns:

None

Parameters:

dst_path (Path | None)

modalities.dataloader.create_packed_data.join_embedded_stream_data(stream_data, target_file, chunk_size=2048)[source]

Joins the embedded stream data into a single file.

Args:

stream_data (list[EmbeddedStreamData]): A list of EmbeddedStreamData objects representing the stream data. target_file (Path): The target file to write the joined data to. chunk_size (int, optional): The size of each data chunk. Defaults to 2048.

Raises:

FileExistsError: If the target file already exists.

Returns:

None

Parameters:

modalities.dataloader.dataloader module

class modalities.dataloader.dataloader.LLMDataLoader(dataloader_tag, batch_sampler, dataset, batch_size=1, sampler=None, num_workers=0, collate_fn=None, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None, multiprocessing_context=None, generator=None, *, prefetch_factor=None, persistent_workers=False, pin_memory_device='')[source]

Bases: DataLoader[_T_co]

LLMDataLoader is a custom DataLoader class that extends the PyTorch DataLoader class.

Initializes a DataLoader object.

Args:

dataloader_tag (str): The tag for the dataloader. batch_sampler (BatchSampler): The batch sampler used for sampling batches. dataset (Dataset[T_co]): The dataset to load the data from. batch_size (Optional[int], optional): The number of samples per batch. Defaults to 1. sampler (Optional[Sampler | Iterable], optional): The sampler used for sampling data. Defaults to None. num_workers (int, optional): The number of worker processes to use for data loading. Defaults to 0. collate_fn (Optional[_collate_fn_t], optional): The function used to collate the data samples.

Defaults to None.

pin_memory (bool, optional): Flag indicating whether to pin the memory. Defaults to False. drop_last (bool, optional): Flag indicating whether to drop the last incomplete batch. Defaults to False. timeout (float, optional): The timeout value for collecting a batch from workers. Defaults to 0. worker_init_fn (Optional[_worker_init_fn_t], optional): The function used to initialize worker processes.

Defaults to None.

multiprocessing_context ([type], optional): The multiprocessing context to use. Defaults to None. generator ([type], optional): The random number generator. Defaults to None. prefetch_factor (Optional[int], optional): The number of batches to prefetch. Defaults to None. persistent_workers (bool, optional): Flag indicating whether to keep the workers alive

between data loading iterations. Defaults to False.

pin_memory_device (str, optional): The device to pin the memory to. Defaults to “”.

Returns:

None

Parameters:
property dataloader_tag: str

Returns the dataloader tag.

Returns:

str: The dataloader tag.

modalities.dataloader.dataloader_factory module

class modalities.dataloader.dataloader_factory.DataloaderFactory[source]

Bases: object

static get_dataloader(dataloader_tag, dataset, batch_sampler, collate_fn, num_workers, pin_memory)[source]

Factory method for the instantiation of LLMDataLoader.

Return type:

LLMDataLoader

Parameters:
Args:

dataloader_tag (str): Tag for the dataloader dataset (Dataset): Dataset to be used batch_sampler (BatchSampler): batch sampler for batch-wise sampling from the dataset collate_fn (Callable): Callable for shaping the batch num_workers (int): Number of workers for the dataloader pin_memory (bool): Flag indicating whether to pin memory

Returns:

LLMDataLoader: Instance of LLMDataLoader

modalities.dataloader.dataset module

class modalities.dataloader.dataset.CombinedDataset(datasets)[source]

Bases: Dataset

Combines multiple datasets into one large dataset at runtime.

Note: When using this class to combine multiple `PackedMemMapDataset`s, each packed sample is packed from a single dataset (i.e., the samples are not mixed between datasets). In the Dataloader, a batch will still contain packed samples from different datasets.

Initializes the CombinedDataset object, combining multiple datasets.

Args:

datasets (list[Dataset]): A list of datasets to combine.

Parameters:

datasets (list[Dataset])

class modalities.dataloader.dataset.Dataset(raw_data_path, sample_key)[source]

Bases: Dataset

Dataset class.

Initializes a Dataset object.

Args:

raw_data_path (Path): The path to the raw data. sample_key (str): The key used to access a sample in the dataset.

Parameters:
  • raw_data_path (Path)

  • sample_key (str)

class modalities.dataloader.dataset.DummyDataset(num_samples, sample_definition)[source]

Bases: Dataset

DummyDataset class.

Initializes a DummyDataset object with the given number of samples and sample definition. When calling the __getitem__ method, the dataset will return a random sample based on the sample definition.

Args:

num_samples (int): The number of samples in the dataset. sample_definition (tuple[DummySampleConfig]): A list of tuples defining the dataset output.

Each touple contains the sample key, shape and data type.

Returns:

None

Parameters:
class modalities.dataloader.dataset.DummyDatasetConfig(**data)[source]

Bases: BaseModel

DummyDatasetConfig is a configuration class for defining a dummy dataset.

Attributes:

num_samples (int): The number of samples in the dataset. sample_definition (list[DummySampleConfig]): The list of sample definitions in the dataset.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

num_samples: int
sample_definition: list[DummySampleConfig]
class modalities.dataloader.dataset.DummySampleConfig(**data)[source]

Bases: BaseModel

DummySampleConfig class represents the configuration for a dummy sample.

Attributes:

sample_key (str): The key of the sample. sample_shape (tuple[int, …]): The shape of the sample. sample_type (DummySampleDataType): The type of the sample.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

sample_key: str
sample_shape: tuple[int, ...]
sample_type: DummySampleDataType
class modalities.dataloader.dataset.DummySampleDataType(value)[source]

Bases: str, Enum

DummySampleDataType is an enumeration class that represents the data types for dummy samples.

Attributes:

FLOAT (str): Represents the float data type. INT (str): Represents the int data type.

FLOAT = 'float'
INT = 'int'
class modalities.dataloader.dataset.MemMapDataset(raw_data_path, tokenizer, sample_key, index_path=None, jq_pattern='.text')[source]

Bases: Dataset

Initializes the MemMapDataset object that represents a PyTorch Dataset with mmap support.

Args:

raw_data_path (Path): Path to a JSONL file, which holds text data. tokenizer (TokenizerWrapper): The tokenizer object that is required to tokenize text data. sample_key (str): The key to access the sample in the BatchEncoding. index_path (Optional[Path], optional): The path to the index file which indicates

the start character/byte position of documents. Defaults to None.

jq_pattern (str, optional): The jq pattern to filter the data. Results are afterwards tokenized and packed.

Defaults to “.text”.

Returns:

None

Parameters:
  • raw_data_path (Path)

  • tokenizer (TokenizerWrapper)

  • sample_key (str)

  • index_path (Optional[Path])

  • jq_pattern (str)

class modalities.dataloader.dataset.PackedMemMapDatasetBase(raw_data_path, sample_key, load_index=True)[source]

Bases: Dataset

PackedMemMapDatasetBase class.

Initializes the PackedMemMapDatasetBase object.

Args:
raw_data_path (Path): Path to a packed binary file (*.pbin).

Use modalities data pack_encoded_data to create one based on a JSONL-file.

sample_key (str): The key to access the sample in the BatchEncoding. load_index (bool, optional): Flag indicating whether to load the index. Defaults to True.

Raises:

RuntimeError: If the token representation with the given size is not supported.

Returns:

None

Note:
TODO: sample_key should support multi-modal features using separately encoded inputs,

this needs to get replaced with a list of sample keys!

Parameters:
  • raw_data_path (Path)

  • sample_key (str)

  • load_index (Optional[bool])

DATA_SECTION_LENGTH_IN_BYTES = 8
HEADER_SIZE_IN_BYTES = 12
TOKEN_SIZE_DESCRIPTOR_LENGTH_IN_BYTES = 4
np_dtype_of_tokens_on_disk_from_bytes = {1: dtype('uint8'), 2: dtype('<u2'), 4: dtype('<u4')}
property token_size_in_bytes: int
type_converter_for_torch = {1: <class 'numpy.uint8'>, 2: <class 'numpy.int32'>, 4: <class 'numpy.int64'>}
class modalities.dataloader.dataset.PackedMemMapDatasetContinuous(raw_data_path, sample_key, block_size, load_index=False)[source]

Bases: PackedMemMapDatasetBase

PackedMemMapDatasetContinuous class.

Initializes the PackedMemMapDatasetContinuous object.

Args:
raw_data_path (Path): Path to a packed binary file (*.pbin).

Use modalities data pack_encoded_data to create one based on a JSONL-file.

sample_key (str): The key to access the sample in the BatchEncoding. block_size (int): The size of the block. load_index (bool, optional): Flag indicating whether to load the index.

This is only needed for debugging purposes to index the original documents. The continuous packing does not need to load the index and should be deactivated as it significantly increases the instantiation time. Defaults to False.

Returns:

None

Parameters:
  • raw_data_path (Path)

  • sample_key (str)

  • block_size (int)

  • load_index (Optional[bool])

class modalities.dataloader.dataset.PackedMemMapDatasetMegatron(raw_data_path, sample_key, block_size)[source]

Bases: PackedMemMapDatasetBase

Initializes the PackedMemMapDatasetBase object.

Args:
raw_data_path (Path): Path to a packed binary file (*.pbin).

Use modalities data pack_encoded_data to create one based on a JSONL-file.

sample_key (str): The key to access the sample in the BatchEncoding. load_index (bool, optional): Flag indicating whether to load the index. Defaults to True.

Raises:

RuntimeError: If the token representation with the given size is not supported.

Returns:

None

Note:
TODO: sample_key should support multi-modal features using separately encoded inputs,

this needs to get replaced with a list of sample keys!

Parameters:
  • raw_data_path (Path)

  • sample_key (str)

  • block_size (int)

modalities.dataloader.dataset_factory module

class modalities.dataloader.dataset_factory.DatasetFactory[source]

Bases: object

DatasetFactory for building the different dataset types.

static get_combined_dataset(datasets)[source]

Factory method for creating a combined datset .

Return type:

Dataset

Parameters:

datasets (list[Dataset])

Args:

datasets (list[Dataset]): List of datasets to combine.

Returns:

Dataset: CombinedDataset object.

static get_dummy_dataset(num_samples, sample_definition)[source]

Returns a DummyDataset object.

Return type:

DummyDataset

Parameters:
Args:

num_samples (int): The number of samples the dataset should generate. sample_definition (tuple[DummySampleConfig]): A list of tuples defining the dataset output.

Each tuple contains the sample key, shape and data type.

Returns:

DummyDataset: The generated DummyDataset object.

static get_mem_map_dataset(raw_data_path, tokenizer, sample_key, index_path=None, jq_pattern='.text')[source]

Returns a MemMapDataset object.

Return type:

MemMapDataset

Parameters:
  • raw_data_path (Path)

  • tokenizer (PreTrainedTokenizer)

  • sample_key (str)

  • index_path (Path | None)

  • jq_pattern (str)

Args:

raw_data_path (Path): The path to the raw data. tokenizer (PreTrainedTokenizer): The tokenizer used to tokenize the data. sample_key (str): The key used to retrieve the samples from the dataset. index_path (Optional[Path], optional): The path to the index file. Defaults to None. jq_pattern (str, optional): The pattern used to extract the text from the data. Defaults to “.text”.

Returns:

MemMapDataset: The MemMapDataset object.

static get_packed_mem_map_dataset_continuous(raw_data_path, sequence_length, sample_key)[source]

Returns a PackedMemMapDatasetContinuous object.

Return type:

PackedMemMapDatasetContinuous

Parameters:
  • raw_data_path (Path)

  • sequence_length (int)

  • sample_key (str)

Args:

raw_data_path (Path): The path to the raw data. sequence_length (int): The length of each sequence. sample_key (str): The key used to retrieve the samples from the dataset.

Returns:

PackedMemMapDatasetContinuous: The packed memory-mapped dataset.

static get_packed_mem_map_dataset_megatron(raw_data_path, sequence_length, sample_key)[source]
Return type:

PackedMemMapDatasetMegatron

Parameters:
  • raw_data_path (Path)

  • sequence_length (int)

  • sample_key (str)

static get_raw_index(raw_index_path)[source]
Return type:

list[tuple[int, int]]

Parameters:

raw_index_path (Path)

modalities.dataloader.large_file_lines_reader module

class modalities.dataloader.large_file_lines_reader.BaseReader[source]

Bases: ABC

class modalities.dataloader.large_file_lines_reader.LargeFileLinesReader(raw_data_path, index_path=None, encoding='utf-8', use_sample_length_from_index=True)[source]

Bases: BaseReader

LargeFileLinesReader class that read lines from a large file efficiently.

Initializes a LargeFileLinesReader object.

Args:

raw_data_path (Path): Path to a jsonl file, which holds text data. index_path (Optional[Path]): Path to an index file, which indicates the start character/byte position

and length of samples given in raw_data_path. If not defined, an index next to raw_data_path is picked, by replacing its suffix with “.idx”.

encoding (Optional[str]): The encoding of the file (default: “utf-8”).

If encoding is None, the raw data is read as bytes.

use_sample_length_from_index (bool): If True, the sample length is taken from the index file

i.e., the (offset, sample_length) pairs. If False, the sample length is calculated as the difference between the starting point of the next and the current sample.

Returns:

None

Parameters:
  • raw_data_path (Path)

  • index_path (Path | None)

  • encoding (str | None)

  • use_sample_length_from_index (bool)

close()[source]
static default_index_path(raw_data_path, index_path=None)[source]

Returns the default index path for the given raw data path.

Return type:

Path

Parameters:
  • raw_data_path (Path)

  • index_path (Path | None)

Args:

raw_data_path (Path): The path to the raw data file. index_path (Optional[Path]): The path to the index file (default: None).

Returns:

Path: The default index path.

Note:
If index_path is not provided, the default index path is generated by

appending the extension “.idx” to the stem of the raw_data_path.

modalities.dataloader.samplers module

class modalities.dataloader.samplers.ResumableDistributedSampler(dataset, rank, num_replicas=None, epoch=0, shuffle=False, seed=0, drop_last=False, skip_num_global_samples=0)[source]

Bases: Sampler[T_co]

Sampler that restricts data loading to a subset of the dataset. We adopted this class from pytorch’s DistributedSampler class and added the ability to resume from a specific index. source: https://github.com/pytorch/pytorch/blob/main/torch/utils/data/distributed.py

It is especially useful in conjunction with torch.nn.parallel.DistributedDataParallel. In such a case, each process can pass a DistributedSampler instance as a DataLoader sampler, and load a subset of the original dataset that is exclusive to it.

Note

Dataset is assumed to be of constant size and that any instance of it always returns the same elements in the same order.

Instantiates a distributed and resumable Sampler object.

Args:

dataset (Dataset): The dataset to sample from. rank (int): The global rank of the current process. num_replicas (int, optional): Number of replicas.

This usually equals the world size. Defaults to None.

epoch (int, optional): Current epoch. Defaults to 0. shuffle (bool, optional): Boolean flag whether to shuffle the data. Defaults to False. seed (int, optional): Seed for the shuffling. Defaults to 0. drop_last (bool, optional): Boolean flag indicating whether to drop the last samples

that cannot be distributed over all ranks (i.e., maximum world size - samples). If drop_last is false padding is applied for these samples, by resampling the initial samples. Defaults to False.

skip_num_global_samples (int, optional): Number of samples to skip, e.g., due to warmstart.

Defaults to 0.

Raises:

RuntimeError: Requires distributed package to be available if num_replicas is None.

Parameters:
  • dataset (Dataset)

  • rank (int)

  • num_replicas (int | None)

  • epoch (int | None)

  • shuffle (bool | None)

  • seed (int | None)

  • drop_last (bool | None)

  • skip_num_global_samples (int | None)

Module contents