egrecho.utils.io.files#

Utilites to parse file paths.

egrecho.utils.io.files.is_remote_url(url_or_filename)[source]#

Whether a path is remote.

Return type:

bool

egrecho.utils.io.files.get_filename(filepath)[source]#

get file’s name from remote or local.

class egrecho.utils.io.files.DataFilesList(data_files, origin_metadata)[source]#

Bases: List[Union[Path, Url]]

List of data files (absolute local paths or URLs).

  • from_local_or_remote: resolve patterns from a local path

Moreover DataFilesList has an additional attribute origin_metadata. It can store:

  • the last modified time of local files.

  • Url metadata is not implemented currently.

egrecho.utils.io.files.sanitize_patterns(patterns)[source]#

Take the data_files patterns from the user, and format them into a dictionary. Each key is the name of the split, and each value is a list of data files patterns (paths or urls). The default split is “train”.

Returns:

dictionary of split_name -> list of file_patterns

Return type:

patterns

class egrecho.utils.io.files.DataFilesDict[source]#

Bases: Dict[str, DataFilesList]

Dict of split_name -> list of data files (absolute local paths or URLs).

  • from_local_or_remote: resolve patterns from a local path

Moreover each list is a DataFilesList. For more info, see DataFilesList.

class egrecho.utils.io.files.DataFoldersDict[source]#

Bases: Dict[str, List[str]]

Dict of split_name -> list of data folders (absolute local paths or URLs).

  • from_local_or_remote: resolve patterns from a local path

egrecho.utils.io.files.resolve_patterns_locally_or_by_urls(base_path, patterns, allowed_extensions=None)[source]#

Resolve the paths and URLs of the data files from the patterns passed by the user. URLs are just returned as is.

You can use patterns to resolve multiple local files. Here are a few examples:

  • *.csv to match all the CSV files at the first level

  • **.csv to match all the CSV files at any level

  • data/* to match all the files inside “data”

  • data/** to match all the files inside “data” and its subdirectories

The patterns are resolved using the fsspec glob. Here are some behaviors specific to fsspec glob that are different from glob.glob, Path.glob, Path.match or fnmatch:

  • '*' matches only first level items

  • '**' matches all items

  • '**/*' matches all at least second level items

More generally:

  • '*' matches any character except a forward-slash (to match just the file or directory name)

  • '**' matches any character including a forward-slash /

Hidden files and directories (i.e. whose names start with a dot) are ignored, unless they are explicitly requested. The same applies to special directories that start with a double underscore like “__pycache__”. You can still include one if the pattern explicilty mentions it:

  • to include a hidden file: "*/.hidden.txt" or "*/.*"

  • to include a hidden directory: ".hidden/*" or ".*/*"

  • to include a special directory: "__special__/*" or "__*/*"

    e.g., glob.glob('**/*', recursive=True), the last /* is invalid as greedy mode of first pattern '**'.

Parameters:
  • base_path (str) -- Base path to use when resolving relative paths.

  • patterns (List[str]) -- Unix patterns or paths or URLs of the data files to resolve. The paths can be absolute or relative to base_path.

  • allowed_extensions (Optional[list], optional) -- White-list of file extensions to use. Defaults to None (all extensions). For example: allowed_extensions=[“csv”, “json”, “txt”, “parquet”]

Returns:

List of paths or URLs to the local or remote files that match the patterns.

Return type:

List[Union[Path, Url]]

egrecho.utils.io.files.resolve_folders_patterns(base_path, patterns)[source]#

Resolve all matching folder paths based on the given base path and patterns.

This function searches the specified base path for all folders that match the given patterns. It returns a list of Path objects representing the matched folders.

Parameters:
  • base_path (str) -- Base path to use when resolving relative paths.

  • patterns (List[str]) -- A list of pattern strings used to match folder names. The paths can be absolute or relative to base_path.

Returns:

a list of Path objects representing all matched folders.

Return type:

List[Path]

egrecho.utils.io.files.resolve_patterns(base_path, patterns, allowed_extensions=None)[source]#

Resolve the paths and URLs of the data files from the patterns passed by the user. URLs are just returned as is.

You can use patterns to resolve multiple local files. Here are a few examples:

  • *.csv to match all the CSV files at the first level

  • **.csv to match all the CSV files at any level

  • data/* to match all the files inside “data”

  • data/** to match all the files inside “data” and its subdirectories

The patterns are resolved using the fsspec glob. Here are some behaviors specific to fsspec glob that are different from glob.glob, Path.glob, Path.match or fnmatch:

  • '*' matches only first level items

  • '**' matches all items

  • '**/*' matches all at least second level items

