Source code for chaosmagpy.data_utils

# Copyright (C) 2024 Clemens Kloss
#
# This file is part of ChaosMagPy.
#
# ChaosMagPy is released under the MIT license. See LICENSE in the root of the
# repository for full licensing details.

"""
`chaosmagpy.data_utils` provides functions for loading and writing data and
geomagnetic field models. It also offers functions to do common time
conversions.

.. autosummary::
    :toctree: functions

    load_matfile
    load_RC_datfile
    save_RC_h5file
    load_shcfile
    save_shcfile
    mjd2000
    timestamp
    is_leap_year
    dyear_to_mjd
    mjd_to_dyear
    memory_usage
    gauss_units

"""


import pandas as pd
import numpy as np
import hdf5storage as hdf
import warnings
import h5py
import os
import datetime as dt
import textwrap


[docs] def load_matfile(filepath, variable_names=None, **kwargs): """ Load MAT-file and return dictionary. Function loads MAT-file by traversing the structure converting data into low-level numpy arrays of different types. There is no guarantee that any kind of data is read in correctly. The data dtype can also vary depending on the MAT-file (v7.3 returns floats instead of integers). But it should work identically for v7.3 and prior MAT-files. Arrays are squeezed if possible. Relies on the :mod:`hdf5storage` package. Parameters ---------- filepath : str Filepath and name of MAT-file. variable_names : list of strings Top-level variables to be loaded. **kwargs : keywords Additional keyword arguments are passed to :func:`hdf5storage.loadmat`. Returns ------- data : dict Dictionary containing the data as dictionaries or numpy arrays. """ # define a recursively called function to traverse structure def traverse_struct(struct): # for dictionaries, iterate through keys if isinstance(struct, dict): out = dict() for key, value in struct.items(): out[key] = traverse_struct(value) return out # for ndarray, iterate through dtype names elif isinstance(struct, np.ndarray): # collect dtype names if available names = struct.dtype.names # if no fields in array if names is None: if struct.dtype == np.dtype('O') and struct.shape == (1, 1): return traverse_struct(struct[0, 0]) else: return struct.squeeze() else: # if there are fields, iterate through fields out = dict() for name in names: out[name] = traverse_struct(struct[name]) return out else: return struct output = hdf.loadmat(filepath, variable_names=variable_names, **kwargs) # loadmat returns dictionary, go through keys and call traverse_struct for key, value in output.items(): if key.startswith('__') and key.endswith('__'): pass else: output[key] = traverse_struct(value) return output
[docs] def load_RC_datfile(filepath=None, parse_dates=None): """ Load RC-index data file into pandas data frame. Parameters ---------- filepath : str, optional Filepath to RC index ``*.dat``. If ``None``, the RC index will be fetched from `spacecenter.dk <http://www.spacecenter.dk/\ files/magnetic-models/RC/current/>`_. parse_dates : bool, optional Replace index with datetime object for time-series manipulations. Default is ``False``. Returns ------- df : dataframe Pandas dataframe with names {'time', 'RC', 'RC_e', 'RC_i', 'flag'}, where ``'time'`` is given in modified Julian dates. """ if filepath is None: from lxml import html import requests link = "http://www.spacecenter.dk/files/magnetic-models/RC/current/" page = requests.get(link) print(f'Accessing {page.url}.') tree = html.fromstring(page.content) file = tree.xpath('//tr[5]//td[2]//a/@href')[0] # get name from list date = tree.xpath('//tr[5]//td[3]/text()')[0] print(f'Downloading RC-index file "{file}" ' f'(last modified on {date.strip()}).') filepath = link + file column_names = ['time', 'RC', 'RC_e', 'RC_i', 'flag'] column_types = {'time': 'float64', 'RC': 'float64', 'RC_e': 'float64', 'RC_i': 'float64', 'flag': 'category'} df = pd.read_csv(filepath, delim_whitespace=True, comment='#', dtype=column_types, names=column_names) parse_dates = False if parse_dates is None else parse_dates # set datetime as index if parse_dates: df.index = pd.to_datetime( df['time'].values, unit='D', origin=pd.Timestamp('2000-1-1')) df.drop(['time'], axis=1, inplace=True) # delete redundant time column return df
[docs] def save_RC_h5file(filepath, read_from=None): """ Return HDF5-file of the RC index. Parameters ---------- filepath : str Filepath and name of ``*.h5`` output file. read_from : str, optional Filepath of RC index ``*.dat``. If ``None``, the RC index will be fetched from :rc_url:`spacecenter.dk <>`. Notes ----- Saves an HDF5-file of the RC index with keywords ['time', 'RC', 'RC_e', 'RC_i', 'flag']. Time is given in modified Julian dates 2000. Examples -------- Save RC-index TXT-file (``RC_1997-2020_Aug_v4.dat``) as file in HDF5 format (``RC_index.h5``). >>> save_RC_h5file('RC_index.h5', read_from='RC_1997-2020_Aug_v4.dat') Successfully saved to RC_index.h5. """ try: df_rc = load_RC_datfile(read_from, parse_dates=False) with h5py.File(filepath, 'w') as f: for column in df_rc.columns: variable = df_rc[column].values if column == 'flag': dset = f.create_dataset(column, variable.shape, dtype="S1") dset[:] = variable.astype('bytes') else: f.create_dataset(column, data=variable) # just save floats print(f'Successfully saved to {f.filename}.') except Exception as err: warnings.warn(f"Can't save new RC index. Raised exception: '{err}'.")
[docs] def load_shcfile(filepath, leap_year=None, comment=None): """ Load SHC-file and return coefficient arrays. Parameters ---------- filepath : str File path to spherical harmonic coefficient SHC-file. leap_year : {True, False}, optional Take leap year in time conversion into account (default). Otherwise, use conversion factor of 365.25 days per year. comment : str, optional Character at the start of a line to indicate a comment (defaults to ``#``). This can also be a tuple of characters. Returns ------- time : ndarray, shape (N,) Array containing `N` times for each model snapshot in modified Julian dates with origin January 1, 2000 0:00 UTC. coeffs : ndarray, shape (nmax(nmax+2), N) Coefficients of model snapshots. Each column is a snapshot up to spherical degree and order `nmax`. parameters : dict, {'SHC', 'nmin', 'nmax', 'N', 'order', 'step'} Dictionary containing parameters of the model snapshots and the following keys: ``'SHC'`` SHC-file name, ``'nmin'`` minimum degree, ``'nmax'`` maximum degree, ``'N'`` number of snapshot models, ``'order'`` piecewise polynomial order and ``'step'`` number of snapshots until next break point. Extract break points of the piecewise polynomial with ``breaks = time[::step]``. """ leap_year = True if leap_year is None else leap_year comment = '#' if comment is None else comment first_line = True with open(filepath, 'r') as f: data = np.array([]) for line in f.readlines(): if line.strip().startswith(comment): continue newline = np.fromstring(line, sep=' ') if first_line: # first non-comment line contains shc params name = os.path.split(filepath)[1] # file name string values = [name] + newline.astype(int).tolist() first_line = False else: data = np.append(data, newline) # unpack parameter line keys = ['SHC', 'nmin', 'nmax', 'N', 'order', 'step'] parameters = dict(zip(keys, values)) time = data[:parameters['N']] coeffs = data[parameters['N']:].reshape((-1, parameters['N']+2)) coeffs = coeffs[:, 2:].copy() # discard columns with n and m mjd = dyear_to_mjd(time, leap_year=leap_year) return mjd, coeffs, parameters
[docs] def save_shcfile(time, coeffs, order=None, filepath=None, nmin=None, nmax=None, leap_year=None, header=None): """ Save Gauss coefficients as SHC-file. Parameters ---------- time : float, list, ndarray, shape (n,) Time of model coeffcients in modified Julian date. coeffs : ndarray, shape (N,) or (n, N) Gauss coefficients as vector or array. The first dimension of the array must be equal to the length `n` of the given ``time``. order : int, optional (defaults to 1) Order of the piecewise polynomial with which the coefficients are parameterized in time (breaks are given by ``time[::order]``). filepath : str, optional Filepath and name of the output file. Defaults to the current working directory and filename `model.shc`. nmin : int, optional Minimum spherical harmonic degree (defaults to 1). This will remove first values from coeffs if greater than 1. nmax : int, optional Maximum spherical harmonic degree (defaults to degree compatible with number of coeffcients, otherwise coeffcients are truncated). leap_year : {True, False}, optional Take leap years for decimal year conversion into account (defaults to ``True``). header : str, optional Optional header at beginning of file. Defaults to an empty string. """ time = np.asarray(time, dtype=float) order = 1 if order is None else int(order) nmin = 1 if nmin is None else int(nmin) if nmax is None: nmax = int(np.sqrt(coeffs.shape[-1] + 1) - 1) else: nmax = int(nmax) if nmin > nmax: raise ValueError('``nmin`` must be smaller than or equal to ``nmax``.') filepath = 'model.shc' if filepath is None else filepath header = '' if header is None else header leap_year = True if leap_year is None else bool(leap_year) if coeffs.ndim == 1: coeffs = coeffs.reshape((1, -1)) coeffs = coeffs[:, (nmin**2-1):((nmax+1)**2-1)] # compute all possible degree and orders deg = np.array([], dtype=int) ord = np.array([], dtype=int) for n in range(nmin, nmax+1): deg = np.append(deg, np.repeat(n, 2*n+1)) ord = np.append(ord, [0]) for m in range(1, n+1): ord = np.append(ord, [m, -m]) comment = header + textwrap.dedent(f"""\ # Created on {dt.datetime.now(dt.timezone.utc)} UTC. # Leap years are accounted for in decimal years format ({leap_year}). {nmin} {nmax} {time.size} {order} {order-1} """) with open(filepath, 'w') as f: # write comment line f.write(comment) f.write(' ') # to represent two missing values for t in time: f.write(' {:16.8f}'.format(mjd_to_dyear(t, leap_year=leap_year))) f.write('\n') # write coefficient table to 8 significants for row, (n, m) in enumerate(zip(deg, ord)): f.write('{:} {:}'.format(n, m)) for value in coeffs[:, row]: f.write(' {:16.8f}'.format(value)) f.write('\n') print('Created SHC-file {}.'.format( os.path.join(os.getcwd(), filepath)))
[docs] def mjd2000(year, month=1, day=1, hour=0, minute=0, second=0, microsecond=0): """ Computes the modified Julian date as floating point number (epoch 2000). It assigns 0 to 0h00 January 1, 2000. Leap seconds are not accounted for. Parameters ---------- time : :class:`datetime.datetime`, ndarray, shape (...) Datetime class instance, `OR ...` year : int, ndarray, shape (...) month : int, ndarray, shape (...), optional Month of the year `[1, 12]` (defaults to 1). day : int, ndarray, shape (...), optional Day of the corresponding month (defaults to 1). hour : int , ndarray, shape (...), optional Hour of the day (default is 0). minute : int, ndarray, shape (...), optional Minutes (default is 0). second : int, ndarray, shape (...), optional Seconds (default is 0). microsecond : int, ndarray, shape (...), optional Microseconds (default is 0). Returns ------- time : ndarray, shape (...) Modified Julian date (units of days). Examples -------- >>> a = np.array([datetime.datetime(2000, 1, 1), \ datetime.datetime(2002, 3, 4)]) >>> mjd2000(a) array([ 0., 793.]) >>> mjd2000(2003, 5, 3, 13, 52, 15) # May 3, 2003, 13:52:15 (hh:mm:ss) 1218.5779513888888 >>> mjd2000(np.arange(2000, 2005)) # January 1 in each year array([ 0., 366., 731., 1096., 1461.]) >>> mjd2000(np.arange(2000, 2005), 2, 1) # February 1 in each year array([ 31., 397., 762., 1127., 1492.]) >>> mjd2000(np.arange(2000, 2005), 2, np.arange(1, 6)) array([ 31., 398., 764., 1130., 1496.]) """ year = np.asarray(year) if (np.issubdtype(year.dtype, np.dtype(dt.datetime).type) or np.issubdtype(year.dtype, np.datetime64)): datetime = year.astype('datetime64[us]') else: # build iso datetime string with unicode year = np.asarray(year, dtype=np.unicode_) month = np.char.zfill(np.asarray(month, dtype=np.unicode_), 2) day = np.char.zfill(np.asarray(day, dtype=np.unicode_), 2) year_month = np.char.add(np.char.add(year, '-'), month) datetime = np.char.add(np.char.add(year_month, '-'), day) datetime = datetime.astype('datetime64[us]') # not use inplace add here because it doesn't broadcast arrays datetime = datetime + np.asarray(hour, dtype='timedelta64[h]') datetime = datetime + np.asarray(minute, dtype='timedelta64[m]') datetime = datetime + np.asarray(second, dtype='timedelta64[s]') datetime = datetime + np.asarray(microsecond, dtype='timedelta64[us]') microseconds = datetime - np.datetime64('2000-01-01', 'us') return microseconds / np.timedelta64(1, 'D') # fraction of days
[docs] def timestamp(time): """ Convert modified Julian date to NumPy's datetime format. Parameters ---------- time : ndarray, shape (...) Modified Julian date (units of days). Returns ------- time : ndarray, shape (...) Array of ``numpy.datetime64[us]``. Examples -------- >>> timestamp(0.53245) numpy.datetime64('2000-01-01T12:46:43.680000') >>> timestamp(np.linspace(0., 1.5, 2)) array(['2000-01-01T00:00:00.000000', '2000-01-02T12:00:00.000000'], \ dtype='datetime64[us]') """ # convert mjd2000 to timedelta64[us] us = np.asarray(time) * 86400e6 * np.timedelta64(1, 'us') # add datetime offset return us + np.datetime64('2000-01-01', 'us')
[docs] def is_leap_year(year): """ Determine if input year is a leap year. Parameters ---------- year : int, ndarray, shape (...) Years to test for leap year. Returns ------- leap_year : ndarray of bools, shape (...) ``True`` for leap year in array. Examples -------- >>> is_leap_year([2000, 2001, 2004]) array([ True, False, True]) Raises ------ TypeError if ``year`` is not of type integer. """ year = np.asarray(year) if not np.issubdtype(year.dtype, int): raise TypeError('Expected integer values as the input year. Use ' 'numpy.floor to extract the integer year ' 'from decimal years.') return np.logical_and(np.remainder(year, 4) == 0, np.logical_or(np.remainder(year, 100) != 0, np.remainder(year, 400) == 0))
[docs] def dyear_to_mjd(time, leap_year=None): """ Convert time from decimal years to modified Julian date 2000. Leap years are accounted for by default. Parameters ---------- time : float, ndarray, shape (...) Time in decimal years. leap_year : {True, False}, optional Take leap years into account by using a conversion factor of 365 or 366 days in a year (leap year, used by default). If ``False`` a conversion factor of 365.25 days in a year is used. Returns ------- time : ndarray, shape (...) Time in modified Julian date 2000. Examples -------- >>> dyear_to_mjd([2000.5, 2001.5]) # account for leap years array([183. , 548.5]) >>> dyear_to_mjd([2000.5, 2001.5], leap_year=False) array([182.625, 547.875]) """ leap_year = True if leap_year is None else leap_year if leap_year: year = np.asarray(np.floor(time), dtype=int) # note: -0.1 is year -1 frac_of_year = np.remainder(time, 1.) isleap = is_leap_year(year) # do provide integer years days_per_year = np.where(isleap, 366., 365.) days = frac_of_year * days_per_year mjd = mjd2000(year, 1, 1) + days elif not leap_year: days_per_year = 365.25 mjd = (np.asarray(time) - 2000.0) * days_per_year else: raise ValueError('Unknown leap year option: use either True or False') return mjd
[docs] def mjd_to_dyear(time, leap_year=None): """ Convert time from modified Julian date 2000 to decimal years. Leap years are accounted for by default. Parameters ---------- time : float, ndarray, shape (...) Time in modified Julian date 2000. leap_year : {True, False}, optional Take leap years into account by using a conversion factor of 365 or 366 days in a year (leap year, used by default). If ``False`` a conversion factor of 365.25 days in a year is used. Returns ------- time : ndarray, shape (...) Time in decimal years. Examples -------- >>> mjd_to_dyear([183. , 548.5]) # account for leap years array([2000.5, 2001.5]) >>> mjd_to_dyear([0. , -1., 365.]) array([2000., 1999.99726027, 2000.99726776]) >>> mjd_to_dyear([182.625, 547.875], leap_year=False) array([2000.5, 2001.5]) """ leap_year = True if leap_year is None else leap_year if leap_year: date = (np.asarray(np.floor(time), dtype=int)*np.timedelta64(1, 'D') + np.datetime64('2000-01-01')) # only precise to date year = date.astype('datetime64[Y]').astype(int) + 1970 days = np.asarray(time) - mjd2000(year, 1, 1) # days of that year isleap = is_leap_year(year) days_per_year = np.where(isleap, 366., 365.) dyear = year + days / days_per_year elif not leap_year: dyear = np.asarray(time) / 365.25 + 2000. else: raise ValueError('Unknown leap year option: use either True or False') return dyear
[docs] def memory_usage(pandas_obj): """ Compute memory usage of pandas object. For full report, use: ``df.info(memory_usage='deep')``. """ if isinstance(pandas_obj, pd.DataFrame): usage_b = pandas_obj.memory_usage(deep=True).sum() else: # we assume if not a df it's a series usage_b = pandas_obj.memory_usage(deep=True) usage_mb = usage_b / 1024 ** 2 # convert bytes to megabytes return "{:03.2f} MB".format(usage_mb)
[docs] def gauss_units(deriv=None): """ Return string of the magnetic field units given the derivative with time. String is meant to be used in plot labels. Parameters ---------- deriv : int, optional Derivative (defaults to 0). Returns ------- units : str Tex-style unit string. Examples -------- >>> gauss_units() 'nT' >>> gauss_units(1) '$\\\\mathrm{nT}/\\\\mathrm{yr}$' >>> gauss_units(2) '$\\\\mathrm{nT}/\\\\mathrm{yr}^{2}$' """ deriv = 0 if deriv is None else deriv if deriv == 0: units = 'nT' elif deriv == 1: units = '$\\mathrm{{nT}}/\\mathrm{{yr}}$' else: units = '$\\mathrm{{nT}}/\\mathrm{{yr}}^{{{:}}}$'.format(deriv) return units