const get = require('lodash/get')

// /* ChangesFeed
//  *
//  * Usage example:
//  * ```
//  * const checkpoint = new Checkpoint(db, 'some-id')
//  * const since = await checkpoint.read()
//  *
//  * const feed = new ChangesFeed(db, { since })
//  * const changes = await feed.readAll()
//  * const docs = changes.map(c => c.doc)
//  *
//  * await checkpoint.write(feed.lastSeq)
//  * ```
//  */
class ChangesFeed {
  constructor (db, options) {
    this.db = db
    // batchSize: Amount of changes to fetch at once
    this.batchSize = get(options, 'batchSize', 100)
    // maxChanges: Maximum number of changes to read. At least equal to batch size.
    this.maxChanges = get(options, 'maxChanges', Math.max(this.batchSize, 100))
    // includeDocs: Whether to include the documents when reading
    this.includeDocs = get(options, 'includeDocs', true)
    // includeDeleted: Whether to include changes for deleted docs
    this.includeDeleted = get(options, 'includeDeleted', false)
    // since: Sequence id after which to start reading
    this.since = get(options, 'since', 0)
    // readUntilEnd: Fetch changes until no more changes are found
    this.readUntilEnd = get(options, 'readUntilEnd', false)
    if (this.readUntilEnd) {
      this.maxChanges = 0
    }
    // descending: Read changes in descending order
    // Warning: Descending will only work for a single batch.
    this.descending = get(options, 'descending')
    if (this.descending) {
      this.maxChanges = this.batchSize
      this.readUntilEnd = false
    }

    this.lastSeq = this.since
    this.numChangesRead = 0
    this.done = false
  }

  async readBatch () {
    let batchSize = this.readUntilEnd
      ? this.batchSize
      : Math.min(this.batchSize, this.maxChanges - this.numChangesRead)
    const changes = await this.db.changes({
      since: this.lastSeq,
      limit: batchSize,
      include_docs: this.includeDocs,
      descending: this.descending
    })
    if (changes.results.length === 0) {
      this.done = true
      return []
    }
    this.numChangesRead += changes.results.length
    this.lastSeq = changes.results[changes.results.length - 1].seq

    const filteredChanges = []
    for (const result of changes.results) {
      if (!this.includeDeleted && result.deleted) {
        continue
      }
      filteredChanges.push(result)
    }
    this.done = (changes.results.length < batchSize) ||
      (!this.readUntilEnd && this.numChangesRead >= this.maxChanges)
    return filteredChanges
  }

  async readAll () {
    this.done = false
    const allChanges = []
    while (!this.done) {
      const changesBatch = await this.readBatch()
      allChanges.push(...changesBatch)
    }
    return allChanges
  }
}

/* Get or set a checkpoint for the changes feed
 * Usage:
 * ```
 * const checkpoint = new Checkpoint(db, checkpointId)
 * const lastSeq = await checkpoint.read()
 * // read some changes...
 * await checkpoint.write(newLastSeq)
 * ```
 */
class Checkpoint {
  constructor (db, name) {
    this.db = db
    this.id = `_local/van:indicators:${name}`
  }

  async write (seq) {
    let existing
    try {
      existing = await this.db.get(this.id)
    } catch (err) {
      if (err.status !== 404) {
        throw err
      }
    }
    const doc = Object.assign(existing || {}, {
      _id: this.id,
      last_seq: seq,
      last_updated: new Date().toJSON()
    })
    return this.db.put(doc)
  }

  async read () {
    try {
      const doc = await this.db.get(this.id)
      return doc.last_seq
    } catch (err) {
      if (err.status !== 404) {
        throw err
      }
    }
  }

  async reset () {
    return this.write(null)
  }
}

/* Read changes and apply handler with changes.
 * The checkpoint will only be set if we see no error.
 * The way this works is:
 * 1. Read n checkpoints
 * 2. Read n changes feeds
 * 3. Apply handler with n lists of changes
 * 4. Write n checkpoints
 * If any step before 4 fails this will not write checkpoints.
 *
 * This can read from multiple dbs.
 * Usage:
 * ```
 * const changesConfigs = [
 * {db: PouchDB, checkpointId: string, feedOptions: ChangesFeed.options}
 * ...
 * {db, checkpointId, feedOptions}
 * ]
 * const handler = async function ([...changes], ..., [...changes]) { ... }
 * await withChanges(changesConfigs, handler)
 * ```
  */
async function withChanges (changesConfigs = [], handler) {
  if (changesConfigs.length === 0) {
    return
  }
  if (!handler) {
    throw new Error('handler is required')
  }
  // construct checkpoints
  const checkpoints = []
  for (const c of changesConfigs) {
    // validate options
    if (!c.db) {
      throw new Error('options.db is required')
    }
    if (!c.checkpointId) {
      throw new Error('options.checkpointId is required')
    }
    // create checkpoints
    checkpoints.push(new Checkpoint(c.db, c.checkpointId))
  }

  // read checkpoints
  const startSeqs = await Promise.all(checkpoints.map(c => {
    return (async function () {
      try {
        return await c.read()
      } catch (e) {
        console.log('error reading checkpoint with id', c.id)
        throw e
      }
    }())
  }))

  // construct feeds
  const feeds = []
  for (let i = 0; i < changesConfigs.length; i++) {
    const config = changesConfigs[i]
    const lastSeq = startSeqs[i]
    const feed = new ChangesFeed(config.db, Object.assign({since: lastSeq}, config.feedOptions))
    feeds.push(feed)
  }

  // read all the changes feeds
  const changesPerFeed = await Promise.all(feeds.map(f => {
    try {
      return f.readAll()
    } catch (error) {
      console.log('Error reading changes feed', f.db.name)
      throw error
    }
  }))

  const stats = feeds.map(({ db, since, lastSeq }) => ({
    dbName: db.name,
    since,
    lastSeq
  }))

  // call the handler with the changes
  try {
    await handler(changesPerFeed, stats)
  } catch (e) {
    console.log('error applying changes handler')
    throw e
  }

  // write the checkpoints and generate some stats to return
  const checkpointPromises = []
  for (let i = 0; i < changesConfigs.length; ++i) {
    const lastSeq = feeds[i].lastSeq
    const config = changesConfigs[i]
    checkpointPromises.push(async function () {
      try {
        return checkpoints[i].write(lastSeq)
      } catch (e) {
        console.log('error setting checkpoing for db', config.db.name)
      }
    }())
  }
  try {
    await Promise.all(checkpointPromises)
  } catch (e) {
    console.log('error setting checkpoints. this is a grave error')
    throw e
  }

  return stats
}

module.exports = {
  ChangesFeed,
  Checkpoint,
  withChanges
}
