new SSE system (downgraded, git add .git add .go with redis if productiongit add .git add .); implimented empty lobby deletion

This commit is contained in:
shrt 2025-03-31 13:54:54 +02:00
parent abc698998e
commit f0672b9bc1
12 changed files with 277 additions and 213 deletions

View File

@ -10,12 +10,11 @@ async function Page({
id: string;
}>;
}) {
const sessionPlayer = await api.player.getBySession();
const { id } = await params;
const lobby = await api.lobby.get({ id });
if (!lobby) return notFound();
if (!lobby) return <div>Lobby not found</div>;
return <LobbyPage initialLobby={lobby} sessionPlayer={sessionPlayer} />;
return <LobbyPage initialLobby={lobby} />;
}
export default Page;

View File

@ -5,11 +5,12 @@ import { Button } from "@/components/ui/button";
import LobbyPage from "@/app/_components/lobby/lobby-page";
import { redirect } from "next/navigation";
import { appRoutes } from "@/config/app.routes";
import { auth } from "@/server/auth";
async function Page() {
const sessionPlayer = await api.player.getBySession();
if (!sessionPlayer) return redirect(appRoutes.signIn);
const lobby = sessionPlayer ? await api.lobby.getCurrentLobby() : null;
const session = await auth();
if (!session?.user) return redirect(appRoutes.signIn);
const lobby = await api.lobby.getCurrentLobby();
if (!lobby)
return (
<div className="flex w-full gap-4">
@ -23,7 +24,7 @@ async function Page() {
</Button>
</div>
);
return <LobbyPage initialLobby={lobby} sessionPlayer={sessionPlayer} />;
return <LobbyPage initialLobby={lobby} />;
}
export default Page;

View File

@ -2,10 +2,10 @@ import { appRoutes } from "@/config/app.routes";
import { useSessionPlayerStore } from "@/lib/store/current-player-store";
import { useLobbyStore } from "@/lib/store/lobby-store";
import type { Lobby, LobbyMember } from "@/server/db/schema";
import type { LobbyMemberLeaveEventData } from "@/server/redis/events";
import { api } from "@/trpc/react";
import { useRouter } from "next/navigation";
import React from "react";
import { toast } from "sonner";
function LobbyProvider({
children,
@ -23,29 +23,42 @@ function LobbyProvider({
const removeMember = useLobbyStore((state) => state.removeMember);
const router = useRouter();
api.lobby.onMemberUpdate.useSubscription(undefined, {
onData({ data: _data }) {
const joined = _data.joined;
if (joined) {
const data = _data.membership as LobbyMember;
addMember(data);
} else {
const data = _data.membership as LobbyMemberLeaveEventData;
removeMember(data.playerId);
if (data?.kicked && data?.playerId === sessionPlayer?.id) {
router.push(appRoutes.lobby(lobby.id));
router.refresh();
}
}
},
});
api.lobby.onUpdate.useSubscription(undefined, {
onData({ data }) {
if (!data?.lobby) return;
updateLobby(data.lobby);
const lobbyId = lobby.id ?? "";
api.lobby.onMemberUpdate.useSubscription(
{ lobbyId },
{
onData({ data }) {
if (data.joined) {
const member = data.member as LobbyMember;
addMember(member);
} else {
const memberId = data.member as string;
removeMember(memberId);
if (data?.kicked && memberId === sessionPlayer?.id) {
router.push(appRoutes.lobby(lobby.id));
router.refresh();
}
}
},
},
});
);
api.lobby.onUpdate.useSubscription(
{ lobbyId },
{
onData({ data }) {
if (data.deleted) {
resetLobby(undefined);
router.push(appRoutes.home);
router.refresh();
toast.error("Lobby got deleted");
return;
}
if (data.lobby) updateLobby(data.lobby);
},
},
);
React.useEffect(() => {
resetLobby(initialLobby);
setMembers(initialLobby?.members ?? []);

View File

@ -1,32 +0,0 @@
"use client";
import React from "react";
import { Button } from "./ui/button";
import Link from "next/link";
import { usePathname } from "next/navigation";
import { cn } from "@/lib/utils";
function NavLink({ label, path, className }: NavLink & { className?: string }) {
const active = usePathname() === path;
return (
<Button
asChild
variant={"ghost"}
className={cn(
"group hover:bg-border/20 h-10 rounded-none font-bold text-white/50 hover:text-white/50",
active && "bg-border/20 text-white hover:text-white",
className,
)}
>
<Link href={path}>
<span
className={"transition-transform duration-150 group-hover:scale-105"}
>
{label}
</span>
</Link>
</Button>
);
}
export default NavLink;

View File

@ -1,16 +1,22 @@
"use client";
import React from "react";
import { Button } from "./ui/button";
import Link from "next/link";
import UserPopover from "@/app/_components/profile-popover";
import { appConfig } from "@/config/app.config";
import NavLink from "./nav-link";
import { api } from "@/trpc/server";
import { Icons } from "./icons";
import { appRoutes } from "@/config/app.routes";
import { useSessionPlayerStore } from "@/lib/store/current-player-store";
import { useLobbyStore } from "@/lib/store/lobby-store";
import { usePathname } from "next/navigation";
import { cn } from "@/lib/utils";
async function Navbar() {
const sessionPlayer = await api.player.getBySession();
const currentLobby = sessionPlayer ? await api.lobby.getCurrentLobby() : null;
function Navbar() {
const sessionPlayer = useSessionPlayerStore((state) => state.sessionPlayer);
const currentLobby = useLobbyStore((state) => state.lobby);
const pathname = usePathname();
return (
<nav className="flex w-full items-center justify-center p-4">
@ -18,12 +24,17 @@ async function Navbar() {
<menu className="flex items-center">
{appConfig.navigation.map((navLink, idx) => (
<li key={idx}>
<NavLink {...navLink} />
<NavLink {...navLink} active={pathname === navLink.path} />
</li>
))}
</menu>
<div className="bg-border/10 border-border/20 flex items-center rounded-full border-l-2">
<div
className={cn(
"bg-border/10 border-border/20 hover:bg-background/20 flex items-center rounded-full border-l-2",
pathname === appRoutes.currentlobby && "bg-background/20",
)}
>
{currentLobby ? (
<Link href={appRoutes.currentlobby}>
<div className="border-border/20 flex h-10 items-center justify-center gap-2 pr-2 pl-4 font-semibold">
@ -45,4 +56,31 @@ async function Navbar() {
);
}
const NavLink = ({
label,
path,
active,
className,
}: NavLink & { className?: string; active?: boolean }) => {
return (
<Button
asChild
variant={"ghost"}
className={cn(
"group hover:bg-background/20 h-10 rounded-none font-bold text-white/50 hover:text-white/50",
active && "bg-background/20 text-white hover:text-white",
className,
)}
>
<Link href={path}>
<span
className={"transition-transform duration-150 group-hover:scale-105"}
>
{label}
</span>
</Link>
</Button>
);
};
export default Navbar;

View File

@ -20,7 +20,7 @@ export const useLobbyStore = create<LobbyStore>((set, get) => ({
selectedGame: 0,
setSelectedGame: (gameId) => set({ selectedGame: gameId }),
updateLobby: (lobby) =>
set((state) => ({ ...state, ...lobby, members: state.members })),
set((state) => ({ lobby: { ...state, ...lobby, members: state.members } })),
setMembers: (members) => set({ members }),
resetLobby: (lobby) => set({ lobby: lobby ?? ({} as Lobby) }),
findMember: (playerId) => get().members.find((m) => m.playerId === playerId),

View File

@ -11,14 +11,26 @@ import {
lobbyPatchSchema,
} from "@/lib/validations/lobby";
import { and, eq } from "drizzle-orm";
import { tracked } from "@trpc/server";
import type { EventArgs, SSE_EVENTS } from "@/server/events";
import EventEmitter, { on } from "node:events";
import {
combineRedisIterators,
redisAsyncIterator,
redisPublish,
} from "@/server/redis/sse-redis";
import { tracked, TRPCError } from "@trpc/server";
import { trcpSubscriptionInput } from "@/lib/validations/trcp";
import type { LobbyMemberLeaveEventData } from "@/server/redis/events";
decreaseLobbyMemberCount,
increaseLobbyMemberCount,
handleLobbyAfterLeave,
} from "@/server/redis/utils/lobby-utils";
type EventMap<T> = Record<keyof T, any[]>;
class IterableEventEmitter<T extends EventMap<T>> extends EventEmitter<T> {
toIterable<TEventName extends keyof T & string>(
eventName: TEventName,
opts?: NonNullable<Parameters<typeof on>[2]>,
): AsyncIterable<T[TEventName]> {
return on(this as any, eventName, opts) as any;
}
}
export const ee = new IterableEventEmitter<SSE_EVENTS>();
export const lobbyRouter = createTRPCRouter({
// queries
@ -76,14 +88,18 @@ export const lobbyRouter = createTRPCRouter({
})
.returning({ id: lobbies.id });
if (!lobby) throw new Error("Error creating lobby");
await ctx.db.insert(lobbyMembers).values({
lobbyId: lobby.id,
playerId: ctx.session.user.id,
isReady: false,
role: "admin",
});
const [member] = await ctx.db
.insert(lobbyMembers)
.values({
lobbyId: lobby.id,
playerId: ctx.session.user.id,
isReady: false,
role: "admin",
})
.returning({
id: lobbyMembers.playerId,
});
if (member) increaseLobbyMemberCount(lobby.id);
return lobby;
}),
update: protectedProcedure
@ -107,7 +123,7 @@ export const lobbyRouter = createTRPCRouter({
)
.returning();
if (lobby) redisPublish("lobby:update", lobby);
if (lobby) ee.emit("lobby:update", lobby);
return lobby;
}),
delete: protectedProcedure
@ -155,8 +171,14 @@ export const lobbyRouter = createTRPCRouter({
where: eq(players.id, member.playerId),
})
: undefined;
if (member) redisPublish("lobby:member:join", { ...member, player });
return { success: true, member };
if (member) {
ee.emit("lobby:member:membership", input.lobbyId, true, {
...member,
player,
});
increaseLobbyMemberCount(input.lobbyId);
return { success: true, member };
}
} catch (e: unknown) {
if (e instanceof Error) {
if (
@ -187,9 +209,17 @@ export const lobbyRouter = createTRPCRouter({
),
)
.returning();
if (member)
redisPublish("lobby:member:leave", { playerId: member.playerId });
return { success: true, member };
if (member) {
ee.emit(
"lobby:member:membership",
input.lobbyId,
false,
member.playerId,
);
decreaseLobbyMemberCount(input.lobbyId);
handleLobbyAfterLeave(input.lobbyId);
return { success: true, member };
}
}
}),
// admin mutaions
@ -214,7 +244,7 @@ export const lobbyRouter = createTRPCRouter({
),
)
.returning();
if (member) redisPublish("lobby:member:update", member);
if (member) ee.emit("lobby:member:update", member);
return member;
}),
@ -238,59 +268,58 @@ export const lobbyRouter = createTRPCRouter({
)
.returning();
if (member)
redisPublish("lobby:member:leave", {
playerId: member.playerId,
kicked: true,
});
ee.emit(
"lobby:member:membership",
input.lobbyId,
false,
member.playerId,
true,
);
decreaseLobbyMemberCount(input.lobbyId);
return member;
}),
// subscriptions
onUpdate: publicProcedure
.input(trcpSubscriptionInput)
.input(z.object({ lobbyId: z.string() }))
.subscription(async function* (opts) {
if (opts.input?.lastEventId) {
// fetch posts from a database that were missed.
const iterable = ee.toIterable("lobby:update", {
signal: opts.signal,
});
function* maybeYield([lobby, deleted]: EventArgs["lobby:update"]) {
if (lobby.id !== opts.input.lobbyId) {
return;
}
yield tracked(lobby.id, { lobby, deleted });
}
for await (const lobby of redisAsyncIterator("lobby:update")) {
yield tracked(lobby?.updatedAt?.toString(), {
lobby,
});
for await (const args of iterable) {
yield* maybeYield(args);
}
}),
onMemberUpdate: publicProcedure
.input(trcpSubscriptionInput)
.input(z.object({ lobbyId: z.string() }))
.subscription(async function* (opts) {
if (opts.input?.lastEventId) {
// fetch posts from a database that were missed.
const iterable = ee.toIterable("lobby:member:membership", {
signal: opts.signal,
});
function* maybeYield([
lobbyId,
joined,
member,
kicked,
]: EventArgs["lobby:member:membership"]) {
if (lobbyId !== opts.input.lobbyId) {
return;
}
yield tracked(lobbyId, { member, joined, kicked });
}
for await (const { event, data: membership } of combineRedisIterators([
"lobby:member:join",
"lobby:member:leave",
"lobby:member:update",
])) {
switch (event) {
case "lobby:member:join":
yield tracked(String(membership.playerId), {
joined: true,
membership,
});
break;
case "lobby:member:leave":
const data = membership as LobbyMemberLeaveEventData;
yield tracked(String(membership.playerId), {
joined: false,
membership: data,
});
break;
// case "lobby:member:update":
// yield tracked(String(membership.playerId), {
// membership,
// });
// break;
}
for await (const args of iterable) {
yield* maybeYield(args);
}
}),
});

28
src/server/events.d.ts vendored Normal file
View File

@ -0,0 +1,28 @@
import type { LobbyMember, Post } from "../db/schema";
export type LobbyMemberLeaveEventData = { playerId: string; kicked?: boolean };
export type EventArgs = {
"lobby:member:membership": [
// lobbyId
string,
// joined
boolean,
// member
LobbyMember | string,
// kicked
boolean?,
];
"lobby:update": [
//updated Lobby
Lobby,
// deleted
boolean?,
];
};
export type SSE_EVENTS = {
"lobby:update": EventArgs["lobby:update"];
"lobby:member:update": [LobbyMember];
"lobby:member:membership": EventArgs["lobby:member:membership"];
};

View File

@ -1,10 +0,0 @@
import type { LobbyMember, Post } from "../db/schema";
export type LobbyMemberLeaveEventData = { playerId: string; kicked?: boolean };
export type PubSubEvents = {
"lobby:update": Lobby;
"lobby:member:update": LobbyMember;
"lobby:member:join": LobbyMember;
"lobby:member:leave": LobbyMemberLeaveEventData;
};

11
src/server/redis/index.ts Normal file
View File

@ -0,0 +1,11 @@
import { env } from "@/env";
import Redis, { type Redis as RedisType } from "ioredis";
const globalForRedis = globalThis as unknown as {
redis: RedisType | undefined;
};
const redisInstance = globalForRedis.redis ?? new Redis();
if (env.NODE_ENV !== "production") globalForRedis.redis = redisInstance;
export const redis = redisInstance;

View File

@ -1,73 +0,0 @@
import Redis from "ioredis";
import type { PubSubEvents } from "./events";
const redisPub = new Redis();
const redisSub = new Redis();
export const redisPublish = <T extends keyof PubSubEvents>(
event: T,
data: PubSubEvents[T],
) => {
redisPub.publish(event, JSON.stringify(data));
};
const redisSubscribe = <T extends keyof PubSubEvents>(
event: T,
callback: (data: PubSubEvents[T]) => void,
) => {
redisSub.subscribe(event, (err) => {
if (err) console.error(`Redis subscription error for ${event}:`, err);
});
redisSub.on("message", (channel, message) => {
if (channel === event) {
callback(JSON.parse(message) as PubSubEvents[T]);
}
});
};
/**
* Creates an async iterator for a Redis subscription channel.
* @param event - The Redis event to subscribe to.
* @returns An async iterator that yields tracked events.
*/
export function redisAsyncIterator<K extends keyof PubSubEvents>(
event: K,
): AsyncIterableIterator<PubSubEvents[K]> {
return {
[Symbol.asyncIterator]() {
return this;
},
async next(): Promise<IteratorResult<PubSubEvents[K]>> {
return new Promise((resolve) => {
redisSubscribe(event, (data: PubSubEvents[K]) => {
resolve({ value: data, done: false });
});
});
},
};
}
export async function* combineRedisIterators<T extends keyof PubSubEvents>(
events: T[],
) {
const iterators = events.map((event) =>
redisAsyncIterator(event)[Symbol.asyncIterator](),
);
while (true) {
// Wait for the next event from any iterator
const result = await Promise.race(
iterators.map((iterator, index) =>
iterator.next().then((untypedRes) => {
const res = untypedRes as IteratorResult<PubSubEvents[T]>;
return { res, event: events[index] };
}),
),
);
if (result.res.done) break;
yield { event: result.event, data: result.res.value };
}
}

View File

@ -0,0 +1,60 @@
import { db } from "@/server/db";
import { redis } from "..";
import { lobbies, lobbyMembers } from "@/server/db/schema";
import { count, eq } from "drizzle-orm";
import { ee } from "@/server/api/routers/lobby";
export async function getLobbyMemberCount(lobbyId: string) {
// Check if the count is in the cache
const cachedCount = await redis.get(`lobby:${lobbyId}:memberCount`);
if (cachedCount !== null) {
return parseInt(cachedCount, 10);
}
// If not in cache, query the database
const [rawMemberCount] = await db
.select({ count: count() })
.from(lobbyMembers)
.where(eq(lobbyMembers.lobbyId, lobbyId));
console.log("rawMemberCount", rawMemberCount);
const memberCount = rawMemberCount?.count ?? 0;
// Cache the result
await redis.set(`lobby:${lobbyId}:memberCount`, memberCount);
return memberCount;
}
export async function deleteLobbyIfEmpty(lobbyId: string) {
const activeMemberCount = await getLobbyMemberCount(lobbyId);
if (activeMemberCount > 0) return;
const [realMemberCount] = await db
.select({ count: count() })
.from(lobbyMembers)
.where(eq(lobbyMembers.lobbyId, lobbyId));
if (realMemberCount?.count === 0) {
await db.delete(lobbies).where(eq(lobbies.id, lobbyId));
console.log(`Lobby with ID ${lobbyId} has been deleted. (EMPTY_LOBBY)`);
// Optionally, remove the cached count
await redis.del(`lobby:${lobbyId}:memberCount`);
ee.emit("lobby:update", { id: lobbyId }, true);
} else {
console.error(
"!!!!== Redis Cache is out of sync with the database. ==!!!!",
);
}
}
export async function addAdminToLobby(lobbyId: string, playerId: string) {}
export async function handleLobbyAfterLeave(lobbyId: string) {
await deleteLobbyIfEmpty(lobbyId);
}
export async function decreaseLobbyMemberCount(lobbyId: string) {
await redis.decr(`lobby:${lobbyId}:memberCount`);
}
export async function increaseLobbyMemberCount(lobbyId: string) {
await redis.incr(`lobby:${lobbyId}:memberCount`);
}