import typing import pandas from dataclasses import dataclass from math import floor from datetime import datetime def timestamp_range_seconds(start: pandas.Timestamp, end: pandas.Timestamp) -> typing.Iterator[pandas.Timestamp]: assert end >= start start = int(floor(start.timestamp())) end = int(floor(end.timestamp())) for second in range(start, end): yield pandas.Timestamp(second, unit='s') def _add_accumulated_score(df: pandas.DataFrame): acc_col = pandas.Series([0.0]).repeat(len(df)).reset_index(drop=True) acc = 0.0 for i, row in enumerate(df.itertuples()): acc += row.score acc_col[i] = acc df['accumulated_score'] = acc_col def load_score_log(path: str) -> pandas.DataFrame: return pandas.read_csv(path, sep=',', dtype={'score': int, 'sourcename': str, 'name': str, 'mapx': int, 'mapy': int}, parse_dates=['when'], date_format='%d/%m/%Y %H:%M') def get_score_per(sourcename: str) -> float | None: if sourcename == 'Capture': return 1.0 elif sourcename == 'Output Boost': return 0.1 else: return None def extend_score_log(scores: pandas.DataFrame): scores.sort_values('when', inplace=True) _add_accumulated_score(scores) def _calc_duration(row) -> int: score_per = get_score_per(row['sourcename']) if score_per is None: return 1 return int(floor(row['score'] / score_per)) def _calc_event_start(row) -> pandas.Timestamp: return pandas.Timestamp(row['when'].timestamp() - row['seconds'], unit='s') scores['seconds'] = scores.apply(_calc_duration, axis=1) scores['when_start'] = scores.apply(_calc_event_start, axis=1) def generate_station_stats(score_log: pandas.DataFrame) -> pandas.DataFrame: station_count = len(score_log['name'].unique()) # every station in the score log should have a first visit, so create summary based on that summary = score_log[score_log['sourcename'] == 'First Visit'][['name', 'mapx', 'mapy', 'when']] summary.rename(columns={'when': 'first_visit'}, inplace=True) assert len(summary) == station_count common_join_args = {'on': 'name', 'how': 'left', 'validate': '1:1'} # add total score summary = pandas.merge(summary, score_log[['name', 'score']].groupby('name').sum(), **common_join_args) summary.rename(columns={'score': 'total_score'}, inplace=True) assert len(summary) == station_count boosts = score_log[score_log['sourcename'] == 'Output Boost'][['name', 'score']].groupby('name') # add total boosts total_boosts = boosts.sum() total_boosts['totalboostduration'] = total_boosts['score'].apply(lambda x: 10 * x) total_boosts.rename(columns={'score': 'totalboostscore'}, inplace=True) summary = pandas.merge(summary, total_boosts, **common_join_args) # add max boosts max_boosts = boosts.max() max_boosts['maxboostduration'] = max_boosts['score'].apply(lambda x: 10 * x) max_boosts.rename(columns={'score': 'maxboostscore'}, inplace=True) summary = pandas.merge(summary, max_boosts, **common_join_args) visits = score_log[(score_log['sourcename'] == 'Visit') | (score_log['sourcename'] == 'First Visit')][ ['name', 'score']].groupby('name') # add total visits (count) summary = pandas.merge(summary, visits.count(), **common_join_args) summary.rename(columns={'score': 'totalvisits'}, inplace=True) captures = score_log[score_log['sourcename'] == 'Capture'][['name', 'score']].groupby('name') # add captures (count) summary = pandas.merge(summary, captures.count(), **common_join_args) summary.rename(columns={'score': 'captures'}, inplace=True) # add max held duration (max capture score) summary = pandas.merge(summary, captures.max(), **common_join_args) summary.rename(columns={'score': 'maxheldduration'}, inplace=True) # add total held duration (sum capture score) summary = pandas.merge(summary, captures.sum(), **common_join_args) summary.rename(columns={'score': 'totalheldduration'}, inplace=True) assert len(summary) == station_count return summary def generate_score_per_second(score_log: pandas.DataFrame) -> pandas.DataFrame: @dataclass class ScoreSecond: name: str sourcename: str when: datetime score: float once: bool event_start: bool mapx: int mapy: int def gen_scoreseconds() -> typing.Iterator[ScoreSecond]: for row in score_log.itertuples(): # TODO: the code below should work with 0s now if row.seconds < 2: # one-off yield ScoreSecond(name=row.name, sourcename=row.sourcename, mapx=row.mapx, mapy=row.mapy, when=row.when, score=row.score, once=True, event_start=True) continue once = row.seconds == 1 score_per = get_score_per(row.sourcename) event_start = True for when in timestamp_range_seconds(row.when_start, row.when): yield ScoreSecond(when=when, once=once, event_start=event_start, name=row.name, sourcename=row.sourcename, mapx=row.mapx, mapy=row.mapy, score=score_per, ) event_start = False scoreseconds = pandas.DataFrame(gen_scoreseconds()) scoreseconds.sort_values(by=['when'], inplace=True) scoreseconds.reset_index(drop=True, inplace=True) _add_accumulated_score(scoreseconds) return scoreseconds def get_known_player_locations(score_log): locations = score_log[score_log['mapx'] != 0][['name', 'when_start', 'mapx', 'mapy']].copy() locations.rename(columns={'when_start': 'when'}, inplace=True) locations.sort_values(by=['when'], inplace=True) locations.reset_index(drop=True, inplace=True) return locations def interpolate_player_locations(locations: pandas.DataFrame) -> pandas.DataFrame: from dataclasses import dataclass IGNORED_GAP_SECONDS = 60 * 60 * 1 @dataclass class LocationSecond: when: pandas.Timestamp mapx: int mapy: int def interpolate_locations(): skipped = False for pair in locations.rolling(window=2, closed='right'): if not skipped: skipped = True continue left = pair.iloc[0] right = pair.iloc[1] start = left['when'] end = right['when'] seconds = end.timestamp() - start.timestamp() if seconds > IGNORED_GAP_SECONDS: end = start seconds = 1 x = left['mapx'] y = left['mapy'] if seconds < 0.1: x_increment = 0 y_increment = 0 else: x_increment = (right['mapx'] - left['mapx']) / seconds y_increment = (right['mapy'] - left['mapy']) / seconds for elapsed, timestamp in enumerate(timestamp_range_seconds(start, end)): yield LocationSecond(when=timestamp, mapx=x, mapy=y) x += x_increment y += y_increment return pandas.DataFrame(interpolate_locations())