Differential Dataflow

In this book we will work through the motivation and technical details behind differential dataflow, a computational framework build on top of timely dataflow intended for efficiently performing computations on large amounts of data and maintaining the computations as the data change.

Differential dataflow programs look like many standard "big data" computations, borrowing idioms from frameworks like MapReduce and SQL. However, once you write and run your program, you can change the data inputs to the computation, and differential dataflow will promptly show you the corresponding changes in its output. Promptly meaning in as little as milliseconds.

This relatively simple set-up, write programs and then change inputs, leads to a surprising breadth of exciting and new classes of scalable computation. We will explore it in this document!


Differential dataflow arose from work at Microsoft Research, where we aimed to build a high-level framework that could both compute and incrementally maintain non-trivial algorithms.

Motivation

Differential dataflow programs are structured as two easy steps:

  1. Write a program.
  2. Change its input.

We will work through an example program, and then interact with it by changing its inputs. Our goal is foremost to show you what a program looks like, and to give you a sense for what interactions look like.

Once we've done this, in the next chapter we will jazz things up a bit with an increased scale of data, computation, and interaction!

Getting started

The first thing you will need to do, if you want to follow along with the examples, is to acquire a copy of Rust. This is the programming language that differential dataflow uses, and it is in charge of building our projects.

With Rust in hand, crack open a shell and make a new project using Rust build manager cargo.

cargo new my_project

This should create a new folder called my_project, and you can wander in there and type

cargo run

This will do something reassuring but pointless, like print Hello, world!, because we haven't gotten differential dataflow involved yet. I mean, it's Rust and you could learn that, but you probably want to read a different web page in that case.

Instead, edit your Cargo.toml file, which tells Rust about your dependencies, to look like this:

[package]
name = "my_project"
version = "0.1.0"
authors = ["Your Name <your_name@you.ch>"]

[dependencies]
timely = "0.11.1"
differential-dataflow = "0.11.0"

You should only need to add those last two lines there, which bring in dependencies on both timely dataflow and differential dataflow. We will be using both of those.

If you would like to point at the most current code release, hosted on github, you can replace the dependencies with:

[dependencies]
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" }
differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow" }

You should now be ready to go. Code examples should mostly work, and you should complain (or file an issue) if they do not!

Step 1: Write a program.

You write differential dataflow programs against apparently static input collections, with operations that look a bit like database (SQL) or big data (MapReduce) idioms. This is actually a bit of a trick, because you will have the ability to change the input data, but we'll pretend we don't know that yet.

Let's write a program with one input: a collection manages of pairs (manager, person) describing people and their direct reports. Our program will determine for each person their manager's manager (where the boss manages the boss's own self). If you are familiar with SQL, this is an "equijoin", and we will write exactly that in differential dataflow.

If you are following along at home, put this in your src/main.rs file.

extern crate timely;
extern crate differential_dataflow;

use differential_dataflow::input::InputSession;
use differential_dataflow::operators::Join;

fn main() {

        // define a new timely dataflow computation.
        timely::execute_from_args(std::env::args(), move |worker| {

        // create an input collection of data.
        let mut input = InputSession::new();

        // define a new computation.
        worker.dataflow(|scope| {

            // create a new collection from our input.
            let manages = input.to_collection(scope);

            // if (m2, m1) and (m1, p), then output (m1, (m2, p))
            manages
                .map(|(m2, m1)| (m1, m2))
                .join(&manages)
                .inspect(|x| println!("{:?}", x));
        });

        // Set an arbitrary size for our organization.
        let size = 100;

        // Load input (a binary tree).
        input.advance_to(0);
        for person in 0 .. size {
            input.insert((person/2, person));
        }

    }).expect("Computation terminated abnormally");
}

This program has a bit of boilerplate, but at its heart it defines a new input manages and then joins it with itself, once the fields have been re-ordered. The intent is as stated in the comment:

#![allow(unused)]
fn main() {
    // if (m2, m1) and (m1, p), then output (m1, (m2, p))
}

We want to report each pair (m2, p), and we happen to also produce as evidence the m1 connecting them.

When we execute this program we get to see the skip-level reports for the small binary tree we loaded as input:

Echidnatron% cargo run -- 10
     Running `target/debug/my_project`
    ((0, (0, 0)), 0, 1)
    ((0, (0, 1)), 0, 1)
    ((1, (0, 2)), 0, 1)
    ((1, (0, 3)), 0, 1)
    ((2, (1, 4)), 0, 1)
    ((2, (1, 5)), 0, 1)
    ((3, (1, 6)), 0, 1)
    ((3, (1, 7)), 0, 1)
    ((4, (2, 8)), 0, 1)
    ((4, (2, 9)), 0, 1)
Echidnatron%

This is a bit crazy, but what we are seeing is many triples of the form

(data, time, diff)

describing how the data have changed. That's right; our input is actually a change from the initially empty input. The output is showing us that at time (Root, 0) several tuples have had their frequency incremented by one. That is a fancy way of saying they are the output.

This may make more sense in just a moment, when we want to change the input.

Step 2: Change its input.

We've written a program, one that produces skip-level reports from some manages relation. Let's see how we can change its input, and what the corresponding output changes will be.

Our organization has gone from one where each manager has at most two reports, to one where each manager has three reports. Of course, this doesn't happen overnight; each day one of the employees will switch from their old manager to their new manager. Of course, the boss gets to stay the boss, because that is what boss means.

The only change we'll make is to add the following just after we load up our initial org chart:

    for person in 1 .. size {
        input.advance_to(person);
        input.remove((person/2, person));
        input.insert((person/3, person));
    }

This moves us through new times, indicated by the line

        input.advance_to(person);

which advances the state of the input collection up to a timestamp person, which just happens to be integers that are conveniently just larger than the time 0 we used to load the data.

Once we've advanced the time, we make some changes.

        input.remove((person/2, person));
        input.insert((person/3, person));

This removes the prior management relation, and introduces a new one where the person reports to their newer, more over-worked manager.

We do this for each of the non-boss employees and get to see a bunch of outputs.

        Echidnatron% cargo run -- 10
             Running `target/debug/my_project`
            ((0, (0, 0)), 0, 1)
            ((0, (0, 1)), 0, 1)
            ((0, (0, 2)), 2, 1)
            ((1, (0, 2)), 0, 1)
            ((1, (0, 2)), 2, -1)
            ((1, (0, 3)), 0, 1)
            ((1, (0, 4)), 4, 1)
            ((1, (0, 5)), 5, 1)
            ((2, (0, 4)), 2, 1)
            ((2, (0, 4)), 4, -1)
            ((2, (0, 5)), 2, 1)
            ((2, (0, 5)), 5, -1)
            ((2, (0, 6)), 6, 1)
            ((2, (0, 7)), 7, 1)
            ((2, (0, 8)), 8, 1)
            ((2, (1, 4)), 0, 1)
            ((2, (1, 4)), 2, -1)
            ((2, (1, 5)), 0, 1)
            ((2, (1, 5)), 2, -1)
            ((3, (1, 6)), 0, 1)
            ((3, (1, 6)), 6, -1)
            ((3, (1, 7)), 0, 1)
            ((3, (1, 7)), 7, -1)
            ((3, (1, 9)), 9, 1)
            ((4, (1, 8)), 4, 1)
            ((4, (1, 8)), 8, -1)
            ((4, (1, 9)), 4, 1)
            ((4, (1, 9)), 9, -1)
            ((4, (2, 8)), 0, 1)
            ((4, (2, 8)), 4, -1)
            ((4, (2, 9)), 0, 1)
            ((4, (2, 9)), 4, -1)
        Echidnatron%

Gaaaaaaah! What in the !#$!?

It turns out our input changes result in output changes. Let's try and break this down and make some sense. If we group the columns by time, the second element of the tuples, we see a bit more structure.

  1. The entries with time 0 are exactly the same as for our prior computation, where we just loaded the data.

  2. There aren't any entries at time 1 (go check). That is because the input didn't change in our first step, because 1/2 == 1/3 == 0. Since the input didn't change, the output doesn't change.

  3. The other times are more complicated.

Let's look at the entries for time 4.

        ((1, (0, 4)), 4, 1)
        ((2, (0, 4)), 4, -1)
        ((4, (1, 8)), 4, 1)
        ((4, (1, 9)), 4, 1)
        ((4, (2, 8)), 4, -1)
        ((4, (2, 9)), 4, -1)

There is a bit going on here. Four's manager changed from two to one, and while their skip-level manager remained zero the explanation changed. The first two lines record this change. The next four lines record the change in the skip-level manager of four's reports, eight and nine.

At the end, time 9, things are a bit simpler because we have reached the employees with no reports, and so the only changes are their skip-level manager, without any implications for other people.

        ((3, (1, 9)), 9, 1)
        ((4, (1, 9)), 9, -1)

Oof. Well, we probably could have figured these things out by hand, right?

Let's check out some ways this gets more interesting.

Increase all the things

Differential dataflow is meant to scale, and in many different directions. You can increase the amount of data it operates on, the number of threads and computers you use, and the rate at which you interact with it.

Let's explore these three scaling dimensions, starting from our example program of the previous chapter!

Increase the scale.

Ten people was a pretty small organization. Let's do ten million instead.

We are going to have to turn off the output printing here (comment out the inspect, but save the semicolon). We'll also need to add the --release flag to our command line, to avoid waiting forever.

We'll break down our computation two ways, first just loading up the initial computation, and second doing that plus all of the changes to the reporting structure. We haven't learned how to interactively load all of the input and await results yet (in just a moment), so we will only see elapsed times measuring the throughput, not the latency.

First, we produce the skip-level management (with the "change its input" code commented out).

        Echidnatron% time cargo run --release -- 10000000
            Finished release [optimized] target(s) in 0.24s
             Running `target/release/my_project 10000000`
        cargo run --release my_project 10000000 -w1  2.74s user 1.00s system 98% cpu 3.786 total
        Echidnatron%

Four seconds. We have no clue if this is a good or bad time.

Second, we produce the skip-level management and then modify it 10 million times (as in "change its input").

        Echidnatron% time cargo run --release -- 10000000
            Finished release [optimized] target(s) in 0.24s
             Running `target/release/my_project 10000000`
        cargo run --release my_project 10000000  10.64s user 2.22s system 99% cpu 12.939 total
        Echidnatron%

About thirteen seconds now.

That's less than a microsecond per modification (subtracting the loading time). Importantly, these are throughput measurements rather than latency numbers; we aren't actually doing the 10 million updates one after the other. But, if you compare this to a sequence of 10 million updates to a database, we would be pretty pleased with a microsecond per operation.

Increase the parallelism.

Differential dataflow works great using multiple threads and computers. It even produces the same output and everything.

For this to work out, we'll want to ask each worker to load up a fraction of the input. If we just run the same code with multiple workers, then each of the workers will run

    for person in 0 .. size {
        input.insert((person/2, person));
    }

and each will insert the entire input collection. We don't want that!

Instead, each timely dataflow worker has methods index() and peers(), which indicate the workers number and out of how many total workers. We can change the code so that each worker only loads their fraction of the input, like so:

    let mut person = worker.index();
    while person < size {
        input.insert((person/2, person));
        person += worker.peers();
    }

We can also make the same changes to the code that supplies the change, where each worker is responsible for those people whose number equals worker.index() modulo worker.peers().

    let mut person = worker.index();
    while person < size {
        input.remove((person/2, person));
        input.insert((person/3, person));
        input.advance_to(person);
        person += worker.peers();
    }

I'm on a laptop with two cores. Let's load the data again, without modifying it, but let's use two worker threads (with the -w2 argument)

        Echidnatron% time cargo run --release -- 10000000 -w2
            Finished release [optimized] target(s) in 0.24s
             Running `target/release/my_project 10000000 -w2`
        cargo run --release -- 10000000 -w2  3.34s user 1.27s system 191% cpu 2.402 total
        Echidnatron%

Now let's try loading and doing ten million modifications, but with two worker threads.

        Echidnatron% time cargo run --release -- 10000000 -w2
            Finished release [optimized] target(s) in 0.24s
             Running `target/release/my_project 10000000 -w2`
        cargo run --release -- 10000000 -w2  13.06s user 3.14s system 196% cpu 8.261 total
        Echidnatron%

Each of these improve on the single-threaded execution (they do more total work, because). Perhaps amazingly, they even improve the case where we need to do ten million sequential modifications. We get exactly the same answer, too.

Increase the interaction.

Instead of loading all of our changes and only waiting for the result, we can load each change and await its results before supplying the next change. This requires a bit of timely dataflow magic, where we add a probe to the end of our dataflow:

    // create a manager
    let probe = worker.dataflow(|scope| {

        // create a new collection from an input session.
        let manages = input.to_collection(scope);

        // if (m2, m1) and (m1, p), then output (m1, (m2, p))
        manages
            .map(|(m2, m1)| (m1, m2))
            .join(&manages)
            // .inspect(|x| println!("{:?}", x))
            .probe()
    });

We can then use this probe to limit the introduction of new data, by waiting for it to catch up with our input before we insert new data. For example, after we insert our initial data, we should wait until everyone has caught up.

    let mut person = worker.index();
    while person < size {
        input.insert((person/2, person));
        person += worker.peers();
    }

    // wait for data loading.
    input.advance_to(1);
    input.flush();
    while probe.less_than(&input.time()) { worker.step(); }
    println!("{:?}\tdata loaded", worker.timer().elapsed());

These four new lines are each important, especially the one that prints things out. The other three do a bit of magic that get timely dataflow to work for us until we are certain that inputs have been completely processed.

We can make the same changes for the interactive loading, but we'll synchronize the workers for each person they load.

    // make changes, but await completion.
    let mut person = 1 + worker.index();
    while person < size {
        input.remove((person/2, person));
        input.insert((person/3, person));
        input.advance_to(person);
        input.flush();
        while probe.less_than(&input.time()) { worker.step(); }
        println!("{:?}\tstep {} complete", worker.timer().elapsed(), person);
        person += worker.peers();
    }

This starts to print out a mess of data, indicating not only how long it takes to start up the computation, but also how long each individual round of updates takes.

        Echidnatron% cargo run --release -- 10000000
            Finished release [optimized] target(s) in 0.24s
             Running `target/release/my_project 10000000`
        4.092895186s    data loaded
        4.092975626s    step 1 complete
        4.093021676s    step 2 complete
        4.093041130s    step 3 complete
        4.093110803s    step 4 complete
        4.093144075s    step 5 complete
        4.093187645s    step 6 complete
        4.093208245s    step 7 complete
        4.093236460s    step 8 complete
        4.093281793s    step 9 complete

which continues for quite a while.

        21.689493445s   step 397525 complete
        21.689522815s   step 397526 complete
        21.689553410s   step 397527 complete
        21.689593500s   step 397528 complete
        21.689643055s   step 397529 complete

You can see that this is pretty prompt; the latencies are in the tens of microseconds.

You can also see that the whole computation is clearly going to take a bit longer. This is because we've forced some work to finish before we start the next work, which we haven't done before. We will explore later on how to trade off latency and throughput when we come to "open-loop" interaction.

Differential Dataflow operators

Differential dataflow programs are fundamentally build out of operators applied to collections. These operators result in collections, to which we apply more and more operators. Ideally, the operators we assemble compute something exciting and meaningful.

This chapter surveys the most common differential dataflow operators, and shows how to use each of them.

The Map Operator

The map operator applies a supplied function to each element of a collection, the results of which are accumulated into a new collection. The map operator preserves the counts of elements, and any elements that are made equal by the map operator will have their counts accumulate.

As an example, our example program used map to reverse the pairs of identifiers in the manages collection, to place the second element first.

#![allow(unused)]
fn main() {
extern crate timely;
extern crate differential_dataflow;
use timely::dataflow::Scope;
use differential_dataflow::Collection;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::Join;
fn example<G: Scope>(manages: &Collection<G, (u64, u64)>)
where G::Timestamp: Lattice
{
    manages
        .map(|(m2, m1)| (m1, m2))
        .join(&manages)
        .inspect(|x| println!("{:?}", x));
}
}

If instead we had just written

#![allow(unused)]
fn main() {
extern crate timely;
extern crate differential_dataflow;
use timely::dataflow::Scope;
use differential_dataflow::Collection;
use differential_dataflow::lattice::Lattice;
fn example<G: Scope>(manages: &Collection<G, (u64, u64)>)
where G::Timestamp: Lattice
{
    manages
        .map(|(m2, m1)| m2);
}
}

we would have a collection containing each manager with a multiplicity equal to the number of individuals they manage.

The Filter Operator

The filter operator applies a supplied predicate to each element of a collection, and retains only those for which the predicate returns true.

As an example, we might select out those management relation where the manager has greater employee id than the managee, by writing

#![allow(unused)]
fn main() {
extern crate timely;
extern crate differential_dataflow;
use timely::dataflow::Scope;
use differential_dataflow::Collection;
use differential_dataflow::lattice::Lattice;
fn example<G: Scope>(manages: &Collection<G, (u64, u64)>)
where G::Timestamp: Lattice
{
    manages
        .filter(|&(m2, m1)| m2 > m1);
}
}

Rust makes it very clear when a method is provided with data, or only the ability to look at the data. The filter operator is only allowed to look at the data, which is where the & glyph comes from. This allows us to be more efficient in execution, but it is a subtle concept that further Rust reading may illuminate.

The Concat Operator

The concat operator takes two collections whose element have the same type, and produces the collection in which the counts of each element are added together.

For example, we might form the symmetric "management relation" by concatenating the manages collection with the same collection with its fields flipped:

#![allow(unused)]
fn main() {
extern crate timely;
extern crate differential_dataflow;
use timely::dataflow::Scope;
use differential_dataflow::Collection;
use differential_dataflow::lattice::Lattice;
fn example<G: Scope>(manages: &Collection<G, (u64, u64)>)
where G::Timestamp: Lattice
{
    manages
        .map(|(m2, m1)| (m1, m2))
        .concat(&manages);
}
}

This collection likely has at most one copy of each record, unless perhaps any manager manages itself. In fact, zero manages itself, and the element (0, 0) would have count two.

Importantly, concat doesn't do the hard work of ensuring that there is only one physical of each element. If we inspect the output of the concat above, we might see

        ((0, 0), 0, 1)
        ((0, 0), 0, 1)

Although these are two updates to the same element at the same time, concat is a bit lazy (read: efficient) and doesn't do the hard work until we ask it. For that, we'll need the consolidate operator.

The Consolidate Operator

The consolidate operator takes an input collection, and does nothing other than possibly changing its physical representation. It leaves the same sets of elements at the same times with the same logical counts.

What consolidate does do is ensure that each element at each time has at most one physical tuple. Generally, we might have multiple updates to the same element at the same time, expressed as independent updates. The consolidate operator adds all of these updates together before moving the update along.

As an example, if we were to inspect

#![allow(unused)]
fn main() {
extern crate timely;
extern crate differential_dataflow;
use timely::dataflow::Scope;
use differential_dataflow::Collection;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::Reduce;
fn example<G: Scope>(manages: &Collection<G, (u64, u64)>)
where G::Timestamp: Lattice
{
    manages
        .map(|(m2, m1)| (m1, m2))
        .concat(&manages)
        .inspect(|x| println!("{:?}", x));
}
}

we might see two copies of the same element:

((0, 0), 0, 1)
((0, 0), 0, 1)

However, by introducing consolidate

#![allow(unused)]
fn main() {
extern crate timely;
extern crate differential_dataflow;
use timely::dataflow::Scope;
use differential_dataflow::Collection;
use differential_dataflow::lattice::Lattice;
fn example<G: Scope>(manages: &Collection<G, (u64, u64)>)
where G::Timestamp: Lattice
{
    manages
        .map(|(m2, m1)| (m1, m2))
        .concat(&manages)
        .consolidate()
        .inspect(|x| println!("{:?}", x));
}
}

we are guaranteed to see at most one (0,0) update at each time:

((0, 0), 0, 2)

The consolidate function is mostly useful before inspecting data, but it can also be important for efficiency; knowing when to spend the additional computation to consolidate the representation of your data is an advanced topic!

The Join Operator

The join operator takes two input collections, each of which must have records with a (key, value) structure, and must have the same type of key. For each pair of elements with matching key, one from each input, the join operator produces the output (key, (value1, value2)).

Our example from earlier uses a join to match up pairs (m2, m1) and (m1, p) when the m1 is in common. To do this, we first have to switch the records in the first collection around, so that they are keyed by m1 instead of m2.

#![allow(unused)]
fn main() {
extern crate timely;
extern crate differential_dataflow;
use timely::dataflow::Scope;
use differential_dataflow::Collection;
use differential_dataflow::operators::Join;
use differential_dataflow::lattice::Lattice;
fn example<G: Scope>(manages: &Collection<G, (u64, u64)>)
where G::Timestamp: Lattice
{
    manages
        .map(|(m2, m1)| (m1, m2))
        .join(&manages)
        .inspect(|x| println!("{:?}", x));
}
}

The join operator multiplies frequencies, so if a record (key, val1) has multiplicity five, and a matching record (key, val2) has multiplicity three, the output result will be (key, (val1, val2)) with multiplicity fifteen.

The Reduce Operator

The reduce operator takes an input collection whose records have a (key, value) structure, and it applies a user-supplied reduction closure to each group of values with the same key.

For example, to produce for each manager their managee with the lowest identifier, we might write

#![allow(unused)]
fn main() {
extern crate timely;
extern crate differential_dataflow;
use timely::dataflow::Scope;
use differential_dataflow::Collection;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::Reduce;
fn example<G: Scope>(manages: &Collection<G, (u64, u64), i64>)
where G::Timestamp: Lattice
{
    manages
        .reduce(|_key, input, output| {
            let mut min_index = 0;

            // Each element of input is a `(&Value, Count)`
            for index in 1 .. input.len() {
                if input[min_index].0 > input[index].0 {
                    min_index = index;
                }
            }

            // Must produce outputs as `(Value, Count)`.
            output.push((*input[min_index].0, 1));
        });
}
}

The reduce operator has some tricky Rust details about how it is expressed. The type of closure you must provide is technically

    Fn(&Key, &[(&Val, Cnt)], &mut Vec<(Val2, Cnt2)>)

which means a function of three arguments:

  1. A reference to the common key (_key above).
  2. A slice (list) of pairs of value references and counts.
  3. A mutable vector into which one can put pairs of values and counts.

The method is structured this way so that you can efficiently observe and manipulate records with large multiplicities without actually walking through that number of records. For example, we can write a count operator much more efficiently with the count looking at us than if we had to traverse as many copies of each record as we were counting up.

Speaking of which ...

Count

The convenience method count wraps the reduce operator, and performs the common operation much more easily. The count operator takes arbitrary input collections, and produces a collection as output whose records are pairs of input records and their accumulated count.

Distinct

The distinct operator is another convenience operator, and it takes any input collection to one in which each input record occurs at most once. The distinct operator is a great way to recover set semantics despite differential dataflow's native multiset semantics.

Threshold

More general than distinct, the threshold operator takes any function from one count to another count and yields the collection with counts correspondingly updated. This is used to implement the distinct operator, but also operators like "records with count at least three".

The Iterate Operator

The iterate operator takes a starting input collection and a closure to repeatedly apply to this input collection. The output of the iterate operator is the collection that results from an unbounded number of applications of this closure to the input. Ideally this process converges, as otherwise the computation will run forever!

As an example, we can take our manages relation and determine for all employees all managers above them in the organizational chat. To do this, we start from the manages relation and write a closure that extends any transitive management pairs by "one hop" along the management relation, using a join operation.

#![allow(unused)]
fn main() {
extern crate timely;
extern crate differential_dataflow;
use timely::dataflow::Scope;
use differential_dataflow::Collection;
use differential_dataflow::operators::{Join, Iterate, Threshold};
use differential_dataflow::lattice::Lattice;
fn example<G: Scope>(manages: &Collection<G, (u64, u64)>)
where G::Timestamp: Lattice
{
    manages   // transitive contains (manager, person) for many hops.
        .iterate(|transitive| {
            transitive
                .map(|(mk, m1)| (m1, mk))
                .join(&transitive)
                .map(|(m1, (mk, p))| (mk, p))
                .concat(&transitive)
                .distinct()
        });
}
}

Although the first three lines of the closure may look like our skip-level management example, we have three more steps that are very important.

  1. We apply a map to remove m1 from the tuple. This was the middle manager, but to have the same type as the input collection we need to produce only pairs.

  2. We concatenate in transitive, which ensures that we don't "lose" any shorter management relations. Otherwise the loop body would insist that we take two steps along transitive.

  3. We apply distinct() at the end. This is important to ensure convergence. Otherwise, the multiplicities of facts would increase indefinitely. The distinct operator makes sure that we wind down as we stop discovering new transitive management relations.

Enter

The enter operator is a helpful method that brings collections outside a loop into the loop, unchanging as the iterations proceed.

In the example above, we could rewrite

#![allow(unused)]
fn main() {
extern crate timely;
extern crate differential_dataflow;
use timely::dataflow::Scope;
use differential_dataflow::Collection;
use differential_dataflow::operators::{Join, Threshold};
use differential_dataflow::operators::{Iterate, iterate::Variable};
use differential_dataflow::lattice::Lattice;
fn example<G: Scope>(manages: &Collection<G, (u64, u64)>)
where G::Timestamp: Lattice
{
    manages   // transitive contains (manager, person) for many hops.
        .iterate(|transitive| {

            let manages = manages.enter(&transitive.scope());

            transitive
                .map(|(mk, m1)| (m1, mk))
                .join(&manages)
                .map(|(m1, (mk, p))| (mk, p))
                .concat(&manages)
                .distinct()
        });
}
}

This modified version extends transitive by one step along manages, rather than by a step along transitive. It also concatenates in manages rather than transitive. This modified version can perform better, as while it takes shorter steps, they are also more measured.

Leave

The leave operator allows you to extract a collection from an iterative context. It isn't exactly clear how you do this yet, but it will be in just a moment. When you call leave on a collection, it returns a collection in the enclosing scope (outside the iteration) equal to the final value of the collection.

Variables

You can manually construct iterative contexts, if you like, using differential dataflow's Variable type. This is a collection that can be used before its contents are defined, establishing a recursive definition. Its contents will then develop iteratively, much as they do for the iterate operator.

Manual construction can be important when you have mutual recursion, perhaps among multiple collections (rather than the one collection iterate supports), or if you want to return something other than the result of the closure (perhaps intermediate collections).

As an example, the implementation of the iterate operator looks something like this:

#![allow(unused)]
fn main() {
extern crate timely;
extern crate differential_dataflow;
use timely::dataflow::Scope;
use timely::dataflow::scopes::Child;
use timely::progress::Antichain;
use differential_dataflow::Collection;
use differential_dataflow::operators::{Iterate, iterate::Variable};
use differential_dataflow::lattice::Lattice;
fn logic<'a, G: Scope>(variable: &Variable<Child<'a, G, G::Timestamp>, (u64, u64), isize>) -> Collection<Child<'a, G, G::Timestamp>, (u64, u64)>
where G::Timestamp: Lattice
{
    (*variable).clone()
}
fn example<'a, G: Scope<Timestamp=u64>>(collection: &Collection<G, (u64, u64)>) //, logic: impl Fn(&Variable<Child<'a, G, G::Timestamp>, (u64, u64), isize>) -> Collection<Child<'a, G, G::Timestamp>, (u64, u64)>)
   where G::Timestamp: Lattice
{
    collection.scope().scoped("inner", |subgraph| {
        let variable = Variable::new_from(collection.enter(subgraph), 1);
        let result = logic(&variable);
        variable.set(&result);
        result.leave()
    });
}
}

The Arrange Operator

The arrange operator is a massively important operator that we will discuss in more detail in the chapter on Arrangements. Arrange controls and coordinates how data are stored, indexed, and maintained, and forms the basis of efficient data sharing.

Operators like consolidate, join, and reduce all use arrange internally, and there can be substantial benefit to exposing this use. We will discuss this further.

Differential Interactions

Once a computation is written, we have only to interact with it. At its heart, this reduces to changing the inputs to the computation and observing changes in the outputs. This can be very easy to do, but there is also intentionally great latitude to allow you to allow the system to perform more efficiently.

Our goal is to go in-order through each of the elements of the code from our interactive example.

    // make changes, but await completion.
    let mut person = index;
    while person < people {
        input.remove((person/2, person));
        input.insert((person/3, person));
        input.advance_to(person);
        input.flush();
        while probe.less_than(&input.time()) {
            worker.step();
        }
        person += peers;
    }

Each of these parts, more or less, do something interesting and important. There is also some flexibility in how they are used, which we will also try to highlight.

Creating Inputs

We've seen already one example of creating a differential dataflow input in our management example.

    // create an input collection of data.
    let mut input = InputSession::new();

    // define a new computation.
    worker.dataflow(|scope| {

        // create a new collection from our input.
        let manages = input.to_collection(scope);

        // ...

Most commonly, a differential dataflow input is managed by an InputSession instance. We've created one above and then turned it in to a differential dataflow collection. Changes that we make to the input session will make their way into the differential dataflow computation; the input session acts as a bridge from our imperative code to the differential dataflow.

New Collections

You can also create input sessions from the new_collection and new_collection_from methods defined on timely dataflow scopes, by way of differential dataflow's Input trait. These methods allow you to define a collection in-line, and optionally supply initial data for the collection.

For example, above we could have written the above as:

    // define a new computation.
    let mut input = worker.dataflow(|scope| {

        // create a new collection from our input.
        let (input, manages) = scope.new_collection();

        // ...

        input
    });

Notice that we need to return the input from the closure, and bind it as the result of our call to dataflow().

As Collections

Any timely dataflow stream of the correct record type, specifically (data, time, diff), can be re-interpreted as a differential dataflow collection using the AsCollection trait, which provides a method as_collection().

This operator is helpful in the implementation of differential dataflow operators, when you need to dive in to timely dataflow specializations, and when you need to interoperate with timely dataflow computations. Perhaps you bring your data in from Kafka using timely dataflow; you must change it from a timely dataflow stream to a differential dataflow collection.

Making Changes

An InputSession instance supports several methods allowing us to change the underlying collection.

The simplest of these methods are insert(item) and remove(item), which respectively increment and decrement the count of the supplied item, at the current time of the input session.

The update(item, diff) method allows you to specify an arbitrary change to an item, positive or negative and of arbitrary magnitude. This method can be useful when your input data come as differences, or if you need to deal with large magnitude changes.

The update_at(item, time, diff) method allows you to specify an arbitrary change to an item, at an arbitrary time in the present or future of the current time of the input session. This method can be useful if your data may arrive out of order with respect to time, and you would nonetheless like to introduce the data to the computation. You are not likely to see the effects of these changes until the input passes their associated times, but you may nonetheless want to introduce the data rather than buffer it yourself.

Timely streams

As indicated in the "Creating Inputs" section, any timely dataflow stream can be recast as a differential dataflow collection, and so any other source of changes that can be turned into a timely dataflow stream can also be used as input changes for a differential dataflow computation. Doing this requires some care, which is what the InputSession type tries to provide for you.

Advancing Time

Differential dataflow will perform relatively little work until it believes it has enough information to produce the correct answer. This involves you supplying input changes, but just as importantly it involves you promising to stop changing the collection.

The InputSession type provides a method advance_to(time), which moves the internal time of the session forward to time, and prevents you from supplying input changes at times that are not greater or equal to time. This is a very strong statement made to the differential dataflow infrastructure that you have stopped changing this input at all times not greater or equal to this input time, and the system can now start to make progress determining the corresponding output changes.

Crucially, the calls to advance_to will be buffered until you call input.flush(), which exposes this information to the underlying timely dataflow system. This allows you to call advance_to as frequently as once per record, without overwhelming the underlying system.

IMPORTANT: Before expecting differential and timely dataflow to actually make forward progress, make sure you have advanced and flushed your inputs.

This is a classic source of error, still made by yours truly, that cause a computation to appear to have hung. In fact, the computation is almost always correct, and simply cannot make progress if you have held back the information that an input has stopped changing at some input times.

Temporal Concurrency

You do not need to call flush() each time you call advance_to(time).

Calls to advance_to change the logical time at which a change occurs, but you are welcome to buffer up many of these changes and the call flush only once, allowing differential dataflow to process the sequence of changes concurrently. This can greatly improve the throughput, often at a nominal affect on latency.

We will see this more clearly when we investigate the example application of real-time streaming input.

Observing Probes

Probes are an important concept in timely dataflow, and they play the same role in differential dataflow.

Dataflow computations differ from imperative computations in that you do not force computation to happen, you must wait until it has happened. The probe() operator on collections returns a probe that can tell you at which times a collection may still experience changes.

For example, recall our example of interacting with our management computation, where we wrote

    // create a manager
    let probe = worker.dataflow(|scope| {

        // create a new collection from an input session.
        let manages = input.to_collection(scope);

        // if (m2, m1) and (m1, p), then output (m1, (m2, p))
        manages
            .map(|(m2, m1)| (m1, m2))
            .join(&manages)
            .probe()
    });

The returned probe allows us to ask whether the computation has stabilized to the point that there will be no more changes at certain query timestamps. We used the probe later on, when we wrote

    while probe.less_than(&input.time()) { worker.step(); }

This causes the dataflow worker to continue to run until such a point as there can be no more changes strictly less than the current input time (what we are about to introduce). At this point all changes introduced at strictly prior times must be fully resolved, as the probe tells us that no further changes at their time can appear in the output.

Performing Work

All of the differential dataflow computation happens in what seems like a fairly small an unobtrusive operation:

    worker.step();

This call schedules each differential dataflow operator, and performs some amount of work they have outstanding. Repeated calls to this method will advance all of the work in the system, and should eventually bring output probes in-line with the times of the inputs on which they depend.

At the end of your differential dataflow computation, when we exit the closure supplied to timely dataflow, the system will call worker.step() until the computation completes. For this reason, it is safe to let workers who otherwise have nothing further to contribute to simply exit, as they will continue to participate until all other workers have completed as well.

For example, our first example computations didn't call worker.step() explicitly, but just exited once it supplied the input changes. Exiting causes all of the work to happen (and complete, as the inputs are automatically closed as they are dropped).

Explicit calls to worker.step() are important when we are maintaining interactive access to probes, and do not want to simply complete the computation.

Example Applications

In this chapter we use the tools we have begun to develop, writing computations and changing their inputs, to effect somewhat more interesting computational patterns. Our goal is to build up to an interactive graph computation that supports both interactive queries and real-time updates to the graph structure.

These aren't the limits of differential dataflow, and we expect that you can also come up with new idioms for using the tools differential dataflow provides!

Graph Computation

Graph computation covers a lot of ground, and we will pick just one example here: graph connectivity.

Imagine you have a collection containing pairs (source, target) of graph edges, and you would like to determine which nodes can reach which other nodes along graph edges (using either direction).

One algorithm for this graph connectivity is "label propagation", in which each graph node maintains a label (initially its own name) and all nodes repeatedly exchange labels and maintain the smallest label they have yet seen. This process converges to a limit where each node has the smallest label in its connected component.

Let's write this computation starting from a collection edges, using differential dataflow.

    // create initial labels from sources.
    let labels = edges.map(|(src,dst)| (src,src))
                      .distinct();

    labels
        .iterate(|inner| {
            let labels = labels.enter(inner.scope());
            let edges = edges.enter(inner.scope());
            inner.join(&edges)
                 .map(|(_src,(lbl,dst))| (dst,lbl))
                 .concat(&labels)
                 .reduce(|_dst, lbls, out| {
                     let min_lbl =
                     lbls.iter()
                         .map(|x| *x.0)
                         .min()
                         .unwrap();
                     out.push((min_lbl, 1));
                 })
        });

This computation first determines some initial labels, taking as the set of nodes those identifiers from which edges emerge. We are assuming that the input graph is symmetric, so every node should be the source of some edge.

The computation then iteratively develops the label collection, by joining whatever labels we have at any point with the set of edges, effectively "proposing" each node's label to each of the node's neighbors. All nodes fold in these proposals with their initial labels, and each retains one copy of the smallest label they are provided as input.

The resulting collection contains pairs (node, label) from which we can determine if two nodes are in the same connected component: do they have the same label? Let's see how to use this interactively next!

Interactive Queries

We have just described an algorithm for determining a collection containing (node, label) pairs describing the connected component structure of an undirected graph. But, this collection is a bit unwieldy. Do we print it to the screen? Flip through the pages of results to find a specific pair of query nodes?

Instead, let's describe an extended computation that lets us query the results, interactively!

Imagine labels contains the results of the iterative computation from before. Let's create a new input, queries, which will simply contain node identifiers.

    labels.semijoin(queries)
          .inspect(|x| println!("{:?}", x));

This is a fairly brief looking computation; what does it do?

As we update the queries relation, we change which elements of labels are passed through to the inspect. As we add new elements, the corresponding label from labels is fished out and presented for inspection. In effect, our computation is now an interactive service that returns the connected component id of nodes we ask for, in our sub-millisecond interactive timescales.

Even more importantly, our computation maintains the results of these queries as labels changes, perhaps due to changes in its input edges collection. When we add an element to queries we are installing a standing query that will monitor the label of the query, and report all changes to it, until such a point as we uninstall the query by removing it from queries.

Contrast this with an approach where the connectivity results are stashed in a key-value store, one that you probe for node labels. While you may read back two labels that are the same, were they actually the same at the same moment? If the labels are not the same, does that mean they were not the same or are they perhaps changing in tandem as the graph changes?

Differential dataflow prevents these ambiguities through its commitment to producing the correct answer, and clearly explaining a consistent history of changes through logical timestamps.

Real-time streaming input

Our examples so far have involved careful manipulation of the input, making changes and advancing time. What happens when we want to connect all of this to an external source that can change at arbitrary rates, and which might not wait for us to complete some work before issuing new changes?

Imagine an external data source that we can poll for changes, and when polled responds with all outstanding changes and the logical times at which each occurred. There is a fairly natural pattern we can write that exposes these changes to differential dataflow and asks it to resolve all changes concurrently, while retaining the logical times of each of the input changes.

    while !source.done() {
        // fetch a bounded amount of input changes.
        for (data, time, diff) in source.fetch() {
            input.update_at(data, time, diff);
        }
        // advance time to the source guarantee.
        let time = source.low_watermark();
        input.advance_to(time);
        input.flush();
        worker.step();
    }

This pattern repeatedly extracts changes from some source of change, perhaps a streaming source like Kafka, or perhaps a socket connection that you have set up to play back a timely stream. It introduces all of the changes, and then advances the input's time to match whatever the source guarantees it will no longer change.

Importantly, we don't know ahead of time what these data will be, nor even which times will we advance_to. These are results of the volume of streamed input changes and the amount of time spent in worker.step().

Under heavy input load differential dataflow may take more time to do its work, and we would expect larger batches of input to result. All components of differential dataflow have been designed so that the throughput improves as batch size increases, which means that the system should adapt its batch size to match the offered load. As load increases both batch size and latency will increase; as the load reduces the batch size and latency will decrease again.

Arrangements

Differential dataflow acts on collections of data, each of which we think of as a growing set of update triples (data, time, diff). Arrangements are a new way to represent a set of updates, which can be substantially more efficient than our approaches so far (and all other stream processors to date).

Thus far, we have implemented differential collections as streams of update triples, which we connect as inputs to our various differential operators. At any moment an operator might receive a few more update triples and need to react to them. Or an operator might receive the signal that some timestamp time is now complete and we should expect no more updates bearing that timestamp.

Streams of updates are a fine representation of collections, but they leave a great amount of performance on the table. In fact, many operators do exactly the same thing with their input update streams: they build and maintain an index of the updates so that they can randomly access them in the future. With that in mind, why not build and maintain that index once, sharing the required resources across the operators that use the indexed data instead of asking each to perform redundant work?

Arrangements are an indexed representation of streamed data. An arrangement indexes batches of update tuples and streams these indexed batches in place of individual update tuples. At the same time, it maintains the sequence of these indexed batches in a compact representation, merging batches as appropriate so that all users have access to an efficient index of all updates so far.

An arrangement example

Imagine you have collection that describes a relation among people, perhaps "x knows y", and you would like to query the "friends of friends" relation: for a given person x, who are the people known by friends of x?

Let's first build this naively, starting from two inputs: knows containing the pairs of the relation and query containing pairs (source, query_id) that allow us to interactively interrogate the data.

extern crate timely;
extern crate differential_dataflow;

use differential_dataflow::operators::Join;

fn main() {

    // define a new timely dataflow computation.
    timely::execute_from_args(::std::env::args(), move |worker| {

        let mut knows = differential_dataflow::input::InputSession::new();
        let mut query = differential_dataflow::input::InputSession::new();

        worker.dataflow(|scope| {

            let knows = knows.to_collection(scope);
            let query = query.to_collection(scope);

            // Hop from x to y, then from y to z.
            query.join_map(&knows, |x,q,y| (*y,*q))
                 .join_map(&knows, |y,q,z| (*q,*z))
                 .inspect(|result| println!("result {:?}", result));

        });

      // to help with type inference ...
      knows.update_at((0,0), 0usize, 1isize);
      query.update_at((0,0), 0usize, 1isize);
    });
}

As it happens, differential dataflow's join operations all do the same things with their input collections: they will convert the stream of updates into an indexed representation (which they then use to respond to change in the other inputs). This makes join an excellent candidate to use arrangements as inputs.

To arrange a collection, we just call one of several arrange methods. In this case, we will arrange "by key", because we want to take our (x, y) pairs and arrange them by x. Once we have done this, there are just a few additional cosmetic changes to make to our program to use this arranged data in each join:

extern crate timely;
extern crate differential_dataflow;

use differential_dataflow::operators::JoinCore;
use differential_dataflow::operators::arrange::ArrangeByKey;

fn main() {

    // define a new timely dataflow computation.
    timely::execute_from_args(::std::env::args(), move |worker| {

        let mut knows = differential_dataflow::input::InputSession::new();
        let mut query = differential_dataflow::input::InputSession::new();

        worker.dataflow(|scope| {

            let knows = knows.to_collection(scope);
            let query = query.to_collection(scope);

            // Arrange the data first! (by key).
            let knows = knows.arrange_by_key();

            // Same logic as before, with a new method name.
            query.join_core(&knows, |x,q,y| Some((*y,*q)))
                 .join_core(&knows, |y,q,z| Some((*q,*z)))
                 .inspect(|result| println!("result {:?}", result));

        });

      // to help with type inference ...
      knows.update_at((0,0), 0usize, 1isize);
      query.update_at((0,0), 0usize, 1isize);
    });
}

Our computation now contains only one copy of the potentially large and fast-changing knows collection. This not only saves on memory for collection, but it also saves on the computation and communication required to maintain the indexed representation as the collection changes.

Using arrangements

The same collection can be arranged multiple different ways. Although the contents of the collection are the same, the different arrangements are useful in different contexts.

Arrangement by key

We saw before an example where we used one type of arrangement, arrange_by_key(), to re-use the same arrangement for multiple join operators. The "by key" arrangement is great for operators that require (key, val) input data grouped by key, and join is one operator that does require that.

extern crate timely;
extern crate differential_dataflow;

use differential_dataflow::operators::JoinCore;
use differential_dataflow::operators::arrange::ArrangeByKey;

fn main() {

    // define a new timely dataflow computation.
    timely::execute_from_args(::std::env::args(), move |worker| {

        let mut knows = differential_dataflow::input::InputSession::new();
        let mut query = differential_dataflow::input::InputSession::new();

        worker.dataflow(|scope| {

            let knows = knows.to_collection(scope);
            let query = query.to_collection(scope);

            // Arrange the data first! (by key).
            let knows = knows.arrange_by_key();

            // Same logic as before, with a new method name.
            query.join_core(&knows, |x, q, y| Some((*y, (*x, *q))))
                .join_core(&knows, |y, (x, q), z| Some((*q, (*x, *y, *z))))
                .inspect(|result| println!("result {:?}", result));
        });

        // to help with type inference ...
        knows.update_at((0, 0), 0usize, 1isize);
        query.update_at((0, 0), 0usize, 1isize);
    });
}

Arrangement by self

Another form of arrangement is "by self", where the elements of the collection themselves are taken as keys with no associated values. Arrangement by self is important for certain operators like distinct, count, and semijoin, each of which just need access to indexed records but without associated values.

We can show off arrangement by self in our "friends of friends" example by adding some requirements to our (x, y, z) output triples. Let's imagine that in addition, we want each of the other four "knows" relationships, in addition to the two we start with ("x knows y" and "y knows z").

extern crate timely;
extern crate differential_dataflow;

use differential_dataflow::operators::JoinCore;
use differential_dataflow::operators::arrange::ArrangeByKey;
use differential_dataflow::operators::arrange::ArrangeBySelf;

fn main() {

    // define a new timely dataflow computation.
    timely::execute_from_args(::std::env::args(), move |worker| {

        let mut knows = differential_dataflow::input::InputSession::new();
        let mut query = differential_dataflow::input::InputSession::new();

        worker.dataflow(|scope| {

            let knows = knows.to_collection(scope);
            let query = query.to_collection(scope);

            // Arrange the data first! (by key and self).
            let knows_by_key = knows.arrange_by_key();
            let knows_by_self = knows.arrange_by_self();

            // The same outputs as in the previous example.
            let candidates =
            query.join_core(&knows_by_key, |x,q,y| Some((*y,(*x,*q))))
                 .join_core(&knows_by_key, |y,(x,q),z| Some((*q,(*x,*y,*z))));

            // Repeatedly put pairs of nodes as keys, and semijoin with knows.
            candidates
                .map(|(q,(x,y,z))| ((x,z),(q,y)))
                .join_core(&knows_by_self, |&(x,z),&(q,y),&()| Some(((y,z),(q,x))))
                .join_core(&knows_by_self, |&(y,z),&(q,x),&()| Some(((z,x),(q,y))))
                .join_core(&knows_by_self, |&(z,x),&(q,y),&()| Some(((y,x),(q,z))))
                .join_core(&knows_by_self, |&(y,x),&(q,z),&()| Some((q,(x,y,z))))
                .inspect(|result| println!("result {:?}", result));

        });

      // to help with type inference ...
      knows.update_at((0,0), 0usize, 1isize);
      query.update_at((0,0), 0usize, 1isize);
    });
}

We now have two arranged forms of the knows collection, which is more than before, but we have now used the collection six times in our computation. We now have a factor three reduction in required resources from the corresponding naive implementation!

Returning to collections

You may need to return from an arrangement to a collection (a stream of updates). An arrangement's as_collection() method allows this.

extern crate timely;
extern crate differential_dataflow;

use differential_dataflow::operators::{Join, JoinCore};
use differential_dataflow::operators::arrange::ArrangeByKey;

fn main() {

    // define a new timely dataflow computation.
    timely::execute_from_args(::std::env::args(), move |worker| {

        let mut knows = differential_dataflow::input::InputSession::new();
        let mut query = differential_dataflow::input::InputSession::new();

        worker.dataflow(|scope| {

            let knows = knows.to_collection(scope);
            let query = query.to_collection(scope);

            // Arrange the data first! (by key).
            let knows = knows.arrange_by_key();

            // Return to collection representation.
            let knows = knows.as_collection(|k,v| (*k,*v));

            // Same logic as before, with a new method name.
            query.join_map(&knows, |x,q,y| (*y,(*x,*q)))
                 .join_map(&knows, |y,(x,q),z| (*q,(*x,*y,*z)))
                 .inspect(|result| println!("result {:?}", result));

        });

        // to help with type inference ...
        knows.update_at((0,0), 0usize, 1isize);
        query.update_at((0,0), 0usize, 1isize);
    });
}

Sharing across dataflows

Arrangements have the additional appealing property that they can be shared not only within a dataflow, but across dataflows.

Imagine we want to build and maintain a relatively large and continually changing collection. But we want to do this in a way that allows an arbitrary number of subsequent queries to access the collection at almost no additional cost.

The following example demonstrates going from an interactive input session (input) to an arrangement (trace) returned from the dataflow and available for use by others.

extern crate timely;
extern crate differential_dataflow;

use differential_dataflow::operators::JoinCore;
use differential_dataflow::operators::arrange::ArrangeByKey;

fn main() {

    // define a new timely dataflow computation.
    timely::execute_from_args(::std::env::args(), move |worker| {

        let mut knows = differential_dataflow::input::InputSession::new();

        // Input and arrange a dynamic collection.
        let mut trace = worker.dataflow(|scope| {

            let knows = knows.to_collection(scope);
            let knows = knows.arrange_by_key();

            // Return the `.trace` field of the arrangement`.
            knows.trace

        });

        // to help with type inference ...
        knows.update_at((0,0), 0usize, 1isize);
    });
}

This example arranges the knows collection as before, but rather than use the arrangement it returns the .trace field of the arrangement. We can use this field to re-introduce the arrangement into other dataflows.

For example, let's imagine we want to construct many new dataflows each of which create a query set, which they then use to read out part of the knows collection.

    for round in 1 .. 1_000 {

        worker.dataflow(|scope| {

            // Round-specific query set.
            let query =
            (round .. round + 3)
                .to_stream(scope)
                .map(move |x| (x, round, 1))
                .as_collection();

            // Import arrangement, extract keys from `query`.
            trace
                .import(scope)
                .semijoin(&query)
                .consolidate()
                .inspect(move |x| println!("{:?}\t{:?}", timer.elapsed(), x))
                .probe_with(&mut probe);

        });

        // Change the collection a bit.
        input.remove((round, round));
        input.advance_to(round + 1);
        input.flush();

        // Run until all computations are current.
        while probe.less_than(input.time()) {
            worker.step();
        }

    }

The crucial line above is the line

            trace
                .import(scope)

which takes the trace and brings it in to the dataflow as an arrangement. It is now ready to be used in operations like semijoin that can exploit pre-arranged data.

Where in other stream processing systems such a computation might maintain one thousand independent indices each containing independent (but identical) copies of knows, here we are able to support all of these uses with a single arrangement.

Great responsibility

When we extract a trace from an arrangement, we acquire the ability to replay the arrangement in any new scope. With that great power comes great responsibility. As long as we simply hold the trace, we prevent the system from compacting and efficiently managing its representation.

A TraceHandle (the type of trace) has two important methods. Their names are not great, and subject to change in the future. Their idioms may also change as more information flows in about users and use cases.

  1. set_logical_compaction(frontier). This method informs trace that it will no longer be called upon to handle queries for times not in advance of frontier, a set of timestamps. This gives the arrangement permission to coalesce otherwise indistinguishable timestamps, which it will start to do once all handles have advanced.

  2. set_physical_compaction(frontier). This method unblocks the merging of physical batches. It is very rare that a user wants to do anything with this other than call trace.set_physical_compaction(&[]), which unblocks all merging. Certain operators, namely join, do need to carefully manipulate this method.

Trace wrappers

There are many cases where we make small manipulations of a collection, and we might hope to retain the arrangement structure rather than re-build and maintain new arrangements. In some cases this is possible, using what we call trace wrappers.

The set of trace wrappers grows as more idioms are discovered and implemented, but the intent is that we can often avoid reforming new collections, and instead just push logic into a layer around the arrangement.

Filter

Like a collection, an arrangement supports the filter(predicate) operator that reduces the data down to those elements satisfying predicate. Unlike a collection, which produces a new collection when filtered, a filtered arrangement is just a wrapper around the existing arrangement.

The following example uses two different collections in its two joins, but one is a filtered version of the other and can re-use the same arrangement.

extern crate timely;
extern crate differential_dataflow;

use differential_dataflow::operators::JoinCore;
use differential_dataflow::operators::arrange::ArrangeByKey;

fn main() {

    // define a new timely dataflow computation.
    timely::execute_from_args(::std::env::args(), move |worker| {

        let mut knows = differential_dataflow::input::InputSession::new();
        let mut query = differential_dataflow::input::InputSession::new();

        worker.dataflow(|scope| {

            let knows = knows.to_collection(scope);
            let query = query.to_collection(scope);

            // Arrange the data first! (by key).
            let knows1 = knows.arrange_by_key();

            // Filter to equal pairs (for some reason).
            let knows2 = knows1.filter(|k,v| k == v);

            // Same logic as before, with a new method name.
            query.join_core(&knows1, |x,q,y| Some((*y,(*x,*q))))
                 .join_core(&knows2, |y,(x,q),z| Some((*q,(*x,*y,*z))))
                 .inspect(|result| println!("result {:?}", result));

        });

      // to help with type inference ...
      knows.update_at((0,0), 0usize, 1isize);
      query.update_at((0,0), 0usize, 1isize);
    });
}

Filtered arrangements are not always a win. If the input arrangement is large and the filtered arrangement is small, it may make more sense to build and maintain a second arrangement than to continually search through the large arrangement for records satisfying the predicate. If you would like to form a second arrangement, you can use as_collection() to return to a collection, filter the result, and then arrange it again.

Entering scopes

Differential dataflow programs often contain nested scopes, used for loops and iteration. Collections in a nested scope have different timestamps than collections outside the scope, which means we can not immediately re-use arrangements from outside the scope inside the scope.

Like collections, arrangements support an enter(scope) method for entering a scope, which will wrap the arrangement so that access to timestamps automatically enriches it as if the collection had entered the scope.

The following example demonstrates arranging the knows relation outside an iterative scope, and then bringing it in to the scope (along with the collection query). Unlike query, which is a collection, knows is an arrangement and will simply be wrapped with timestamp-extending logic.

extern crate timely;
extern crate differential_dataflow;

use differential_dataflow::operators::Join;
use differential_dataflow::operators::Threshold;
use differential_dataflow::operators::Iterate;
use differential_dataflow::operators::arrange::ArrangeByKey;

fn main() {

    // define a new timely dataflow computation.
    timely::execute_from_args(::std::env::args(), move |worker| {

        let mut knows = differential_dataflow::input::InputSession::new();
        let mut query = differential_dataflow::input::InputSession::new();

        worker.dataflow(|scope| {

            let knows = knows.to_collection(scope);
            let query = query.to_collection(scope);

            // Arrange the data first! (by key).
            let knows = knows.arrange_by_key();

            // Reachability queries.
            query.iterate(|reach| {

                let knows = knows.enter(&reach.scope());
                let query = query.enter(&reach.scope());

                knows.join_map(reach, |x,y,q| (*y,*q))
                     .concat(&query)
                     .distinct()
            });

        });

      // to help with type inference ...
      knows.update_at((0,0), 0usize, 1isize);
      query.update_at((0,0), 0usize, 1isize);
    });
}

Other wrappers

Other wrappers exist, but are still in development and testing. Generally, if the same physical layout of the index would support a collection transformation, a wrapper may be appropriate. If you think you have such an operation, the src/trace/wrappers/ directory is where the current examples reside.

Windows Enough and Time