Splitting approach analysis

As previously noted, the main output of the TUG test (i.e., completioin time) can be decomposed into several components (i.e., subphases). In the literature, there seem to be different approaches to this decomposition: some authors break down the subphases of the test into standing_up, first_walk, first_turn, second_walk, second_turn and sitting_down (Zakaria et al. 2015; Madhushri et al. 2016; Beyea et al. 2017; Coelln et al. 2019). Others consider the last two subphases into a single one combining the last turn and the sitting down activities into the turn_to_sit subphase (Salarian et al. 2010; Adame et al. 2012; Milosevic, Jovanov, and Milenković 2013; Ansai et al. 2019).

From the two splitting approaches used in the literature, we train several models with four different datasets obtained from the dataset described in Smartphone and smartwatch HAR dataset:

The ts datasets were generated in the previous chapter, while the tts datasets were generated using the 01_relabel.py script:

Code
"""Data relabelling script.

Relabels the windowed data by replacing the TURNING and SITTING_DOWN labels by the TURN_TO_SIT label. Note that
only the TURNING activities inmediately before the SITTING_DOWN activity are replaced by TURN_TO_SIT.

**Example**:

    $ python 01_relabel.py --input_data_path <PATH_OF_WINDOWED_DATA> --output_data_path <PATH_TO_STORE_RELABELLED_DATA>
"""


import argparse
import numpy as np
import os
import pandas as pd
import sys

sys.path.append("../../..")

from alive_progress import alive_bar
from libs.common.data_loading import load_subjects_data


def relabel(gt):
    relabelled_gt = {}
    for subject, data in gt.items():
        data_copy = np.copy(data)
        changes = np.where(np.roll(data,1) != data)[0]

        for i, change in enumerate(changes):
            if change == 0:
                continue
            if data_copy[change] == 'SITTING_DOWN':
                if i+1 != len(changes):
                    data_copy[changes[i-1]:changes[i+1]] = 'TURN_TO_SIT'
                else:
                    data_copy[changes[i-1]:] = 'TURN_TO_SIT'
        relabelled_gt[subject] = data_copy
    return relabelled_gt


def count_data(data_collection):        
    recount = {
        'sp': {
            'SEATED': 0,
            'STANDING_UP': 0,
            'WALKING': 0,
            'TURNING': 0,
            'TURN_TO_SIT': 0
        },
        'sw': {  
            'SEATED': 0,
            'STANDING_UP': 0,
            'WALKING': 0,
            'TURNING': 0,
            'TURN_TO_SIT': 0
        }
    }

    for source, data in data_collection.items(): 
        for subject, subject_data in data.items():    
            unique, counts = np.unique(subject_data, return_counts=True)
            value_counts = dict(zip(unique, counts))
            recount[source]['SEATED'] += value_counts['SEATED']
            recount[source]['STANDING_UP'] += value_counts['STANDING_UP']
            recount[source]['WALKING'] += value_counts['WALKING']
            recount[source]['TURNING'] += value_counts['TURNING']
            recount[source]['TURN_TO_SIT'] += value_counts['TURN_TO_SIT']
        
    df = pd.DataFrame(recount).transpose()
    df['TOTAL'] = df.sum(axis=1)
    return df.to_markdown()



