diff --git a/src/lib/server/endpoint.ts b/src/lib/server/endpoint.ts index 9b97176..aa73391 100644 --- a/src/lib/server/endpoint.ts +++ b/src/lib/server/endpoint.ts @@ -1,34 +1,77 @@ export async function generateEndpoint( - startFunction?: (enqueue: (data: any) => void) => void | Promise void)> + startFunction?: (enqueue: (data: any) => void) => void | Promise void)>, + request?: Request ) { let streamController: ReadableStreamDefaultController | null = null; let cleanupFunction: (() => void) | void = undefined; + let heartbeatInterval: ReturnType | null = null; + let cleanedUp = false; + + const safeCleanup = () => { + if (cleanedUp) return; + cleanedUp = true; + + if (heartbeatInterval) { + clearInterval(heartbeatInterval); + heartbeatInterval = null; + } + + if (typeof cleanupFunction === 'function') { + cleanupFunction(); + cleanupFunction = undefined; + } + }; const enqueue = (data: any) => { + if (cleanedUp) return; let transferdata = JSON.stringify(data); - // stringify data and add to controller queue if (streamController) { - streamController.enqueue(`data: ${transferdata}\n\n`); - } else { - console.log('no controller'); + try { + streamController.enqueue(`data: ${transferdata}\n\n`); + } catch { + safeCleanup(); + } } }; const stream = new ReadableStream({ async start(controller) { streamController = controller; + + heartbeatInterval = setInterval(() => { + try { + streamController!.enqueue(': keepalive\n\n'); + } catch { + safeCleanup(); + } + }, 30000); + heartbeatInterval.unref(); + if (startFunction) { - const result = await startFunction(enqueue); - if (typeof result === 'function') { - cleanupFunction = result; + try { + const result = await startFunction(enqueue); + if (typeof result === 'function') { + cleanupFunction = result; + } + } catch (err) { + safeCleanup(); + throw err; } } }, async cancel() { - if (cleanupFunction) cleanupFunction(); + safeCleanup(); } }); + if (request?.signal) { + if (request.signal.aborted) { + safeCleanup(); + } else { + request.signal.addEventListener('abort', () => safeCleanup()); + } + } + return { response: new Response(stream, { headers: { diff --git a/src/lib/server/globalEmitter.ts b/src/lib/server/globalEmitter.ts index 02dc635..8ce6ca7 100644 --- a/src/lib/server/globalEmitter.ts +++ b/src/lib/server/globalEmitter.ts @@ -1,5 +1,4 @@ import { EventEmitter } from 'node:events'; -// Main emitter for everything - export const globalEmitter = new EventEmitter(); +globalEmitter.setMaxListeners(1000); diff --git a/src/routes/api/registeredEvents/+server.ts b/src/routes/api/registeredEvents/+server.ts index 82a5912..f130d3d 100644 --- a/src/routes/api/registeredEvents/+server.ts +++ b/src/routes/api/registeredEvents/+server.ts @@ -2,10 +2,8 @@ import { getRegisteredEventsWithPlayers } from '$lib/server/databaseManager'; import { globalEmitter } from '$lib/server/globalEmitter'; import { generateEndpoint } from '$lib/server/endpoint'; -export async function GET() { - // Generate stream endpoint +export async function GET({ request }) { const endpoint = generateEndpoint(async (enqueue) => { - // Get the all the events with the players seperated into brackets let eventList = async () => { // Get eventList with structure from database let newEventList = await getRegisteredEventsWithPlayers(); @@ -23,7 +21,7 @@ export async function GET() { return () => { globalEmitter.off('eventUpdate', eventList); }; - }); + }, request); return (await endpoint).response; } diff --git a/src/routes/api/teams/+server.ts b/src/routes/api/teams/+server.ts index df1f227..ca12721 100644 --- a/src/routes/api/teams/+server.ts +++ b/src/routes/api/teams/+server.ts @@ -10,7 +10,7 @@ export async function POST({ request }: any) { return new Response('ok'); } -export async function GET() { +export async function GET({ request }) { const endpoint = generateEndpoint(async (enqueue) => { // Function to grab score from database and add it to message queue let newScore = async () => { @@ -26,6 +26,6 @@ export async function GET() { return () => { globalEmitter.off('scoreUpdate', newScore); }; - }); + }, request); return (await endpoint).response; }