import { Stage } from './Stage';
import { Signal } from '../Signal/Signal';
import { Pipeline } from '../Pipeline';
import { EndOfStreamSignal } from '../Signal/EndOfStreamSignal';

export class SinkStage extends Stage<any> {
  constructor(
    pipeline: Pipeline,
    json?: any,
    public subscriber?: (signal: Signal) => any,
    public complete?: () => any,
  ) {
    super(pipeline, json);
  }

  process() {
    console.log('sink::process');
    this.takeAll().forEach((signal) => this.subscriber && this.subscriber(signal));
    this.subscriber && this.subscriber(new EndOfStreamSignal());
    this.complete && this.complete();
  }

  get type(): string {
    return 'sink';
  }
}
