why2025-polygen-stats/foo.py
2025-08-27 22:04:01 +02:00

199 lines
7.2 KiB
Python

import typing
import pandas
from dataclasses import dataclass
from math import floor
from datetime import datetime
def timestamp_range_seconds(start: pandas.Timestamp, end: pandas.Timestamp) -> typing.Iterator[pandas.Timestamp]:
assert end >= start
start = int(floor(start.timestamp()))
end = int(floor(end.timestamp()))
for second in range(start, end):
yield pandas.Timestamp(second, unit='s')
def _add_accumulated_score(df: pandas.DataFrame):
acc_col = pandas.Series([0.0]).repeat(len(df)).reset_index(drop=True)
acc = 0.0
for i, row in enumerate(df.itertuples()):
acc += row.score
acc_col[i] = acc
df['accumulated_score'] = acc_col
def load_score_log(path: str) -> pandas.DataFrame:
return pandas.read_csv(path, sep=',',
dtype={'score': int, 'sourcename': str, 'name': str, 'mapx': int, 'mapy': int},
parse_dates=['when'], date_format='%d/%m/%Y %H:%M')
def get_score_per(sourcename: str) -> float | None:
if sourcename == 'Capture':
return 1.0
elif sourcename == 'Output Boost':
return 0.1
else:
return None
def extend_score_log(scores: pandas.DataFrame):
scores.sort_values('when', inplace=True)
_add_accumulated_score(scores)
def _calc_duration(row) -> int:
score_per = get_score_per(row['sourcename'])
if score_per is None:
return 1
return int(floor(row['score'] / score_per))
def _calc_event_start(row) -> pandas.Timestamp:
return pandas.Timestamp(row['when'].timestamp() - row['seconds'], unit='s')
scores['seconds'] = scores.apply(_calc_duration, axis=1)
scores['when_start'] = scores.apply(_calc_event_start, axis=1)
def generate_station_stats(score_log: pandas.DataFrame) -> pandas.DataFrame:
station_count = len(score_log['name'].unique())
# every station in the score log should have a first visit, so create summary based on that
summary = score_log[score_log['sourcename'] == 'First Visit'][['name', 'mapx', 'mapy', 'when']]
summary.rename(columns={'when': 'first_visit'}, inplace=True)
assert len(summary) == station_count
common_join_args = {'on': 'name', 'how': 'left', 'validate': '1:1'}
# add total score
summary = pandas.merge(summary, score_log[['name', 'score']].groupby('name').sum(), **common_join_args)
summary.rename(columns={'score': 'total_score'}, inplace=True)
assert len(summary) == station_count
boosts = score_log[score_log['sourcename'] == 'Output Boost'][['name', 'score']].groupby('name')
# add total boosts
total_boosts = boosts.sum()
total_boosts['totalboostduration'] = total_boosts['score'].apply(lambda x: 10 * x)
total_boosts.rename(columns={'score': 'totalboostscore'}, inplace=True)
summary = pandas.merge(summary, total_boosts, **common_join_args)
# add max boosts
max_boosts = boosts.max()
max_boosts['maxboostduration'] = max_boosts['score'].apply(lambda x: 10 * x)
max_boosts.rename(columns={'score': 'maxboostscore'}, inplace=True)
summary = pandas.merge(summary, max_boosts, **common_join_args)
visits = score_log[(score_log['sourcename'] == 'Visit') | (score_log['sourcename'] == 'First Visit')][
['name', 'score']].groupby('name')
# add total visits (count)
summary = pandas.merge(summary, visits.count(), **common_join_args)
summary.rename(columns={'score': 'totalvisits'}, inplace=True)
captures = score_log[score_log['sourcename'] == 'Capture'][['name', 'score']].groupby('name')
# add captures (count)
summary = pandas.merge(summary, captures.count(), **common_join_args)
summary.rename(columns={'score': 'captures'}, inplace=True)
# add max held duration (max capture score)
summary = pandas.merge(summary, captures.max(), **common_join_args)
summary.rename(columns={'score': 'maxheldduration'}, inplace=True)
# add total held duration (sum capture score)
summary = pandas.merge(summary, captures.sum(), **common_join_args)
summary.rename(columns={'score': 'totalheldduration'}, inplace=True)
assert len(summary) == station_count
return summary
def generate_score_per_second(score_log: pandas.DataFrame) -> pandas.DataFrame:
@dataclass
class ScoreSecond:
name: str
sourcename: str
when: datetime
score: float
once: bool
event_start: bool
mapx: int
mapy: int
def gen_scoreseconds() -> typing.Iterator[ScoreSecond]:
for row in score_log.itertuples():
# TODO: the code below should work with 0s now
if row.seconds < 2: # one-off
yield ScoreSecond(name=row.name, sourcename=row.sourcename, mapx=row.mapx, mapy=row.mapy, when=row.when,
score=row.score, once=True, event_start=True)
continue
once = row.seconds == 1
score_per = get_score_per(row.sourcename)
event_start = True
for when in timestamp_range_seconds(row.when_start, row.when):
yield ScoreSecond(when=when, once=once, event_start=event_start,
name=row.name, sourcename=row.sourcename, mapx=row.mapx, mapy=row.mapy,
score=score_per, )
event_start = False
scoreseconds = pandas.DataFrame(gen_scoreseconds())
scoreseconds.sort_values(by=['when'], inplace=True)
scoreseconds.reset_index(drop=True, inplace=True)
_add_accumulated_score(scoreseconds)
return scoreseconds
def get_known_player_locations(score_log):
locations = score_log[score_log['mapx'] != 0][['name', 'when_start', 'mapx', 'mapy']].copy()
locations.rename(columns={'when_start': 'when'}, inplace=True)
locations.sort_values(by=['when'], inplace=True)
locations.reset_index(drop=True, inplace=True)
return locations
def interpolate_player_locations(locations: pandas.DataFrame) -> pandas.DataFrame:
from dataclasses import dataclass
IGNORED_GAP_SECONDS = 60 * 60 * 1
@dataclass
class LocationSecond:
when: pandas.Timestamp
mapx: int
mapy: int
def interpolate_locations():
skipped = False
for pair in locations.rolling(window=2, closed='right'):
if not skipped:
skipped = True
continue
left = pair.iloc[0]
right = pair.iloc[1]
start = left['when']
end = right['when']
seconds = end.timestamp() - start.timestamp()
if seconds > IGNORED_GAP_SECONDS:
end = start
seconds = 1
x = left['mapx']
y = left['mapy']
if seconds < 0.1:
x_increment = 0
y_increment = 0
else:
x_increment = (right['mapx'] - left['mapx']) / seconds
y_increment = (right['mapy'] - left['mapy']) / seconds
for elapsed, timestamp in enumerate(timestamp_range_seconds(start, end)):
yield LocationSecond(when=timestamp, mapx=x, mapy=y)
x += x_increment
y += y_increment
return pandas.DataFrame(interpolate_locations())