why2025-polygen-stats/foo.py
Vinzenz Schroeter 79fed14337 extend score log
2025-08-26 19:28:18 +02:00

142 lines
5.2 KiB
Python

import typing
import pandas
from dataclasses import dataclass
from math import floor
from datetime import datetime
def _add_accumulated_score(df: pandas.DataFrame):
acc_col = pandas.Series([0.0]).repeat(len(df)).reset_index(drop=True)
acc = 0.0
for i, row in enumerate(df.itertuples()):
acc += row.score
acc_col[i] = acc
df['accumulated_score'] = acc_col
def load_score_log(path: str) -> pandas.DataFrame:
return pandas.read_csv(path, sep=',',
dtype={'score': int, 'sourcename': str, 'name': str, 'mapx': int, 'mapy': int},
parse_dates=['when'], date_format='%d/%m/%Y %H:%M')
def _calc_duration(row) -> int:
score_per = None
if row['sourcename'] == 'Capture':
score_per = 1.0
elif row['sourcename'] == 'Output Boost':
score_per = 0.1
else:
return 0
return int(floor(row['score'] / score_per))
def _calc_event_start(row) -> pandas.Timestamp:
return pandas.Timestamp(row['when'].timestamp() - row['seconds'], unit='s')
def extend_score_log(scores: pandas.DataFrame):
scores.sort_values('when', inplace=True)
_add_accumulated_score(scores)
scores['seconds'] = scores.apply(_calc_duration, axis=1)
scores['when_start'] = scores.apply(_calc_event_start, axis=1)
def generate_station_stats(score_log: pandas.DataFrame) -> pandas.DataFrame:
station_count = len(score_log['name'].unique())
# every station in the score log should have a first visit, so create summary based on that
summary = score_log[score_log['sourcename'] == 'First Visit'][['name', 'mapx', 'mapy', 'when']]
summary.rename(columns={'when': 'first_visit'}, inplace=True)
assert len(summary) == station_count
common_join_args = {'on': 'name', 'how': 'left', 'validate': '1:1'}
# add total score
summary = pandas.merge(summary, score_log[['name', 'score']].groupby('name').sum(), **common_join_args)
summary.rename(columns={'score': 'total_score'}, inplace=True)
assert len(summary) == station_count
boosts = score_log[score_log['sourcename'] == 'Output Boost'][['name', 'score']].groupby('name')
# add total boosts
total_boosts = boosts.sum()
total_boosts['totalboostduration'] = total_boosts['score'].apply(lambda x: 10 * x)
total_boosts.rename(columns={'score': 'totalboostscore'}, inplace=True)
summary = pandas.merge(summary, total_boosts, **common_join_args)
assert len(summary) == station_count
# add max boosts
max_boosts = boosts.max()
max_boosts['maxboostduration'] = max_boosts['score'].apply(lambda x: 10 * x)
max_boosts.rename(columns={'score': 'maxboostscore'}, inplace=True)
summary = pandas.merge(summary, max_boosts, **common_join_args)
assert len(summary) == station_count
visits = score_log[(score_log['sourcename'] == 'Visit') | (score_log['sourcename'] == 'First Visit')][
['name', 'score']].groupby('name')
# add total visits (count)
summary = pandas.merge(summary, visits.count(), **common_join_args)
summary.rename(columns={'score': 'totalvisits'}, inplace=True)
assert len(summary) == station_count
captures = score_log[score_log['sourcename'] == 'Capture'][['name', 'score']].groupby('name')
# add captures (count)
summary = pandas.merge(summary, captures.count(), **common_join_args)
summary.rename(columns={'score': 'captures'}, inplace=True)
assert len(summary) == station_count
# add max held duration (max capture score)
summary = pandas.merge(summary, captures.max(), **common_join_args)
summary.rename(columns={'score': 'maxheldduration'}, inplace=True)
assert len(summary) == station_count
# add total held duration (sum capture score)
summary = pandas.merge(summary, captures.sum(), **common_join_args)
summary.rename(columns={'score': 'totalheldduration'}, inplace=True)
assert len(summary) == station_count
return summary
def generate_score_per_second(score_log: pandas.DataFrame) -> pandas.DataFrame:
@dataclass
class ScoreSecond:
name: str
sourcename: str
when: datetime
score: float
once: bool
event_start: bool
mapx: int
mapy: int
def row_to_scoreseconds(row) -> typing.Iterator[ScoreSecond]:
score_per = row.score / row.seconds
for elapsed in range(0, row.seconds):
timestamp = pandas.Timestamp(row.when_start.timestamp() + elapsed, unit='s')
yield ScoreSecond(name=row.name, sourcename=row.sourcename, mapx=row.mapx, mapy=row.mapy, when=timestamp,
score=score_per, once=False, event_start=(elapsed == 0))
def gen_scoreseconds() -> typing.Iterator[ScoreSecond]:
for row in score_log.itertuples():
if row.seconds == 0: # one-off
yield ScoreSecond(name=row.name, sourcename=row.sourcename, mapx=row.mapx, mapy=row.mapy, when=row.when,
score=row.score, once=True, event_start=True)
else:
yield from row_to_scoreseconds(row)
scoreseconds = pandas.DataFrame(gen_scoreseconds())
scoreseconds.sort_values(by=['when'], inplace=True)
scoreseconds.reset_index(drop=True, inplace=True)
_add_accumulated_score(scoreseconds)
return scoreseconds