import { Observable } from 'rxjs';
import { resolvePKFields } from '../../utils/resolvePKFields.mjs';
import { findIndexByFields } from '../../utils/findIndexByFields.mjs';

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
function observeQueryFactory(models, model) {
  const {
    name
  } = model;
  const observeQuery = arg => new Observable(subscriber => {
    // what we'll be sending to our subscribers
    const items = [];
    // To enqueue subscription messages while we collect our initial
    // result set.
    const messageQueue = [];
    // operation to take when message(s) arrive.
    // this operation will be swapped out once initial "sync" is complete
    // to immediately ingest messsages.
    let receiveMessages = (...messages) => {
      return messageQueue.push(...messages);
    };
    // start subscriptions
    const onCreateSub = models[name].onCreate(arg).subscribe({
      next(item) {
        receiveMessages({
          item,
          type: 'create'
        });
      },
      error(error) {
        subscriber.error({
          type: 'onCreate',
          error
        });
      }
    });
    const onUpdateSub = models[name].onUpdate(arg).subscribe({
      next(item) {
        receiveMessages({
          item,
          type: 'update'
        });
      },
      error(error) {
        subscriber.error({
          type: 'onUpdate',
          error
        });
      }
    });
    const onDeleteSub = models[name].onDelete(arg).subscribe({
      next(item) {
        receiveMessages({
          item,
          type: 'delete'
        });
      },
      error(error) {
        subscriber.error({
          type: 'onDelete',
          error
        });
      }
    });
    // consumes a list of messages and sends a snapshot
    function ingestMessages(messages) {
      for (const message of messages) {
        const idx = findIndexByFields(message.item, items, pkFields);
        switch (message.type) {
          case 'create':
            if (idx < 0) items.push(message.item);
            break;
          case 'update':
            if (idx >= 0) items[idx] = message.item;
            break;
          case 'delete':
            if (idx >= 0) items.splice(idx, 1);
            break;
          default:
            console.error('Unrecognized message in observeQuery.', message);
        }
      }
      subscriber.next({
        items,
        isSynced: true
      });
    }
    const pkFields = resolvePKFields(model);
    // initial results
    (async () => {
      let firstPage = true;
      let nextToken = null;
      while (!subscriber.closed && (firstPage || nextToken)) {
        firstPage = false;
        const {
          data: page,
          errors,
          nextToken: _nextToken
        } = await models[name].list({
          ...arg,
          nextToken
        });
        nextToken = _nextToken;
        items.push(...page);
        // if there are no more pages and no items we already know about
        // that need to be merged in from sub, we're "synced"
        const isSynced = messageQueue.length === 0 && (nextToken === null || nextToken === undefined);
        subscriber.next({
          items,
          isSynced
        });
        if (Array.isArray(errors)) {
          for (const error of errors) {
            subscriber.error(error);
          }
        }
      }
      // play through the queue
      if (messageQueue.length > 0) {
        ingestMessages(messageQueue);
      }
      // switch the queue to write directly to the items collection
      receiveMessages = (...messages) => {
        ingestMessages(messages);
        return items.length;
      };
    })();
    // when subscriber unsubscribes, tear down internal subs
    return () => {
      // 1. tear down internal subs
      onCreateSub.unsubscribe();
      onUpdateSub.unsubscribe();
      onDeleteSub.unsubscribe();
      // 2. there is no need to explicitly stop paging. instead, we
      // just check `subscriber.closed` above before fetching each page.
    };
  });
  return observeQuery;
}
export { observeQueryFactory };
