199 lines
7.2 KiB
Python
199 lines
7.2 KiB
Python
import typing
|
|
|
|
import pandas
|
|
from dataclasses import dataclass
|
|
from math import floor
|
|
from datetime import datetime
|
|
|
|
|
|
def timestamp_range_seconds(start: pandas.Timestamp, end: pandas.Timestamp) -> typing.Iterator[pandas.Timestamp]:
|
|
assert end >= start
|
|
start = int(floor(start.timestamp()))
|
|
end = int(floor(end.timestamp()))
|
|
for second in range(start, end):
|
|
yield pandas.Timestamp(second, unit='s')
|
|
|
|
def _add_accumulated_score(df: pandas.DataFrame):
|
|
acc_col = pandas.Series([0.0]).repeat(len(df)).reset_index(drop=True)
|
|
|
|
acc = 0.0
|
|
for i, row in enumerate(df.itertuples()):
|
|
acc += row.score
|
|
acc_col[i] = acc
|
|
|
|
df['accumulated_score'] = acc_col
|
|
|
|
def load_score_log(path: str) -> pandas.DataFrame:
|
|
return pandas.read_csv(path, sep=',',
|
|
dtype={'score': int, 'sourcename': str, 'name': str, 'mapx': int, 'mapy': int},
|
|
parse_dates=['when'], date_format='%d/%m/%Y %H:%M')
|
|
|
|
def get_score_per(sourcename: str) -> float | None:
|
|
if sourcename == 'Capture':
|
|
return 1.0
|
|
elif sourcename == 'Output Boost':
|
|
return 0.1
|
|
else:
|
|
return None
|
|
|
|
def extend_score_log(scores: pandas.DataFrame):
|
|
scores.sort_values('when', inplace=True)
|
|
_add_accumulated_score(scores)
|
|
|
|
def _calc_duration(row) -> int:
|
|
score_per = get_score_per(row['sourcename'])
|
|
if score_per is None:
|
|
return 1
|
|
return int(floor(row['score'] / score_per))
|
|
|
|
def _calc_event_start(row) -> pandas.Timestamp:
|
|
return pandas.Timestamp(row['when'].timestamp() - row['seconds'], unit='s')
|
|
|
|
scores['seconds'] = scores.apply(_calc_duration, axis=1)
|
|
scores['when_start'] = scores.apply(_calc_event_start, axis=1)
|
|
|
|
|
|
def generate_station_stats(score_log: pandas.DataFrame) -> pandas.DataFrame:
|
|
station_count = len(score_log['name'].unique())
|
|
|
|
# every station in the score log should have a first visit, so create summary based on that
|
|
summary = score_log[score_log['sourcename'] == 'First Visit'][['name', 'mapx', 'mapy', 'when']]
|
|
summary.rename(columns={'when': 'first_visit'}, inplace=True)
|
|
|
|
assert len(summary) == station_count
|
|
|
|
common_join_args = {'on': 'name', 'how': 'left', 'validate': '1:1'}
|
|
|
|
# add total score
|
|
summary = pandas.merge(summary, score_log[['name', 'score']].groupby('name').sum(), **common_join_args)
|
|
summary.rename(columns={'score': 'total_score'}, inplace=True)
|
|
assert len(summary) == station_count
|
|
|
|
boosts = score_log[score_log['sourcename'] == 'Output Boost'][['name', 'score']].groupby('name')
|
|
|
|
# add total boosts
|
|
total_boosts = boosts.sum()
|
|
total_boosts['totalboostduration'] = total_boosts['score'].apply(lambda x: 10 * x)
|
|
total_boosts.rename(columns={'score': 'totalboostscore'}, inplace=True)
|
|
summary = pandas.merge(summary, total_boosts, **common_join_args)
|
|
|
|
# add max boosts
|
|
max_boosts = boosts.max()
|
|
max_boosts['maxboostduration'] = max_boosts['score'].apply(lambda x: 10 * x)
|
|
max_boosts.rename(columns={'score': 'maxboostscore'}, inplace=True)
|
|
summary = pandas.merge(summary, max_boosts, **common_join_args)
|
|
|
|
visits = score_log[(score_log['sourcename'] == 'Visit') | (score_log['sourcename'] == 'First Visit')][
|
|
['name', 'score']].groupby('name')
|
|
|
|
# add total visits (count)
|
|
summary = pandas.merge(summary, visits.count(), **common_join_args)
|
|
summary.rename(columns={'score': 'totalvisits'}, inplace=True)
|
|
|
|
captures = score_log[score_log['sourcename'] == 'Capture'][['name', 'score']].groupby('name')
|
|
|
|
# add captures (count)
|
|
summary = pandas.merge(summary, captures.count(), **common_join_args)
|
|
summary.rename(columns={'score': 'captures'}, inplace=True)
|
|
|
|
# add max held duration (max capture score)
|
|
summary = pandas.merge(summary, captures.max(), **common_join_args)
|
|
summary.rename(columns={'score': 'maxheldduration'}, inplace=True)
|
|
|
|
# add total held duration (sum capture score)
|
|
summary = pandas.merge(summary, captures.sum(), **common_join_args)
|
|
summary.rename(columns={'score': 'totalheldduration'}, inplace=True)
|
|
|
|
assert len(summary) == station_count
|
|
return summary
|
|
|
|
|
|
def generate_score_per_second(score_log: pandas.DataFrame) -> pandas.DataFrame:
|
|
@dataclass
|
|
class ScoreSecond:
|
|
name: str
|
|
sourcename: str
|
|
when: datetime
|
|
score: float
|
|
once: bool
|
|
event_start: bool
|
|
mapx: int
|
|
mapy: int
|
|
|
|
def gen_scoreseconds() -> typing.Iterator[ScoreSecond]:
|
|
for row in score_log.itertuples():
|
|
# TODO: the code below should work with 0s now
|
|
if row.seconds < 2: # one-off
|
|
yield ScoreSecond(name=row.name, sourcename=row.sourcename, mapx=row.mapx, mapy=row.mapy, when=row.when,
|
|
score=row.score, once=True, event_start=True)
|
|
continue
|
|
|
|
once = row.seconds == 1
|
|
score_per = get_score_per(row.sourcename)
|
|
|
|
event_start = True
|
|
for when in timestamp_range_seconds(row.when_start, row.when):
|
|
yield ScoreSecond(when=when, once=once, event_start=event_start,
|
|
name=row.name, sourcename=row.sourcename, mapx=row.mapx, mapy=row.mapy,
|
|
score=score_per, )
|
|
event_start = False
|
|
|
|
scoreseconds = pandas.DataFrame(gen_scoreseconds())
|
|
scoreseconds.sort_values(by=['when'], inplace=True)
|
|
scoreseconds.reset_index(drop=True, inplace=True)
|
|
|
|
_add_accumulated_score(scoreseconds)
|
|
return scoreseconds
|
|
|
|
def get_known_player_locations(score_log):
|
|
locations = score_log[score_log['mapx'] != 0][['name', 'when_start', 'mapx', 'mapy']].copy()
|
|
locations.rename(columns={'when_start': 'when'}, inplace=True)
|
|
locations.sort_values(by=['when'], inplace=True)
|
|
locations.reset_index(drop=True, inplace=True)
|
|
return locations
|
|
|
|
def interpolate_player_locations(locations: pandas.DataFrame) -> pandas.DataFrame:
|
|
from dataclasses import dataclass
|
|
|
|
IGNORED_GAP_SECONDS = 60 * 60 * 1
|
|
|
|
@dataclass
|
|
class LocationSecond:
|
|
when: pandas.Timestamp
|
|
mapx: int
|
|
mapy: int
|
|
|
|
def interpolate_locations():
|
|
skipped = False
|
|
for pair in locations.rolling(window=2, closed='right'):
|
|
if not skipped:
|
|
skipped = True
|
|
continue
|
|
|
|
left = pair.iloc[0]
|
|
right = pair.iloc[1]
|
|
|
|
start = left['when']
|
|
end = right['when']
|
|
|
|
seconds = end.timestamp() - start.timestamp()
|
|
if seconds > IGNORED_GAP_SECONDS:
|
|
end = start
|
|
seconds = 1
|
|
|
|
x = left['mapx']
|
|
y = left['mapy']
|
|
|
|
if seconds < 0.1:
|
|
x_increment = 0
|
|
y_increment = 0
|
|
else:
|
|
x_increment = (right['mapx'] - left['mapx']) / seconds
|
|
y_increment = (right['mapy'] - left['mapy']) / seconds
|
|
|
|
for elapsed, timestamp in enumerate(timestamp_range_seconds(start, end)):
|
|
yield LocationSecond(when=timestamp, mapx=x, mapy=y)
|
|
x += x_increment
|
|
y += y_increment
|
|
|
|
return pandas.DataFrame(interpolate_locations())
|