DistLog.java

package handist.collections.dist;

import static apgas.Constructs.*;
import static handist.collections.util.StringUtilities.*;

import java.io.PrintStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

import apgas.Constructs;
import apgas.Place;
import apgas.util.GlobalID;
import handist.collections.dist.util.Pair;
import handist.collections.dist.util.SerializableBiFunction;

/**
 * DistLog is a distributed log manager. It collects log on places and gather it
 * to a place.
 *
 * Each DistLog instance refers to a distributed collection having global ID and
 * you can arbitrary copy it to other places.
 *
 * It offers a static method {@code DistLog.log(tag, msg)}. Please call
 * `DistLog.globalSetup(DistLog)` first, otherwise the method only print the
 * {@code tag} and {@code msg} to {@code System.out}.
 *
 */
public class DistLog extends DistCollectionSatellite<DistConcurrentMultiMap<DistLog.LogKey, DistLog.LogItem>, DistLog>
        implements Serializable {

    static class ListDiff {
        int index;
        LogItem first;
        LogItem second;

        public ListDiff(int index, LogItem first, LogItem second) {
            this.index = index;
            this.first = first;
            this.second = second;
        }

        @Override
        public String toString() {
            return "Elements (No. " + index + ") differ: " + first + ", " + second;
        }
    }

    /**
     * Class used to represent information logged. Logged information takes the
     * shape of a message and an optional "appendix". Logged items can be compared
     * to one-another, but only the message value is considered. The appendix may
     * differ between two instances but the two logged items may still be equal.
     */
    public static class LogItem implements Serializable {
        private static final long serialVersionUID = -1365865614858381506L;
        public static Comparator<LogItem> cmp = Comparator.nullsFirst(Comparator.comparing(i0 -> i0.msg));
        public static boolean appendixPrint = true;
        public final String msg;
        public final String appendix;

        LogItem(String message, String appendix) {
            msg = message;
            this.appendix = appendix;
        }

        @Override
        public boolean equals(Object obj) {
            if (!(obj instanceof LogItem)) {
                return false;
            }
            return msg.equals(((LogItem) obj).msg);
        }

        @Override
        public int hashCode() {
            return msg.hashCode();
        }

        @Override
        public String toString() {

            if (appendixPrint) {
                return "LogItem:" + msg + ", " + appendix;
            } else {
                return "LogItem:" + msg;
            }
        }
    }

    /**
     * Class used as a key to record elements in a {@link DistLog}. A {@link LogKey}
     * is a combination of a {@link Place}, a {@link String} tag, and a {@code long}
     * phase tuple.
     */
    public static final class LogKey implements Serializable, Comparable<LogKey> {

        /** Serial Version UID */
        private static final long serialVersionUID = -7799219001690238705L;

        /** Place on which the record was made */
        public final Place place;
        /** Tag, or topic under which the record was made */
        public final String tag;

        /** Phase during which the record was made */
        public final long phase;

        /**
         * Constructor
         *
         * @param p     place on which the record is recorded
         * @param tag   the subject tag for the record
         * @param phase the phase during which the record is made
         */
        public LogKey(Place p, String tag, long phase) {
            place = p;
            this.tag = tag;
            this.phase = phase;
        }

        @Override
        public int compareTo(LogKey o) {
            int result = Long.compare(phase, o.phase);
            if (result == 0) {
                result = tag.compareTo(o.tag);
            }
            if (result == 0) {
                result = Integer.compare(place.id, o.place.id);
            }
            return result;
        }

        /**
         * Two {@link LogKey}s are equal iff their respective {@link #place},
         * {@link #tag} and {@link #phase} match.
         */
        @Override
        public boolean equals(Object obj) {
            if (obj == null) {
                return false;
            }
            if (!(obj instanceof LogKey)) {
                return false;
            }
            final LogKey key2 = (LogKey) obj;
            return place.equals(key2.place) && nullSafeEquals(tag, key2.tag) && (phase == key2.phase);
        }

        @Override
        public int hashCode() {
            return place.id + (tag.hashCode() << 2) + (int) (phase << 4 + phase >> 16);
        }

        @Override
        public String toString() {
            return "Log@" + place + ", tag: " + tag + ", phase: " + phase;
        }
    }

    /**
     *
     */
    static class SetDiff {
        Collection<LogItem> first;
        Collection<LogItem> second;

        public SetDiff(Collection<LogItem> first, Collection<LogItem> second) {
            this.first = first;
            this.second = second;
        }

        public boolean isEmpty() {
            return (first == null || first.isEmpty()) && (second == null || second.isEmpty());
        }

        public void print(PrintStream out) {
            out.println("  1st:" + (first == null ? "" : first));
            out.println("  2nd:" + (second == null ? "" : second));
        }
    }

    /** Serial Version UID */
    private static final long serialVersionUID = 3453720633747873404L;

    /**
     * The default DistLog instance used by the static
     * {@link DistLog#log(String, String, String)} method.
     */
    public static DistLog defaultLog;

    /**
     * Map used to keep a record of {@link DistLog} instances in use
     */
    public static HashMap<GlobalID, DistLog> map = new HashMap<>();

    private static final Comparator<Pair<String, Long>> gcmp = Comparator.comparing(Pair<String, Long>::getSecond)
            .thenComparing(Pair<String, Long>::getFirst);

    public static String itemOffset = "  ";

    private static <E> Collection<E> concat(Collection<? extends Collection<E>> lists) {
        if (lists == null) {
            return null;
        }
        int size = 0;
        for (final Collection<E> list : lists) {
            size += list.size();
        }
        final ArrayList<E> result = new ArrayList<>(size);
        for (final Collection<E> list : lists) {
            result.addAll(list);
        }
        return result;
    }

    public static void defaultGlobalGather() {
        DistLog.defaultLog.globalGather();
    }

    /**
     * set the phase of DistLog.defaultLog
     *
     * @param phase the new phase value
     */
    public static void defaultGlobalSetPhase(long phase) {
        DistLog.defaultLog.globalSetPhase(phase);
    }

    private static ListDiff diffCheckList(List<LogItem> list0, List<LogItem> list1) {
        final int size = Math.min(list0.size(), list1.size());
        for (int i = 0; i < size; i++) {
            final LogItem item0 = list0.get(i);
            final LogItem item1 = list1.get(i);
            if (!item0.equals(item1)) {
                return new ListDiff(i, item0, item1);
            }
        }
        if (list0.size() > list1.size()) {
            return new ListDiff(size, list0.get(size), null);
        } else if (list0.size() < list1.size()) {
            return new ListDiff(size, null, list1.get(size));
        } else {
            return null;
        }
    }

    private static SetDiff diffCheckSet(Collection<? extends Collection<LogItem>> lists0,
            Collection<? extends Collection<LogItem>> lists1) {
        if (lists0 == null) {
            lists0 = Collections.emptySet();
        }
        if (lists1 == null) {
            lists1 = Collections.emptySet();
        }
        return diffCheckSet0(concat(lists0), concat(lists1));
    }

    private static SetDiff diffCheckSet0(Collection<LogItem> olist0, Collection<LogItem> olist1) {
        if (olist0.isEmpty() && olist1.isEmpty()) {
            return null;
        }
        if (olist0.isEmpty() || olist1.isEmpty()) {
            return new SetDiff(olist0, olist1);
        }
        final ArrayList<LogItem> list0 = new ArrayList<>(olist0);
        final ArrayList<LogItem> list1 = new ArrayList<>(olist1);
        list0.sort(LogItem.cmp);
        list1.sort(LogItem.cmp);
        final SetDiff result = new SetDiff(list0, list1);
        if (list0.size() != list1.size()) {
            return result;
        }
        for (int i = 0; i < list0.size(); i++) {
            if (!list0.get(i).equals(list1.get(i))) {
                return result;
            }
        }
        return null;
    }

    /**
     * create a DistLog instance and set it to {@code DistLog.defaultLog} on all the
     * places of the place group.
     *
     * @param pg         the place group
     * @param phase      the initial phase of the created DistLog
     * @param setDefault set the created DistLog as DistLog.defaultLog
     */
    public static DistLog globalSetup(TeamedPlaceGroup pg, long phase, boolean setDefault) {
        final DistLog dlog = new DistLog(pg, phase);
        dlog.globalSetup(setDefault);
        return dlog;
    }

    /**
     * put a log to {@code DistLog.defaultLog}. This method only calls
     * {@code DistLog.defaultLog.put(tag, msg, appendix)} if {@code defaultLog} is
     * set. Otherwise {@code System.out.println()} is called instead.
     *
     * @param tag      the topic tag about which a log is made
     * @param msg      object being logged, can be a {@link String} or another
     *                 object
     * @param appendix appendix of the log message
     */
    public static void log(String tag, String msg, String appendix) {
        if (defaultLog != null) {
            defaultLog.put(tag, msg, appendix);
        } else {
            System.out.println(tag + ":" + msg + (appendix == null ? "" : ":" + appendix));
        }
    }

    public final TeamedPlaceGroup pg;

    /** The current logging phase */
    AtomicLong phase;

    /**
     * Creates a {@link DistLog} instance for events that occur in the whole
     * {@link TeamedPlaceGroup#getWorld()} with an initial phse of {@value 0l}.l
     */
    public DistLog() {
        this(TeamedPlaceGroup.getWorld());
    }

    /**
     * Creates a {@link DistLog} instance for events to be logged on the provided
     * place group. The initial logging phase is arbitrarily set to {@value 0l};
     *
     * @param pg the group of places on which events are gathered
     */
    public DistLog(final TeamedPlaceGroup pg) {
        this(pg, 0l);
    }

    /**
     * Create a DistLog instance that records logs within a place group and gathers
     * them to a place
     *
     * @param pg    the place group
     * @param phase the initial phase
     */
    public DistLog(final TeamedPlaceGroup pg, final long phase) {
        this(pg, phase, new DistConcurrentMultiMap<>(pg));
    }

    private DistLog(TeamedPlaceGroup pg, long phase, DistConcurrentMultiMap<LogKey, LogItem> base) {
        super(base);
        this.pg = pg;
        this.phase = new AtomicLong(phase);
        assert (DistLog.map.get(base.id()) == null);
        assert (base != null);
        DistLog.map.put(base.id(), this);
    }

    /**
     * Determines whether the log set of this and the target DistLog isntances are
     * equal. It distinguish only tags and ignores the generated places. It return
     * true iff the sets of the logs having the same tag are the same. Diff will be
     * output to {@code out} if they differ.
     *
     * @param target the logger instance to compare to this
     * @param out    the output to which the difference is printed
     * @return {@code true} if this instance and the target are identical,
     *         {@code false otherwise}
     */

    public boolean distributionFreeEquals(DistLog target, PrintStream out) {
        boolean result = true;
        final TreeMap<Pair<String, Long>, List<Collection<LogItem>>> g0 = groupBy();
        final TreeMap<Pair<String, Long>, List<Collection<LogItem>>> g1 = target.groupBy();
        final TreeSet<Pair<String, Long>> keys = new TreeSet<>(gcmp);
        keys.addAll(g0.keySet());
        keys.addAll(g1.keySet());
        long phase = 0;
        for (final Pair<String, Long> key : keys) {
            if ((!result) && phase < key.second) {
                return false;
            }
            final List<Collection<LogItem>> entries0 = g0.get(key);
            final List<Collection<LogItem>> entries1 = g1.get(key);
            final SetDiff diff = diffCheckSet(entries0, entries1);
            if (diff != null) {
                if (result == true) {
                    out.println("Diff first found in phase " + key.second);
                }
                out.println("Diff with tag: " + key.first + ", phase: " + key.second);
                diff.print(out);
                result = false;
                phase = key.second;
            }
        }
        for (final Map.Entry<Pair<String, Long>, List<Collection<LogItem>>> entry : g1.entrySet()) {
            out.println("Diff @ [tag: " + entry.getKey().first + ", phase" + entry.getKey().second
                    + "]: target only has values:" + entry.getValue());
        }
        return result;
    }

    @Override
    public SerializableBiFunction<DistConcurrentMultiMap<LogKey, LogItem>, Place, DistLog> getBranchCreator() {
        final DistConcurrentMultiMap<LogKey, LogItem> base0 = base;
        final long phase0 = phase.get();
        return (DistConcurrentMultiMap<LogKey, LogItem> base, Place place) -> new DistLog(base0.placeGroup(), phase0,
                base0);
    }

    public DistConcurrentMultiMap<LogKey, LogItem> getDistMultiMap() {
        return getPlanet();
    }

    /**
     * Returns the {@link LogItem}s mapped to the specified key
     *
     * @param k the key which specify the place, tag, and phase of the targeted
     *          records
     * @return collection of {@link LogItem} mapped to the specified key,
     *         {@code null} if there are no such items
     */
    public Collection<LogItem> getLog(LogKey k) {
        return base.get(k);
    }

    /**
     * Returns the logged items of the specified place for the specified key and the
     * current phase. If the specified place is the local place, then calling this
     * method is equivalent to calling method {@link #getLog(String)}. If the
     * specified place is remote, then programmers should be careful to call method
     * {@link #globalGather()} so that events logged on remote hosts are relocated
     * and accessible on this host.
     *
     * @param p   the place on which the logged events occurred
     * @param key the key under which the events were recorded
     * @return the collection of items logged under the specified key at the
     *         specified host
     */
    public Collection<LogItem> getLog(Place p, String key) {
        return getLog(p, key, getPhase());
    }

    /**
     * Returns the logged items of the specified place, tag and phase.
     * <p>
     * Note that is the targeted log entries are of a remote host, method
     * {@link #globalGather()} should be called prior to attempting to retrieve such
     * logs.
     *
     * @param p     the place on which events were recorded
     * @param tag   the tag under which events were recorded
     * @param phase the phase during which the events were recorded
     * @return the collection of {@link LogItem} recorded under the specified key
     */
    public Collection<LogItem> getLog(Place p, String tag, long phase) {
        return getLog(new LogKey(p, tag, phase));
    }

    /**
     * Returns the logged items of this place for the specified key
     *
     * @param key the key whose logged events needs to be retrieved
     * @return the collection of events logged on this place with the specified key
     */
    public Collection<LogItem> getLog(String key) {
        return getLog(here(), key);
    }

    public long getPhase() {
        return phase.get();
    }

    /**
     * gather the log to the receiving place initially specified
     */
    public void globalGather() {
        // TODO
        // base.GLOBAL.gather(this.place);
        final DistConcurrentMultiMap<LogKey, LogItem> base0 = base;
        final Place dest = here();
        pg.broadcastFlat(() -> {
            final Function<LogKey, Place> func = (LogKey k) -> dest;
            base0.relocate(func);
        });
    }

    public void globalSetDefault() {
        final DistLog b = this;
        DistLog.defaultLog = b;
        final Place caller = here();
        pg.broadcastFlat(() -> {
            if (!here().equals(caller)) {
                DistLog.defaultLog = b;
            }
        });
    }

    /**
     * set the phase of DistLog on each place
     *
     * @param phase the logging phase to be used from now on
     */
    public void globalSetPhase(final long phase) {
        if (base == null) {
            throw new IllegalStateException(
                    "Note: `DistLog#globalSetPhase` can be used after `globalSetup()` called. ");
        }
        final DistLog b = this;
        b.setPhase(phase);
        final Place caller = Constructs.here();
        pg.broadcastFlat(() -> {
            if (Constructs.here().equals(caller)) {
                return;
            }
            b.setPhase(phase);
        });
    }

    /**
     * set this instance to {@code DistLog.defaultLog} on all the places of the
     * place group.
     */
    public void globalSetup(final boolean setDefault) {
        if (setDefault) {
            globalSetDefault();
        }
    }

    private TreeMap<Pair<String, Long>, List<Collection<LogItem>>> groupBy() {
        final TreeMap<Pair<String, Long>, List<Collection<LogItem>>> results = new TreeMap<>(gcmp);
        base.forEach((LogKey key, Collection<LogItem> items) -> {
            final Pair<String, Long> keyWOp = new Pair<>(key.tag, key.phase);
            final List<Collection<LogItem>> bag = results.computeIfAbsent(keyWOp, k -> new ArrayList<>());
            bag.add(items);
        });
        return results;
    }

    /**
     * Determines whether the log set of this and the target is equals. It
     * distinguish the log having the different tags or different generated places.
     * It only returns true only iff the two log lists have the same elements with
     * the same order for each generated place and tag. Diff will be output to
     * {@code out} if they differs.
     *
     * @param target the target to which this instance needs to be compared
     * @param out    the output stream to which the difference will be printed
     * @return {@code true} if the target and this logs are equal,
     *         {@code flase otherwise}
     */
    public boolean placeConsciousEquals(DistLog target, PrintStream out, boolean asList) {
        if (target == null) {
            if (out != null) {
                out.println("DistLog differs: target is null");
            }
            return false;
        }

        final TreeSet<LogKey> keys = new TreeSet<>();
        keys.addAll(base.keySet());
        keys.addAll(target.base.keySet());

        boolean result = true;
        long bugPhase = 0;

        for (final LogKey key : keys) {
            if ((!result) && key.phase > bugPhase) {
                return false;
            }
            final Collection<LogItem> elems1 = base.get(key);
            final Collection<LogItem> elems2 = target.base.get(key);
            final List<LogItem> lists1 = elems1 == null ? Collections.emptyList() : new ArrayList<>(elems1);
            final List<LogItem> lists2 = elems2 == null ? Collections.emptyList() : new ArrayList<>(elems2);
            if (asList) {
                final ListDiff diff = diffCheckList(lists1, lists2);
                if (diff != null) {
                    if (result) {
                        out.println("Diff first found in phase " + key.phase);
                        bugPhase = key.phase;
                        result = false;
                    }
                    out.println("Diff in " + key + "::" + diff);
                }
            } else {
                final SetDiff diff = diffCheckSet0(lists1, lists2);
                if (diff != null) {
                    if (result) {
                        out.println("Diff first found in phase " + key.phase);
                        bugPhase = key.phase;
                        result = false;
                    }
                    out.println("Diff in " + key);
                    diff.print(out);
                }
            }
        }
        return result;
    }

    public void printAll(PrintStream out) {
        final TreeMap<LogKey, Collection<LogItem>> sorted = new TreeMap<>((o1, o2) -> {
            int result = Integer.compareUnsigned(o1.place.id, o2.place.id);
            if (result == 0) {
                result = o1.tag.compareTo(o2.tag);
            }
            if (result == 0) {
                result = Long.compare(o1.phase, o2.phase);
            }
            return result;
        });
        base.forEach((LogKey key, Collection<LogItem> items) -> {
            sorted.put(key, items);
        });
        sorted.forEach((LogKey key, Collection<LogItem> items) -> {
            out.println("LogKey: " + key);
            for (final LogItem item : items) {
                out.println(itemOffset + item);
            }
        });
    }

    /**
     * Puts the specified message for the specified tag
     *
     * @param phaseVal the phase under which this log message will be kept
     * @param tag      the topic into which this message will be recorded
     * @param msg      the message to log
     * @param appendix may be null
     */
    public void put(long phaseVal, String tag, String msg, String appendix) {
        base.put1(new LogKey(Constructs.here(), tag, phaseVal), new LogItem(msg, appendix));
    }

    /**
     * put the msg with the specified tag.
     *
     * @param tag      tag for the message.
     * @param msg      the message to be stored. The value of {@code msg.toString()}
     *                 is stored into DistLog.
     * @param appendix the appendix information for the log. The value of appendix
     *                 is not used for equality check of {@code DistLog} instances
     *                 while {@code msg} is used.
     */
    public void put(String tag, String msg, String appendix) {
        put(phase.get(), tag, msg, appendix);
    }

    public Collection<LogItem> removeLog(String key) {
        return base.remove(new LogKey(here(), key, phase.get()));
    }

    public void setPhase(long phase) {
        this.phase.set(phase);
    }
}