Source code for transitionMatrix.estimators.cohort_estimator

# -*- coding: utf-8 -*-

# (c) 2017-2026 Open Risk, all rights reserved
#
# TransitionMatrix is licensed under the Apache 2.0 license a copy of which is included
# in the source distribution of TransitionMatrix. This is notwithstanding any licenses of
# third-party software included in this distribution. You may not use this file except in
# compliance with the License.
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import numpy as np
import statsmodels.stats.proportion as st

import transitionMatrix
from transitionMatrix.estimators import BaseEstimator


[docs] class CohortEstimator(BaseEstimator): """ Class for implementing a Cohort Estimator for the transition matrix Documentation: `Cohort Estimator <https://www.openriskmanual.org/wiki/Cohort_Estimator>`_ """ def __init__(self, cohort_bounds=None, states=None, ci=None): BaseEstimator.__init__(self) # if not (0 < alpha <= 1.): # raise ValueError('alpha parameter must be between 0 and 1.') self.cohort_bounds = cohort_bounds if states is not None: self.states = states if ci is not None: assert (ci['method'] in ['goodman', 'sison-glaz', 'binomial']) assert (0 < ci['alpha'] <= 1.0) self.ci_method = ci['method'] self.ci_alpha = ci['alpha']
[docs] def get_average(self): return self.average_matrix
[docs] def fit(self, data, labels=None): """ Parameters ---------- data : dataframe - The data to use for the estimation (in sorted by ID in compact format) labels: an optional dictionary for relabeling column names Returns ------- matrix_set : An estimated transition matrix set Notes ------ * loop over data rows (id, timepoint, state) * at least two distinct timepoints are required (initial and final) * calculate population count N^i_k per state i per timepoint k * calculate migrations count N^{ij}_{kl} from i to j from timepoint k to timepoint l * calculate transition matrix as ratio T^{ij}_{kl} = N^{ij}_{kl} / N^i_k * calculate also count-averaged matrix References ---------- """ # Allow for flexible labelling for dataframe columns if labels is not None: state_label = labels['State'] timestep_label = labels['Time'] id_label = labels['ID'] else: state_label = 'State' id_label = 'ID' timestep_label = 'Time' # Old way of enumerating cohort intervals was using labels # cohort_labels = data[timestep_label].unique() # cohort_dim = len(cohort_labels) - 1 # The size of the state space state_dim = self.states.cardinality # The number of cohorts is the number of intervals # Minimally two (initial and final) cohort_dim = len(self.cohort_bounds) - 1 event_count = data[id_label].count() # store data in 1d arrays for faster processing # capture nan events for missing observations event_exists = np.empty(event_count, int) entity_id = np.empty(event_count, int) entity_state = np.empty(event_count, int) event_time = np.empty(event_count, int) nan_count = 0 i = 0 for index, row in data.iterrows(): entity_id[i] = row[id_label] try: entity_state[i] = row[state_label] event_time[i] = row[timestep_label] event_exists[i] = 1 # indicates a valid (complete) data row except ValueError: entity_state[i] = -99999 event_time[i] = -99999 event_exists[i] = 0 nan_count += 1 i += 1 self.nans = nan_count # store number of entities observed in given state per time step tm_count = np.ndarray((state_dim, cohort_dim + 1), int) # store number of entities observed to transition from state (From) to state (To) per period tmn_count = np.ndarray((state_dim, state_dim, cohort_dim), int) # store normalized frequencies tmn_values = np.ndarray((state_dim, state_dim, cohort_dim), float) # matrix to store average transitions tmn_average = np.ndarray((state_dim, state_dim), float) # initialize to zero (TODO ?) tm_count.fill(0) tmn_count.fill(0) tmn_values.fill(0) tmn_average.fill(0) # TODO Capture case if entity with only one observation (hence no transition count) # TODO Capture case with stale observations (no transitions) for i in range(0, event_count - 1): # the last point handled separately if event_exists[i] == 1: # while processing valid event data from same entity # increment state count tm_count[(entity_state[i], event_time[i])] += 1 if entity_id[i + 1] == entity_id[i]: # increment migration count if there is subsequent observation # NB: It does not have to be different tmn_count[(entity_state[i], entity_state[i + 1], event_time[i])] += 1 # handle boundary cases # the last event must be evaluated in comparison with its previous one i = event_count - 1 if event_exists[i] == 1: # ATTN we must shift the time index of the tm_count, tmn_count tm_count[(entity_state[i], event_time[i] - 1)] += 1 if entity_id[i] == entity_id[i - 1]: tmn_count[(entity_state[i - 1], entity_state[i], event_time[i] - 1)] += 1 # print(tm_count) # print(tm_count.sum()) # print(tmn_count[:, :, 0]) self.counts = int(tm_count.sum()) # Normalization of counts to produce a family of probability matrices for s1 in range(state_dim): for s2 in range(state_dim): for k in range(cohort_dim): if tm_count[(s1, k)] > 0: tmn_values[(s1, s2, k)] = tmn_count[(s1, s2, k)] / tm_count[(s1, k)] # for k in range(cohort_dim): # m = transitionMatrix.TransitionMatrix(tmn_values[:, :, k]) # m.print_matrix(accuracy=3) # Average transition matrix (assuming temporal homogeneity) for s1 in range(state_dim): for s2 in range(state_dim): tm_total_count = 0 for k in range(cohort_dim): tmn_average[(s1, s2)] += tmn_count[(s1, s2, k)] tm_total_count += tm_count[(s1, k)] if tm_total_count > 0: tmn_average[(s1, s2)] /= tm_total_count self.average_matrix = tmn_average # Confidence Interval Estimation (Based on Counts) confint_lower = np.ndarray((state_dim, state_dim, cohort_dim)) confint_upper = np.ndarray((state_dim, state_dim, cohort_dim)) for k in range(cohort_dim - 1): for s1 in range(state_dim): intervals = st.multinomial_proportions_confint(tmn_count[s1, :, k], alpha=self.ci_alpha, method=self.ci_method) for s2 in range(state_dim): confint_lower[s1, s2, k] = intervals[s2][0] confint_upper[s1, s2, k] = intervals[s2][1] self.confint_lower = confint_lower self.confint_upper = confint_upper # Return a list of transition matrices # Both absolute (frequency) and relative (probability) format for k in range(cohort_dim): self.matrix_set.append(tmn_values[:, :, k]) self.count_set.append(tmn_count[:, :, k]) # Return absolute counts at time points for k in range(cohort_dim + 1): self.count_normalization.append(tm_count[:, k]) # print(self.count_normalization) return self.matrix_set