import {combineEpics} from 'redux-observable';
import {makeAsyncEpic} from 'common/utils/simplifiedAsync';
import * as actions from '../actions';
import * as api from '../../services/api';

import 'rxjs/add/operator/distinctUntilChanged';
import 'rxjs/add/operator/debounceTime';
import 'rxjs/add/operator/mapTo';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/catch';
import 'rxjs/add/operator/filter';
import 'rxjs/add/observable/of';
import * as selectors from '../selectors';
import {getUniqueId} from '../../../common/utils/guid';

const fetchParquetAnalysis = makeAsyncEpic(actions.fetchParquetAnalysis, api.fetchParquetAnalysis);

// gets sources ids used by items recursively
const getSourceIdsArr = (sourcesIdArr, item) => {
  if (item.sourceColumn !== undefined) {
    if (!sourcesIdArr.includes(item.sourceColumn)) {
      sourcesIdArr.push(item.sourceColumn);
    }
  } else if (item.transform && item.transform.input) {
    item.transform.input.forEach((a) => {
      getSourceIdsArr(sourcesIdArr, a);
    });
  }
  return sourcesIdArr;
};

const setParquetDiametricsStreamAnalysisSchema = (action$) =>
  action$
    .ofType(actions.fetchParquetAnalysis.success.TYPE)
    .flatMap((action) => [actions.setParquetDiametricsStreamAnalysisSchema(action.payload)]);

const setParquetDataStreamSchema = (action$, {getState}) =>
  action$
    .ofType(actions.setParquetStreamDiametricsChange.TYPE, actions.removeParquetStreamDiametrics.TYPE)
    .flatMap(() => {
      const stream = selectors.getSelectedDataStream(getState());

      const tableMeta = stream.uiState.analysisResult.columns;
      const schemaModifications = {
        schema: {...stream.schema},
      };

      // update existing schema items
      // eslint-disable-next-line complexity
      stream.schema.columns.forEach((item) => {
        if (
          item.sourceColumn !== undefined ||
          (item.transform &&
            item.transform.input &&
            item.transform.input.length === 1 &&
            getSourceIdsArr([], item).length === 1)
        ) {
          const sourceColumn = item.sourceColumn || getSourceIdsArr([], item)[0];
          if (stream.metrics.includes(sourceColumn)) {
            if (item.type === 'dimension') {
              // delete any copy of this item
              schemaModifications.schema.columns = schemaModifications.schema.columns.filter(
                (a) => sourceColumn !== a.sourceColumn || item.name === a.name,
              );
            }
            if (item.type !== 'metric') {
              item.type = 'metric';
              item.sourceColumn = sourceColumn;
              if (item.transform) {
                /*  eslint no-param-reassign: "off" */
                delete item.transform;
              }
            }
          } else if (stream.dimensions.includes(sourceColumn)) {
            if (item.type !== 'dimension') {
              item.type = 'dimension';
              item.sourceColumn = sourceColumn;
              if (item.transform) {
                /*  eslint no-param-reassign: "off" */
                delete item.transform;
              }
              if (item.targetType) {
                /*  eslint no-param-reassign: "off" */
                delete item.targetType;
              }
              if (item.metricTags) {
                /*  eslint no-param-reassign: "off" */
                delete item.metricTags;
              }
            }
          } else {
            schemaModifications.schema.columns = schemaModifications.schema.columns.filter(
              (a) => (a.sourceColumn !== undefined ? a.sourceColumn : getSourceIdsArr([], a)[0]) !== sourceColumn,
            );
            schemaModifications.schema.sourceColumns = schemaModifications.schema.sourceColumns.filter(
              (a) => a.name !== sourceColumn,
            );
          }
        } else {
          // user created columns check if source still exist
          if (item.transform && item.transform.input && item.transform.input.length > 1 && item.type === 'dimension') {
            getSourceIdsArr([], item).forEach((name) => {
              /*  eslint no-lonely-if: "off" */
              if (!stream.dimensions.includes(name)) {
                schemaModifications.schema.columns = schemaModifications.schema.columns.filter(
                  (a) => a.name !== item.name,
                );
              }
            });
          }
        }
      });

      // check for unAssigned items added to metrics or dimensions
      tableMeta.forEach((item) => {
        if (stream.metrics.includes(item.name) || stream.dimensions.includes(item.name)) {
          const foundItem = schemaModifications.schema.sourceColumns.find((a) => item.name === a.name);
          if (!foundItem) {
            schemaModifications.schema.columns.push({
              id: getUniqueId(),
              sourceColumn: item.name,
              name: item.name,
              type: stream.metrics.includes(item.name) ? 'metric' : 'dimension',
            });
            schemaModifications.schema.sourceColumns.push({
              id: item.name,
              name: item.name,
            });
          }
        }
      });

      return [actions.setSelectedStreamKeyVal(schemaModifications)];
    });

const parquetEpic = combineEpics(
  fetchParquetAnalysis,
  setParquetDiametricsStreamAnalysisSchema,
  setParquetDataStreamSchema,
);
export default parquetEpic;