def store_windowed_data(windowed_data, ground_truth, path):
    def store_as_npy(path, data):
        with open(path, 'wb') as f:
            np.save(f, np.array(data)) 
            
    with alive_bar(len(windowed_data), title=f'Storing windowed data in {path}', force_tty=True, monitor='[{percent:.0%}]') as progress_bar:
        for source, subjects_data in windowed_data.items():
            for subject, data in subjects_data.items():
                subject_path = os.path.join(path, subject)
                if not os.path.exists(subject_path):
                    os.makedirs(subject_path)

                store_as_npy(os.path.join(subject_path, f'{subject}_{source}.npy'), data)
                store_as_npy(os.path.join(subject_path, f'{subject}_{source}_gt.npy'), ground_truth[source][subject])
            progress_bar()  


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--input_data_path', help='Path of input windowed data', type=str, required=True)
    parser.add_argument('--output_data_path', help='Path to store the relabelled windowed data', type=str, required=True)
    args = parser.parse_args()
    
    sp_windowed_data, sp_gt = load_subjects_data(args.input_data_path, 'sp', True)
    sw_windowed_data, sw_gt = load_subjects_data(args.input_data_path, 'sw', True)

    sp_gt_relabelled = relabel(sp_gt)
    sw_gt_relabelled = relabel(sw_gt)

    windowed_data = {
        'sp': sp_windowed_data,
        'sw': sw_windowed_data
    }     

    gt = {
        'sp': sp_gt_relabelled,
        'sw': sw_gt_relabelled,
    }

    print(count_data(gt))

    store_windowed_data(windowed_data, gt, args.output_data_path)

Then, for each dataset, \(100\) models were trained using \(80\%\) of the subjects as training subjects and the remaining \(20\%\) as testing subjects. This proces was executed using the 02_splitting-evaluation.py script.

Code
"""Splitting approach evaluation script

This script trains 100 models for each data source (smartphone, smartwatch) and splitting approach. For the training process,
a 80/20 train/test split is employed with a batch size of 20 windows during 50 epochs.

**Example**:

    $ python 02_splitting-evaluation.py 
        --ts_data_path <PATH_OF_TURNING_SITTING_DATA> 
        --tts_data_path <PATH_OF_TURN_TO_SIT_DATA>
        --reports_output_path <PATH_TO_STORE_REPORTS>
"""


import argparse
import sys
sys.path.append("../../..")

from alive_progress import alive_bar
from libs.chapter4.pipeline.training import create_trainer
from libs.common.data_loading import load_data
from libs.common.data_grouping import generate_training_and_test_sets
from libs.common.ml import generate_report
from libs.common.utils import save_json, set_seed
from sklearn.model_selection import train_test_split


TURNING_AND_SITTING_MAPPING = {"SEATED": 0, "STANDING_UP": 1, "WALKING": 2, "TURNING": 3, "SITTING_DOWN": 4}
TURN_TO_SIT_MAPPING = {"SEATED": 0, "STANDING_UP": 1, "WALKING": 2, "TURNING": 3, "TURN_TO_SIT": 4}

BATCH_SIZE = 20
EPOCHS = 50

def training_report_from_datasets(datasets, models_per_dataset=100):
    set_seed()
    trainer = create_trainer(BATCH_SIZE, EPOCHS)
    reports = {}

    for dataset_id, (x, y) in datasets.items():
        reports[dataset_id] = []
        activity_names = TURNING_AND_SITTING_MAPPING.keys() if 'turning_and_sitting' in dataset_id else TURN_TO_SIT_MAPPING.keys()
        with alive_bar(models_per_dataset, title=f'Training models for dataset {dataset_id}', force_tty=True) as progress:
            for i in range(models_per_dataset):
                train_subjects, test_subjects = train_test_split(list(x.keys()), test_size=0.2)

                x_train, y_train, x_test, y_test = generate_training_and_test_sets(x, y, train_subjects, test_subjects)

                model = trainer(x_train, y_train, verbose=0)
                y_pred = model.predict(x_test)

                reports[dataset_id].append(generate_report(y_test, y_pred, activity_names))
                progress()

    return reports


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--ts_data_path', help='Path of data labelled with TURNING and SITTING_DOWN activities', type=str, required=True)
    parser.add_argument('--tts_data_path', help='Path of data labelled with TURN_TO_SIT activity', type=str, required=True)
    parser.add_argument('--reports_output_path', help='Path to store the generated reports', type=str, required=True)
    args = parser.parse_args()
    
    x_sp_ts, y_sp_ts = load_data(args.ts_data_path, 'sp', True, TURNING_AND_SITTING_MAPPING)
    x_sw_ts, y_sw_ts = load_data(args.ts_data_path, 'sw', True, TURNING_AND_SITTING_MAPPING)

    x_sp_tts, y_sp_tts = load_data(args.tts_data_path, 'sp', True, TURN_TO_SIT_MAPPING)
    x_sw_tts, y_sw_tts = load_data(args.tts_data_path, 'sw', True, TURN_TO_SIT_MAPPING)

    datasets = {
        'sw_turning_and_sitting': [x_sw_ts, y_sw_ts],
        'sp_turning_and_sitting': [x_sp_ts, y_sp_ts],
        'sw_turn_to_sit': [x_sw_tts, y_sw_tts],
        'sp_turn_to_sit': [x_sp_tts, y_sp_tts],
    }

    reports = training_report_from_datasets(datasets)
    save_json(reports, args.reports_output_path, 'reports.json')
    

