remove a bunch of locks

This commit is contained in:
Vinzenz Schroeter 2024-05-03 14:45:41 +02:00 committed by RobbersDaughter
parent b3bf62b391
commit d7b8664062
13 changed files with 126 additions and 143 deletions

View file

@ -18,7 +18,7 @@ internal sealed class Endpoints(
{
app.MapPost("/player", PostPlayer);
app.MapGet("/player", GetPlayerAsync);
app.MapGet("/scores", () => playerService.GetAll() as IEnumerable<Player>);
app.MapGet("/scores", () => playerService.Players);
app.Map("/screen", ConnectScreenAsync);
app.Map("/controls", ConnectControlsAsync);
app.MapGet("/map", () => mapService.MapNames);

View file

@ -9,8 +9,7 @@ internal sealed class DrawMapStep(MapService map) : IDrawStep
for (ushort y = 0; y < MapService.PixelsPerColumn; y++)
for (ushort x = 0; x < MapService.PixelsPerRow; x++)
{
var pixel = new PixelPosition(x, y);
if (!map.Current.IsWall(pixel))
if (!map.Current.IsWall(x, y))
continue;
pixels[x, y].EntityType = GamePixelEntityType.Wall;

View file

@ -1,5 +1,6 @@
using DisplayCommands;
using TanksServer.GameLogic;
using TanksServer.Interactivity;
namespace TanksServer.Graphics;
@ -22,7 +23,8 @@ internal sealed class GeneratePixelsTickStep(
if (_observerPixelGrid.Data.Span.SequenceEqual(_lastObserverPixelGrid.Data.Span))
return;
await Task.WhenAll(_consumers.Select(c => c.OnFrameDoneAsync(_gamePixelGrid, _observerPixelGrid)));
await _consumers.Select(c => c.OnFrameDoneAsync(_gamePixelGrid, _observerPixelGrid))
.WhenAll();
(_lastGamePixelGrid, _gamePixelGrid) = (_gamePixelGrid, _lastGamePixelGrid);
(_lastObserverPixelGrid, _observerPixelGrid) = (_observerPixelGrid, _lastObserverPixelGrid);

View file

@ -0,0 +1,25 @@
using System.Buffers;
namespace TanksServer.Interactivity;
internal sealed class BufferPool: MemoryPool<byte>
{
private readonly MemoryPool<byte> _actualPool = Shared;
public override int MaxBufferSize => int.MaxValue;
protected override void Dispose(bool disposing) {}
public override IMemoryOwner<byte> Rent(int minBufferSize = -1)
{
ArgumentOutOfRangeException.ThrowIfLessThan(minBufferSize, 1);
return new BufferPoolMemoryOwner(_actualPool.Rent(minBufferSize), minBufferSize);
}
private sealed class BufferPoolMemoryOwner(IMemoryOwner<byte> actualOwner, int wantedSize): IMemoryOwner<byte>
{
public Memory<byte> Memory { get; } = actualOwner.Memory[..wantedSize];
public void Dispose() => actualOwner.Dispose();
}
}

View file

@ -6,17 +6,23 @@ namespace TanksServer.Interactivity;
internal sealed class ClientScreenServer(
ILogger<ClientScreenServer> logger,
ILoggerFactory loggerFactory
ILoggerFactory loggerFactory,
BufferPool bufferPool
) : WebsocketServer<ClientScreenServerConnection>(logger),
IFrameConsumer
{
public Task HandleClientAsync(WebSocket socket, Player? player)
=> base.HandleClientAsync(new ClientScreenServerConnection(
{
var connection = new ClientScreenServerConnection(
socket,
loggerFactory.CreateLogger<ClientScreenServerConnection>(),
player
));
public async Task OnFrameDoneAsync(GamePixelGrid gamePixelGrid, PixelGrid observerPixels)
=> await ParallelForEachConnectionAsync(c => c.OnGameTickAsync(observerPixels, gamePixelGrid).AsTask());
player,
bufferPool
);
return base.HandleClientAsync(connection);
}
public Task OnFrameDoneAsync(GamePixelGrid gamePixelGrid, PixelGrid observerPixels)
=> Connections.Select(c => c.OnGameTickAsync(observerPixels, gamePixelGrid))
.WhenAll();
}

View file

@ -7,24 +7,23 @@ namespace TanksServer.Interactivity;
internal sealed class ClientScreenServerConnection : WebsocketServerConnection
{
private sealed record class Package(
IMemoryOwner<byte> PixelsOwner,
Memory<byte> Pixels,
IMemoryOwner<byte>? PlayerDataOwner,
Memory<byte>? PlayerData
);
private sealed record class Package(IMemoryOwner<byte> Pixels, IMemoryOwner<byte>? PlayerData);
private readonly MemoryPool<byte> _memoryPool = MemoryPool<byte>.Shared;
private readonly BufferPool _bufferPool;
private readonly PlayerScreenData? _playerDataBuilder;
private readonly Player? _player;
private int _wantsFrameOnTick = 1;
private Package? _next;
public ClientScreenServerConnection(WebSocket webSocket,
public ClientScreenServerConnection(
WebSocket webSocket,
ILogger<ClientScreenServerConnection> logger,
Player? player) : base(logger, new ByteChannelWebSocket(webSocket, logger, 0))
Player? player,
BufferPool bufferPool
) : base(logger, new ByteChannelWebSocket(webSocket, logger, 0))
{
_player = player;
_bufferPool = bufferPool;
_player?.IncrementConnectionCount();
_playerDataBuilder = player == null
? null
@ -46,25 +45,22 @@ internal sealed class ClientScreenServerConnection : WebsocketServerConnection
return ValueTask.CompletedTask;
}
public async ValueTask OnGameTickAsync(PixelGrid pixels, GamePixelGrid gamePixelGrid)
public async Task OnGameTickAsync(PixelGrid pixels, GamePixelGrid gamePixelGrid)
{
await Task.Yield();
var nextPixelsOwner = _memoryPool.Rent(pixels.Data.Length);
var nextPixels = nextPixelsOwner.Memory[..pixels.Data.Length];
pixels.Data.CopyTo(nextPixels);
var nextPixels = _bufferPool.Rent(pixels.Data.Length);
pixels.Data.CopyTo(nextPixels.Memory);
IMemoryOwner<byte>? nextPlayerDataOwner = null;
Memory<byte>? nextPlayerData = null;
IMemoryOwner<byte>? nextPlayerData = null;
if (_playerDataBuilder != null)
{
var data = _playerDataBuilder.Build(gamePixelGrid);
nextPlayerDataOwner = _memoryPool.Rent(data.Length);
nextPlayerData = nextPlayerDataOwner.Memory[..data.Length];
data.CopyTo(nextPlayerData.Value);
nextPlayerData = _bufferPool.Rent(data.Length);
data.CopyTo(nextPlayerData.Memory);
}
var next = new Package(nextPixelsOwner, nextPixels, nextPlayerDataOwner, nextPlayerData);
var next = new Package(nextPixels, nextPlayerData);
if (Interlocked.Exchange(ref _wantsFrameOnTick, 0) != 0)
{
await SendAndDisposeAsync(next);
@ -72,8 +68,8 @@ internal sealed class ClientScreenServerConnection : WebsocketServerConnection
}
var oldNext = Interlocked.Exchange(ref _next, next);
oldNext?.PixelsOwner.Dispose();
oldNext?.PlayerDataOwner?.Dispose();
oldNext?.Pixels.Dispose();
oldNext?.PlayerData?.Dispose();
}
public override ValueTask RemovedAsync()
@ -86,9 +82,9 @@ internal sealed class ClientScreenServerConnection : WebsocketServerConnection
{
try
{
await Socket.SendBinaryAsync(package.Pixels, package.PlayerData == null);
await Socket.SendBinaryAsync(package.Pixels.Memory, package.PlayerData == null);
if (package.PlayerData != null)
await Socket.SendBinaryAsync(package.PlayerData.Value);
await Socket.SendBinaryAsync(package.PlayerData.Memory);
}
catch (WebSocketException ex)
{
@ -96,8 +92,8 @@ internal sealed class ClientScreenServerConnection : WebsocketServerConnection
}
finally
{
package.PixelsOwner.Dispose();
package.PlayerDataOwner?.Dispose();
package.Pixels.Dispose();
package.PlayerData?.Dispose();
}
}
}

View file

@ -6,25 +6,30 @@ using TanksServer.GameLogic;
namespace TanksServer.Interactivity;
// MemoryStream is IDisposable but does not need to be disposed
#pragma warning disable CA1001
internal sealed class PlayerInfoConnection : WebsocketServerConnection
#pragma warning restore CA1001
{
private readonly Player _player;
private readonly MapEntityManager _entityManager;
private readonly BufferPool _bufferPool;
private readonly MemoryStream _tempStream = new();
private readonly MemoryPool<byte> _memoryPool = MemoryPool<byte>.Shared;
private int _wantsInfoOnTick = 1;
private Package? _lastMessage = null;
private Package? _nextMessage = null;
private IMemoryOwner<byte>? _lastMessage = null;
private IMemoryOwner<byte>? _nextMessage = null;
private sealed record class Package(IMemoryOwner<byte> Owner, Memory<byte> Memory);
public PlayerInfoConnection(Player player,
public PlayerInfoConnection(
Player player,
ILogger logger,
WebSocket rawSocket,
MapEntityManager entityManager) : base(logger, new ByteChannelWebSocket(rawSocket, logger, 0))
MapEntityManager entityManager,
BufferPool bufferPool
) : base(logger, new ByteChannelWebSocket(rawSocket, logger, 0))
{
_player = player;
_entityManager = entityManager;
_bufferPool = bufferPool;
_player.IncrementConnectionCount();
}
@ -38,7 +43,7 @@ internal sealed class PlayerInfoConnection : WebsocketServerConnection
return ValueTask.CompletedTask;
}
public async ValueTask OnGameTickAsync()
public async Task OnGameTickAsync()
{
await Task.Yield();
@ -60,7 +65,7 @@ internal sealed class PlayerInfoConnection : WebsocketServerConnection
return ValueTask.CompletedTask;
}
private async ValueTask<Package> GenerateMessageAsync()
private async ValueTask<IMemoryOwner<byte>> GenerateMessageAsync()
{
var tank = _entityManager.GetCurrentTankOfPlayer(_player);
@ -82,17 +87,16 @@ internal sealed class PlayerInfoConnection : WebsocketServerConnection
await JsonSerializer.SerializeAsync(_tempStream, info, AppSerializerContext.Default.PlayerInfo);
var messageLength = (int)_tempStream.Position;
var owner = _memoryPool.Rent(messageLength);
var package = new Package(owner, owner.Memory[..messageLength]);
var owner = _bufferPool.Rent(messageLength);
_tempStream.Position = 0;
await _tempStream.ReadExactlyAsync(package.Memory);
return package;
await _tempStream.ReadExactlyAsync(owner.Memory);
return owner;
}
private async ValueTask SendAndDisposeAsync(Package data)
private async ValueTask SendAndDisposeAsync(IMemoryOwner<byte> data)
{
await Socket.SendTextAsync(data.Memory);
Interlocked.Exchange(ref _lastMessage, data)?.Owner.Dispose();
Interlocked.Exchange(ref _lastMessage, data)?.Dispose();
}
}

View file

@ -8,65 +8,34 @@ internal sealed class PlayerServer(
ILogger<PlayerServer> logger,
ILogger<PlayerInfoConnection> connectionLogger,
TankSpawnQueue tankSpawnQueue,
MapEntityManager entityManager
MapEntityManager entityManager,
BufferPool bufferPool
) : WebsocketServer<PlayerInfoConnection>(logger), ITickStep
{
private readonly Dictionary<string, Player> _players = [];
private readonly SemaphoreSlim _mutex = new(1, 1);
private readonly ConcurrentDictionary<string, Player> _players = [];
public Player GetOrAdd(string name)
{
_mutex.Wait();
try
{
if (_players.TryGetValue(name, out var existingPlayer))
{
logger.LogInformation("player {} rejoined", existingPlayer.Name);
return existingPlayer;
}
public Player GetOrAdd(string name) => _players.GetOrAdd(name, Add);
public bool TryGet(string name, [MaybeNullWhen(false)] out Player foundPlayer)
=> _players.TryGetValue(name, out foundPlayer);
public IEnumerable<Player> Players => _players.Values;
private Player Add(string name)
{
var newPlayer = new Player { Name = name };
logger.LogInformation("player {} joined", newPlayer.Name);
_players.Add(name, newPlayer);
tankSpawnQueue.EnqueueForImmediateSpawn(newPlayer);
return newPlayer;
}
finally
{
_mutex.Release();
}
}
public bool TryGet(string name, [MaybeNullWhen(false)] out Player foundPlayer)
{
_mutex.Wait();
try
{
foundPlayer = _players.Values.FirstOrDefault(player => player.Name == name);
return foundPlayer != null;
}
finally
{
_mutex.Release();
}
}
public List<Player> GetAll()
{
_mutex.Wait();
try
{
return _players.Values.ToList();
}
finally
{
_mutex.Release();
}
}
public Task HandleClientAsync(WebSocket webSocket, Player player)
=> HandleClientAsync(new PlayerInfoConnection(player, connectionLogger, webSocket, entityManager));
public ValueTask TickAsync(TimeSpan delta)
=> ParallelForEachConnectionAsync(connection => connection.OnGameTickAsync().AsTask());
{
var connection = new PlayerInfoConnection(player, connectionLogger, webSocket, entityManager, bufferPool);
return HandleClientAsync(connection);
}
public async ValueTask TickAsync(TimeSpan delta)
=> await Connections.Select(connection => connection.OnGameTickAsync())
.WhenAll();
}

View file

@ -75,7 +75,7 @@ internal sealed class SendToServicePointDisplay : IFrameConsumer
private void RefreshScores()
{
var playersToDisplay = _players.GetAll()
var playersToDisplay = _players.Players
.OrderByDescending(p => p.Scores.Kills)
.Take(ScoresPlayerRows);

View file

@ -0,0 +1,9 @@
using System.Runtime.CompilerServices;
namespace TanksServer.Interactivity;
public static class TaskExtensions
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static Task WhenAll(this IEnumerable<Task> tasks) => Task.WhenAll(tasks);
}

View file

@ -1,70 +1,45 @@
using System.Diagnostics;
using Microsoft.Extensions.Hosting;
namespace TanksServer.Interactivity;
internal abstract class WebsocketServer<T>(
ILogger logger
) : IHostedLifecycleService, IDisposable
) : IHostedLifecycleService
where T : WebsocketServerConnection
{
private readonly SemaphoreSlim _mutex = new(1, 1);
private bool _closing;
private readonly HashSet<T> _connections = [];
private readonly ConcurrentDictionary<T, byte> _connections = [];
public async Task StoppingAsync(CancellationToken cancellationToken)
{
logger.LogInformation("closing connections");
await LockedAsync(async () =>
{
_closing = true;
await Task.WhenAll(_connections.Select(c => c.CloseAsync()));
}, cancellationToken);
logger.LogInformation("closing connections");
await _connections.Keys.Select(c => c.CloseAsync())
.WhenAll();
logger.LogInformation("closed connections");
}
protected ValueTask ParallelForEachConnectionAsync(Func<T, Task> body) =>
LockedAsync(async () => await Task.WhenAll(_connections.Select(body)), CancellationToken.None);
protected IEnumerable<T> Connections => _connections.Keys;
private ValueTask AddConnectionAsync(T connection) => LockedAsync(async () =>
protected async Task HandleClientAsync(T connection)
{
if (_closing)
{
logger.LogWarning("refusing connection because server is shutting down");
await connection.CloseAsync();
return;
}
_connections.Add(connection);
}, CancellationToken.None);
var added = _connections.TryAdd(connection, 0);
Debug.Assert(added);
private ValueTask RemoveConnectionAsync(T connection) => LockedAsync(() =>
{
_connections.Remove(connection);
return ValueTask.CompletedTask;
}, CancellationToken.None);
protected async Task HandleClientAsync(T connection)
{
await AddConnectionAsync(connection);
await connection.ReceiveAsync();
await RemoveConnectionAsync(connection);
_ = _connections.TryRemove(connection, out _);
await connection.RemovedAsync();
}
private async ValueTask LockedAsync(Func<ValueTask> action, CancellationToken cancellationToken)
{
await _mutex.WaitAsync(cancellationToken);
try
{
await action();
}
finally
{
_mutex.Release();
}
}
public virtual void Dispose() => _mutex.Dispose();
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;

View file

@ -3,9 +3,8 @@ namespace TanksServer.Interactivity;
internal abstract class WebsocketServerConnection(
ILogger logger,
ByteChannelWebSocket socket
) : IDisposable
)
{
private readonly SemaphoreSlim _mutex = new(1);
protected readonly ByteChannelWebSocket Socket = socket;
protected readonly ILogger Logger = logger;
@ -25,6 +24,4 @@ internal abstract class WebsocketServerConnection(
public abstract ValueTask RemovedAsync();
protected abstract ValueTask HandleMessageAsync(Memory<byte> buffer);
public virtual void Dispose() => _mutex.Dispose();
}

View file

@ -63,6 +63,7 @@ public static class Program
builder.Services.AddSingleton<ClientScreenServer>();
builder.Services.AddSingleton<TankSpawnQueue>();
builder.Services.AddSingleton<Endpoints>();
builder.Services.AddSingleton<BufferPool>();
builder.Services.AddHostedService<GameTickWorker>();
builder.Services.AddHostedService(sp => sp.GetRequiredService<ControlsServer>());