Commit d9eb02f8 authored by Julien Boccard's avatar Julien Boccard
Browse files

Update npca/__init__.py, npca/common.py, npca/data.py, npca/model.py,...

Update npca/__init__.py, npca/common.py, npca/data.py, npca/model.py, npca/solve.py, npca/utils.py files
parent 1426d104
from .common import *
import re as regex
from . import data
from . import utils
def new_data_net():
"""
Wrapper for data.DataNet object creator
"""
return data.DataNet()
def inspect(obj, max_level=0):
"""
Returns a string with a tree description of <obj>
with at most <max_level> depth
"""
return utils.inspect(obj, max_level=max_level)
def re(pattern):
"""
Convenience method to return a regex object
"""
return regex.compile(pattern)
import re
import numpy as np
from .utils import to_regex, InvalidBlockException, DataFormatException
from .solve import find_components
from .model import Model
class DataNet:
"""
Contains a collection of references to blocks (and indirectly to the groups they represent)
Blocks must be added to the blocks member, as
<dataset>.blocks.<blockname> = <Block object>
"""
def __init__(s):
s.blocks = DatasetContainer() # All blocks must be declared here
s.groups = DatasetContainer() # Used to keep named references to Group objects
s.plot_sets = DatasetContainer() # Used to keep named references to PlotSet objects
# Generation methods
def add_multiple(s, method, common_args, *args):
if common_args is None:
common_args = {}
for next_args in args:
group_args = {}
group_args.update(**common_args)
group_args.update(**next_args)
method(**group_args)
def add_plot_sets(s, *args, common_args=None):
s.add_multiple(s.add_plot_set, common_args, *args)
def add_groups(s, *args, common_args=None):
s.add_multiple(s.add_group, common_args, *args)
def add_blocks(s, *args, common_args=None):
s.add_multiple(s.add_block, common_args, *args)
def add_plot_set(s, key, label, color=(100, 100, 100)):
s.plot_sets.put(key, PlotSet(key, label, color))
def add_group(s, key, label, data_table, id_property, filter_property, filter_function, variables=True, **group_parameters):
if variables:
source = data_table.variable_metadata
else:
source = data_table.observation_metadata
filter_selection = data_table.filter_mask(source, filter_property, filter_function)
ids = source[id_property].values[filter_selection]
metadata = {
key: source[key].values[filter_selection]
for key in source
if key != id_property
}
group = Group(key, label, ids, metadata, filter_property, filter_function, **group_parameters)
s.groups.put(key, group)
def add_block(s, key, observation_group, variable_group, plot_set, data_table,
override_observation_filter_property=None, override_variable_filter_property=None,
override_observation_filter_function=None, override_variable_filter_function=None,
relative_block_charge=1.0):
if override_observation_filter_function is None:
override_observation_filter_function = observation_group.filter_function
if override_observation_filter_property is None:
override_observation_filter_property = observation_group.filter_property
if override_variable_filter_function is None:
override_variable_filter_function = variable_group.filter_function
if override_variable_filter_property is None:
override_variable_filter_property = variable_group.filter_property
variable_selection = data_table.variable_mask(override_variable_filter_property, override_variable_filter_function)
observation_selection = data_table.observation_mask(override_observation_filter_property, override_observation_filter_function)
block_data = data_table.data[observation_selection][:, variable_selection]
if key is None:
base_key = re.sub(r'(\A[^A-Za-z_])|[^A-Za-z0-9_]', '', f'{observation_group.key}_{variable_group.key}').lower()
index = 0
key = base_key
while key in s.blocks.__dict__:
index += 1
key = f'{base_key}_{index}'
block = Block(key, observation_group, variable_group, plot_set, block_data, relative_block_charge)
s.blocks.put(key, block)
# Access methods
def get_groups(s, regex=None, observations=True, variables=True):
"""
Retrieves the set of all groupd referenced by the blocks of the dataset,
optionally switching off <observations> or <variables>
Possibly filters their labels by <regex>, either a string to match exactly, or a compiled regex
"""
blocks = s.get_blocks()
all = []
if observations:
all = all + [block.observations for block in blocks]
if variables:
all = all + [block.variables for block in blocks]
all = list(set(all)) # Remove duplicates
if regex is None:
return all
else:
regex_obj = to_regex(regex)
return [group for group in all if regex_obj.fullmatch(group.name)]
def get_blocks(s, regex=None):
"""
Returns all blocks in this dataset
Possibly filters the labels by <regex>, either a string to match exactly, or a compiled regex
"""
all = s.blocks.as_list()
if regex is None:
return all
else:
regex_obj = to_regex(regex)
return [block for block in all if regex_obj.fullmatch(block.name)]
def remove_blocks(s, delete_list):
"""
Removes all the references in the dataset (s.blocks) to elements in <delete_list>
"""
attribute_names = []
for name, block in s.blocks.__dict__.items():
if block in delete_list:
attribute_names.append(name)
for name in attribute_names:
delattr(s.blocks, name)
# Preparation methods
def normalize_observations(s, reference_blocks=None, norm_power=1):
"""
Centers and normalizes every set of blocks with the same variables group
<norm_power> is the power used for normalization (1 is UV, 0.5 is Pareto)
<reference_blocks> are used to define the center and scale, and
can either be None (all blocks are used), an explicit list of Block objects,
or a regex string
"""
variables = s.get_groups(observations=False)
blocks = s.get_blocks()
blocks_by_variables = {group: [] for group in variables}
for block in blocks:
block.data = block.data * block.scale + block.center
blocks_by_variables[block.variables].append(block)
all_reference_blocks = blocks
if isinstance(reference_blocks, list):
all_reference_blocks = reference_blocks
elif isinstance(reference_blocks, str):
all_reference_blocks = s.get_blocks(regex=re.compile(reference_blocks))
for variables, to_normalize in blocks_by_variables.items():
references = [b for b in to_normalize if b in all_reference_blocks]
if len(references) == 0:
raise InvalidBlockException('Provided reference_blocks did not produce enough blocks for variable group {}'.format(variables.name))
sums = sum([np.sum(block.data, axis=0, keepdims=True) for block in references])
sums_sq = sum([np.sum(np.power(block.data, 2), axis=0, keepdims=True) for block in references])
sums_n = sum([block.data.shape[0] for block in references])
mean = sums / sums_n
mean_sq = sums_sq / sums_n
std = np.sqrt(mean_sq - np.power(mean, 2))
for block in to_normalize:
block.center = mean
block.scale = np.power(std, norm_power)
block.data = (block.data - block.center) / block.scale
block.validate()
def build_charges(s, homogeneous_observations=True, homogeneous_variables=True, data_function=None):
"""
Sets the fit charges for all the blocks
If <homogeneous_observations>, blocks are weighted for their size so all observation groups have the same influence
Similar for <homogeneous_variables>
The optional <data_function> takes three arguments, (<data>, <obs_metadata>, <var_metadata>), and should
produce a float giving further weight corrections
"""
for b in s.get_blocks():
charge = b.relative_charge
if homogeneous_observations:
charge /= np.power(b.observations.size(), 0.25)
if homogeneous_variables:
charge /= np.power(b.variables.size(), 0.25)
if data_function is not None:
charge *= data_function(b.data * b.scale + b.center, b.observations.metadata, b.variables.metadata)
b.charge = charge
# Model methods
def model(s, n_components=2, **solver_args):
"""
Builds and returns a Model object with <n_components> for the current dataset
See solve.find_component for more information on <solver_args>
"""
return Model(s, find_components(s, n_components, **solver_args))
def variance(s, unweighted=False, per_variable=False):
"""
Returns the sum of all variances in the blocks of this dataset
"""
blocks = s.get_blocks()
if per_variable:
variable_groups = s.get_groups(observations=False, variables=True)
result = {group: 0 for group in variable_groups}
for block in blocks:
result[block.variables] = block.variance(unweighted, True) + result[block.variables]
return result
else:
result = 0
for block in blocks:
result += block.variance(unweighted, False)
return result
class DatasetContainer:
"""
Used to hold references as member variables
"""
def as_list(s):
"""
Returns all members of this object as a list
"""
return list(s.__dict__.values())
def clear(s):
s.__dict__.clear()
def member_names(s):
return list(s.__dict__.keys())
def put(s, key, value):
s.__dict__[key] = value
def __getitem__(s, key):
return s.__dict__[key]
class Group:
"""
Contains a group of observation types (individuals, features, time points...)
Takes the <label> of the group and a list of <ids> for each observation, which also determines the length of the group
and an <order> of precedence where higher values come first
"""
def __init__(s, key, label, ids, metadata, filter_property, filter_function, order=0):
s.label = label # string
s.key = key # string
s.ids = ids # list of strings
s.order = order # int describing the order of the group
s.metadata = {key: values for key, values in metadata.items()}
s.filter_property = filter_property
s.filter_function = filter_function
def size(s):
"""
Size of group
"""
return len(s.ids)
def __deepcopy__(s, memo):
"""
Group objects should not be cloned, so that they can be used universally as references
"""
return s
class Block:
"""
Contains a data block as a numpy matrix
Takes the <key> of the block, the <observations_group> and <variables_group>
for each dimension of the matrix, and the actual <data> matrix
The <plot_set> object is used for plotting attributes, if not None
The dimensions of the matrix must match (observations, variables)
An optional relative <relative_charge> can be specified, to be used during the model fitting
"""
def __init__(s, key, observations_group, variables_group, plot_set, data, relative_block_charge):
s.key = key # string
s.observations = observations_group # Group object reference
s.variables = variables_group # Group object reference
s.data = data # np matrix
s.relative_charge = relative_block_charge # float, block weight on models
s.plot_set = plot_set # PlotSet object for visualization
s.validate()
s.center = np.zeros((1, variables_group.size())) # internal: relationship to "raw" data
s.scale = np.ones((1, variables_group.size())) # internal: relationship to "raw" data
s.charge = relative_block_charge # internal: actual block weight, modifiable by dataset normalization
def validate(s):
"""
Check if the dimensions of the data matrix match the sizes of the related groups,
Should be called whenever the member variables are changed
"""
if len(s.data.shape) != 2:
raise DataFormatException("Block '{}' is not a matrix (shape {})".format(s.name, s.data.shape))
if s.data.shape[0] != s.observations.size():
raise DataFormatException("Block '{}' has inconsistent data size ({}) for observations group {} ({})".format(
s.key, s.data.shape[0], s.observations.label, s.observations.size()
))
if s.data.shape[1] != s.variables.size():
raise DataFormatException("Block '{}' has inconsistent data size ({}) for variables group {} ({})".format(
s.key, s.data.shape[1], s.variables.label, s.variables.size()
))
def variance(s, unweighted, per_variable):
charge = 1 if unweighted else s.charge
if per_variable:
return np.sum(np.power(charge * s.data, 2), axis=0)
return np.sum(np.power(charge * s.data, 2))
class PlotSet:
"""
Contains plotting information for blocks
"""
def __init__(s, key, label, color):
s.key = key
s.label = label
s.color = color
import numpy as np
from .iplot import report
class Model:
"""
Container for a dataset and a decomposition
"""
def __init__(s, dataset, components):
s.dataset = dataset
s.components = components
def scores(s, index, block):
"""
Computes observation scores
<index> of the component
<block> on which they are computed
"""
return np.dot(block.data, s.components[index].loadings[block.variables]) / np.power(block.variables.size(), 0.5)
def loadings(s, index, group):
"""
Same as scores, for model loadings
"""
return s.components[index].loadings[group]
def weight(s, index, block):
"""
Same as scores, for model weights
"""
return s.components[index].weights[block]
def explained_variances_per_variable(s, component_index, block):
"""
For the <component_index>-th component in this model
computes an array of variance explained per variable
(change of variance when suppressing the variable in the block)
on the given <block>
"""
comp = s.components[component_index]
proj_contribution = comp.weights[block] * comp.loadings[block.variables] * np.dot(comp.loadings[block.observations], block.data)
overlap_contribution = 0
for overlap_comp in s.components[:component_index]:
obs_dot = np.dot(comp.loadings[block.observations], overlap_comp.loadings[block.observations])
weights = comp.weights[block] * overlap_comp.weights[block]
vars = comp.loadings[block.variables] * overlap_comp.loadings[block.variables]
overlap_contribution += obs_dot * weights * vars
return proj_contribution - overlap_contribution
def loading_orthogonality_matrices(s):
return {
group: np.array(
[[np.dot(c1.loadings[group], c2.loadings[group]) for c1 in s.components] for c2 in s.components]
)
for group in s.dataset.get_groups()
}
def variable_influences(s, component_index, unweighted=False, per_variable=False):
"""
For the <component_index>-th component in this model
returns a dict of all the blocks of the underlying dataset
to their relative influences (explained variances per variable vs total dataset variance)
given as arrays
If <per_variable> is True, the influences are normalized to the total variance of each variable,
otherwise to the total of the dataset
If <unweighted> is True, block charges are ignored when computing the variance
"""
blocks = s.dataset.get_blocks()
total_var = s.dataset.variance(unweighted, per_variable)
result = {}
for block in blocks:
weighted_loading = s.components[component_index].weighted_variable_loading(block)
loading_terms = weighted_loading
projection_terms = weighted_loading - np.dot(s.components[component_index].loadings[block.observations], block.data)
orthogonal_terms = np.sum([
s.components[previous_component].weighted_variable_loading(block) * np.dot(
s.components[previous_component].loadings[block.observations],
s.components[component_index].loadings[block.observations]
)
for previous_component in range(component_index)
], axis=0)
charge_term = 1 if unweighted else block.charge**2
normalization = total_var
if per_variable:
normalization = total_var[block.variables]
result[block] = charge_term * weighted_loading * (loading_terms - 2 * projection_terms - 2 * orthogonal_terms) / normalization
return result
def block_influences(s, component_index, unweighted=False, per_variable=False):
variable_influences = s.variable_influences(component_index, unweighted, per_variable)
return {block: np.sum(influences) for block, influences in variable_influences.items()}
def report(s, out_file, **report_args):
"""
Generates a report of this model into <out_file> in HTML format
For details on <report_args> see iplot.report.report
"""
report.report(s, out_file, **report_args)
class Component:
"""
Contains a single component of a model, with
a dict of <loadings> for each group and
a dict of <weights> for each block
"""
def __init__(s, loadings, weights):
s.loadings = loadings
s.weights = weights
def block_reconstruct(s, block):
"""
Gives the contribution of this component to the reconstruction of <block>
"""
return s.weights[block] * np.multiply.outer(s.loadings[block.observations], s.loadings[block.variables])
def weighted_variable_loading(s, block):
"""
Gives the best fit for this block of its projection on observations
"""
return s.weights[block] * s.loadings[block.variables]
import time as tm
import numpy as np
from .model import Component
from .utils import normalize
from ..io.display import NullMonitor
def find_components(dataset, n_components, monitor=None, **solver_args):
"""
Returns a list holding <n_components> Component objects
For information on <solver_args>, see find_component
"""
if monitor is None:
monitor = NullMonitor()
blocks = dataset.get_blocks()
for block in blocks:
block.validate()
deflated_blocks = {block: np.copy(block.data) for block in blocks}
components = []
for component_id in range(0, n_components):
comp = find_component(component_id, dataset, deflated_blocks, components, monitor, **solver_args)
for block in blocks:
deflated_blocks[block] = deflated_blocks[block] - comp.block_reconstruct(block)
components.append(comp)
return components
def find_component(i_component, dataset, data_matrices, previous_components, monitor,
dependants=None, normalize_dependants=False,
min_inner_iterations=5, max_inner_iterations=1000, min_outer_steps=5, max_outer_steps=200,
tolerance=1e-6, orthogonalize=False, consensus_signs=False):
"""
Find a single Component object for the given <dataset> and <data_matrices>
-The <data_matrices> must be in one-to-one correspondence with the data blocks of the <dataset>
-Performs several steps of minimization (at least <min_outer_steps> and at most <max_outer_steps>),
until the maximal change of any loading through the step is lower than <tolerance>
-Each step solves a generalized eigenvalue problem by iterating (at least <min_inner_iterations> and at most <max_inner_iterations>),
until the maximal change of any loading through the step is lower than <tolerance>
-A list of <previous_components> Component objects can be specified for orthogonalization, applied if <orthogonalize> is True
-A dict of <dependants> can be specified for supervised fitting, mapping component indices to dicts of (Blocks or PlotSet objects to
dependant variable weights), which can be normalized between blocks if <normalize_dependants> is True
-The function <print_callback> takes one string argument, and is supplied used-readable iteration updates
"""
start_time = tm.time()
blocks = dataset.get_blocks()
observations = dataset.get_groups(variables=False)
variables = dataset.get_groups(observations=False)
blocks_by_observations = {o: [b for b in blocks if b.observations == o] for o in observations}
blocks_by_variables = {v: [b for b in blocks if b.variables == v] for v in variables}
blocks_by_groups = {(o, v): [b for b in blocks if (b.variables == v and b.observations == o)] for o in observations for v in variables}
orthogonal_components = []
if orthogonalize:
orthogonal_components = previous_components
if previous_components is None:
previous_components = []
if dependants is None:
dependants = []
observation_loadings = {group: normalize(np.ones(group.size())) for group in observations}
variable_loadings = {group: normalize(np.ones(group.size())) for group in variables}
charged_data_matrices = {b: b.charge * data_matrices[b] for b in data_matrices}
def broadcast_dependants(block_or_plot_set, dependant_value):
if block_or_plot_set in blocks:
return {block_or_plot_set: dependant_value * np.ones(block_or_plot_set.observations.size())}
else:
return {block: dependant_value * np.ones(block.observations.size()) for block in blocks if block.plot_set == block_or_plot_set}
def normalize_dependant_dict(dependants_by_block):
norm = np.sqrt(sum([np.sum(np.power(dependants_by_block[b], 2)) for b in dependants_by_block]))
return {b: dependants_by_block[b] / norm for b in dependants_by_block}
normalized_dependants = {}
if i_component in dependants:
normalized_dependants = {
block: broadcasted_dependants
for block_or_plot_set, dependant_value in dependants[i_component].items()
for block, broadcasted_dependants in broadcast_dependants(block_or_plot_set, dependant_value).items()
}
if normalize_dependants:
normalized_dependants = normalize_dependant_dict(normalized_dependants)
def build_observations_matrix(variable_loadings):
def fill_function(h, g, submatrix):
if h == g:
for block in blocks_by_observations[h]:
submatrix.add(ProjectorMatrix(block, charged_data_matrices, variable_loadings, project_observations=False))
return IterationMatrix(observations, fill_function, orthogonal_components)
def build_variables_matrix(observation_loadings):
def fill_function(h, g, submatrix):
if h == g:
for block in blocks_by_variables[h]:
submatrix.add(ProjectorMatrix(block, charged_data_matrices, observation_loadings, project_observations=True))
for block_a in normalized_dependants:
for block_b in normalized_dependants:
if block_a.variables == h and block_b.variables == g:
submatrix.add(DependantMatrix(block_a, block_b, charged_data_matrices, normalized_dependants, 1))
return IterationMatrix(variables, fill_function, orthogonal_components)
def iterate(input_loadings, iterate_matrix, callback_text):
current_loadings = {g: input_loadings[g] for g in input_loadings}
iterations = 0
iteration_change = float('inf')
max_change = float('-inf')
while iterations < min_inner_iterations or (iterations < max_inner_iterations and iteration_change > tolerance):
iterations += 1
current_loadings, _, _, iteration_change = iterate_matrix.apply(current_loadings)
if iteration_change > max_change:
max_change = iteration_change
monitor.state('\t{}: {} iterations, max change {:.1f} tols'.format(callback_text, iterations, max_change / tolerance))
return current_loadings, max_change
def fix_variable_signs(loadings):
for g in variables:
sign_contributions = 0
for block in blocks:
if block.variables == g:
scores = np.dot(block.data, loadings[g])
means = np.mean(block.data, axis=1)
sign_contributions += np.dot(scores, means)
if sign_contributions < 0:
loadings[g] = -loadings[g]
if consensus_signs:
def variable_group_order(g):
total_score_variance = 0
for b in blocks_by_variables[g]:
total_score_variance += np.linalg.norm(np.dot(b.data, loadings[g]))
return -total_score_variance
sorted_variables = sorted(variables, key=variable_group_order)
for g_i, _ in enumerate(sorted_variables):
g = sorted_variables[g_i]
sign_contributions = 0
for h_i in range(g_i):
h = sorted_variables[h_i]
for o in observations:
for b in blocks_by_groups[(o, g)]:
for c in blocks_by_groups[(o, h)]:
scores_g = np.dot(b.data, loadings[g])
scores_h = np.dot(c.data, loadings[h])
sign_contributions += np.dot(scores_g, scores_h)
if sign_contributions < 0:
loadings[g] = -loadings[g]
def estimate_weight(loadings, block):
result = data_matrices[block]
result = np.dot(result, loadings[block.variables])
result = np.dot(result, loadings[block.observations])
return result
steps = 0
step_change = float('inf')
while steps < min_outer_steps or (steps < max_outer_steps and step_change > tolerance):
steps += 1