Results

Code
from libs.chapter4.analysis.data_loading import load_reports
from libs.chapter4.analysis.statistical_tests import compare_splitting_approaches

reports = load_reports()

Table 9.1 compares the overall accuracy and F1-scores of TURNING and SITTING_DOWN from one side, and TURN_TO_SIT activities from the other side, obtained for each data source – smartwatch (sw) or smartphone (sp) – and splitting approach – turning and sitting down (ts) or turn_to_sit (tts) – from the trained models. The overall accuracy obtained with the models trained with the ts is statistically better than the ones trained with the tts datasets. Moreover, the F1-score of the TURNING activity is statistically worse in the tts datasets due to the reduced number of training samples for that activity compared with the ts datasets, caused by the fact that the TURN_TO_SIT activity includes the TURNING activity (which is one of the other activities to be individually detected). In addition, the F1-score of the TURN_TO_SIT activity is low compared with the scores of TURNING and SITTING_DOWN in the ts datasets.

Code
def get_accuracy_and_f1_scores(reports):
    results = {}
    for dataset_key, dataset_reports in reports.items():
        source = dataset_key.split('_')[0]
        if source not in results:
            results[source] = {}

        dataset_accuracies = []
        dataset_turning = []
        dataset_specific = []

        specific_act, specific_score, key = ('SITTING_DOWN', 'f1-sitting-down', 'ts') if 'turning_and_sitting' in dataset_key else ('TURN_TO_SIT', 'f1-turn_to_sit', 'tts')
            
        for dataset_report in dataset_reports:
            dataset_accuracies.append(dataset_report['accuracy'])
            dataset_turning.append(dataset_report['TURNING']['f1-score'])
            dataset_specific.append(dataset_report[specific_act]['f1-score'])

        results[source][key] = {
            'accuracy': dataset_accuracies,
            'f1-turning': dataset_turning,
            f'{specific_score}': dataset_specific
        }

    return results

results = get_accuracy_and_f1_scores(reports)
comparison = compare_splitting_approaches(results, ['accuracy', 'f1-turning', 'f1-sitting-down', 'f1-turn_to_sit'])
comparison
Table 9.1: Overall accuracy and F1-scores of TURNING, SITTING_DOWN and TURN_TO_SIT for each data source and splitting approach.
source metric turning_sitting turn_to_sit two-tailed test
0 sw accuracy 0.848 0.809 t(198)=13.459315696940894, p-val=0.0, power=1.0
1 sw f1-turning 0.795 0.565 t(198)=44.34041443328367, p-val=0.0, power=1.0
2 sw f1-sitting-down 0.804 - -
3 sw f1-turn_to_sit - 0.735 -
4 sp accuracy 0.857 0.789 U=9036.0, p-val=0.0, power=1
5 sp f1-turning 0.846 0.529 t(198)=57.06501597390656, p-val=0.0, power=1.0
6 sp f1-sitting-down 0.753 - -
7 sp f1-turn_to_sit - 0.655 -

Summary

From these results, we conclude that more accurate results are obtained when considering TURNING and SITTING_DOWN as separate activities, compared to combining them, since not only the overall accuracy of the prediction model is better, but also the predictability for the TURNING activity. Therefore, the first approach (separate activities) will be used in the implementation and evaluation of the system.

Code reference

Tip

The documentation of the Python functions employed in this section can be found in Chapter 4 reference: