Commit c4ad50ee authored by Julien Boccard's avatar Julien Boccard
Browse files

Upload New File

parent 1943efe1
import numpy as np
import pandas as pd
class DataTable:
def __init__(s, observation_metadata, variable_metadata, data):
s.observation_metadata = observation_metadata
s.variable_metadata = variable_metadata
s.data = data
s.validate()
def flatten(s, list_dict):
keys = list_dict.keys()
n = len(list_dict[max(keys)].values)
result = []
for i in range(n):
new_dict = {}
for key in keys:
if list_dict[key].mask[i]:
new_dict[key] = list_dict[key].values[i]
result.append(new_dict)
return result
def observation_metadata_dicts(s):
return s.flatten(s.observation_metadata)
def variable_metadata_dicts(s):
return s.flatten(s.variable_metadata)
def observation_mask(s, key, filter_function=None):
return s.filter_mask(s.observation_metadata, key, filter_function)
def variable_mask(s, key, filter_function=None):
return s.filter_mask(s.variable_metadata, key, filter_function)
def filter_mask(s, metadata, key, filter_function):
mask = metadata[key].mask
values = metadata[key].values
if filter_function is not None:
mask = np.array([filter_function(i, value)[0] if value_mask else False for i, (value_mask, value) in enumerate(zip(mask, values))])
return mask
def select(s, variable_mask=None, observation_mask=None):
if variable_mask is None:
variable_mask = np.ones(s.data.shape[1], dtype=bool)
if observation_mask is None:
observation_mask = np.ones(s.data.shape[0], dtype=bool)
new_observation_metadata = {
key: metadata.select(observation_mask) for key, metadata in s.observation_metadata.items()
}
new_variable_metadata = {
key: metadata.select(variable_mask) for key, metadata in s.variable_metadata.items()
}
new_data = np.copy(s.data[observation_mask,:][:,variable_mask])
return DataTable(new_observation_metadata, new_variable_metadata, new_data)
def map_metadata(s, over_variables, new_key, mapping_function, *input_keys):
metadata = s.variable_metadata if over_variables else s.observation_metadata
n_metadata = s.data.shape[1] if over_variables else s.data.shape[0]
result_values = []
result_mask = []
if len(input_keys) > 0:
value_generator = zip(*[metadata[key].values for key in input_keys])
mask_generator = zip(*[metadata[key].mask for key in input_keys])
for i, input_masks, input_values in zip(range(n_metadata), mask_generator, value_generator):
result_values.append(mapping_function(i, *input_values))
result_mask.append(np.prod(input_masks).astype(bool))
else:
for i in range(n_metadata):
result_values.append(mapping_function(i))
result_mask.append(True)
metadata[new_key] = MetaData(np.array(result_mask), np.array(result_values))
s.validate()
def map_observation_metadata(s, *args, **kwargs):
s.map_metadata(False, *args, **kwargs)
def map_variable_metadata(s, *args, **kwargs):
s.map_metadata(True, *args, **kwargs)
def add_data(s, as_variables, new_data, new_metadata=None):
if as_variables:
metadata = s.variable_metadata
else:
metadata = s.observation_metadata
if new_metadata_masks is None:
new_metadata_masks = {}
for key in metadata:
new_values = []
new_mask = []
for new_metadata_entry in new_metadata:
if key in new_metadata_entry:
new_values.append(new_metadata_entry[key])
new_mask.append(True)
else:
new_values.append(None)
new_mask.append(False)
new_values = np.array(new_values, dtype=metadata[key].values.dtype)
new_mask = np.array(new_mask)
metadata[key].values = np.concatenate((metadata[key].values, new_values))
metadata[key].mask = np.concatenate((metadata[key].mask, new_mask))
if as_variables:
s.data = np.concatenate((s.data, new_data), axis=1)
else:
s.data = np.concatenate((s.data, new_data), axis=0)
s.validate()
def add_observations(s, *args, **kwargs):
s.add_data(False, *args, **kwargs)
def add_variables(s, *args, **kwargs):
s.add_data(True, *args, **kwargs)
def to_dataframe(s, force_type=str, empty_value='', missing_value='', **pandas_args):
observation_keys = sorted(s.observation_metadata.keys())
variable_keys = sorted(s.variable_metadata.keys())
table = []
for key in observation_keys:
table.append(
[empty_value] * len(s.variable_metadata) + [key] + [
(value if mask else missing_value) for mask, value in zip(
s.observation_metadata[key].mask, s.observation_metadata[key].values
)
]
)
table.append(variable_keys + [empty_value] * (s.data.shape[0] + 1))
for j in range(s.data.shape[1]):
table.append([
s.variable_metadata[key].values[j] if s.variable_metadata[key].mask[j] else missing_value
for key in variable_keys
] + [empty_value] + [s.data[i, j] for i in range(s.data.shape[0])])
args = {}
if force_type is not None:
args['dtype'] = force_type
args.update(**pandas_args)
return pd.DataFrame(table, **args)
def validate(s):
for key, metadata in s.variable_metadata.items():
if len(metadata.values) != s.data.shape[1] or len(metadata.mask) != s.data.shape[1]:
raise MetadataConsistencyException(f'Variable metadata for key {key} does not have the same size as the data')
for key, metadata in s.observation_metadata.items():
if len(metadata.values) != s.data.shape[0] or len(metadata.mask) != s.data.shape[0]:
raise MetadataConsistencyException(f'Observation metadata for key {key} does not have the same size as the data')
class MetaData:
def __init__(s, mask, values):
s.mask = np.array(mask)
s.values = np.array(values)
def select(s, submask=None):
if submask is None:
submask = s.mask
return MetaData(s.mask[submask], s.values[submask])
class MetadataConsistencyException(Exception):
pass
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment