Utility API Reference#

Utility functions are defined in chronocratic.datasets.utils and re-exported from the package root. They handle caching, scaling, data processing, and feature extraction.

Utility functions for data processing.

chronocratic.datasets.utils.atomic_save_metadata(path: Path, data: dict[str, Any]) None#

Save a metadata dictionary to a JSON file atomically.

Writes to a .json.tmp intermediate file then uses Path.replace() for POSIX atomicity.

Parameters:
  • path – Target .json file path.

  • data – Metadata dictionary to persist.

chronocratic.datasets.utils.atomic_save_npz(path: Path, **arrays: ndarray) None#

Save numpy arrays to a compressed .npz file atomically.

Writes to a temporary .npz file in the same directory, then uses Path.replace() for POSIX atomicity. The tmp file is created in the same directory as the target to guarantee same- filesystem rename.

Parameters:
  • path – Target .npz file path.

  • **arrays – Named arrays to persist.

chronocratic.datasets.utils.build_cache_key(*, dataset_name: str, params: dict[str, Any]) str#

Build a hybrid cache key: SHA-256 hash prefix plus readable suffix.

The key format is <8-char-sha256>_<dataset>_<key-params>.cache. Example: a3f8e1c2_ETTm1_seq_len=128_mode=UNIVARIATE.cache.

Parameters:
  • dataset_name – Dataset identifier (e.g. "ETTm1").

  • params – Parameters that affect data layout (seq_len, mode, scaling_method, etc.). Dict ordering does not affect the resulting key.

Returns:

A deterministic cache key string.

chronocratic.datasets.utils.compose(*functions: Callable) Callable#

Compose multiple functions into a single callable.

Functions are applied in the order they are provided.

Parameters:

functions – Callables to compose.

Returns:

A callable that applies all functions in order.

chronocratic.datasets.utils.create_data_scaler(*, scale: bool, scaling_range: tuple[float, float], scaling_method: ScalingMethod = ScalingMethod.MINMAX, data_form: DataForm = DataForm.REGULAR) Callable#

Create a data scaling function.

Returns a callable that, when invoked with (train, valid, test) data, fits a scaler on train and transforms all splits.

Parameters:
  • scale – Whether to apply scaling at all.

  • scaling_range – Target (min, max) for MinMaxScaler.

  • scaling_method – Scaling algorithm to use.

  • data_form – Shape category of the data.

Returns:

A callable that accepts (train_data, valid_data, test_data) and returns scaled versions of the same.

chronocratic.datasets.utils.custom_collate_fn(batch: list[Any], *, desired_batch_size: int) Any#

Collate function that pads the last batch by cycling samples.

If the current batch is smaller than desired_batch_size, extra samples are appended by cycling backwards through the batch.

Parameters:
  • batch – A list of samples returned by the dataset.

  • desired_batch_size – Target batch size.

Returns:

Standard collated tensor batch.

chronocratic.datasets.utils.extract_time_features(datetime_index: DatetimeIndex) ndarray#

Extract cyclical time features from a DatetimeIndex.

Produces a 2-D array with columns: minute, hour, dayofweek, day, dayofyear, month, week.

Parameters:

datetime_index – A pandas DatetimeIndex.

Returns:

2-D numpy array of shape (len(index), 7) with dtype float32.

chronocratic.datasets.utils.flatten_list_of_np_arrays(list_of_np_arrays: list[ndarray]) ndarray#

Flatten a list of numpy arrays into a single 1-D array.

Parameters:

list_of_np_arrays – A list of numpy arrays.

Returns:

A single flattened numpy array.

chronocratic.datasets.utils.get_num_samples_from_ts(ts: ndarray | list[ndarray]) int#

Get number of samples from a time series.

Parameters:

ts – A time series array or list of arrays.

Returns:

Number of samples (length) of the time series.

chronocratic.datasets.utils.load_metadata(path: Path) dict[str, Any]#

Load and validate metadata from a JSON file.

Checks that the version field matches CACHE_SCHEMA_VERSION. Raises FileNotFoundError if the file does not exist and ValueError on version mismatch.

Parameters:

path – Metadata .json file path.

Returns:

The parsed metadata dictionary.

Raises:
  • FileNotFoundError – If the metadata file does not exist.

  • ValueError – If the schema version does not match CACHE_SCHEMA_VERSION.

chronocratic.datasets.utils.load_scaler(path: Path) Any#

Load a persisted sklearn scaler via torch.load.

Parameters:

path.pt file path containing a pickled scaler.

Returns:

The loaded scaler instance.

chronocratic.datasets.utils.process_data_with_varying_sequence_lengths_single(data: ndarray | DataFrame) ndarray | DataFrame#

Process data with varying sequence lengths by centering valid data.

Handles both 2-D (samples, timesteps) and 3-D (samples, timesteps, features) arrays. If the original data is a DataFrame, the result is returned as a DataFrame.

Parameters:

data – Input array or DataFrame.

Returns:

Processed numpy array of the same shape.

chronocratic.datasets.utils.process_df_according_to_dtypes(df_data: DataFrame, meta: Any, dtypes_functions_map: dict[str, Callable]) DataFrame#

Process DataFrame columns according to ARFF dtype mapping.

Iterates over each column defined in the ARFF metadata, determines its type, and applies the corresponding transformation function from the provided mapping.

Parameters:
  • df_data – DataFrame to process.

  • meta – ARFF metadata containing column type information.

  • dtypes_functions_map – Mapping from dtype name to transformation function.

Returns:

Processed DataFrame with correctly typed columns.

chronocratic.datasets.utils.read_arff_as_df(arff_file_path: Path | str) tuple[DataFrame, Any]#

Read an ARFF file into a pandas DataFrame.

Note

scipy.io.arff returns nominal (string) column values as bytes objects (e.g. b'a' not 'a'). The caller must provide a decode function in dtypes_functions_map when using process_df_according_to_dtypes().

Parameters:

arff_file_path – Path to the ARFF file.

Returns:

A tuple of (DataFrame, ARFF metadata object).

chronocratic.datasets.utils.resolve_cache_dir(*, cache_dir: Path | None, dataset_name: str) Path#

Resolve the absolute cache directory path.

When cache_dir is None, the default location ~/.cache/tsdatasets/{dataset_name} is used. A custom path is expanded (~) and resolved to an absolute path.

Parameters:
  • cache_dir – User-provided cache directory, or None for the default location.

  • dataset_name – Dataset identifier appended to the default cache root.

Returns:

An absolute Path to the cache directory.

chronocratic.datasets.utils.save_scaler(scaler: Any, path: Path) None#

Persist a fitted sklearn scaler via torch.save.

Uses pickle_protocol=5 and writes atomically through a .pt.tmp intermediate file. If the target already exists (DDP race condition) or the cache directory is not writable (test environments with nonexistent paths), the save is skipped silently since the in-memory scaler is still valid.

Parameters:
  • scaler – Fitted scaler instance (e.g. MinMaxScaler).

  • path – Target .pt file path.

chronocratic.datasets.utils.separate_target_feature_from_df(df: DataFrame, target_feature_name: str) tuple[DataFrame, Series]#

Separate target feature column from a DataFrame.

Extracts the specified target column as a Series and returns the remaining columns as a DataFrame.

Parameters:
  • df – Source DataFrame containing the target column.

  • target_feature_name – Name of the target column to extract.

Returns:

A tuple of (features DataFrame, target Series).

Raises:

KeyError – If target_feature_name is not in df.columns.