import { Stage } from '../Stages/Stage';
import { observable, toJS } from 'mobx';
import { Signal } from './Signal';
import { EndOfStreamSignal } from './EndOfStreamSignal';
import { EmptySignal } from './EmptySignal';

export type StagePortState = 'OPEN' | 'CLOSED';

export class StagePort {
  @observable
  portState: StagePortState = 'OPEN';
  @observable
  otherPort?: StagePort = undefined;
  @observable
  signalQueue: Signal[] = [];

  constructor(public readonly stage: Stage<any>, public readonly type: string) {}

  signal(signal: Signal) {
    if (this.portState === 'OPEN') {
      if (signal instanceof EndOfStreamSignal) {
        this.portState = 'CLOSED';
        this.stage.signal(signal, this);
      } else if (!this.signalQueue.find((s) => s.id === signal.id)) {
        this.signalQueue.push(signal);
      } else {
        console.warn('signal already processed', this.stage.type, toJS(signal));
      }
    } else {
      console.warn(`Port (${this.type}) of ${this.stage.type} is not OPEN`, this.portState);
    }
  }

  forward(signal: Signal) {
    this.otherPort?.signal(signal);
  }

  take(): Signal | undefined {
    if (this.signalQueue.length > 0) {
      return this.signalQueue.splice(0, 1)[0];
    }
    return undefined;
  }

  takeAll(): Signal[] {
    const signals = this.signalQueue;
    this.signalQueue = [];
    return signals;
  }

  close() {
    this.portState = 'CLOSED';
  }
}
