import { ASRChunkJSON, TranscriptJSON } from 'src/models/Transcript'

import { StreamQueue } from './StreamQueue'

export interface RevisionUpdateStreamMessage {
    action: 'revision_update'
    data: TranscriptJSON
}

export interface ASRUpdateStreamMessage {
    action: 'asr_chunk'
    data: ASRChunkJSON
    session_id: string
    request_id: string
    timestamp: number
}

export type StreamMessage = RevisionUpdateStreamMessage | ASRUpdateStreamMessage

export type StreamMessageCallbackFn<T> = (message: Extract<StreamMessage, { action: T }>) => unknown

export interface StreamHistoryOptions {
    start: number
    limit: number
}

export interface StreamSourceInterface {
    subscribe: (...targets: string[]) => unknown
    unsubscribe: (...targets: string[]) => unknown
    onMessage: (handler: StreamMessageCallbackFn<any>) => unknown
    history: <T extends StreamMessage['action']>(
        target: string,
        options: StreamHistoryOptions,
    ) => Promise<Extract<StreamMessage, { action: T }>[]>
    close: () => unknown
}

export class StreamSourceBase implements Pick<StreamSourceInterface, 'onMessage'> {
    private handlers: StreamMessageCallbackFn<any>[] = []

    protected send(message: StreamMessage) {
        this.handlers.forEach((h) => h(message))

        return message
    }

    onMessage(handler: StreamMessageCallbackFn<any>) {
        this.handlers.push(handler)
    }
}

export class StreamClient {
    private source: StreamSourceInterface | null = null
    private handlers: Record<string, StreamMessageCallbackFn<any>[]> = {}
    private isClosed: boolean = false
    private queues: Record<string, StreamQueue<any>> = {}

    attachSource(source: StreamSourceInterface) {
        this.source = source
        source.onMessage(this.onMessage.bind(this))
    }

    close() {
        this.source?.close()
        this.isClosed = true
    }

    subscribe(...targets: string[]) {
        if (this.isClosed) return

        return this.source?.subscribe(...targets)
    }

    unsubscribe(...targets: string[]) {
        if (this.isClosed) return

        return this.source?.unsubscribe(...targets)
    }

    history<T extends StreamMessage['action']>(
        target: string,
        options: StreamHistoryOptions,
    ): Promise<Extract<StreamMessage, { action: T }>[]> {
        return this.source?.history(target, options) ?? Promise.resolve([])
    }

    private onMessage(message: StreamMessage) {
        if (this.isClosed) return
        this.handlers[message.action]?.forEach((handler) => handler(message))
    }

    on<T extends StreamMessage['action']>(action: T, handler: StreamMessageCallbackFn<T>) {
        if (this.isClosed) return

        if (!this.handlers[action]) {
            this.handlers[action] = []
        }

        this.handlers[action].push(handler)
    }

    off<T extends StreamMessage['action']>(action: T, handler: StreamMessageCallbackFn<T>) {
        if (this.isClosed) return

        this.handlers[action] = this.handlers[action].filter((h) => h !== handler)
    }

    queue<T extends StreamMessage['action']>(action: T) {
        if (!this.queues[action]) {
            this.queues[action] = new StreamQueue(this, action)
        }

        return this.queues[action] as StreamQueue<T>
    }
}
