import es from 'elasticsearch-browser';
import { Observable, OperatorFunction } from 'rxjs';
import * as _ from 'lodash';
import { CollapseInput } from './_types/CollapseInput';
import { SortInput } from './_types/SortInput';
import { SizeInput } from './_types/SizeInput';
import { FromInput } from './_types/FromInput';
import { AggInput } from './_types/AggInput';
import { SourceInput } from './_types/SourceInput';

export function executeSearch(
  client: es.Client
): OperatorFunction<es.Client.SearchParams, es.Client.SearchResponse<any>> {
  return (source: Observable<es.Client.SearchParams>) => {
    return new Observable(observer => {
      const sourceSubscription = source.subscribe({
        next(searchParams) {
          client
            .search(searchParams)
            .then(response => {
              observer.next(response);
            })
            .catch(error => observer.error(error));
        },
        error(err) {
          observer.error(err);
        },
        complete() {
          observer.complete();
        }
      });
      return () => {
        sourceSubscription.unsubscribe();
      };
    });
  };
}

export function getMapping(client: es.Client): OperatorFunction<es.Client.SearchParams, es.Client.SearchResponse<any>> {
  return (source: Observable<es.Client.SearchParams>) => {
    return new Observable(observer => {
      const sourceSubscription = source.subscribe({
        next(searchParams) {
          client.indices
            .getMapping(searchParams)
            .then(response => {
              observer.next(response);
            })
            .catch(error => observer.error(error));
        },
        error(err) {
          observer.error(err);
        },
        complete() {
          observer.complete();
        }
      });
      return () => {
        sourceSubscription.unsubscribe();
      };
    });
  };
}

export function executeCount(
  client: es.Client
): OperatorFunction<es.Client.SearchParams, es.Client.SearchResponse<any>> {
  return (source: Observable<es.Client.SearchParams>) => {
    return new Observable(observer => {
      const sourceSubscription = source.subscribe({
        next(searchParams) {
          client
            .count(searchParams)
            .then(response => {
              observer.next(response);
            })
            .catch(error => observer.error(error));
        },
        error(err) {
          observer.error(err);
        },
        complete() {
          observer.complete();
        }
      });
      return () => {
        sourceSubscription.unsubscribe();
      };
    });
  };
}

export function index$(index: string | string[]): Observable<es.Client.SearchParams> {
  return new Observable(observer => {
    observer.next({
      index,
      ignoreUnavailable: true
    });
  });
}

export function getIndexes(datesAsStrings: string[] | null | undefined): string[] {
  if (!datesAsStrings) {
    return ['messages-*'];
  }
  const indexes: string[] = datesAsStrings ? datesAsStrings.map(date => `messages-${date}`) : ['messages-*'];
  return indexes;
}

export function setTerm(key: string, value: string): OperatorFunction<es.Client.SearchParams, es.Client.SearchParams> {
  return (source: Observable<es.Client.SearchParams>) => {
    return new Observable(observer => {
      const sourceSubscription = source.subscribe({
        next(searchParams) {
          const subField = _.get(searchParams, 'body.query.term');
          if (!subField) {
            _.set(searchParams, 'body.query.term', {});
          }
          searchParams.body.query.term[key] = {
            value
          };
          observer.next(searchParams);
        },
        error(err) {
          observer.error(err);
        },
        complete() {
          observer.complete();
        }
      });
      return () => {
        sourceSubscription.unsubscribe();
      };
    });
  };
}

export function setFilter(filter: object): OperatorFunction<es.Client.SearchParams, es.Client.SearchParams> {
  return (source: Observable<es.Client.SearchParams>) => {
    return new Observable(observer => {
      const sourceSubscription = source.subscribe({
        next(searchParams) {
          if (!filter) {
            observer.next(searchParams);
            return;
          }
          const subField = _.get(searchParams, 'body.query.bool');
          if (!subField) {
            _.set(searchParams, 'body.query.bool', {});
          }
          searchParams.body.query.bool.filter = filter;
          observer.next(searchParams);
        },
        error(err) {
          observer.error(err);
        },
        complete() {
          observer.complete();
        }
      });
      return () => {
        sourceSubscription.unsubscribe();
      };
    });
  };
}

function createBody(searchParams: es.Client.SearchParams) {
  const subField = _.get(searchParams, 'body');
  if (!subField) {
    _.set(searchParams, 'body', {});
  }
}
function createBodyAggs(searchParams: es.Client.SearchParams) {
  const subField = _.get(searchParams, 'body.aggs');
  if (!subField) {
    _.set(searchParams, 'body.aggs', {});
  }
}

