Source code for transitionMatrix.utils.preprocessing

# encoding: utf-8

# (c) 2017-2026 Open Risk, all rights reserved
#
# TransitionMatrix is licensed under the Apache 2.0 license a copy of which is included
# in the source distribution of TransitionMatrix. This is notwithstanding any licenses of
# third-party software included in this distribution. You may not use this file except in
# compliance with the License.
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific language governing permissions and
# limitations under the License.

'''
module transitionMatrix.utils - helper classes and functions

'''

from __future__ import print_function, division

import numpy as np
import pandas as pd
import pprint as pp
from transitionMatrix.utils.converters import frame_to_array


[docs] def validate_absorbing_state(dataframe, state): """ Validate whether a given state is actually absorbing (there should be no transitions to another state) :param dataframe: an input data frame :param state: the state to validate :type state: int :return: a list of exceptions """ exceptions = [] entity_id, event_time, entity_state = frame_to_array(dataframe) for i in range(len(entity_id) - 1): if entity_id[i + 1] == entity_id[i] and entity_state[i] == state and entity_state[i + 1] != state: exceptions.append((entity_id[i], entity_state[i], entity_state[i + 1])) return exceptions
[docs] def transitions_summary(dataframe): """ Calculate some summary statistics about transitions :param dataframe: input dataframe :return: dict """ statistics = {} try: statistics['unique_entities'] = len(list(unique_entities(dataframe))) except Exception as e: statistics['unique_entities'] = 'Could not parse entities' try: statistics['unique_states'] = len(list(unique_states(dataframe))) except Exception as e: statistics['unique_states'] = 'Could not parse states' try: statistics['unique_timestamps'] = len(list(unique_timestamps(dataframe))) except: statistics['unique_timestamps'] = 'Could not parse timestamps' try: statistics['total_timestamps'] = total_timestamps(dataframe) except: statistics['total_timestamps'] = 'Could not parse timestamps' return statistics
[docs] def unique_entities(data): """ Identify unique entities in a dataframe :param data: dataframe. The 'ID' column is used by default :returns: returns a numpy array """ unique_entities = data['ID'].unique() return unique_entities
[docs] def unique_states(data): """ Identify unique states in a dataframe :param data: dataframe. The 'State' column is used by default for Compact formats, 'From' column as fallback for Canonical format :returns: returns a numpy array """ try: unique_states = data['State'].unique() except Exception as e: unique_states = data['From'].unique() return unique_states
[docs] def total_timestamps(data): """ Count total number of timestamps in a dataframe :param data: dataframe. The 'Time' column is used by default :returns: returns an integer """ total_timestamps = data['Time'].count() return total_timestamps
[docs] def unique_timestamps(data): """ Identify unique timestamps in a dataframe :param data: dataframe. The 'Time' column is used by default :returns: returns a sorted numpy array """ unique_timestamps = sorted(data['Time'].unique()) return unique_timestamps
[docs] def generate_cohort_bounds(data, cohorts): """Generate cohort intervals given an input transition dataframe and the desired number of cohorts. The function finds the range of timestamps and divides it equally :param data: a pandas dataframe :param cohorts: the number of cohorts :type cohorts: int :return: cohort_bounds :return: dt .. warning:: the Time column must be in float format """ # Find the temporal range of observed event times t_min = data['Time'].min() t_max = data['Time'].max() # Divide the temporal range into equal intervals (dt) dt = (t_max - t_min) / cohorts # Capture the degenerate case that t_max = t_min if not dt > 0: dt = 1.0 cohort_bounds = [t_min + dt * i for i in range(0, cohorts + 1)] return dt, cohort_bounds
[docs] def generate_event_dict(data, dt, cohort_bounds): """ Loop over all events and construct a dictionary in the following format: .. code:: event_dict = { (entity_id, cohort interval) : [(time, state), ..., (time, state)] (entity_id, cohort interval) : (time, state), ..., (time, state)] } * Create a unique key as per (entity, interval) * Find the interval of each event (the cohort it belongs it) * Add (time, state) pairs as variable length list This data structure allows applying arbitrary state assignment to each cohort interval :param data: a pandas dataframe :param dt: the cohort interval :param cohort_bounds: the boundaries of the cohort intervals :return: dict """ initial_time = cohort_bounds[0] event_dict = {} for row in data.itertuples(): entity_id = row.ID event_time = row.Time entity_state = row.State # insert initial state observation events # ATTN equality check if event_time == initial_time: event_key = (entity_id, 0) event_dict[event_key] = [(event_time, entity_state)] else: # Find the interval of the event (the cohort it belongs it) c = 0 for i in range(len(cohort_bounds)): if event_time > cohort_bounds[i]: c = i event_key = (entity_id, c + 1) if event_key in event_dict.keys(): # append observation if (i, c) key exists event_dict[event_key].append((event_time, entity_state)) else: # create new key if (i, c) key does not exist event_dict[event_key] = [(event_time, entity_state)] return event_dict
[docs] def remove_stale_events(data): """ Parse an event dictionary and remove transitions to the same state: .. code:: event_dict = { (entity_id, cohort interval) : [(time, state), ..., (time, state)] (entity_id, cohort interval) : (time, state), ..., (time, state)] } :param data: a pandas dataframe :return: dict """ event_dict = {} for event_key in data.keys(): # Pull the list of all events for this entity in this time event_list = data[event_key] new_event_list = [] # Iterate over all events and only keep those leading to changed state for i in range(len(event_list) - 1): if event_list[i][1] != event_list[i + 1][1]: new_event_list.append(event_list[i]) # Last event added by default i = len(event_list) - 1 new_event_list.append(event_list[i]) event_dict[event_key] = new_event_list return event_dict
[docs] def bin_timestamps(sorted_data, cohorts, output_format=0, remove_stale=False): """ Bin timestamped data in a dataframe so as to have ingoing and outgoing states per cohort interval :param data: the dataframe to cohort :param cohorts: the number of cohorts :param output_format: how to structure the outputs (0=cohorts, 1=event_list) :param remove_stale: whether to remove successive observations with identical state :type data: pandas dataframe :type dimension: int :type output_format: int :type remove_stale: bool :returns: returns dataframe with cohorted data and cohort intervals .. note:: The 'ID' and 'Time' column labels are used by default. .. warning:: Cohorting is a 'lossy' operation: Timestamps are discretised (binned) and any intermediate state transitions are lost. .. warning:: The data must be sorted already """ # STEP 1 # Construct regular intervals on the basis of minimum / maximum observation times # dt, cohort_bounds = generate_cohort_bounds(sorted_data, cohorts) # print(80 * '=') # print(cohort_bounds) # print(80 * '=') # TODO Optionally construct intervals by hand # dt = 1.0 # cohort_bounds = [0.0, 1.0] # Identify unique entities in the frame unique_ids = sorted_data['ID'].unique() # Array storage for processed data cohort_assigned_state = np.empty((len(unique_ids), len(cohort_bounds)), str) cohort_assigned_state.fill(np.nan) cohort_event = np.empty((len(unique_ids), len(cohort_bounds))) cohort_event.fill(np.nan) cohort_count = np.empty((len(unique_ids), len(cohort_bounds))) cohort_count.fill(np.nan) # STEP 2 # Generate the full event dictionary # event_dict = generate_event_dict(sorted_data, dt, cohort_bounds) # pp.pprint(event_dict) if remove_stale: event_dict = remove_stale_events(event_dict) # STEP 3 # Loop over all possible entity / cohort interval pairs # assign a state to the cohort interval # compute counts of events within cohort inteval # ATTN we loop over all possibilities, not all actual realizations. # We use integer indexes for entities / cohort intervals for id in unique_ids: for time in range(len(cohort_bounds)): # Construct the event key entity_id = list(unique_ids).index(id) event_key = (id, time) # Intervals are associated with bounds starting at 0 # 0 Interval between 0 and 1 Timestep # 1 Interval between 1 and 2 Timestep etc. interval = time # Case A: We have events for the entity in this time # (If the key exists there should be at least one event for this entity in this time) if event_key in event_dict.keys(): # Pull the list of all events for this entity in this time event_list = event_dict[(id, interval)] # Assign state to cohort # Here we are using the LAST observation in interval # TODO Generalize to user specified function (first observation, average state in interval etc) if time == 0: # The first time point is a special (initial state) cohort_assigned_state[entity_id, time] = event_list[0][1] # the initial state cohort_event[entity_id, time] = event_list[0][0] # the actual time of the initial state cohort_count[entity_id, time] = 1 # by default only one count at initial state # # # Assign to the first cohort interval the last observed state # cohort_assigned_state[entity_id, time + 1] = event_list[len(event_list) - 1][1] # # Pick time of last event (informative) # cohort_event[entity_id, time + 1] = event_list[len(event_list) - 1][0] # # Add the count of events for that entity in the cohort (informative) # cohort_count[entity_id, time + 1] = int(len(event_list)) # print('A00', event_key, time, cohort_event[entity_id, time]) # print('A01', event_key, time, cohort_event[entity_id, time + 1]) else: # Assign to the cohort interval the last observed state cohort_assigned_state[entity_id, time] = event_list[len(event_list) - 1][1] # Pick time of last event (informative) cohort_event[entity_id, time] = event_list[len(event_list) - 1][0] # Add the count of events for that entity in the cohort (informative) cohort_count[entity_id, time] = int(len(event_list)) # print('AXX', event_key, time, cohort_assigned_state[entity_id, time]) # Case B: We don't have events for the entity in this interval and it is the first interval # If we don't have observation for an entity in the first interval we assign NaN state elif event_key not in event_dict.keys() and time == 0: cohort_assigned_state[entity_id, time] = np.nan cohort_event[entity_id, time] = np.nan cohort_count[entity_id, time] = np.nan # print('BXX', event_key, time, cohort_count[entity_id, time]) # Case C: We don't have events for the entity in this interval but maybe we have in the previous one # if there are no events and the previous state is available assign the last known state, else NaN elif event_key not in event_dict.keys() and time > 0: if cohort_assigned_state[entity_id, time - 1]: cohort_assigned_state[entity_id, time] = cohort_assigned_state[entity_id, time - 1] cohort_event[entity_id, time] = cohort_event[entity_id, time - 1] cohort_count[entity_id, time] = cohort_count[entity_id, time - 1] else: cohort_assigned_state[entity_id, time] = np.nan cohort_event[entity_id, time] = np.nan cohort_count[entity_id, time] = np.nan # print('CXX', event_key, time, cohort_count[entity_id, time]) # Convert to pandas dataframe cohort_data = [] for i in range(len(unique_ids)): for c in range(len(cohort_bounds)): cohort_data.append((unique_ids[i], c, cohort_assigned_state[i][c], cohort_event[i][c], cohort_count[i][c])) # The time index spans the cohort intervals (bounds - 1) # The measurement time point is the cohort_data = pd.DataFrame(cohort_data, columns=['ID', 'Time', 'State', 'EventTime', 'Count']) if output_format == 0: return cohort_data, cohort_bounds elif output_format == 1: return event_dict, cohort_bounds