0

I am trying to write an unbounded ping pipeline that takes output from a ping command and parses it to determine some statistics about the RTT (avg/min/max) and for now, just print the results.

I have already written an unbounded ping source that outputs each line as it comes in. The results are windowed every second for every 5 seconds of pings. The windowed data is fed to a Combine.globally call to statefully process the string outputs. The problem is that the accumulators are never merged and the output is never extracted. This means that the pipeline never continues past this point. What am I doing wrong here?

public class TestPingIPs {
   public static void main(String[] args)
   {
      PipelineOptions options = PipelineOptionsFactory.create();
      Pipeline pipeline = Pipeline.create(options);
      String destination = "8.8.8.8";
      PCollection<PingResult> res =
              /*
              Run the unbounded ping command. Only the lines where the result of the ping command are returned.
              No statistics or first startup lines are returned here.
               */
              pipeline.apply("Ping command",
                      PingCmd.read()
                              .withPingArguments(PingCmd.PingArguments.create(destination, -1)))
             /*
             Window the ping command strings into 5 second sliding windows produced every 1 second
              */
              .apply("Window strings",
                      Window.into(SlidingWindows.of(Duration.standardSeconds(5))
                              .every(Duration.standardSeconds(1))))
             /*
             Parse and aggregate the strings into a PingResult object using stateful processing.
              */
              .apply("Combine the pings",
                      Combine.globally(new ProcessPings()).withoutDefaults())
             /*
             Test our output to see what we get here
              */
              .apply("Test output",
                      ParDo.of(new DoFn<PingResult, PingResult>() {
                 @ProcessElement
                 public void processElement(ProcessContext c)
                 {
                    System.out.println(c.element().getAvgRTT());
                    System.out.println(c.element().getPacketLoss());
                    c.output(c.element());
                 }
              }));

      pipeline.run().waitUntilFinish();
   }


   static class ProcessPings extends Combine.CombineFn<String, RttStats, PingResult> {
      private long getRTTFromLine(String line){
         long rtt = Long.parseLong(line.split("time=")[1].split("ms")[0]);
         return rtt;
      }

      @Override
      public RttStats createAccumulator()
      {
         return new RttStats();
      }

      @Override
      public RttStats addInput(RttStats mutableAccumulator, String input)
      {
         mutableAccumulator.incTotal();
         if (input.contains("unreachable")) {
            _unreachableCount.inc();
            mutableAccumulator.incPacketLoss();
         }
         else if (input.contains("General failure")) {
            _transmitFailureCount.inc();
            mutableAccumulator.incPacketLoss();
         }
         else if (input.contains("timed out")) {
            _timeoutCount.inc();
            mutableAccumulator.incPacketLoss();
         }
         else if (input.contains("could not find")) {
            _unknownHostCount.inc();
            mutableAccumulator.incPacketLoss();
         }
         else {
            _successfulCount.inc();
            mutableAccumulator.add(getRTTFromLine(input));
         }

         return mutableAccumulator;
      }

      @Override
      public RttStats mergeAccumulators(Iterable<RttStats> accumulators)
      {
         Iterator<RttStats> iter = accumulators.iterator();
         if (!iter.hasNext()){
            return createAccumulator();
         }
         RttStats running = iter.next();
         while (iter.hasNext()){
            RttStats next = iter.next();
            running.addAll(next.getVals());
            running.addLostPackets(next.getLostPackets());
         }
         return running;
      }

      @Override
      public PingResult extractOutput(RttStats stats)
      {
         stats.calculate();
         boolean connected = stats.getPacketLoss() != 1;
         return new PingResult(connected, stats.getAvg(), stats.getMin(), stats.getMax(), stats.getPacketLoss());
      }

      private final Counter _successfulCount = Metrics.counter(ProcessPings.class, "Successful pings");
      private final Counter _unknownHostCount = Metrics.counter(ProcessPings.class, "Unknown hosts");
      private final Counter _transmitFailureCount = Metrics.counter(ProcessPings.class, "Transmit failures");
      private final Counter _timeoutCount = Metrics.counter(ProcessPings.class, "Timeouts");
      private final Counter _unreachableCount = Metrics.counter(ProcessPings.class, "Unreachable host");
   }

I would guess that there are some issues with the CombineFn that I wrote, but I can't seem to figure out what's going wrong here! I tried following the example here, but there's still something I must be missing.

EDIT: I added the ping command implementation below. This is running on a Direct Runner while I test.

PingCmd.java:

public class PingCmd {
 public static Read read(){
      if (System.getProperty("os.name").startsWith("Windows")) {
         return WindowsPingCmd.read();
      }
      else{
         return null;
      }
   }

WindowsPingCmd.java:

public class WindowsPingCmd extends PingCmd {
   private WindowsPingCmd()
   {
   }

