"""
convert.py
Written by Tyler Sutterley (05/2024)
Utilities for converting ICESat-2 HDF5 files into different formats
PYTHON DEPENDENCIES:
numpy: Scientific Computing Tools For Python
https://numpy.org
https://numpy.org/doc/stable/user/numpy-for-matlab-users.html
h5py: Python interface for Hierarchal Data Format 5 (HDF5)
https://h5py.org
http://docs.h5py.org/en/stable/index.html
zarr: Chunked, compressed, N-dimensional arrays in Python
https://github.com/zarr-developers/zarr-python
https://zarr.readthedocs.io/en/stable/index.html
pandas: Python Data Analysis Library
https://pandas.pydata.org/
UPDATE HISTORY:
Updated 05/2024: use wrapper to importlib for optional dependencies
Updated 03/2024: use pathlib to define and operate on paths
Updated 12/2022: place some imports behind try/except statements
Updated 06/2022: place zarr and pandas imports behind try/except statements
Updated 04/2022: updated docstrings to numpy documentation format
Updated 01/2022: added ascii and dataframe outputs for ATL07
Updated 09/2021: added ground track and time to output dataframes
calculate a global reference point for ATL06/07/08 dataframes
Updated 07/2021: comment for column number in yaml headers
Updated 10/2020: added ascii output for ATL08
Updated 08/2020: added output in pandas dataframe for ATL06 and ATL08
Written 06/2020
"""
import re
import pathlib
import warnings
import itertools
import posixpath
import numpy as np
from icesat2_toolkit.utilities import import_dependency
# attempt imports
h5py = import_dependency('h5py')
pandas = import_dependency('pandas')
zarr = import_dependency('zarr')
[docs]class convert():
np.seterr(invalid='ignore')
def __init__(self, filename=None, reformat=None):
"""Utilities for converting ICESat-2 HDF5 files into different formats
Parameters
----------
filename: str, obj or NoneType, default None
input HDF5 filename or io.BytesIO object
reformat: str or NoneType
output format
- ``'csv'``: comma-separated values for each beam group
- ``'dataframe'``: Pandas dataframe
- ``'HDF5'``: rechunked HDF5
- ``'txt'``: tab-delimited ascii for each beam group
- ``'zarr'``: chunked zarr
"""
self.filename = filename
self.reformat = reformat
# gps-based epoch for delta times #
self.atlas_sdp_epoch = np.datetime64('2018-01-01T00:00:00')
# PURPOSE: wrapper function for converting HDF5 files to another type
[docs] def file_converter(self, **kwds):
"""
Convert a HDF5 file to another format
Parameters
----------
**kwds: dict
keyword arguments for output converter
"""
if (self.reformat == 'zarr'):
# output zarr file
self.HDF5_to_zarr(**kwds)
elif (self.reformat == 'HDF5'):
# output rechunked HDF5 file
self.HDF5_to_HDF5(**kwds)
# elif (reformat == 'JPL'):
# # output JPL captoolkit formatted HDF5 files
# self.HDF5_to_JPL_HDF5(**kwds)
elif self.reformat in ('csv','txt'):
# output reduced files to ascii formats
self.HDF5_to_ascii(**kwds)
elif self.reformat in ('dataframe',):
# output reduced files to pandas dataframe
return self.HDF5_to_dataframe(**kwds)
else:
raise ValueError(f'Unknown format {self.reformat}')
# PURPOSE: convert the HDF5 file to zarr copying all file data
[docs] def HDF5_to_zarr(self, **kwds):
"""
convert a HDF5 file to zarr copying all file data
Parameters
----------
**kwds: dict
keyword arguments for output zarr converter
"""
# output zarr file
if isinstance(self.filename, (str, pathlib.Path)):
zarr_file = pathlib.Path(self.filename).with_suffix('.zarr')
else:
zarr_file = pathlib.Path(self.filename.filename).with_suffix('.zarr')
# copy everything from the HDF5 file to the zarr file
with h5py.File(self.filename, mode='r') as source:
dest = zarr.open_group(zarr_file, mode='w')
# value checks on output zarr
if not hasattr(dest, 'create_dataset'):
raise ValueError('dest must be a group, got {!r}'.format(dest))
# for each key in the root of the hdf5 file structure
for k in source.keys():
self.copy_from_HDF5(source[k], dest, name=k, **kwds)
# PURPOSE: rechunk the HDF5 file copying all file data
[docs] def HDF5_to_HDF5(self, **kwds):
"""
rechunk a HDF5 file copying all file data
Parameters
----------
**kwds: dict
keyword arguments for output HDF5 converter
"""
# output HDF5 file
if isinstance(self.filename, (str, pathlib.Path)):
hdf5_file = pathlib.Path(self.filename).with_suffix('.h5')
else:
hdf5_file = pathlib.Path(self.filename.filename).with_suffix('.h5')
# copy everything from the HDF5 file
with h5py.File(self.filename,mode='r') as source:
dest = h5py.File(hdf5_file, mode='w')
# value checks on output HDF5
if not hasattr(dest, 'create_dataset'):
raise ValueError('dest must be a group, got {!r}'.format(dest))
# for each key in the root of the hdf5 file structure
for k in source.keys():
self.copy_from_HDF5(source[k], dest, name=k, **kwds)
# PURPOSE: Copy a named variable from the HDF5 file to the destination file
[docs] def copy_from_HDF5(self, source, dest, name=None, **kwds):
"""
Copy a named variable from the ``source`` HDF5 into the ``dest`` file
Parameters
----------
source: obj
open file object for input
dest: obj
open file object for output
name: str or NoneType, default None
variable or group name
chunks: int
chunk size for output
"""
if hasattr(source, 'shape') and bool(source.chunks):
# if data can be chunked
# copy a dataset/array
if dest is not None and name in dest:
raise RuntimeError('an object {!r} already exists in '
'destination {!r}'.format(name, dest.name))
# setup creation keyword arguments
create_kwds = {k:v for k,v in kwds.items() if v}
if 'chunks' in create_kwds.keys():
# setup chunks option to limit by dimensions
chunks = ()
for d in range(source.ndim):
chunks += (min([create_kwds.get('chunks'),source.shape[d]]),)
create_kwds['chunks'] = chunks
else:
# setup chunks option, preserve by default
chunks = ()
for d in range(source.ndim):
chunks += (min([source.chunks[d],source.shape[d]]),)
create_kwds.setdefault('chunks', chunks)
# setup compression options
# from h5py to zarr: use zarr default compression options
if (self.reformat == 'zarr') and source.fillvalue:
create_kwds.setdefault('fill_value', source.fillvalue)
elif (self.reformat == 'HDF5') and source.fillvalue:
create_kwds.setdefault('fillvalue', source.fillvalue)
# create new dataset in destination
ds = dest.create_dataset(name, shape=source.shape,
dtype=source.dtype, **create_kwds)
# copy data going chunk by chunk to avoid loading in entirety
shape = ds.shape
chunks = ds.chunks
chunk_offsets = [range(0, s, c) for s, c in zip(shape, chunks)]
for offset in itertools.product(*chunk_offsets):
sel = tuple(slice(o, min(s, o + c)) for o, s, c in
zip(offset, shape, chunks))
ds[sel] = source[sel]
# copy attributes
attrs = {key:self.attributes_encoder(source.attrs[key]) for key in
source.attrs.keys() if self.attributes_encoder(source.attrs[key])}
# update chunks attribute
attrs['_ChunkSizes'] = self.attributes_encoder(create_kwds['chunks'])
ds.attrs.update(attrs)
elif hasattr(source, 'shape'):
# if data cannot be chunked
# copy a dataset/array
if dest is not None and name in dest:
raise RuntimeError('an object {!r} already exists in '
'destination {!r}'.format(name, dest.name))
# setup creation keyword arguments
create_kwds = {}
# setup compression options
# from h5py to zarr: use zarr default compression options
if (self.reformat == 'zarr') and source.fillvalue:
create_kwds.setdefault('fill_value', source.fillvalue)
elif (self.reformat == 'HDF5') and source.fillvalue:
create_kwds.setdefault('fillvalue', source.fillvalue)
# create new dataset in destination
ds = dest.create_dataset(name, shape=source.shape,
dtype=source.dtype, **create_kwds)
ds[:] = source[:]
# copy attributes
attrs = {key:self.attributes_encoder(source.attrs[key]) for key in
source.attrs.keys() if self.attributes_encoder(source.attrs[key])}
ds.attrs.update(attrs)
else:
# copy a group
if (dest is not None and name in dest and hasattr(dest[name], 'shape')):
raise RuntimeError('an array {!r} already exists in '
'destination {!r}'.format(name, dest.name))
# require group in destination
grp = dest.require_group(name)
# copy attributes
attrs = {key:self.attributes_encoder(source.attrs[key]) for key in
source.attrs.keys() if self.attributes_encoder(source.attrs[key])}
grp.attrs.update(attrs)
# recursively copy from source
for k in source.keys():
self.copy_from_HDF5(source[k], grp, name=k, **kwds)
# PURPOSE: reduce HDF5 files to beam groups and output to ascii
[docs] def HDF5_to_ascii(self, **kwds):
"""
Convert a HDF5 file to beam-level ascii files copying reduced sets of data
Parameters
----------
**kwds: dict
keyword arguments for output ascii converter
"""
# compile regular expression operator for extracting info from ICESat2 files
rx = re.compile(r'(processed_)?(ATL\d+)(-\d{2})?_(\d{4})(\d{2})(\d{2})'
r'(\d{2})(\d{2})(\d{2})_(\d{4})(\d{2})(\d{2})_(\d{3})_(\d{2})(.*?).h5$')
# split extension from HDF5 file
# extract parameters from ICESat2 HDF5 file
if isinstance(self.filename, (str, pathlib.Path)):
hdf5_file = pathlib.Path(self.filename)
else:
hdf5_file = pathlib.Path(self.filename.filename)
# extract parameters from ICESat2 HDF5 file
SUB,PRD,HEM,YY,MM,DD,HH,MN,SS,TRK,CYCL,GRAN,RL,VERS,AUX = \
rx.findall(hdf5_file.name).pop()
# output file suffix for csv or tab-delimited text
delimiter = ',' if self.reformat == 'csv' else '\t'
# copy bare minimum variables from the HDF5 file to the ascii file
source = h5py.File(self.filename, mode='r')
# find valid beam groups by testing for particular variables
if (PRD == 'ATL06'):
VARIABLE_PATH = ['land_ice_segments','segment_id']
elif (PRD == 'ATL07'):
VARIABLE_PATH = ['sea_ice_segments','height_segment_id']
elif (PRD == 'ATL08'):
VARIABLE_PATH = ['land_segments','segment_id_beg']
elif (PRD == 'ATL10'):
VARIABLE_PATH = ['freeboard_beam_segments','delta_time']
elif (PRD == 'ATL12'):
VARIABLE_PATH = ['ssh_segments','delta_time']
# create list of valid beams within the HDF5 file
beams = []
for gtx in [k for k in source.keys() if bool(re.match(r'gt\d[lr]',k))]:
# check if subsetted beam contains data
try:
source['/'.join([gtx,*VARIABLE_PATH])]
except KeyError:
pass
else:
beams.append(gtx)
# for each valid beam within the HDF5 file
for gtx in sorted(beams):
# extract variables and attributes for each variable
values = {}
vattrs = {}
# create a column stack of valid output segment values
if (PRD == 'ATL06'):
# land ice height
var = source[gtx]['land_ice_segments']
valid, = np.nonzero(var['atl06_quality_summary'][:] == 0)
# variables for the output ascii file
vnames = ['segment_id','delta_time','latitude','longitude',
'h_li','h_li_sigma']
vformat = ('{1:0.0f}{0}{2:0.9f}{0}{3:0.9f}{0}{4:0.9f}{0}'
'{5:0.9f}{0}{6:0.9f}')
# for each output variable
for i,v in enumerate(vnames):
# convert data to numpy array for HDF5 compatibility
values[v] = np.copy(var[v][:])
# extract attributes
vattrs[v] = {atn:atv for atn,atv in var[v].attrs.items()}
# add precision attributes for ascii yaml header
if (v == 'segment_id'):
vattrs[v]['precision'] = 'integer'
vattrs[v]['units'] = 'count'
else:
vattrs[v]['precision'] = 'double_precision'
vattrs[v]['comment'] = f'column {i+1:d}'
elif (PRD == 'ATL07'):
# sea ice height
var = source[gtx]['sea_ice_segments']
valid, = np.nonzero(var['heights/height_segment_quality'][:] == 1)
# variables for the output ascii file
vnames = ['height_segment_id','delta_time',
'latitude','longitude','seg_dist_x',
'heights/height_segment_height',
'heights/height_segment_confidence',
'heights/height_segment_type',
'heights/height_segment_ssh_flag',
'heights/height_segment_w_gaussian',
'stats/photon_rate','stats/cloud_flag_asr',
'geophysical/height_segment_lpe',
'geophysical/height_segment_mss',
'geophysical/height_segment_ocean',
'geophysical/height_segment_ib']
vformat = ('{1:0.0f}{0}{2:0.9f}{0}{3:0.9f}{0}{4:0.9f}{0}'
'{5:0.9f}{0}{6:0.9f}{0}{7:0.9f}{0}{8:0.0f}{0}{9:0.0f}{0}'
'{10:0.9f}{0}{11:0.9f}{0}{12:0.0f}{0}{13:0.9f}{0}'
'{14:0.9f}{0}{15:0.9f}{0}{16:0.9f}')
# for each output variable
for i,v in enumerate(vnames):
# convert data to numpy array for HDF5 compatibility
values[v] = np.copy(var[v][:])
# extract attributes
vattrs[v] = {atn:atv for atn,atv in var[v].attrs.items()}
# add precision attributes for ascii yaml header
if v in ('height_segment_id','heights/height_segment_type',
'heights/height_segment_ssh_flag',
'stats/cloud_flag_asr'):
vattrs[v]['precision'] = 'integer'
else:
vattrs[v]['precision'] = 'double_precision'
vattrs[v]['comment'] = f'column {i+1:d}'
elif (PRD == 'ATL08'):
# land and vegetation height
var = source[gtx]['land_segments']
valid, = np.nonzero(var['terrain/h_te_best_fit'][:] !=
var['terrain/h_te_best_fit'].fillvalue)
# variables for the output ascii file
vnames = ['segment_id_beg','segment_id_end','delta_time',
'latitude','longitude','terrain/h_te_best_fit',
'terrain/h_te_uncertainty','terrain/terrain_slope',
'canopy/h_canopy','canopy/h_canopy_uncertainty']
vformat = ('{1:0.0f}{0}{2:0.0f}{0}{3:0.9f}{0}{4:0.9f}{0}'
'{5:0.9f}{0}{6:0.9f}{0}{7:0.9f}{0}{8:0.9f}{0}{9:0.9f}{0}'
'{10:0.9f}')
# for each output variable
for i,v in enumerate(vnames):
# convert data to numpy array for HDF5 compatibility
values[v] = np.copy(var[v][:])
# extract attributes
vattrs[v] = {atn:atv for atn,atv in var[v].attrs.items()}
# add precision attributes for ascii yaml header
if v in ('segment_id_beg','segment_id_end'):
vattrs[v]['precision'] = 'integer'
vattrs[v]['units'] = 'count'
else:
vattrs[v]['precision'] = 'double_precision'
vattrs[v]['comment'] = f'column {i+1:d}'
# column stack of valid output segment values
output = np.column_stack([values[v][valid] for v in vnames])
# output ascii file
granule = f'{hdf5_file.stem}_{gtx}.{self.reformat}'
ascii_file = hdf5_file.parent.joinpath(granule)
fid = ascii_file.open(mode='w', encoding='utf8')
# print YAML header to top of file
fid.write('{0}:\n'.format('header'))
# global attributes for file
fid.write(' {0}:\n'.format('global_attributes'))
for atn,atv in source.attrs.items():
if atn not in ('Conventions','Processing Parameters','hdfversion',
'history','identifier_file_uuid'):
fid.write(' {0:22}: {1}\n'.format(atn,self.attributes_encoder(atv)))
# beam attributes
fid.write('\n {0}:\n'.format('beam_attributes'))
for atn,atv in source[gtx].attrs.items():
if atn not in ('Description',):
fid.write(' {0:22}: {1}\n'.format(atn,self.attributes_encoder(atv)))
# data dimensions
fid.write('\n {0}:\n'.format('dimensions'))
nrow,ncol = np.shape(output)
fid.write(' {0:22}: {1:d}\n'.format('segments',nrow))
# non-standard attributes
fid.write('\n {0}:\n'.format('non-standard_attributes'))
# value to convert to GPS seconds (seconds since 1980-01-06T00:00:00)
fid.write(' {0:22}:\n'.format('atlas_sdp_gps_epoch'))
atlas_sdp_gps_epoch = source['ancillary_data']['atlas_sdp_gps_epoch']
for atn in ['units','long_name']:
atv = self.attributes_encoder(atlas_sdp_gps_epoch.attrs[atn])
fid.write(' {0:20}: {1}\n'.format(atn,atv))
fid.write(' {0:20}: {1:0.0f}\n'.format('value',atlas_sdp_gps_epoch[0]))
# print variable descriptions to YAML header
fid.write('\n {0}:\n'.format('variables'))
for v in vnames:
fid.write(' {0:22}:\n'.format(posixpath.basename(v)))
for atn in ['precision','units','long_name','comment']:
atv = self.attributes_encoder(vattrs[v][atn])
fid.write(' {0:20}: {1}\n'.format(atn,atv))
# end of header
fid.write('\n\n# End of YAML header\n')
# print data to file
for row in output:
print(vformat.format(delimiter,*row),file=fid)
# close the file
fid.close()
# close the source HDF5 file
source.close()
# PURPOSE: reduce HDF5 files to pandas dataframe
[docs] def HDF5_to_dataframe(self, **kwds):
"""
Convert a HDF5 file to a pandas dataframe copying reduced sets of data
Parameters
----------
**kwds: dict
keyword arguments for output dataframe converter
"""
# compile regular expression operator for extracting info from ICESat2 files
rx = re.compile(r'(processed_)?(ATL\d+)(-\d{2})?_(\d{4})(\d{2})(\d{2})'
r'(\d{2})(\d{2})(\d{2})_(\d{4})(\d{2})(\d{2})_(\d{3})_(\d{2})(.*?).h5$')
# split extension from HDF5 file
# extract parameters from ICESat2 HDF5 file
if isinstance(self.filename, (str, pathlib.Path)):
hdf5_file = pathlib.Path(self.filename)
else:
hdf5_file = pathlib.Path(self.filename.filename)
# extract parameters from ICESat2 HDF5 file
SUB,PRD,HEM,YY,MM,DD,HH,MN,SS,TRK,CYCL,GRAN,RL,VERS,AUX = \
rx.findall(hdf5_file.name).pop()
# copy bare minimum variables from the HDF5 file to pandas data frame
source = h5py.File(self.filename,mode='r')
# find valid beam groups by testing for particular variables
if (PRD == 'ATL06'):
VARIABLE_PATH = ['land_ice_segments','segment_id']
elif (PRD == 'ATL07'):
VARIABLE_PATH = ['sea_ice_segments','height_segment_id']
elif (PRD == 'ATL08'):
VARIABLE_PATH = ['land_segments','segment_id_beg']
elif (PRD == 'ATL10'):
VARIABLE_PATH = ['freeboard_beam_segments','delta_time']
elif (PRD == 'ATL12'):
VARIABLE_PATH = ['ssh_segments','delta_time']
# create list of valid beams within the HDF5 file
beams = []
for gtx in [k for k in source.keys() if bool(re.match(r'gt\d[lr]',k))]:
# check if subsetted beam contains data
try:
source['/'.join([gtx,*VARIABLE_PATH])]
except KeyError:
pass
else:
beams.append(gtx)
# for each valid beam within the HDF5 file
frames = []
gt = dict(gt1l=10,gt1r=20,gt2l=30,gt2r=40,gt3l=50,gt3r=60)
for gtx in sorted(beams):
# set variable parameters to read for specific products
if (PRD == 'ATL06'):
# land ice height
var = source[gtx]['land_ice_segments']
valid, = np.nonzero(var['h_li'][:] != var['h_li'].fillvalue)
# variables for the output dataframe
vnames = ['segment_id','delta_time','latitude','longitude',
'h_li','h_li_sigma','atl06_quality_summary',
'fit_statistics/dh_fit_dx',
'fit_statistics/dh_fit_dy',
'fit_statistics/dh_fit_dx_sigma',
'fit_statistics/n_fit_photons',
'fit_statistics/h_expected_rms',
'fit_statistics/h_robust_sprd',
'fit_statistics/w_surface_window_final']
elif (PRD == 'ATL07'):
# sea ice height
var = source[gtx]['sea_ice_segments']
valid, = np.nonzero(var['heights/height_segment_quality'][:] == 1)
# variables for the output ascii file
vnames = ['height_segment_id','seg_dist_x','delta_time',
'latitude','longitude',
'heights/height_segment_height',
'heights/height_segment_confidence',
'heights/height_segment_type',
'heights/height_segment_ssh_flag',
'heights/height_segment_w_gaussian',
'stats/photon_rate','stats/cloud_flag_asr',
'geophysical/height_segment_lpe',
'geophysical/height_segment_mss',
'geophysical/height_segment_ocean',
'geophysical/height_segment_ib']
elif (PRD == 'ATL08'):
# land and vegetation height
var = source[gtx]['land_segments']
valid, = np.nonzero(var['terrain/h_te_best_fit'][:] !=
var['terrain/h_te_best_fit'].fillvalue)
# variables for the output dataframe
vnames = ['segment_id_beg','segment_id_end','delta_time',
'latitude','longitude','brightness_flag','layer_flag',
'msw_flag','night_flag','terrain_flg','urban_flag',
'segment_landcover','segment_snowcover','segment_watermask',
'terrain/h_te_best_fit','terrain/h_te_uncertainty',
'terrain/terrain_slope','terrain/n_te_photons',
'canopy/h_canopy','canopy/h_canopy_uncertainty',
'canopy/canopy_flag','canopy/n_ca_photons']
# create a dictionary of valid output segment values
data = {}
# convert data to numpy array for backwards HDF5 compatibility
for v in vnames:
values = np.copy(var[v][:])
data[posixpath.basename(v)] = values[valid]
# Generate Time Column
delta_time = (data['delta_time']*1e9).astype('timedelta64[ns]')
data['time'] = pandas.to_datetime(self.atlas_sdp_epoch+delta_time)
# copy filename parameters
data['rgt'] = np.array([int(TRK)]*len(valid))
data['cycle'] = np.array([int(CYCL)]*len(valid))
data['gt'] = np.array([gt[gtx]]*len(valid))
# calculate global reference point
if PRD in ('ATL06','ATL07','ATL08'):
data['global_ref_pt'] = 6*1387*data[VARIABLE_PATH[-1]] + \
6*(data['rgt']-1) + (data['gt']/10)
# copy beam-level attributes
attrs = ['groundtrack_id','atlas_spot_number','atlas_beam_type',
'sc_orientation','atmosphere_profile','atlas_pce']
for att_name in attrs:
att_val=self.attributes_encoder(source[gtx].attrs[att_name])
data[att_name] = [att_val]*len(valid)
# pandas dataframe from compiled dictionary
frames.append(pandas.DataFrame.from_dict(data))
# return the concatenated pandas dataframe
return pandas.concat(frames)
# PURPOSE: encoder for copying the file attributes
[docs] def attributes_encoder(self, attr):
"""
Custom encoder for copying file attributes in Python 3
Parameters
----------
attr: obj
attribute to be converted for output
"""
if isinstance(attr, (bytes, bytearray)):
return attr.decode('utf-8')
if isinstance(attr, (np.int_, np.intc, np.intp, np.int8, np.int16, np.int32,
np.int64, np.uint8, np.uint16, np.uint32, np.uint64)):
return int(attr)
elif isinstance(attr, (np.float_, np.float16, np.float32, np.float64)):
return float(attr)
elif isinstance(attr, (np.ndarray)):
if not isinstance(attr[0], (object)):
return attr.tolist()
elif isinstance(attr, (np.bool_)):
return bool(attr)
elif isinstance(attr, (np.void)):
return None
else:
return attr