import { DestroyRef, inject, Injectable, signal } from "@angular/core";
import { Processor } from "./processor";
import { Event, EventLogService, LoggedEvent } from "../event";
import { debounce, map, of, tap, timer } from "rxjs";
import { SplunkHttpEventCollectorService } from "../splunk/splunk-http-event-collector.service";
import { environment } from "@cq/environments/environment";
import { takeUntilDestroyed } from "@angular/core/rxjs-interop";

@Injectable()
export class SplunkProcessorService extends Processor {
  batchSize = signal(100);
  idlePeriod = signal(5000);
  isEnabled = signal(environment.isSplunkEventCollectionEnabled);

  #destroyed = inject(DestroyRef);
  #hec = inject(SplunkHttpEventCollectorService);
  #log = inject(EventLogService);
  #pending = signal<LoggedEvent[]>([]);

  start(): void {
    this.#log.stream
      .pipe(
        // unsubscribe on destruction
        takeUntilDestroyed(this.#destroyed),
        // collect pending events
        tap((logged) =>
          this.#pending.update((pending) => [...pending, logged]),
        ),
        // batch events for collection
        debounce(
          (logged) =>
            this.shouldFlushImmediately(logged.event)
              ? of(logged) // immediate
              : timer(this.idlePeriod()), // delayed
        ),
        // clear pending events
        map(() => {
          const pending = this.#pending();
          this.#pending.set([]);
          return pending;
        }),
      )
      .subscribe((events) => {
        // transmit events to splunk
        this.#hec.event(this.format(events)).subscribe();
      });
  }

  private shouldFlushImmediately(event: Event) {
    return (
      event.priority === "critical" ||
      this.#pending().length >= this.batchSize()
    );
  }

  private format(events: LoggedEvent[]) {
    return events.map((logged) => {
      const event = logged.event.serialize();
      return {
        time: logged.timestamp,
        host: window.location.hostname,
        source: "clearquote",
        sourcetype: "diagnostics",
        index: "cls",
        event: { event }, // namespace event data to aid with field and index collisions
        fields: logged.context,
      };
    });
  }
}