   public static PingCmd.Read read()
   {
      return new WindowsRead.Builder().build();
   }


   static class PingCheckpointMark implements UnboundedSource.CheckpointMark, Serializable {
      @VisibleForTesting
      Instant oldestMessageTimestamp = Instant.now();
      @VisibleForTesting
      transient List<String> outputs = new ArrayList<>();

      public PingCheckpointMark()
      {
      }

      public void add(String message, Instant timestamp)
      {
         if (timestamp.isBefore(oldestMessageTimestamp)) {
            oldestMessageTimestamp = timestamp;
         }
         outputs.add(message);
      }

      @Override
      public void finalizeCheckpoint()
      {
         oldestMessageTimestamp = Instant.now();
         outputs.clear();
      }

      // set an empty list to messages when deserialize
      private void readObject(java.io.ObjectInputStream stream)
              throws IOException, ClassNotFoundException
      {
         stream.defaultReadObject();
         outputs = new ArrayList<>();
      }

      @Override
      public boolean equals(@Nullable Object other)
      {
         if (other instanceof PingCheckpointMark) {
            PingCheckpointMark that = (PingCheckpointMark) other;
            return Objects.equals(this.oldestMessageTimestamp, that.oldestMessageTimestamp)
                    && Objects.deepEquals(this.outputs, that.outputs);
         }
         else {
            return false;
         }
      }
   }


   @VisibleForTesting
   static class UnboundedPingSource extends UnboundedSource<String, PingCheckpointMark> {

      private final WindowsRead spec;

      public UnboundedPingSource(WindowsRead spec)
      {
         this.spec = spec;
      }

      @Override
      public UnboundedReader<String> createReader(
              PipelineOptions options, PingCheckpointMark checkpointMark)
      {
         return new UnboundedPingReader(this, checkpointMark);
      }

      @Override
      public List<UnboundedPingSource> split(int desiredNumSplits, PipelineOptions options)
      {
         // Don't really need to ever split the ping source, so we should just have one per destination
         return Collections.singletonList(new UnboundedPingSource(spec));
      }

      @Override
      public void populateDisplayData(DisplayData.Builder builder)
      {
         spec.populateDisplayData(builder);
      }

      @Override
      public Coder<PingCheckpointMark> getCheckpointMarkCoder()
      {
         return SerializableCoder.of(PingCheckpointMark.class);
      }

      @Override
      public Coder<String> getOutputCoder()
      {
         return StringUtf8Coder.of();
      }
   }


   @VisibleForTesting
      static class UnboundedPingReader extends UnboundedSource.UnboundedReader<String> {

      private final UnboundedPingSource source;

      private String current;
      private Instant currentTimestamp;
      private final PingCheckpointMark checkpointMark;
      private BufferedReader processOutput;
      private Process process;
      private boolean finishedPings;
      private int maxCount = 5;
      private static AtomicInteger currCount = new AtomicInteger(0);

      public UnboundedPingReader(UnboundedPingSource source, PingCheckpointMark checkpointMark)
      {
         this.finishedPings = false;
         this.source = source;
         this.current = null;
         if (checkpointMark != null) {
            this.checkpointMark = checkpointMark;
         }
         else {
            this.checkpointMark = new PingCheckpointMark();
         }
      }

      @Override
      public boolean start() throws IOException
      {
         WindowsRead spec = source.spec;
         String cmd = createCommand(spec.pingConfiguration().getPingCount(), spec.pingConfiguration().getDestination());
         try {
            ProcessBuilder builder = new ProcessBuilder(cmd.split(" "));
            builder.redirectErrorStream(true);
            process = builder.start();

            processOutput = new BufferedReader(new InputStreamReader(process.getInputStream()));
            return advance();
         } catch (Exception e) {
            throw new IOException(e);
         }
      }

      private String createCommand(int count, String dest){
         StringBuilder builder = new StringBuilder("ping");
         String countParam = "";
         if (count <= 0){
            countParam = "-t";
         }
         else{
            countParam += "-n " + count;
         }

         return builder.append(" ").append(countParam).append(" ").append(dest).toString();
      }

      @Override
      public boolean advance() throws IOException
      {
         String line = processOutput.readLine();
         // Ignore empty/null lines
         if (line == null || line.isEmpty()) {
            line = processOutput.readLine();
         }
         // Ignore the 'Pinging <dest> with 32 bytes of data' line
         if (line.contains("Pinging " + source.spec.pingConfiguration().getDestination())) {
            line = processOutput.readLine();
         }
         // If the pings have finished, ignore
         if (finishedPings) {
            return false;
         }
         // If this is the start of the statistics, the pings are done and we can just exit
         if (line.contains("statistics")) {
            finishedPings = true;
         }

         current = line;
         currentTimestamp = Instant.now();
         checkpointMark.add(current, currentTimestamp);
         if (currCount.incrementAndGet() == maxCount){
            currCount.set(0);
            return false;
         }
         return true;
      }

