@@ -45,4 +56,31 @@ async function Navbar() {
);
}
+const NavLink = ({
+ label,
+ path,
+ active,
+ className,
+}: NavLink & { className?: string; active?: boolean }) => {
+ return (
+
+ );
+};
+
export default Navbar;
diff --git a/src/lib/store/lobby-store.ts b/src/lib/store/lobby-store.ts
index 4e14d61..867e7ba 100644
--- a/src/lib/store/lobby-store.ts
+++ b/src/lib/store/lobby-store.ts
@@ -20,7 +20,7 @@ export const useLobbyStore = create
((set, get) => ({
selectedGame: 0,
setSelectedGame: (gameId) => set({ selectedGame: gameId }),
updateLobby: (lobby) =>
- set((state) => ({ ...state, ...lobby, members: state.members })),
+ set((state) => ({ lobby: { ...state, ...lobby, members: state.members } })),
setMembers: (members) => set({ members }),
resetLobby: (lobby) => set({ lobby: lobby ?? ({} as Lobby) }),
findMember: (playerId) => get().members.find((m) => m.playerId === playerId),
diff --git a/src/server/api/routers/lobby.ts b/src/server/api/routers/lobby.ts
index 2494313..8c645d4 100644
--- a/src/server/api/routers/lobby.ts
+++ b/src/server/api/routers/lobby.ts
@@ -11,14 +11,26 @@ import {
lobbyPatchSchema,
} from "@/lib/validations/lobby";
import { and, eq } from "drizzle-orm";
+import { tracked } from "@trpc/server";
+import type { EventArgs, SSE_EVENTS } from "@/server/events";
+import EventEmitter, { on } from "node:events";
import {
- combineRedisIterators,
- redisAsyncIterator,
- redisPublish,
-} from "@/server/redis/sse-redis";
-import { tracked, TRPCError } from "@trpc/server";
-import { trcpSubscriptionInput } from "@/lib/validations/trcp";
-import type { LobbyMemberLeaveEventData } from "@/server/redis/events";
+ decreaseLobbyMemberCount,
+ increaseLobbyMemberCount,
+ handleLobbyAfterLeave,
+} from "@/server/redis/utils/lobby-utils";
+
+type EventMap = Record;
+class IterableEventEmitter> extends EventEmitter {
+ toIterable(
+ eventName: TEventName,
+ opts?: NonNullable[2]>,
+ ): AsyncIterable {
+ return on(this as any, eventName, opts) as any;
+ }
+}
+
+export const ee = new IterableEventEmitter();
export const lobbyRouter = createTRPCRouter({
// queries
@@ -76,14 +88,18 @@ export const lobbyRouter = createTRPCRouter({
})
.returning({ id: lobbies.id });
if (!lobby) throw new Error("Error creating lobby");
- await ctx.db.insert(lobbyMembers).values({
- lobbyId: lobby.id,
- playerId: ctx.session.user.id,
- isReady: false,
-
- role: "admin",
- });
-
+ const [member] = await ctx.db
+ .insert(lobbyMembers)
+ .values({
+ lobbyId: lobby.id,
+ playerId: ctx.session.user.id,
+ isReady: false,
+ role: "admin",
+ })
+ .returning({
+ id: lobbyMembers.playerId,
+ });
+ if (member) increaseLobbyMemberCount(lobby.id);
return lobby;
}),
update: protectedProcedure
@@ -107,7 +123,7 @@ export const lobbyRouter = createTRPCRouter({
)
.returning();
- if (lobby) redisPublish("lobby:update", lobby);
+ if (lobby) ee.emit("lobby:update", lobby);
return lobby;
}),
delete: protectedProcedure
@@ -155,8 +171,14 @@ export const lobbyRouter = createTRPCRouter({
where: eq(players.id, member.playerId),
})
: undefined;
- if (member) redisPublish("lobby:member:join", { ...member, player });
- return { success: true, member };
+ if (member) {
+ ee.emit("lobby:member:membership", input.lobbyId, true, {
+ ...member,
+ player,
+ });
+ increaseLobbyMemberCount(input.lobbyId);
+ return { success: true, member };
+ }
} catch (e: unknown) {
if (e instanceof Error) {
if (
@@ -187,9 +209,17 @@ export const lobbyRouter = createTRPCRouter({
),
)
.returning();
- if (member)
- redisPublish("lobby:member:leave", { playerId: member.playerId });
- return { success: true, member };
+ if (member) {
+ ee.emit(
+ "lobby:member:membership",
+ input.lobbyId,
+ false,
+ member.playerId,
+ );
+ decreaseLobbyMemberCount(input.lobbyId);
+ handleLobbyAfterLeave(input.lobbyId);
+ return { success: true, member };
+ }
}
}),
// admin mutaions
@@ -214,7 +244,7 @@ export const lobbyRouter = createTRPCRouter({
),
)
.returning();
- if (member) redisPublish("lobby:member:update", member);
+ if (member) ee.emit("lobby:member:update", member);
return member;
}),
@@ -238,59 +268,58 @@ export const lobbyRouter = createTRPCRouter({
)
.returning();
if (member)
- redisPublish("lobby:member:leave", {
- playerId: member.playerId,
- kicked: true,
- });
+ ee.emit(
+ "lobby:member:membership",
+ input.lobbyId,
+ false,
+ member.playerId,
+ true,
+ );
+ decreaseLobbyMemberCount(input.lobbyId);
return member;
}),
// subscriptions
onUpdate: publicProcedure
- .input(trcpSubscriptionInput)
+ .input(z.object({ lobbyId: z.string() }))
.subscription(async function* (opts) {
- if (opts.input?.lastEventId) {
- // fetch posts from a database that were missed.
+ const iterable = ee.toIterable("lobby:update", {
+ signal: opts.signal,
+ });
+
+ function* maybeYield([lobby, deleted]: EventArgs["lobby:update"]) {
+ if (lobby.id !== opts.input.lobbyId) {
+ return;
+ }
+ yield tracked(lobby.id, { lobby, deleted });
}
- for await (const lobby of redisAsyncIterator("lobby:update")) {
- yield tracked(lobby?.updatedAt?.toString(), {
- lobby,
- });
+
+ for await (const args of iterable) {
+ yield* maybeYield(args);
}
}),
onMemberUpdate: publicProcedure
- .input(trcpSubscriptionInput)
+ .input(z.object({ lobbyId: z.string() }))
.subscription(async function* (opts) {
- if (opts.input?.lastEventId) {
- // fetch posts from a database that were missed.
+ const iterable = ee.toIterable("lobby:member:membership", {
+ signal: opts.signal,
+ });
+
+ function* maybeYield([
+ lobbyId,
+ joined,
+ member,
+ kicked,
+ ]: EventArgs["lobby:member:membership"]) {
+ if (lobbyId !== opts.input.lobbyId) {
+ return;
+ }
+ yield tracked(lobbyId, { member, joined, kicked });
}
- for await (const { event, data: membership } of combineRedisIterators([
- "lobby:member:join",
- "lobby:member:leave",
- "lobby:member:update",
- ])) {
- switch (event) {
- case "lobby:member:join":
- yield tracked(String(membership.playerId), {
- joined: true,
- membership,
- });
- break;
- case "lobby:member:leave":
- const data = membership as LobbyMemberLeaveEventData;
- yield tracked(String(membership.playerId), {
- joined: false,
- membership: data,
- });
- break;
- // case "lobby:member:update":
- // yield tracked(String(membership.playerId), {
- // membership,
- // });
- // break;
- }
+ for await (const args of iterable) {
+ yield* maybeYield(args);
}
}),
});
diff --git a/src/server/events.d.ts b/src/server/events.d.ts
new file mode 100644
index 0000000..57e3135
--- /dev/null
+++ b/src/server/events.d.ts
@@ -0,0 +1,28 @@
+import type { LobbyMember, Post } from "../db/schema";
+
+export type LobbyMemberLeaveEventData = { playerId: string; kicked?: boolean };
+
+export type EventArgs = {
+ "lobby:member:membership": [
+ // lobbyId
+ string,
+ // joined
+ boolean,
+ // member
+ LobbyMember | string,
+ // kicked
+ boolean?,
+ ];
+ "lobby:update": [
+ //updated Lobby
+ Lobby,
+ // deleted
+ boolean?,
+ ];
+};
+
+export type SSE_EVENTS = {
+ "lobby:update": EventArgs["lobby:update"];
+ "lobby:member:update": [LobbyMember];
+ "lobby:member:membership": EventArgs["lobby:member:membership"];
+};
diff --git a/src/server/redis/events.d.ts b/src/server/redis/events.d.ts
deleted file mode 100644
index db49b1e..0000000
--- a/src/server/redis/events.d.ts
+++ /dev/null
@@ -1,10 +0,0 @@
-import type { LobbyMember, Post } from "../db/schema";
-
-export type LobbyMemberLeaveEventData = { playerId: string; kicked?: boolean };
-
-export type PubSubEvents = {
- "lobby:update": Lobby;
- "lobby:member:update": LobbyMember;
- "lobby:member:join": LobbyMember;
- "lobby:member:leave": LobbyMemberLeaveEventData;
-};
diff --git a/src/server/redis/index.ts b/src/server/redis/index.ts
new file mode 100644
index 0000000..ee71186
--- /dev/null
+++ b/src/server/redis/index.ts
@@ -0,0 +1,11 @@
+import { env } from "@/env";
+import Redis, { type Redis as RedisType } from "ioredis";
+
+const globalForRedis = globalThis as unknown as {
+ redis: RedisType | undefined;
+};
+
+const redisInstance = globalForRedis.redis ?? new Redis();
+if (env.NODE_ENV !== "production") globalForRedis.redis = redisInstance;
+
+export const redis = redisInstance;
diff --git a/src/server/redis/sse-redis.ts b/src/server/redis/sse-redis.ts
deleted file mode 100644
index d8ab121..0000000
--- a/src/server/redis/sse-redis.ts
+++ /dev/null
@@ -1,73 +0,0 @@
-import Redis from "ioredis";
-import type { PubSubEvents } from "./events";
-
-const redisPub = new Redis();
-const redisSub = new Redis();
-
-export const redisPublish = (
- event: T,
- data: PubSubEvents[T],
-) => {
- redisPub.publish(event, JSON.stringify(data));
-};
-
-const redisSubscribe = (
- event: T,
- callback: (data: PubSubEvents[T]) => void,
-) => {
- redisSub.subscribe(event, (err) => {
- if (err) console.error(`Redis subscription error for ${event}:`, err);
- });
-
- redisSub.on("message", (channel, message) => {
- if (channel === event) {
- callback(JSON.parse(message) as PubSubEvents[T]);
- }
- });
-};
-
-/**
- * Creates an async iterator for a Redis subscription channel.
- * @param event - The Redis event to subscribe to.
- * @returns An async iterator that yields tracked events.
- */
-export function redisAsyncIterator(
- event: K,
-): AsyncIterableIterator {
- return {
- [Symbol.asyncIterator]() {
- return this;
- },
- async next(): Promise> {
- return new Promise((resolve) => {
- redisSubscribe(event, (data: PubSubEvents[K]) => {
- resolve({ value: data, done: false });
- });
- });
- },
- };
-}
-
-export async function* combineRedisIterators(
- events: T[],
-) {
- const iterators = events.map((event) =>
- redisAsyncIterator(event)[Symbol.asyncIterator](),
- );
-
- while (true) {
- // Wait for the next event from any iterator
- const result = await Promise.race(
- iterators.map((iterator, index) =>
- iterator.next().then((untypedRes) => {
- const res = untypedRes as IteratorResult;
- return { res, event: events[index] };
- }),
- ),
- );
-
- if (result.res.done) break;
-
- yield { event: result.event, data: result.res.value };
- }
-}
diff --git a/src/server/redis/utils/lobby-utils.ts b/src/server/redis/utils/lobby-utils.ts
new file mode 100644
index 0000000..c948db9
--- /dev/null
+++ b/src/server/redis/utils/lobby-utils.ts
@@ -0,0 +1,60 @@
+import { db } from "@/server/db";
+import { redis } from "..";
+import { lobbies, lobbyMembers } from "@/server/db/schema";
+import { count, eq } from "drizzle-orm";
+import { ee } from "@/server/api/routers/lobby";
+
+export async function getLobbyMemberCount(lobbyId: string) {
+ // Check if the count is in the cache
+ const cachedCount = await redis.get(`lobby:${lobbyId}:memberCount`);
+ if (cachedCount !== null) {
+ return parseInt(cachedCount, 10);
+ }
+
+ // If not in cache, query the database
+ const [rawMemberCount] = await db
+ .select({ count: count() })
+ .from(lobbyMembers)
+ .where(eq(lobbyMembers.lobbyId, lobbyId));
+ console.log("rawMemberCount", rawMemberCount);
+
+ const memberCount = rawMemberCount?.count ?? 0;
+ // Cache the result
+ await redis.set(`lobby:${lobbyId}:memberCount`, memberCount);
+ return memberCount;
+}
+
+export async function deleteLobbyIfEmpty(lobbyId: string) {
+ const activeMemberCount = await getLobbyMemberCount(lobbyId);
+ if (activeMemberCount > 0) return;
+
+ const [realMemberCount] = await db
+ .select({ count: count() })
+ .from(lobbyMembers)
+ .where(eq(lobbyMembers.lobbyId, lobbyId));
+ if (realMemberCount?.count === 0) {
+ await db.delete(lobbies).where(eq(lobbies.id, lobbyId));
+ console.log(`Lobby with ID ${lobbyId} has been deleted. (EMPTY_LOBBY)`);
+ // Optionally, remove the cached count
+ await redis.del(`lobby:${lobbyId}:memberCount`);
+ ee.emit("lobby:update", { id: lobbyId }, true);
+ } else {
+ console.error(
+ "!!!!== Redis Cache is out of sync with the database. ==!!!!",
+ );
+ }
+}
+
+export async function addAdminToLobby(lobbyId: string, playerId: string) {}
+
+export async function handleLobbyAfterLeave(lobbyId: string) {
+ await deleteLobbyIfEmpty(lobbyId);
+}
+
+export async function decreaseLobbyMemberCount(lobbyId: string) {
+ await redis.decr(`lobby:${lobbyId}:memberCount`);
+}
+
+export async function increaseLobbyMemberCount(lobbyId: string) {
+ await redis.incr(`lobby:${lobbyId}:memberCount`);
+}