Commit 1943efe1 authored by Julien Boccard's avatar Julien Boccard
Browse files

Delete functions.py

parent 5b07b96e
import pickle
import numpy as np
import pandas as pd
from .filters import as_is
from .datatable import DataTable, MetaData
from .display import NullMonitor
pd.set_option('display.max_columns', 512)
def load(file_name):
with open(file_name, 'rb') as in_file:
return pickle.load(in_file)
def save(obj, file_name):
with open(file_name, 'wb') as out_file:
pickle.dump(obj, out_file)
def read(file_name, file_type=None, **pandas_kwargs):
if file_type is None:
file_type = file_name.split('.')[-1]
file_type = file_type.lower()
pandas_merged_kwargs = dict(
header=None,
index_col=False,
keep_default_na=False,
dtype=str,
encoding='utf-8'
)
pandas_merged_kwargs.update(**pandas_kwargs)
if file_type in {'csv', 'tsv'}:
pandas_frame = pd.read_csv(file_name, **pandas_merged_kwargs)
elif file_type in {'xls', 'xlsx', 'xlsm', 'xlsb', 'odf'}:
pandas_frame = pd.read_excel(file_name, **pandas_merged_kwargs)
else:
raise Exception(f'File type {file_type} is not a CSV or Excel file type')
return pandas_frame
def parse(dataframe,
observation_data_filter, variable_data_filter,
observation_metadata_filters, variable_metadata_filters,
cell_data_filter=None, observations_in_rows=False, monitor=None):
if monitor is None:
monitor = NullMonitor()
if observations_in_rows:
frame_observations = dataframe.iterrows()
frame_variables = dataframe.iteritems()
else:
frame_observations = dataframe.iteritems()
frame_variables = dataframe.iterrows()
monitor.log('Filtering observations')
selected_observations = np.argwhere([observation_data_filter(i, obs)[0] for i, obs in frame_observations])[:,0]
monitor.log('Filtering variables')
selected_variables = np.argwhere([variable_data_filter(j, var)[0] for j, var in frame_variables])[:,0]
def extract_metadata(filter_list, extractor):
result = {}
for metadata_key, index, filter_function in filter_list:
monitor.state(f'Extracting {metadata_key}')
the_filter_function = filter_function
if the_filter_function is None:
the_filter_function = as_is
data_slice = extractor(index)
filtered = [the_filter_function(i, cell) for i, cell in enumerate(data_slice)]
valids = []
values = []
for is_valid, value in filtered:
valids.append(is_valid)
values.append(value)
result[metadata_key] = MetaData(np.array(valids), np.array(values))
return result
if observations_in_rows:
observation_extractor = lambda index: dataframe.iloc[selected_observations, index]
variable_extractor = lambda index: dataframe.iloc[index, selected_variables]
else:
observation_extractor = lambda index: dataframe.iloc[index, selected_observations]
variable_extractor = lambda index: dataframe.iloc[selected_variables, index]
monitor.log('Extracting observation metadata')
observation_metadata = extract_metadata(observation_metadata_filters, observation_extractor)
monitor.log('Extracting variable metadata')
variable_metadata = extract_metadata(variable_metadata_filters, variable_extractor)
monitor.log('Extracting data')
if cell_data_filter is None:
cell_data_filter = as_is
if observations_in_rows:
data_selection = dataframe.iloc[selected_observations, selected_variables]
else:
data_selection = dataframe.iloc[selected_variables, selected_observations]
data = []
row_count = data_selection.shape[0]
for i, row in data_selection.iterrows():
monitor.state(f'Row {i+1}/{row_count}')
data.append(
[cell_data_filter(None, cell)[1] for cell in row]
)
monitor.log('Building data array')
data = np.array(data)
if not observations_in_rows:
data = np.transpose(data)
monitor.state('')
monitor.log('Done')
return DataTable(observation_metadata, variable_metadata, data)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment