Skip to content

BaseDataset

Bases: JSONLMixin, BaseTextDataset

Source code in src/llm_datasets/datasets/jsonl_dataset.py
class JSONLDataset(JSONLMixin, BaseTextDataset):  # TODO rename to JSONLTextDataset
    def get_text_from_item(self, item) -> str:
        """This simply returns the text field from item (but dataset classes can override this to implement filtering etc.)"""
        return item[self.raw_jsonl_text_field]

    def get_document_from_item(self, item) -> Document:
        """This simply returns the document with a text field from item (but dataset classes can override this to implement filtering etc.)"""
        return Document(text=item[self.raw_jsonl_text_field])

    def get_texts_from_file_handler(self, file_handler):
        if hasattr(self.config, "use_documents") and self.config.use_documents:
            getter_func = self.get_document_from_item
        else:
            getter_func = self.get_text_from_item

        for line in file_handler:
            item = json.loads(line)
            text = getter_func(item)

            if text:
                yield text

    def get_texts_from_file_path(self, file_path: str | Path):
        logger.info(f"Reading from {file_path}")

        if (
            isinstance(file_path, str) and file_path.endswith(".zst")
        ) or file_path.suffix == ".zst":  # zstd compression
            with open(file_path, "rb") as zf:
                dctx = zstd.ZstdDecompressor()  # uncompress zstd
                with dctx.stream_reader(zf) as reader:
                    f = io.BufferedReader(reader)
                    yield from self.get_texts_from_file_handler(f)
        else:
            with open(file_path) as f:  # jsonl or jsonl.fz (via smart_open)
                yield from self.get_texts_from_file_handler(f)

    def get_texts(self):
        """Iterate over all input files and read JSON from each line."""
        # if self.workers == 1:
        yield from self.get_texts_with_single_proc()
        # else:
        #     yield from self.get_texts_with_multi_proc()

    def get_texts_with_multi_proc(self):
        """Iterate over all input files in parallel and read JSON from each line."""
        raise NotImplementedError()
        # # with multiprocessing.Pool(self.workers) as pool:
        # with multiprocess.Pool(self.workers) as pool:
        #     for text in flatmap(pool, self.get_texts_from_file_path, self.get_raw_jsonl_paths()):
        #         yield text

        # print("all files done")

    def get_texts_with_single_proc(self):
        """Iterate over all input files and read JSON from each line."""
        processed_files = 0
        for file_path in self.get_raw_jsonl_paths():
            yield from self.get_texts_from_file_path(file_path)

            processed_files += 1

        if processed_files == 0:
            logger.warning("No file has been processed.")

get_document_from_item(item)

This simply returns the document with a text field from item (but dataset classes can override this to implement filtering etc.)

Source code in src/llm_datasets/datasets/jsonl_dataset.py
def get_document_from_item(self, item) -> Document:
    """This simply returns the document with a text field from item (but dataset classes can override this to implement filtering etc.)"""
    return Document(text=item[self.raw_jsonl_text_field])

get_text_from_item(item)

This simply returns the text field from item (but dataset classes can override this to implement filtering etc.)

Source code in src/llm_datasets/datasets/jsonl_dataset.py
def get_text_from_item(self, item) -> str:
    """This simply returns the text field from item (but dataset classes can override this to implement filtering etc.)"""
    return item[self.raw_jsonl_text_field]

get_texts()

Iterate over all input files and read JSON from each line.

Source code in src/llm_datasets/datasets/jsonl_dataset.py
def get_texts(self):
    """Iterate over all input files and read JSON from each line."""
    # if self.workers == 1:
    yield from self.get_texts_with_single_proc()

get_texts_with_multi_proc()

Iterate over all input files in parallel and read JSON from each line.

Source code in src/llm_datasets/datasets/jsonl_dataset.py
def get_texts_with_multi_proc(self):
    """Iterate over all input files in parallel and read JSON from each line."""
    raise NotImplementedError()

get_texts_with_single_proc()

Iterate over all input files and read JSON from each line.

Source code in src/llm_datasets/datasets/jsonl_dataset.py
def get_texts_with_single_proc(self):
    """Iterate over all input files and read JSON from each line."""
    processed_files = 0
    for file_path in self.get_raw_jsonl_paths():
        yield from self.get_texts_from_file_path(file_path)

        processed_files += 1

    if processed_files == 0:
        logger.warning("No file has been processed.")