The disArray-ed File Problem: Building Incremental Data Pipelines for Out-of-Order Scientific Data
A queue architecture for billion-row scientific time-series systems where late-arriving files, global metrics, and deterministic reprocessing make ingestion far more challenging than simple ETL.

Introduction
We at bisArray hate to admit it, but the real world can be rather disArray-ed.
The general use case we would like to discuss in this article is telemetry — something in the lab or the factory floor produces data as a function of time, and we need to capture it and provide insights. Maybe this is a lab sensor, or a glove box, or a battery cycler, or a production tool, or readout from an antibody fermenter — whatever the subject is, it is producing data as a function of time. Straightforward enough, but what if there is a lot of data from a lot of channels? Like 1,000 channels and 100M rows a day? What if we need to take global values from this data? Like say the full equivalent cycles our battery under test has been cycling? And the real kicker, what if that data arrives out of order, i.e. it is disArrayed. Say we are integrating the running total of parts in and out of spec for a production tool which produces a file summary every 4 hr, the tool was offline for two days in the middle of the month and a worker uploads the files to our pipeline weeks out of sequence. We need to be able to handle that.
We see this problem in many of the instrument-heavy systems we help build and modernize: out-of-order arrival, at-least-once delivery, and scale that makes "just reprocess everything" impossible. On one acquisition system, roughly 38% of files arrived out of event-time order under normal operation, with a long tail of stragglers landing up to 40 minutes after the data they contained was measured. At a few thousand files an hour and billions of resulting rows per month, the pipeline cannot re-derive those global metrics from scratch; it has to repair itself incrementally.
The architecture at a glance
For today's discussion lets make some concrete assumptions about what we are doing and describe the architecture in those terms. We think of the architecture in two steps:
Step 1 Files from local computers are centrally enqueued; and Step 2 Files are pulled from the queue and processed.
To put this in highly specific terms, assume:
- The use case is a factory floor with twenty process tools each of which makes a disposable surgical tool for cataract surgery. The tool is automated, PLC driven but has an online HMI for doing manual things like entering work orders, pulling configuration from a central database and pushing pass/fail criteria into several industrial vision cameras. (We wrote exactly such an HMI that has been in use for years!) Let's call the tools Tool01 ... Tool20;
- The HMI for ToolXX produces a report on the work order, that summarizes the run, parts made, parts failed, mispicks, that kind of thing. The report is a file continuously appended and is submitted daily and at the end of the work order run; and
- The files are placed into a central location on the factory's internal network, but temporal order is not guaranteed! A data agent — a locally running python script — watches for file changes and when a file changes pushes information about that file into an air-gapped RabbitMQ queue running locally on the network. Crucially, along with the file location (
source_uri) there needs to be a mechanism for deducing which ToolXX generated the file. We have done this many ways. Using the folder structure, adding the tool number to the file name, or parsing it out of the file are three such approaches. This is thesubject_keywe will see later in our database discussion. The data agent might add other information as well: perhaps its version, a timestamp or similar. This is all wrapped up into ametadataobject which is likewise part of the message placed on the queue;
subject_key and metadata) onto RabbitMQ.That's the first part. Files are placed in a semi-automated way into a filesystem which is monitored by a data agent. The data agent sees the files as arrived or changed then pushes a message with relevant metadata to a queue. Now for the second part of our architecture:
- We have a TimescaleDB database running to capture the telemetry data in hypertables we will extract from the files. On a different schema in that database, we will place the tables necessary for our pipeline to function efficiently. Not hypertables, just normal postgres tables (TimescaleDB is PostgreSQL after all);
- We have two worker instances Worker01 & Worker02 each running in its own VM, polling the queue, let's say, and receiving messages. Worker01 is configured to receive and process messages from tools 01–12 and Worker02 the balance of the messages, because these tools are in different buildings;
- Each of the workers is a python program running in the VM and configured to use all available threads. The program has three main threads and the same number of workers as threads in the VM each with their own dedicated process (python's
multiprocessinglibrary); - The first thread runs a tight poll of the RabbitMQ message queue. If it finds messages for itself it places row(s) in the database then clears the message;
- The second thread polls a table in postgres to find what work needs to be done. If work is found it then passes it to the running internal processes where the work is performed (in practice with the
multiprocessing.Queueobject from the Python standard library); - A worker process then processes each file and removes it from the postgres work table; and
- The third and final thread is configured as a REST server (
starlettewithuvicornis our favorite approach) that allows in-process communication with the Worker.
Every file flows through the same two stages. The two ideas that make it resilient are idempotency (upon re-ingestion a file is hashed and if not changed it's ignored) and a dirty-range reconciler (the system tracks which time spans it still needs to recompute, rather than trusting that data arrived in order).
The database
For this application we choose PostgreSQL because its Timescale extension allows us to create hypertables which are simply excellent at even several hundred billion row scale. But of course we can still do everything we ordinarily do with postgres. For managing the pipeline we create a separate schema (here called disarray) and put the data in a schema called telemetry (not shown).
A word of foreshadowing while we are here, because it shapes everything that follows: we lean on Timescale's hypertables constantly, but we deliberately do not lean on its continuous aggregates. We are dealing with use cases that present large amounts of data, and to plot them on a UI, aggregates are the idiomatic approach. We also believe that Timescale's continuous aggregates are a genuinely good feature — for data that arrives in order. The moment files can show up disArrayed, keeping a continuous aggregate correct turns into exactly the fight this whole article is about, and we ended up computing our rollups ourselves instead. We earn that conclusion the hard way in Approach to aggregates; for now, just note that "use a Timescale continuous aggregate" is the obvious answer we are quietly setting aside.
The schema labels ingest_queue_reason, ingest_queue_status, file_info_status and ingest_event_event_type as 'enums'. In postgres they could be either 'types' or implemented with a CHECK ... IN approach, but we call them enums here for clarity. We find strong typing ages better, so often prefer this approach.
The first table is ingest_queue, the list of work that needs to be done. The approach here is to add a row when work needs to be done, and then remove it when that work is finished. There are a few columns worth explaining. metadata is that metadata that our data agent attached to the file which, trust us, will change over time, so we just keep it as a postgres jsonb. Notice the requested_by_instance_name and claimed_by_instance_name. This is our Worker01 or Worker02 instance so we can monitor or enforce work assignments between the workers. Just briefly lease_expires_at is used to catch stalled processes. A healthy process sets this at process start and bumps this time on a regular cadence and expired leases can be detected and used to recover from hangs.
The next table file_info is meant to persist after ingestion. It is the information about the file together with some information about how it was processed. It has the subject_key, TOOLXX for our use case. Again, this is the crucial bit of information we need when applying 'global statistics' as we will only apply global statistics to a single tool at a time so as to minimize locking in our telemetry tables. When files are used in append mode, they are typically ingested more than once. We keep track of the logical_start and logical_end (of course keeping track of timezone information) of the data that are ingested, together with the process_count, which is how many times that file in fact has been ingested.
The file_blacklist table is a way to allow the system to simply ignore certain files. Perhaps R&D came to use a production tool which generated data files and the data agent placed them into the file system, but R&D did not want that data to be part of the production record. Blacklisting allows that flexibility. We usually start a blacklist process with the REST interface.
The subject_lock table is our mechanism to ensure that we are only operating on a single subject's files at a time. Again, we are designing a system that will calculate global parameters from the these files. If we need to integrate across telemetry data from multiple files, we are best served by ensuring this integration occur only across a single subject_key at a time. This table also has a lease_expires_at column to deal with potential hanging as described above.
Next we have the ingest_event table which is our log, keyed to a file — again, which may be ingested multiple times. As what logging information we may wish to store changes based on the logging event metadata stores specific information, say rows ingested this event.
Finally there is the dirty_range table. It records the time spans a subject still needs recomputed after a late or changed file lands — the pipeline's own list of what to repair, rather than a blind trust that data arrived in order. It is keyed by subject_key and stands on its own (no foreign key) — we come back to it in The dirty-range reconciler below.
A functional access pattern
We use functions to orchestrate the pipeline — we often enforce a function-only access pattern, so the workers never touch these tables directly. When a worker pulls a message off RabbitMQ it calls enqueue_file, which (like everything here) is marked SECURITY DEFINER:
CREATE OR REPLACE FUNCTION disarray.enqueue_file( p_source_uri TEXT, p_subject_key TEXT, p_reason TEXT DEFAULT 'file_notification', p_instance_name TEXT DEFAULT NULL, p_metadata JSONB DEFAULT '{}'::jsonb ) RETURNS BIGINT LANGUAGE plpgsql SECURITY DEFINER SET search_path = disarray, pg_temp AS $$ ... $$;
The body is deliberately boring, and that is the point. In a single transaction it does three things:
Short-circuits blacklisted files. If
source_uriis infile_blacklist, it upserts afile_inforow with statusblacklisted, writes ablacklistedingest_event, and returnsNULL— no queue row is ever created.Upserts
file_info. AnON CONFLICT (source_uri) DO UPDATEkeeps exactly one durable row per file across every re-ingest. The only line worth pausing on is how we fold in metadata rather than clobber it:metadata = file_info.metadata || EXCLUDED.metadata
The jsonb
||operator right-biases the incoming keys while preserving whatever the agent sent on earlier notifications. It costs nothing and it saves you the day the agent's payload shape quietly drifts — which, over months, it will.Enqueues the work. It inserts one
ingest_queuerow plus a matchingenqueuedevent, then returns the newqueue_id.
enqueue_file is one of a small family of SECURITY DEFINER functions that are the only way a worker touches these tables. Two more carry the rest of the loop: fetch_items claims work, and complete_item / fail_item retire it.
Claiming work — fetch_items
The RabbitMQ thread fills the queue; a second thread drains it by calling fetch_items. This is where the "one subject at a time" rule is actually enforced:
CREATE OR REPLACE FUNCTION disarray.fetch_items( p_instance_name TEXT, p_limit INTEGER DEFAULT 10, p_lease_seconds INTEGER DEFAULT 300 ) RETURNS TABLE (queue_id BIGINT, source_uri TEXT, subject_key TEXT, reason TEXT, metadata JSONB, retry_count INTEGER, lease_expires_at TIMESTAMPTZ) LANGUAGE plpgsql SECURITY DEFINER SET search_path = disarray, pg_temp AS $$ ... $$;
It runs four moves in a single transaction:
- Reap dead locks.
DELETE FROM subject_lock WHERE lease_expires_at < now()frees any subject whose owner crashed or hung past its lease before we go looking for work. - Stay on your subject. If this instance already holds a live lock, it keeps draining that subject. Because global metrics are computed per subject, a worker finishes one subject before moving to the next rather than interleaving them.
- Otherwise, claim a new subject. It finds the oldest available subject nobody holds a live lock on and takes the lock with an atomic
INSERT ... ON CONFLICT (subject_key) DO UPDATE ... WHERE subject_lock.lease_expires_at < now()— so a lock is granted only if it is free or expired. The candidate scan usesFOR UPDATE SKIP LOCKED, so two workers racing for the next subject never block each other; the loser just skips to another. Acquiring the lock logs asubject_lockedevent. - Claim the files. With the subject owned, it claims up to
p_limitof that subject'savailablerows — againFOR UPDATE SKIP LOCKED— flips them toclaimed, and stamps a fresh lease. Every call also bumps the lease, so a healthy worker heartbeats simply by continuing to fetch.
The payoff is the invariant everything downstream leans on: every subject for which an in-flight file is being processed is only accessed by a single worker at a time. That is what makes it safe to integrate a global metric across a subject's files without cross-worker locking on the telemetry tables.
Retiring work — complete_item and fail_item
When a worker process finishes a file it retires the queue row. Success is complete_item:
CREATE OR REPLACE FUNCTION disarray.complete_item( p_queue_id BIGINT, p_instance_name TEXT, p_metadata JSONB DEFAULT '{}'::jsonb ) RETURNS VOID LANGUAGE plpgsql SECURITY DEFINER SET search_path = disarray, pg_temp AS $$ ... $$;
It deletes the queue row — but only WHERE claimed_by_instance_name = p_instance_name, and raises if that matches nothing. That ownership guard is the point: if this worker's lease already expired and another instance stole the subject, the stale worker must not be able to mark the work done. On success it flips file_info to processed, bumps process_count, clears the last error, and logs a completed event.
Failure is fail_item, which decides retry-or-die in one place:
CREATE OR REPLACE FUNCTION disarray.fail_item( p_queue_id BIGINT, p_instance_name TEXT, p_error_message TEXT, p_retry_delay_seconds INTEGER DEFAULT 60, p_metadata JSONB DEFAULT '{}'::jsonb ) RETURNS VOID LANGUAGE plpgsql SECURITY DEFINER SET search_path = disarray, pg_temp AS $$ ... $$;
If retry_count + 1 < max_retries it returns the row to available with a backoff (available_at = now() + p_retry_delay_seconds), clears the claim, and logs attempt_failed. Otherwise it deletes the row, marks file_info as failed, and logs failed. The queue is therefore self-cleaning: a row exists only while there is still work worth trying. (A sibling, release_subject, drops a subject lock and logs subject_released when a worker is done with a subject — the deliberate counterpart to the lease reaping in step 1.)
There are many more details — like how the REST controller triggers reingestion and blacklisting of already ingested files, or when to use a synchronous db call versus an asynchronous one, or how to manage crash recovery, to name just a few — but none of this is clever, and it shouldn't be — it is a queue, a claim, and a lease. The cleverness lives in two places: how we avoid re-ingesting data we already hold, and how we keep a global number correct when a file lands weeks late. Take them in turn.
Idempotent efficiency
Remember that these report files are appended, often over very long spans — a work order can stay open for weeks, and a file we first saw in January may still be growing in March. Re-ingesting a file like that from scratch on every notification would be enormous, redundant work. Two choices keep it cheap.
Notify at a cadence that matches the data. A file that is actively growing deserves prompt attention; a file that has not changed in a month does not. So we tune the data agent's notification policy to the use case — in practice, newer files trigger notifications far faster than older ones. The exact schedule is entirely use-case dependent (a battery cycler that logs for months looks nothing like a tool that emits a report every four hours), but the principle holds even when the numbers do not: spend the ingest budget where the data is actually moving.
Ingest only what is genuinely new. Every notification is checked against file_info, which remembers what we have already absorbed for that source_uri — the content_hash, the logical_start/logical_end, the process_count. Two fast paths fall out:
- Unchanged file → no data work at all. If the incoming
content_hashmatches what we stored, there is nothing to ingest. We touch only the bookkeeping — bumplast_seen_at, write aningest_event— and return. A re-notification of a stable file costs one hash comparison and two small row updates, never a single row of telemetry. - Partially-new file → correct only the tail. When just the end of a file is new — the ordinary append case — we do not re-ingest the whole thing. We take a configurable
back_correction_window(often only a few seconds), step back that far from where the previous ingest ended,DELETEthe telemetry inside that window, and thenINSERTeverything from the left edge of the window forward. A delete-then-insert of a small tail is reliably faster than a row-by-row upsert, and the short backward overlap absorbs any boundary jitter — a half-written last line, a little clock skew — at the point where the earlier ingest stopped.
The net effect is that the steady-state cost of watching a months-long file is proportional to how much it grew, not how big it is — which is what makes a few thousand notifications an hour affordable at all. At least it usually is. There are cases where the whole file has to be crunched for the appended region to make sense, but even then we keep the database work as cheap as we can.
Keeping the raw telemetry correct and cheap is only half the job, though. The genuinely hard problem starts after those rows land — when a single late file means a global number computed across the whole subject is suddenly wrong.
Calculating global parameters
Up to now nothing has been especially hard. Watch a share, enqueue notifications, upsert a row, claim some work — you could build that against any queue and any database. The reason we go to all this trouble is the global parameter, and it is worth being precise about what that is.
A global parameter is a number you can only get by integrating across a subject's entire history, not from any single file. The full equivalent cycles on a battery under test. The running total of in-spec and out-of-spec parts for Tool07 since the work order opened. Cumulative charge throughput, total runtime, lifetime mispicks. These are all prefix sums: the value at time t depends on every measurement at or before t.
That one property is what turns out-of-order arrival from a nuisance into a real problem. If a file carrying data for [t0, t1] shows up late — the tool was offline for two days and someone uploads the backlog weeks later — it does not simply add rows at t0. It moves every cumulative value from t0 to the present. The damage from a late file is not a point; it is the entire data set after it.
t0 doesn't just add a point — it shifts the running total for every instant after t0. Everything left of t0 is untouched, so the fix is to recompute only the post-insert data, seeded from the trusted value at t0.The naive fix is to recompute the subject from zero whenever anything lands. At a hundred million rows a day per tool that is a non-starter — you would spend the whole day re-integrating history to absorb a single 4-hour report. What we want instead is to recompute only the data after our insert window that actually moved, and to do it for one subject at a time so we hold a bounded, predictable lock on the telemetry tables. That second constraint is exactly why the subject_lock table exists.
The dirty-range reconciler
The trick is to stop trusting that data arrived in order and instead write down what we know is stale. That is the whole job of the dirty_range table from the schema above: a running list of the time spans a subject still needs recomputed. Only the unresolved rows are ever scanned, so it carries a partial index on WHERE resolved_at IS NULL and nothing more.
When a worker writes a file's telemetry rows into the hypertable, it records the span it touched in the same transaction as the data write:
-- Runs alongside the INSERT of the file's rows, so the fact that a span is -- dirty commits atomically with the rows that made it dirty. INSERT INTO disarray.dirty_range (subject_key, range_start, range_end, reason) VALUES (p_subject_key, p_logical_start, p_logical_end, 'late_file');
That atomicity is the whole game — and it is the reason dirty_range is a table at all rather than a set held in Python. You could just as easily track the affected ranges in the worker and recompute them right after each insert; the arithmetic is trivial. But an in-memory list evaporates the instant a worker dies mid-batch, leaving rows in the hypertable that nothing knows are stale. Writing the "this span is now stale" fact in the same transaction as the rows that made it stale buys the guarantee we actually need: the two commit or fail together, and there is never a window where the data exists but the system has forgotten it owes a recompute — precisely the window a crash would otherwise exploit.
Reconciliation itself runs under the subject lock (acquired in fetch_items, so exactly one worker owns a subject at a time). It finds the earliest still-dirty point and recomputes cumulative metrics forward from the last trustworthy checkpoint, never from the beginning of time:
CREATE OR REPLACE FUNCTION disarray.reconcile_subject( p_subject_key TEXT, p_instance_name TEXT ) RETURNS TIMESTAMPTZ LANGUAGE plpgsql SECURITY DEFINER SET search_path = disarray, pg_temp AS $$ DECLARE v_from TIMESTAMPTZ; BEGIN -- The caller already holds subject_lock for p_subject_key, so we are the -- only writer touching this subject's aggregates. -- The frontier: earliest point that still needs recomputation. SELECT min(range_start) INTO v_from FROM dirty_range WHERE subject_key = p_subject_key AND resolved_at IS NULL; IF v_from IS NULL THEN RETURN NULL; -- nothing dirty; nothing to do END IF; -- Recompute forward from the last good checkpoint strictly before the -- frontier. That checkpoint is a running total we can trust, so we seed -- from it instead of re-integrating the whole subject. PERFORM telemetry.recompute_running_totals(p_subject_key, v_from); -- Everything from the frontier forward is now consistent again. UPDATE dirty_range SET resolved_at = now() WHERE subject_key = p_subject_key AND resolved_at IS NULL AND range_start >= v_from; RETURN v_from; END; $$;
The recompute_running_totals helper is where the prefix-sum insight pays off: it reads the last aggregate row strictly before v_from, takes its cumulative value as the base, and re-integrates forward only from there. A file that lands three weeks late costs you a replay from its timestamp to now — not from the birth of the tool.
Where the new data lands changes the work. That helper has three regimes, and the difference between them turned out to be the single most important performance lesson we took from this system.
- First data for the subject. There is nothing to reconcile against, so the worker computes the globals for the file in Python and inserts the result rows. No prior state, no query.
- Data that appends after everything stored — the common case. The worker reads the last global value already in the database, seeds from it, and integrates the new file forward, again entirely in Python. One small read, then arithmetic.
- Data that lands before existing rows — the genuinely disArrayed case. The file inserts into the middle of the subject's history, so every cumulative value to its right is now wrong. We insert the file's own rows (computed in Python, as always) and then run a single SQL pass that uses window functions to repair the running totals from the insertion point forward.
The hard-won lesson is the asymmetry between cases 2 and 3. It is tempting — and we did this first — to lean on SQL window functions for everything, computing each file's own contribution with the same sum() OVER (...) machinery that makes the case-3 repair so tidy. It reads beautifully and it is dramatically slower: window functions pull the rows back out of the hypertable, sort them, and materialize a frame, and doing that for data the worker already holds in memory is pure waste. The rule we settled on is do as much of the arithmetic as possible in Python, and reserve SQL window functions for the one thing they are uniquely good at here — cleaning up the data to the right of a late insert.
Two properties fall out of this design for free, and they are the two we lead with when we describe it to clients:
- Idempotency. We covered the ingest mechanism earlier — an unchanged file is caught by its
content_hashand does no work. What matters here is the downstream consequence: a re-ingest of data we already hold never marks a range dirty, so it can never trigger a spurious recompute or double-count a subject's totals. Replaying the same file a hundred times lands the same numbers as playing it once. - Deterministic replay. Because staleness is data in a table rather than an assumption about arrival order, the same set of files always converges to the same aggregates regardless of the order they showed up in. Reprocessing is a repair, not a gamble.
Out-of-order arrival in practice
A few details separate the sketch above from something you can run for years.
Merge before you replay. A backlog upload can drop dozens of overlapping dirty ranges on one subject at once. Recomputing each independently would replay the same data chunk over and over. Before reconciling we coalesce the unresolved ranges per subject down to the single earliest frontier — one replay absorbs all of them.
Windowed vs. cumulative metrics behave differently. For a cumulative metric (running total, equivalent cycles) dirtiness propagates all the way to the frontier, because the prefix sum after t0 is all wrong. For a windowed aggregate (hourly in-spec count, per-shift mispicks) only the touched buckets are stale, so you recompute those buckets and stop. The dirty-range table carries both; the reconciler decides how far forward to walk based on the metric.
Bound how far back you will look. In principle a file from a year ago could arrive tomorrow. In practice you pick a watermark — a horizon beyond which late data is quarantined for human review rather than silently rewriting a year-old cumulative total. Choosing that horizon is a judgment call, and getting it wrong in the aggressive direction was one of our more instructive mistakes (see below).
Approach to aggregates
A running total is one kind of derived number; the other is the aggregate — a windowed rollup of the raw telemetry. Hourly in-spec and out-of-spec counts per tool, per-shift mispicks, a downsampled trend for the dashboards. Nobody queries a billion-row hypertable directly to draw a chart; they query a rollup that is orders of magnitude smaller. So we need those rollups to stay correct under the same out-of-order arrival that complicates the globals.
Timescale ships a purpose-built feature for exactly this — continuous aggregates with refresh policies — and we tried the obvious approaches before landing where we did.
- Refresh the whole aggregate on a schedule. This is what we inherited: a continuous aggregate with a policy that re-ran across a broad window every few minutes. It is simple and it is correct, but it does a great deal of work to re-materialize buckets that never changed, and the cost grows with history rather than with how much actually arrived. At our row counts that was untenable.
- Dirty-range refreshes of the aggregate. The natural next idea: reuse the dirty-range table and call
refresh_continuous_aggregateonly over the spans we know changed. In principle you refresh nothing but the dirty buckets. In practice it was worse — a steady stream of small, overlapping refreshes each taking locks, contending with ongoing ingest and with each other. Under real load it became a lock storm and throughput collapsed. - Compute the aggregates in Python. What actually worked was to stop treating the aggregate as something the database maintains for us and build it in the same worker pass that computes the globals. The worker already has the file's rows in memory and already holds the subject lock; rolling them into bucketed counts there and upserting the aggregate rows directly is cheap, deterministic, and uses no continuous-aggregate machinery — and therefore none of its locking.
This is the same lesson as the globals, reached from a different direction: the database is superb at storing and serving these rows, but the cheapest place to compute is the worker that already holds the data. Continuous aggregates are a fine tool — we still reach for them on low-churn, in-order datasets — but for high-volume, out-of-order, per-subject telemetry, doing the aggregation in Python under the subject lock beat both database-side approaches by a wide margin.
Why not just use a generic DAG?
Whenever we describe this, someone reasonably asks: couldn't Airflow (or Dagster, or Prefect) do all of this? It is a fair question and the honest answer is worth stating plainly, because the distinction is the entire point of the article.
A DAG orchestrator answers "when should this task run, and in what order?" It schedules, retries, backfills, and draws you a nice dependency graph. What it does not answer is "is my billion-row cumulative metric still correct now that a file measured three weeks ago just landed?" That is a data-correctness question, not a scheduling question, and no orchestrator answers it for you — you still have to build idempotent writes, dirty-range tracking, per-subject locking, and deterministic replay yourself. The DAG would just be the thing that calls the machinery we described; it is not a substitute for it.
There are also two practical mismatches for this specific problem:
- Granularity and rate. These tools are built for workflows on the order of minutes to hours. We are absorbing a few thousand files an hour, each one potentially dirtying a range and triggering a bounded recompute. Modeling every file as a DAG run drowns the scheduler in metadata for work that a single Postgres function does in milliseconds.
- Where correctness lives. We deliberately keep correctness inside the database — in transactions, upserts, and the
subject_lock. The moment the "is it correct" logic lives in an external orchestrator, it is racing the database rather than being protected by it, and the atomic "rows-and-dirtiness commit together" guarantee we rely on is gone.
None of this makes orchestrators bad — they are excellent at what they are for, and we happily use one as the scheduler that kicks off reconciliation sweeps or nightly audits. The rule we give clients: use a DAG tool to decide when work runs; use the database to guarantee the work is correct. Reaching for Airflow to solve out-of-order correctness is solving a data problem with a scheduling tool.
Design tradeoffs
The resilient design is not free. It buys correctness under disorder at the cost of write amplification and bookkeeping. The table below is the honest comparison we walk clients through before committing.
| Concern | Naive append-on-arrival | Idempotent + dirty-range reconciler |
|---|---|---|
| Duplicate file | Double-counts | No-op |
| Late / out-of-order file | Wrong aggregates | Range marked dirty, recomputed |
| Cost of a correction | Reprocess everything | Reprocess only changed ranges |
| Write path complexity | Trivial | Moderate (upsert + state table) |
| Scales to billions of rows | Only if never wrong | Yes |
Rule of thumb: if your instruments can ever disconnect, retry, or be re-run — and at scale they always can — design for disordered, at-least-once arrival from day one. Retrofitting idempotency onto a pipeline that assumed order is far more expensive than building it in.
What we'd do differently next time
Little here will surprise you if you have read this far — these are the same lessons, restated as the rules we now carry into the next build.
Reach for SQL directly, not through an ORM. ORMs come and go; SQL is forever. Unless a client specifically wants one, we default to a thin data-access layer over a full object-relational mapper — Dapper rather than Entity Framework on .NET, psycopg rather than the SQLAlchemy ORM in Python, JDBI rather than Spring Data JPA on the JVM. This pipeline lives or dies on
SECURITY DEFINERfunctions,FOR UPDATE SKIP LOCKED, and hand-tuned upserts; a heavy mapper mostly gets between you and the exact SQL you meant to write, and the schema will outlive whichever ORM was fashionable when the project started.Do the arithmetic in Python, not in the database. We have said it twice already and we will say it once more, because we learned it the expensive way: we were a little too in love with SQL window functions. A vectorized Python pass — NumPy or pandas over the rows the worker already holds in memory — ate their lunch, and a Python FLOP was usually cheaper than a Timescale FLOP. Window functions still earn their keep for the one job nothing else does as cleanly — repairing cumulative values to the right of a late insert — but they are a scalpel, not a workhorse.
Continuous aggregates are wonderful — until the data can be disArrayed. We genuinely admire what the Timescale team built, and on low-churn, in-order datasets we still reach for continuous aggregates first. But for high-volume, out-of-order, per-subject telemetry we could not make them pay off — whole-table refreshes did far too much work, and dirty-range refreshes turned into lock storms. We tried hard before giving up and moving the aggregation into the worker, and we would make that call sooner next time.