ProgramStatistics.java

package handist.collections.glb.util;

import static handist.collections.glb.GlobalLoadBalancer.*;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.function.Consumer;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;

import handist.collections.dist.DistLog.LogItem;
import handist.collections.glb.GlobalLoadBalancer;
import handist.collections.util.SavedLog;

public class ProgramStatistics {

    private static final String OPT_WORKER_OVER_TIME = "w";

    private static Options commandOptions() {
        final Options opts = new Options();
        opts.addOption(OPT_WORKER_OVER_TIME, "worker", true,
                "produces a CSV showing the proportion of workers active over time");
        opts.addOption("f", false, "if the generation of files would result in some being overwritten, this program "
                + "normally skips these files; use this option to force the overwriting of existing files");
        opts.addOption("v", false,
                "verbose output; if activated, will print the contents of the log on the standard output");
        return opts;
    }

    /**
     * Program which obtains a GlbLog from a file and produces various CSV exports
     * on demand.
     *
     * @param args (option)+ <log file>
     */
    public static void main(String[] args) {
        // Check that at least the input log file is specified
        if (args.length <= 1) {
            final Options opt = commandOptions();
            new HelpFormatter().printHelp("(options)+ <log input file>", opt);
            return;
        }

        // Open the input log file
        SavedLog log = null;
        final String inputFileName = args[args.length - 1];
        final File f = new File(inputFileName);
        if (!f.exists()) {
            System.err.println("Could not read file " + inputFileName);
            System.err.println("Exiting with code -1");
            System.exit(-1);
        }
        try {
            log = new SavedLog(f);
        } catch (final Exception e) {
            System.err.println("A problem occurred while parsing input file " + inputFileName);
            e.printStackTrace();
            System.exit(-2);
        }
        final ProgramStatistics statFactory = new ProgramStatistics(log);

        // Now that the input file has been successfully parsed,
        // check which program outputs are desired
        final Options programOptions = commandOptions();
        final CommandLineParser parser = new DefaultParser();
        CommandLine cmd = null;
        final String[] optionArray = Arrays.copyOf(args, args.length - 1); // remove the input file from the options
        try {
            cmd = parser.parse(programOptions, optionArray);
        } catch (final ParseException e1) {
            System.err.println(e1.getLocalizedMessage());
            final HelpFormatter formatter = new HelpFormatter();
            formatter.printHelp(
                    "java [...] " + ProgramStatistics.class.getCanonicalName() + " [options]* <input log file>",
                    programOptions);
            return;
        }

        if (cmd.hasOption("v")) {
            log.printAll(System.out);
        }

        // Check if the "force overwrite" option is set
        final boolean overwriteFiles = cmd.hasOption("f");

        // Produce each output in the specified files
        makeOutputToFile(cmd, OPT_WORKER_OVER_TIME, statFactory::workerActivity, overwriteFiles);
    }

    /**
     * Generic method used to check whether an option was set
     *
     * @param option
     * @param outputMethod
     */
    private static void makeOutputToFile(CommandLine cmd, String option, Consumer<PrintStream> outputMethod,
            boolean overwriteFiles) {
        if (cmd.hasOption(option)) {
            final File output = new File(cmd.getOptionValue(option));
            if (output.exists() && !overwriteFiles) {
                System.err.println("File " + output.getAbsolutePath() + " already exists, skipping");
            } else {
                PrintStream ps;
                try {
                    ps = new PrintStream(output);
                    outputMethod.accept(ps);
                    ps.close();
                } catch (final FileNotFoundException e) {
                    System.err.println("A problem occurred while writing to " + output.getAbsolutePath());
                    e.printStackTrace();
                }
                System.out.println("Wrote file " + output.getAbsolutePath());
            }
        }
    }

    /**
     * Logger of a GLB execution. Is used by this class to produce various data
     * tables to generates various plots about the glb execution considered
     */
    final private SavedLog log;

    /**
     * Constructor
     *
     * @param logger DistLog instance retrieved through method
     *               {@link GlobalLoadBalancer#getPreviousLog()}
     */
    public ProgramStatistics(SavedLog logger) {
        log = logger;
    }

    /**
     * Produces a data output of the number of workers running, yielding, and
     * inactive on each host over time.
     *
     * @param ps the printstream onto which the data needs to be printed
     */
    public void workerActivity(PrintStream ps) {
        ps.println("# TimeStamp(s) InactiveWorker YieldingWorker RunningWorker");
        // First obtain the entire program computation time
        long programStart = 0, programEnd = -1;
        final Collection<LogItem> underGlbItems = log.getLog(0, LOGKEY_UNDER_GLB, 0);
        for (final LogItem item : underGlbItems) {
            switch (item.msg) {
            case LOG_PROGRAM_STARTED:
                programStart = Long.parseLong(item.appendix);
                break;
            case LOG_PROGRAM_ENDED:
                programEnd = Long.parseLong(item.appendix);
                break;
            default:
                // Other potential items ignored
            }
        }
        final long TOTAL_COMPUTATION_TIME = programEnd - programStart;

        // For each host in the computation
        for (int place = 0; place < log.placeCount(); place++) {
            ps.println("# place(" + place + ")");

            int workerCount = -1;
            long referenceNanoTime = -1l;
            for (final LogItem item : log.getLog(place, LOGKEY_GLB, 0)) {
                switch (item.msg) {
                case LOG_INITIALIZED_AT_NANOTIME:
                    referenceNanoTime = Long.parseLong(item.appendix);
                    break;
                case LOG_INITIALIZED_WORKERS:
                    workerCount = Integer.parseInt(item.appendix);
                    break;
                default:
                    // Other messages are ignored
                }

            }

            // At first, all workers are inactive
            int inactive = workerCount;
            int yielding = 0;
            int running = 0;

            // First line output
            ps.println(String.format("%s %s %s %s", 0l, inactive, yielding, running));

            // For each "worker" event, update and print the new worker status
            final Collection<LogItem> workerEvents = log.getLog(place, LOGKEY_WORKER, 0);
            if (workerEvents != null) {
                for (final LogItem item : log.getLog(place, LOGKEY_WORKER, 0)) {
                    // FIXME add the update of the timestamp
                    final double stamp = (Long.parseLong(item.appendix) - referenceNanoTime) / 1e9;
                    switch (item.msg) {
                    case LOG_WORKER_STARTED:
                        inactive--;
                        running++;
                        break;
                    case LOG_WORKER_YIELDING:
                        yielding++;
                        running--;
                        break;
                    case LOG_WORKER_RESUMED:
                        yielding--;
                        running++;
                    case LOG_WORKER_STOPPED:
                        running--;
                        inactive++;
                    default:
                        // in other cases, ignore the logged entry
                    }
                    // Even if there were no changes, this has no adverse effect on the plot
                    ps.println(String.format("%s %s %s %s", stamp, inactive, yielding, running));
                }
            }

            // Last line with "end of computation"
            ps.println(String.format("%s %s %s %s", TOTAL_COMPUTATION_TIME / 1e9, inactive, yielding, running));

            // Add two empty line to separate the data from each host
            ps.println();
            ps.println();
        }
    }
}