export function setAggBody(
  searchParams: es.Client.SearchParams,
  aggName: string,
  aggData: any,
  parentAggregations?: string[]
) {
  createBodyAggs(searchParams);
  let aggsField = searchParams.body.aggs;
  if (parentAggregations) {
    parentAggregations.forEach(crtAggName => {
      const subField = _.get(aggsField, `${crtAggName}.aggs`);
      if (!subField) {
        _.set(aggsField, `${crtAggName}.aggs`, {});
      }
      aggsField = aggsField[crtAggName].aggs;
    });
  }
  aggsField[aggName] = aggData;
}

export function setAgg(
  name: string,
  input: AggInput,
  parentAggregations?: string[]
): OperatorFunction<es.Client.SearchParams, es.Client.SearchParams> {
  return (source: Observable<es.Client.SearchParams>) => {
    return new Observable(observer => {
      const sourceSubscription = source.subscribe({
        next(searchParams) {
          setAggBody(searchParams, name, input, parentAggregations);
          observer.next(searchParams);
        },
        error(err) {
          observer.error(err);
        },
        complete() {
          observer.complete();
        }
      });
      return () => {
        sourceSubscription.unsubscribe();
      };
    });
  };
}

export function setCollapse(input: CollapseInput): OperatorFunction<es.Client.SearchParams, es.Client.SearchParams> {
  return (source: Observable<es.Client.SearchParams>) => {
    return new Observable(observer => {
      const sourceSubscription = source.subscribe({
        next(searchParams) {
          createBodyAggs(searchParams);
          searchParams.body.collapse = input;
          observer.next(searchParams);
        },
        error(err) {
          observer.error(err);
        },
        complete() {
          observer.complete();
        }
      });
      return () => {
        sourceSubscription.unsubscribe();
      };
    });
  };
}

export function setSort(input: SortInput): OperatorFunction<es.Client.SearchParams, es.Client.SearchParams> {
  return (source: Observable<es.Client.SearchParams>) => {
    return new Observable(observer => {
      const sourceSubscription = source.subscribe({
        next(searchParams) {
          createBodyAggs(searchParams);
          searchParams.body.sort = input;
          observer.next(searchParams);
        },
        error(err) {
          observer.error(err);
        },
        complete() {
          observer.complete();
        }
      });
      return () => {
        sourceSubscription.unsubscribe();
      };
    });
  };
}

export function setSearchAfter(timestamp: number): OperatorFunction<es.Client.SearchParams, es.Client.SearchParams> {
  return (source: Observable<es.Client.SearchParams>) => {
    return new Observable(observer => {
      const sourceSubscription = source.subscribe({
        next(searchParams) {
          createBodyAggs(searchParams);
          searchParams.body.search_after = [timestamp];
          observer.next(searchParams);
        },
        error(err) {
          observer.error(err);
        },
        complete() {
          observer.complete();
        }
      });
      return () => {
        sourceSubscription.unsubscribe();
      };
    });
  };
}

export function setSize(input: SizeInput): OperatorFunction<es.Client.SearchParams, es.Client.SearchParams> {
  return (source: Observable<es.Client.SearchParams>) => {
    return new Observable(observer => {
      const sourceSubscription = source.subscribe({
        next(searchParams) {
          createBodyAggs(searchParams);
          searchParams.body.size = input;
          observer.next(searchParams);
        },
        error(err) {
          observer.error(err);
        },
        complete() {
          observer.complete();
        }
      });
      return () => {
        sourceSubscription.unsubscribe();
      };
    });
  };
}

export function setFrom(input: FromInput): OperatorFunction<es.Client.SearchParams, es.Client.SearchParams> {
  return (source: Observable<es.Client.SearchParams>) => {
    return new Observable(observer => {
      const sourceSubscription = source.subscribe({
        next(searchParams) {
          createBodyAggs(searchParams);
          searchParams.body.from = input;
          observer.next(searchParams);
        },
        error(err) {
          observer.error(err);
        },
        complete() {
          observer.complete();
        }
      });
      return () => {
        sourceSubscription.unsubscribe();
      };
    });
  };
}

/**
 * Define the source fields that will be shown
 */
export function setSource(input: SourceInput): OperatorFunction<es.Client.SearchParams, es.Client.SearchParams> {
  return (source: Observable<es.Client.SearchParams>) => {
    return new Observable(observer => {
      const sourceSubscription = source.subscribe({
        next(searchParams) {
          createBody(searchParams);
          searchParams.body._source = input._source;
          observer.next(searchParams);
        },
        error(err) {
          observer.error(err);
        },
        complete() {
          observer.complete();
        }
      });
      return () => {
        sourceSubscription.unsubscribe();
      };
    });
  };
}
