import { Stage } from './Stage';
import { Pipeline } from '../Pipeline';
import {
  action,
  IReactionDisposer,
  observable,
  onBecomeObserved,
  onBecomeUnobserved,
  reaction,
  runInAction,
  toJS,
} from 'mobx';
import { EndOfStreamSignal } from '../Signal/EndOfStreamSignal';
import { notUndefined } from '../../../Utils/notUndefined';
import { StartSignal } from '../Signal/StartSignal';
import { PipelineContext, ParamValues } from '../PipelineContext';

export type PipelineExecutionMode = 'STREAM' | 'ONCE';

export interface PipelineStageConfig {
  pipeline?: Pipeline;
  pipelineId?: string;
  executionMode?: PipelineExecutionMode;
  tags?: string[];
  params?: ParamValues;
}

export class PipelineStage extends Stage<PipelineStageConfig> {
  @observable
  innerPipeline?: Pipeline = undefined;
  pipelineFetch?: IReactionDisposer;

  constructor(pipeline: Pipeline, json?: any) {
    super(pipeline, json);
    onBecomeObserved(this, 'innerPipeline', this.startFetchPipeline);
    onBecomeUnobserved(this, 'innerPipeline', this.stopFetchPipeline);
  }

  startFetchPipeline = () => {
    if (!this.pipelineFetch) {
      this.pipelineFetch = reaction(
        () => this.config.pipelineId,
        (pipelineId?: string) => {
          if (pipelineId) {
            Pipeline.get(pipelineId).then((p) => runInAction(() => (this.innerPipeline = p)));
          } else {
            this.innerPipeline = undefined;
          }
        },
        { fireImmediately: true },
      );
    }
  };

  stopFetchPipeline = () => {
    this.pipelineFetch && this.pipelineFetch();
    this.pipelineFetch = undefined;
  };

  process() {
    if (this.config.pipeline) {
      console.log('pipeline::process', this.config.executionMode, toJS(this.config.pipeline));
      if (this.config.executionMode === 'ONCE') {
        return this.executeOnce();
      } else {
        return this.executeStream();
      }
    }
    return undefined;
  }

  executeStream() {
    const signals = this.takeAll();
    console.log('pipeline::stream', toJS(signals));
    return this.config.pipeline
      ?.executePromise(this.context, signals.concat([new EndOfStreamSignal()]))
      .then((result) => {
        console.log('signal remote pipeline stream', this.config.pipeline?.name, toJS(result));
        result.forEach((signal) => this.processNext(signal));
      });
  }

  executeOnce() {
    const { dataSignals } = this;
    console.log('pipeline::once', toJS(dataSignals));
    if (dataSignals.length > 0) {
      return Promise.all(
        dataSignals.map((signal) =>
          this.config.pipeline
            ?.compile()
            .then((pipeline) =>
              pipeline.executePromise(this.context, [signal].concat([new EndOfStreamSignal()] as any)),
            ),
        ),
      ).then((signals) =>
        signals
          .flatMap((s) => s)
          .filter(notUndefined)
          .forEach((signal) => {
            console.log('signal remote pipeline once', this.config.pipeline?.name, toJS(signals));
            return this.processNext(signal);
          }),
      );
    } else if (this.previousStages.length === 0) {
      return this.config.pipeline
        ?.compile()
        .then((pipeline) => pipeline.executePromise(this.context, [new StartSignal(), new EndOfStreamSignal()]))
        .then((signals) => {
          console.log('signal remote pipeline once', this.config.pipeline?.name, toJS(signals));
          signals.forEach((signal) => this.processNext(signal));
        });
    } else {
      console.warn(
        'Skipping pipeline execution since we did not get any dataSignals and there are incoming ports',
        toJS(this.previousStages),
      );
    }
    return undefined;
  }

  @action
  setParam(paramName: string, value: any) {
    this.config.params = this.config.params ?? {};
    this.config.params[paramName] = value;
  }

  @action
  removeParam(paramName: string) {
    this.config.params = this.config.params ?? {};
    delete this.config.params[paramName];
  }

  get context(): PipelineContext {
    const context = this.pipeline.context.copy();
    const params = this.config.params ?? {};
    context.tags = context.tags.concat(this.config.tags ?? []);
    Object.keys(params).map((key) => {
      const value = params[key];
      if (typeof value === 'string' && value.startsWith('$')) {
        const param = value.replace('$', '');
        context.paramValues[key] = context.getValue(this.pipeline, param);
      } else {
        context.paramValues[key] = value;
      }
    });
    console.log('getContext', toJS(context.paramValues), toJS(this.config));
    return context;
  }

  toJS(): any {
    return Object.assign(super.toJS(), {
      config: {
        pipelineId: this.config.pipelineId,
        executionMode: this.config.executionMode,
        tags: toJS(this.config.tags),
        params: toJS(this.config.params),
      },
    });
  }

  get type(): string {
    return 'pipeline';
  }
}