      @Override
      public void close() throws IOException
      {
         if (process != null) {
            process.destroy();
            if (process.isAlive()) {
               process.destroyForcibly();
            }
         }
      }

      @Override
      public Instant getWatermark()
      {
         return checkpointMark.oldestMessageTimestamp;
      }

      @Override
      public UnboundedSource.CheckpointMark getCheckpointMark()
      {
         return checkpointMark;
      }

      @Override
      public String getCurrent()
      {
         if (current == null) {
            throw new NoSuchElementException();
         }
         return current;
      }

      @Override
      public Instant getCurrentTimestamp()
      {
         if (current == null) {
            throw new NoSuchElementException();
         }
         return currentTimestamp;
      }

      @Override
      public UnboundedPingSource getCurrentSource()
      {
         return source;
      }
   }


   public static class WindowsRead extends PingCmd.Read {
      private final PingArguments pingConfig;

      private WindowsRead(PingArguments pingConfig)
      {
         this.pingConfig = pingConfig;
      }

      public Builder builder()
      {
         return new WindowsRead.Builder(this);
      }

      PingArguments pingConfiguration()
      {
         return pingConfig;
      }

      public WindowsRead withPingArguments(PingArguments configuration)
      {
         checkArgument(configuration != null, "configuration can not be null");
         return builder().setPingArguments(configuration).build();
      }

      @Override
      public PCollection<String> expand(PBegin input)
      {
         org.apache.beam.sdk.io.Read.Unbounded<String> unbounded =
                 org.apache.beam.sdk.io.Read.from(new UnboundedPingSource(this));

         return input.getPipeline().apply(unbounded);
      }

      @Override
      public void populateDisplayData(DisplayData.Builder builder)
      {
         super.populateDisplayData(builder);
         pingConfiguration().populateDisplayData(builder);
      }

      static class Builder {
         private PingArguments config;

         Builder()
         {
         }

         private Builder(WindowsRead source)
         {
            this.config = source.pingConfiguration();
         }

         WindowsRead.Builder setPingArguments(PingArguments config)
         {
            this.config = config;
            return this;
         }

         WindowsRead build()
         {
            return new WindowsRead(this.config);
         }
      }

      @Override
      public int hashCode()
      {
         return Objects.hash(pingConfig);
      }


   }
4
  • What is the implementation of PingCmd.read()? Is it advancing the watermark? Commented Mar 19, 2021 at 18:21
  • Yes. It updates the watermark every time a new line is output by the console's BufferedReader. Commented Mar 19, 2021 at 19:10
  • It does sound like a watermark issue. What runner is this on? Seeing the implementation of PingCmd.read() could help. Commented Mar 19, 2021 at 19:58
  • Edited above to include the classes Commented Mar 19, 2021 at 20:28

1 Answer 1

1

One thing I notice in your code is that advance() always returns True. The watermark only advances on bundle completion, and I think it's runner-dependent whether a runner will ever complete a bundle if advance ever never returns False. You could try returning False after a bounded amount of time/number of pings.

You could also consider re-writing this as an SDF.

Sign up to request clarification or add additional context in comments.

4 Comments

I edited the class above. Is there a better way to go about doing it than adding a static variable shared between the classes? Also, is there any reading for the answer you gave? I'm a bit confused why the advance function has to return false for the bundle to go to the next stage, even if it's windowed data. Thanks!!!
A distributed system needs to be resilient to bad machines, network breaks, etc., meaning it will retry uncommitted work on failure. The "unit of commitment" is the bundle, which may include several calls to advance, and the eager implementation is to just call advance until it returns false, and put all of that into the same bundle, and at that point the bundle is sent downstream. (Dataflow's SDF implementation, IIRC, is to wait until either there's no more records, or fixed amount of time like 10 seconds.) See beam.apache.org/documentation/runtime/model for more details.
That makes sense! Now another question I have. It seems that the UnboundedPingReader is created every now and then, and subsequently start() is called. This means that another Process starts a ping command (in my logic). Is there a way to make it so there's only one that never gets recreated, or a better practice than what I'm doing?
You can just throw in some locks and only re-create it if it's not already there to do the right kind of sharing. (You could also do some kind of lease mechanism, where you have a static, and grab it in start() and release it in close(), but that may be overkill.)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.