/// HELPERS ///

const diff = (orTopics, bTopics) => {
  const strs = orTopics.map(([, ...and]) => and.sort().toString());
  return bTopics.filter(and => !strs.includes([...and].sort().toString()));
};

const remove = (orTopics, rmTopics) => {
  const strs = rmTopics.map(and => [...and].sort().toString());
  return orTopics
    .filter(([, ...and]) => !strs.includes(and.sort().toString()))
    .map(topics => [...topics]);
};

const append = (orTopics, addTopics) => {
  const topics = remove(orTopics, addTopics.map(([, ...and]) => and));
  return [...topics, ...addTopics.map(topics => [...topics])];
};

export default (prefix, namespace, socket, params) => {
  /// STATE ///

  params = params[prefix] = {};

  let ns = namespace;
  let number = 1;
  let subNbr = 1;
  // [[ prevIdRef, "topic1", ...andTopicN], ...orTopics]]
  let topics = [];
  // [nextSeqNbr, ...[[msgSeqNbr, msgObject]]]
  let sendBuf = {};
  let recvBuf = [1];
  let acknBuf = [0];
  let msgsBuf = [];
  let dispBuf = [];
  let listeners = [];
  let done,
    pending = null;

  /// SENDING ///

  params.nbr = 1;

  //if (prefix == "sdk") {
  //  setInterval(() => console.log(JSON.stringify({ params, acknBuf, sendBuf: Object.keys(sendBuf), recvBuf: Object.keys(recvBuf) }, null, 2)), 3000);
  //}

  const onAcknowledge = nbr => {
    if (acknBuf[0] < nbr && !acknBuf.includes(nbr)) {
      acknBuf = [...acknBuf, nbr].sort((a, b) => a - b);
      const i = [...acknBuf, -1].findIndex((n, i) => n != acknBuf[0] + i);
      const nbrs = acknBuf.slice(1, i);
      acknBuf = acknBuf.slice(i - 1);
      for (const nbr of nbrs) {
        delete sendBuf[nbr];
      }
      params.nbr = nbrs[nbrs.length - 1] + 1;
    }
  };

  const emitReliable = async (evt, ...args) => {
    const nbr = number++;
    let release,
      promise = new Promise(resolve => (release = resolve));
    const cb = (...args) => {
      onAcknowledge(nbr);
      release(...args);
    };
    const emitArgs = [evt, nbr, ...args, cb];
    sendBuf[nbr] = emitArgs;
    if (socket.connected) {
      socket.emit(...emitArgs);
    }
    return await promise;
  };

  /// RECEIVING ///

  // insert ordered
  // return continious
  const enforceOrder = (nbr, msg) => {
    if (nbr < recvBuf[0] || recvBuf.some((msg, i) => i >= 1 && msg[0] == nbr)) {
      return [];
    }
    let i;
    for (i = 1; i < recvBuf.length; i++) {
      if (nbr < recvBuf[i][0]) {
        break;
      }
    }
    recvBuf.splice(i, 0, [nbr, msg]);
    for (i = 0; i < recvBuf.length - 1; i++) {
      if (recvBuf[i + 1][0] != recvBuf[0] + i) {
        break;
      }
    }
    recvBuf[0] += i;
    return recvBuf.splice(1, i).map(([, msg]) => msg);
  };

  const restart = async (myTopics = null, addTopics = [], rmTopics = []) => {
    topics = myTopics = append(remove(myTopics || topics, rmTopics), addTopics);
    const myPending = (pending = pending || new Promise(f => (done = f)));
    const subTopics = myTopics.map(([, ...topics]) => topics);

    if (!socket.connected) {
      return await myPending;
    }

    await new Promise(cb =>
      socket.emit(prefix + "/sub", subNbr++, ns, subTopics, cb)
    );

    if (topics != myTopics) {
      return await myPending;
    }

    let buffer = [],
      page = 0;

    while (true) {
      //// ref == 0 means only realtime
      // ref == -1 means only realtime
      //const getTopics = topics.filter(([ref]) => ref != 0);
      const getTopics = topics.filter(([ref]) => ref >= 0);
      if (!getTopics.length) {
        break;
      }
      const { result, more } = await new Promise(cb =>
        socket.emit(prefix + "/get", ns, getTopics, { page }, cb)
      );
      if (topics != myTopics) {
        break;
      }
      buffer.push(...result);
      if (!more) {
        break;
      }
      page = more;
    }

    let msgs;

    if (topics != myTopics) {
      msgs = await myPending;
    } else {
      msgsBuf.push(...buffer);
      msgs = drainMessages();
      done(msgs);
      dispatch(msgs);
      pending = null;
    }

    if (addTopics.length) {
      for (let i = msgs.length - 1; i >= 0; i--) {
        const msg = msgs[i];
        for (const arr of addTopics) {
          const [ref, ...topics] = arr;
          if (topics.every(topic => msg.topics.includes(topic))) {
            if (ref < msg.id) {
              return { end: msg.id };
            }
          }
        }
      }
    }

    return {};
  };

  const drainMessages = () => {
    const msgs = msgsBuf.splice(0, msgsBuf.length);

    const dups = {};

    for (let i = msgs.length - 1; 0 <= i; i--) {
      const id = msgs[i].id;
      if (id in dups) {
        msgs.splice(i, 1);
      } else {
        dups[id] = 1;
      }
    }

    msgs.sort(({ id: a }, { id: b }) => a - b);

    for (let i = 0; i < msgs.length; i++) {
      const msg = msgs[i];
      let ok = false;
      for (const arr of topics) {
        const [ref, ...tpcs] = arr;
        if (tpcs.every(tpc => msg.topics.includes(tpc))) {
          if (ref < msg.id) {
            arr[0] = msg.id;
            ok = true;
            //} else {
            //  console.log("will probably discard message because of ref and msg.id", ref, msg.id);
          }
        }
      }
      if (!ok) {
        //console.log("DISCARDING MESSAGE");
        msgs.splice(i--, 1);
      }
    }

    return msgs;
  };

  const dispatch = msgs => {
    dispBuf.push(msgs);
    setTimeout(() => {
      for (const msgs of dispBuf.splice(0, dispBuf.length)) {
        for (const msg of msgs) {
          const { id, at, topics, entry } = msg;
          for (const cb of listeners) {
            cb(entry, topics, { id, at });
          }
        }
      }
    }, 0);
  };

  /// WIRING ///

  const onConnect = () => {
    recvBuf = [1];
    restart();
    for (const emitArgs of Object.values(sendBuf)) {
      socket.emit(...emitArgs);
    }
  };

  //setInterval(() => {
  //  console.log("inteval", Object.values(sendBuf))
  //  for (const emitArgs of Object.values(sendBuf)) {
  //    console.log("remitting!");
  //    socket.emit(...emitArgs);
  //  }
  //}, 5000);

  const onReceive = (nbr, msg) => {
    const msgs = enforceOrder(nbr, msg);
    msgsBuf.push(...msgs);
    if (!pending) {
      dispatch(drainMessages());
    }
  };

  socket.on("connect", () => onConnect());
  socket.on(prefix + "/msg", (nbr, msg) => onReceive(nbr, msg));
  //socket.on(prefix + "/syn", (nbr) => {
  //  if (nbr == 1) {
  //    recvBuf = [1];
  //  }
  //  gotServerNumber();
  //});

  /// API ///

  const publish = async (topics, entry, blk = undefined) => {
    //return await socket.emit(prefix + "/pub", number++, ns, topics, entry, blk);
    return await emitReliable(
      prefix + "/pub",
      /*number++,*/ ns,
      topics,
      entry,
      blk
    );
  };

  const extend = async (query, blk = undefined) => {
    return await emitReliable(prefix + "/xnd", /*number++,*/ ns, query, blk);
  };

  const commit = async (cmt, csd = undefined) => {
    return await emitReliable(prefix + "/cmt", ns, cmt, csd);
  };

  const retrieve = async (topics, { max, back, page } = {}) => {
    topics = topics.map(x => (Array.isArray(x) ? x : [0, x]));
    const [err, { result, more }] = await new Promise(cb =>
      socket
        .timeout(5000)
        .emit(prefix + "/get", ns, topics, { max, back, page }, (...x) => cb(x))
    );
    return { err, result, more };
  };

  const sub = (add, rm) => restart(null, add, rm);
  const pull = (...or) => sub(or.map(x => (Array.isArray(x) ? x : [0, x])));
  const on = (...or) => sub(or.map(x => (Array.isArray(x) ? x : [-1, x])));
  const off = (...rm) => sub([], rm.map(x => (Array.isArray(x) ? x : [x])));
  const d = (...t) => diff(topics, t.map(x => (Array.isArray(x) ? x : [x])));
  const has = (...or) => !d(...or).length;
  const onMessage = cb => listeners.push(cb);
  const getSeq = () => number;
  const setSeq = nbr => (number = nbr);

  return {
    restart,
    sub,
    subscribe: on,
    pull,
    on,
    off,
    has,
    diff: d,
    onMessage,
    publish,
    extend,
    commit,
    retrieve,
    getSeq,
    setSeq
  };
};