More generally:

  • '*' matches any character except a forward-slash (to match just the file or directory name)

  • '**' matches any character including a forward-slash /

Hidden files and directories (i.e. whose names start with a dot) are ignored, unless they are explicitly requested. The same applies to special directories that start with a double underscore like “__pycache__”. You can still include one if the pattern explicilty mentions it:

  • to include a hidden file: "*/.hidden.txt" or "*/.*"

  • to include a hidden directory: ".hidden/*" or ".*/*"

  • to include a special directory: "__special__/*" or "__*/*"

    e.g., glob.glob('**/*', recursive=True), the last /* is invalid as greedy mode of first pattern '**'.

Parameters:
  • base_path (str) -- Base path to use when resolving relative paths.

  • patterns (List[str]) -- Unix patterns or paths or URLs of the data files to resolve. The paths can be absolute or relative to base_path.

  • allowed_extensions (Optional[list], optional) -- White-list of file extensions to use. Defaults to None (all extensions). For example: allowed_extensions=[“csv”, “json”, “txt”, “parquet”]

Returns:

List of paths or URLs to the local or remote files that match the patterns.

Return type:

List[Union[Path, Url]]

egrecho.utils.io.files.resolve_file(fname, base_path=None)[source]#

Resolve a single file

if given base_path and rel fname, get the file subject the base_path.

egrecho.utils.io.kaldi#

egrecho.utils.io.kaldi.valid_kaldi_storage_name(name)[source]#

As we will add .scp, .ark suffix in writer, suffix should not contains them.

egrecho.utils.io.kaldi.close_cached_kaldi_handles()[source]#

Closes the cached file handles in lookup_cache_or_open and lookup_reader_cache_or_open (see respective docs for more details).

Return type:

None

egrecho.utils.io.kaldi.lookup_matrix_reader_cache_or_open(storage_path)#

Helper internal function used in KaldiMatrixReader. It opens kaldi scp files and keeps their handles open in a global program cache to avoid excessive amount of syscalls when the Reader class is instantiated and destroyed in a loop repeatedly (frequent use-case).

The file handles can be freed at any time by calling :meth`close_cached_file_handles`.

egrecho.utils.io.kaldi.lookup_vector_reader_cache_or_open(storage_path)#

Helper internal function used in KaldiVectorReader. It opens kaldi scp files and keeps their handles open in a global program cache to avoid excessive amount of syscalls when the Reader class is instantiated and destroyed in a loop repeatedly (frequent use-case).

The file handles can be freed at any time by calling close_cached_file_handles().

class egrecho.utils.io.kaldi.KaldiVectorReader(storage_path, *args, **kwargs)[source]#

Bases: object

Reads Kaldi’s vector (1-D float32) file (e.g., “xvector.scp”) using kaldi_native_io. storage_path corresponds to the path to file with suffix .scp. storage_key corresponds to the utterance-id in Kaldi.

Caution

Requires kaldi_native_io to be installed (pip install kaldi_native_io).

class egrecho.utils.io.kaldi.KaldiVectorWriter(storage_path, storage_name='xvector', **kwargs)[source]#

Bases: object

Write vector data (1-D float32) to Kaldi’s “.scp” and “.ark” files using kaldi_native_io. storage_path corresponds to a directory where we’ll create “xvector.scp” and “xvector.ark” files. storage_key corresponds to the utterance-id in Kaldi. storage_name specified the stem name, i.e., “xvector”.

Example:

>>> data = np.random.randn(192).astype(np.float32)
>>> with KaldiVectorWriter('xvectordir') as w:
...     w.write('utt1', data)
>>> reader = KaldiVectorReader('xvectordir/xvector.scp')
>>> read_data = reader.read('utt1')
>>> np.testing.assert_equal(data, read_data)

Caution

Requires kaldi_native_io to be installed (pip install kaldi_native_io).

class egrecho.utils.io.kaldi.KaldiMatrixReader(storage_path, *args, **kwargs)[source]#

Bases: object

Reads Kaldi’s “feats.scp” file using kaldi_native_io. storage_path corresponds to the path to feats.scp. storage_key corresponds to the utterance-id in Kaldi.

referring:

https://github.com/lhotse-speech/lhotse/blob/master/lhotse/features/io.py

Caution

Requires kaldi_native_io to be installed (pip install kaldi_native_io).

class egrecho.utils.io.kaldi.KaldiMatrixWriter(storage_path, storage_name='feats', *, compression_method=1, **kwargs)[source]#

Bases: object

Write data to Kaldi’s “feats.scp” and “feats.ark” files using kaldi_native_io. storage_path corresponds to a directory where we’ll create “feats.scp” and “feats.ark” files. storage_key corresponds to the utterance-id in Kaldi. storage_name specified the stem name, i.e., “feats”.

referring:

https://github.com/lhotse-speech/lhotse/blob/master/lhotse/features/io.py

The following compression_method values are supported by kaldi_native_io:

kAutomaticMethod = 1
kSpeechFeature = 2
kTwoByteAuto = 3
kTwoByteSignedInteger = 4
kOneByteAuto = 5
kOneByteUnsignedInteger = 6
kOneByteZeroOne = 7

Note

Setting compression_method works only with 2D arrays.

Example:

>>> data = np.random.randn(131, 80)
>>> with KaldiMatrixWriter('featdir') as w:
...     w.write('utt1', data)
>>> reader = KaldiMatrixReader('featdir/feats.scp')
>>> read_data = reader.read('utt1')
>>> np.testing.assert_equal(data, read_data)

Caution

Requires kaldi_native_io to be installed (pip install kaldi_native_io).

egrecho.utils.io.reader#

class egrecho.utils.io.reader.JsonlIterable(path, **kwargs)[source]#

Bases: object

Get example iterator from json lines file.

class egrecho.utils.io.reader.JsonIterable(path, **kwargs)[source]#

Bases: object

Get example iterator from json file.

class egrecho.utils.io.reader.CsvIterable(path, **csv_reader_kwargs)[source]#

Bases: object

Get example iterator from csv file.

egrecho.utils.io.resolve_ckpt#

egrecho.utils.io.resolve_ckpt.resolve_ckpt(checkpoint='last.ckpt', dirpath=None, version='version', best_k_fname='best_k_models.yaml', best_k_mode='min', ckpt_subdir='checkpoints')[source]#

Resolve checkpoint path from local or remote.

Automatically search checkpoint, parameters except checkpoint is for local fs, checkpoint can be either:

  • remote url (e.g., startwith “http”): return it directly, otherwise change to local mode.

  • absolute file path: return it if exists, otherwise raise a FileExistError.

  • relative file name: rel to dirpath, return it if exist, otherwise change mode to auto-matching.

  • auto-matching: checkpoint must a parameter of one level rel path (recommand one file name)

to avoid messing:

  • dirpath: base dir.

  • ckpt_subdir: defaults to name as “checkpoints”

  • version: version subdir, if specified will check it otherwise find the max version number (means latest training).

  • See resolve_version_ckpt() and resolve_rel_ckpt() for details.

The minist matching unit is like:

./dirpath/
├── best_k_models.yaml
├── last.ckpt
└── abc.ckpt

or

./dirpath/
└── checkpoints
    ├── best_k_models.yaml
    ├── last.ckpt
    └── abc.ckpt

With version subdir structure can be:

./dirpath/version_1
        └── checkpoints
            ├── best_k_models.yaml
            ├── last.ckpt
            └── abc.ckpt
Parameters:
  • checkpoint (str, optional) -- The file name of checkpoint to resolve, local file needs a suffix like “.ckpt” / “.pt”, While checkpoint="best" is a preseved key means it will find best_k_fname which is a file contains Dict[BEST_K_MODEL_PATH, BEST_K_SCORE], and sort by its score to match a best ckpt. Defaults to “last.ckpt”.

  • dirpath (Path or str, optional) -- The root path. Defaults to None, which means the current directory.

  • version (str, optional) -- The versioned subdir name. Conmmonly subdir is named as “version_0/version_1”, if you specify the version name with a version num, it will search that version dir, otherwise choose the max number of version (above “version_1”). Defaults to “version”.

  • best_k_fname (str, optional) -- The filename for the best_k map file. Note that the best model path in best map file may not in this directory since it is stored in training stage, so we assume that its basename can matching ckpts in the same level. Defaults to best_k_models.yaml.

  • best_k_mode (Literal["max", "min"], optional) -- The mode for selecting the best_k checkpoint. Defaults to “min”.

  • ckpt_subdir (str, optional) -- The name of the checkpoints subdir. Defaults to “checkpoints”.

Return type:

str

egrecho.utils.io.resolve_ckpt.resolve_version_ckpt(dirpath=None, checkpoint='last.ckpt', version='version', best_k_fname='best_k_models.yaml', best_k_mode='min', ckpt_subdir='checkpoints')[source]#

Search for a local version directory.

Cares about structure like:

./dirpath/version_1
        └── checkpoints
            ├── best_k_models.yaml
            ├── last.ckpt
            └── abc.ckpt

Note: Truly matching see resolve_rel_ckpt() for more details.

Parameters:
  • version (str, optional) -- The versioned subdir name. Conmmonly subdir is named as “version_0/version_1”, if you specify the version name with a version num, it will search that version dir, otherwise choose the max number of version (above “version_1”). Defaults to “version”.

  • dirpath (Path or str, optional) -- The root path. Defaults to None, which means the current directory.

  • checkpoint (str, optional) -- The file name of checkpoint to resolve, needs a suffix like “.ckpt” / “.pt”, While checkpoint="best" is a preseved key means it will find best_k_fname which is a file contains Dict[BEST_K_MODEL_PATH, BEST_K_SCORE], and sort by its score to match a best ckpt. Defaults to "last.ckpt".

  • best_k_fname (str, optional) -- The filename for the best_k map file. Note that the best model path in best map file may not in this directory since it is stored in training stage, so we assume that its basename can matching ckpts in the same level. Defaults to "best_k_models.yaml".

  • best_k_mode (Literal["max", "min"], optional) -- The mode for selecting the best_k checkpoint. Defaults to "min".

  • ckpt_subdir (str, optional) -- The name of the checkpoints subdir. Defaults to "checkpoints".

Return type:

Optional[str]

egrecho.utils.io.resolve_ckpt.resolve_rel_ckpt(dirpath=None, checkpoint='last.ckpt', best_k_fname='best_k_models.yaml', best_k_mode='min', ckpt_subdir='checkpoints')[source]#

Resolve checkpoint path rel to dirpath.

Automatically search checkpoint in a directory’s checkpoints subdir, normally names as "checkpoints". The dirpath may has such default structure:

./dirpath/
├── best_k_models.yaml
├── last.ckpt
└── abc.ckpt

or

./dirpath/
└── checkpoints
    ├── best_k_models.yaml
    ├── last.ckpt
    └── abc.ckpt

First search dirpath , then fallback to its ckpt_subdir (checkpoints) subdir match the valid checpoint path . If all failed, return None.

Note: checkpoint must a parameter of one level rel path to avoid mess matching. and deep rel path matching should in top level function but not here.

  • valid: (last.ckpt, best, ./last.ckpt)

  • invalid: (/last.ckpt, mypath/last.ckpt)

Parameters:
  • dirpath (Path or str, optional) -- The root path. Defaults to None, which means the current directory.

  • checkpoint (str, optional) -- The file name of checkpoint to resolve, needs a suffix like “.ckpt” / “.pt”, While checkpoint=”best” is a preseved key means it will find best_k_fname which is a file contains Dict[BEST_K_MODEL_PATH, BEST_K_SCORE], and sort by its score to match a best ckpt. Defaults to “last.ckpt”.

  • best_k_fname (str, optional) -- The filename for the best_k map file. Note that the best model path in best map file may not in this directory since it is stored in training stage, so we assume that its basename can matching ckpts in the same level. Defaults to best_k_models.yaml.

  • best_k_mode (Literal["max", "min"], optional) -- The mode for selecting the best_k checkpoint. Defaults to “min”.

  • ckpt_subdir (str, optional) -- The name of the checkpoints subdir. Defaults to “checkpoints”.

Returns:

The resolved checkpoint path or None.

Return type:

Optional[str]

Examples

>>> resolve_rel_ckpt('./dirpath', checkpoint='best')
'/path/to/xxxl.ckpt'

egrecho.utils.io.utils#

egrecho.utils.io.utils.auto_open(path, mode='r', **kwargs)[source]#

Open a Path, if it is end with ‘gz’, will call gzip.open first.

Note: just support local path now.

class egrecho.utils.io.utils.JsonMixin[source]#

Bases: object

Loads/save json mixin.

class egrecho.utils.io.utils.YamlMixin[source]#

Bases: object

Loads/save yaml mixin.

class egrecho.utils.io.utils.ConfigFileMixin[source]#

Bases: JsonMixin, YamlMixin

To serialize/deserialize config files in local, support json and yaml.

class egrecho.utils.io.utils.SerializationFn[source]#

Bases: object

Serialization fn mixin.

egrecho.utils.io.utils.load_jsonl_lazy(path, **kwargs)[source]#

Load json lines in a lazy way.

Return type:

Generator

egrecho.utils.io.utils.save_jsonl(data, path, **kwargs)[source]#

Save json lines.

Return type:

None

egrecho.utils.io.utils.load_csv_lazy(path, **fmtparams)[source]#

Load csv lines in a lazy way.

Return type:

Generator

egrecho.utils.io.utils.save_csv(data, path, fieldnames, **fmtparams)[source]#

Save csv lines.

Return type:

None

egrecho.utils.io.utils.repr_dict(data, sort_keys=False, inline_list=True, **kwds)[source]#

Make dict more readable.

Return type:

str

egrecho.utils.io.utils.buf_count_newlines(fname)[source]#

Count the number of lines in a file

Return type:

int

egrecho.utils.io.utils.read_key_first_lists(file_path, vector=False, every_bytes=10000000)[source]#

Reads txt line by line, and items sep with blank in one line.

Returns a list of tuples contains two item, the first column and the second formated other columns. (the second can be a list of strings when vector=True).

Return type:

List[Tuple[str, Union[str, List[str]]]]

egrecho.utils.io.utils.read_lists(file_path, vector=False, every_bytes=10000000)[source]#

Reads txt line by line, and items sep with blank in one line.

Returns a list of strings (or a list of lists contains all strings splitted when vector=True).

Return type:

List[Union[str, List[str]]]

egrecho.utils.io.utils.read_key_first_lists_lazy(file_path, vector=False)[source]#

Reads txt line by line lazy, and items sep with blank in one line.

Generates tuples contains two item, the first column and the second formated other columns. (the second can be a list of strings when vector=True).

Return type:

Generator[Tuple[str, Union[str, List[str]]], None, None]

egrecho.utils.io.utils.read_lists_lazy(file_path, vector=False)[source]#

Reads txt line by line lazy, and items sep with blank in one line.

Returns a list of strings (or a list of lists contains all strings splitted when vector=True).

Return type:

Generator[Union[str, List[str]], None, None]

egrecho.utils.io.writer#

class egrecho.utils.io.writer.SequentialDewWriter(path, overwrite=True)[source]#

Bases: object

Sequently store dews (manifest), support json lines (jsonl).

This implementation is mostly based on lhotse: https://github.com/lhotse-speech/lhotse/blob/master/lhotse/serialization.py#SequentialJsonlWriter

write(manifest, flush=False)[source]#

Serializes a manifest item (e.g. ) to JSON and stores it in a JSONL file.

Parameters:
  • manifest (Any) -- the manifest to be written.

  • flush (bool) -- should we flush the file after writing (ensures the changes are synced with the disk and not just buffered for later writing).

Return type:

bool

class egrecho.utils.io.writer.ShardWriter(pattern, shard_size=None)[source]#

Bases: object

Create a ShardWriter, data should be of webdataset format.

Parameters:
  • pattern (Union[str, Path]) -- output file pattern.

  • shard_size (Optional[int]) -- maximum number of records per shard, if None, means infinite.

Note

  • If pattern is a specify filepath, it will write to one tarfile.

  • If given shard_size, it will streamly write items to many tarfiles with a max size shard_size, in this case, the pattern must be specified such as ‘%06d’ for str matching.

Example

>>> samples = [
>>>    {'__key__': 'tom', 'txt': 'i want eat.'},
>>>    {'__key__': 'jimmy', 'txt': 'i want sleep.'}
>>> ]
>>> with ShardWriter('./test_fake.tar') as writer:
>>>     for item in samples:
>>>         writer.write(item)
next_stream()[source]#

Close the current stream and move to the next.

write(obj)[source]#

Write a sample.

Parameters:

obj -- sample to be written

Return type:

int

close()[source]#

Finish all writing.

class egrecho.utils.io.writer.TextBoxWriter(path, overwrite=True, box_end='\\n--------------------|boxend|--------------------\\n')[source]#

Bases: object

Sequently store tex boxes.

Parameters:
  • path (Union[str, Path]) -- output file path.

  • overwrite (bool) -- set False for a mode.

  • box_end (str) -- the string that marks the end of a text box.

Example:

from egrecho.utils.io.writer import TextBoxWriter, TXT_BOXEND

text = f'''REF:    # short one here
HYP: shoe order one    *
        I     S        D
{TXT_BOXEND}
REF: quite a bit of  #    #  longer sentence    #
HYP: quite * bit of an even longest sentence here
           D         I    I       S             I
{TXT_BOXEND}
REF: there is ** another    one
HYP: there is an   other sample
            I       S      S
{TXT_BOXEND}'''

texts = text.split(TXT_BOXEND)[:-1]
with TextBoxWriter('text.txt') as writer:
    for box in texts:
        writer.write(box)
with open('text.txt') as fr:
    rs = fr.read()
assert text == rs
write(box, flush=False)[source]#

Write a text box.

Parameters:
  • box (str) -- the string to be written.

  • flush (bool) -- should we flush the file after writing (ensures the changes are synced with the disk and not just buffered for later writing).

Return type:

bool