Data Pipeline Scripts — Systemic Risk Channels in Digital Finance
Review date: 2026-03-25 • Reviewer: Automated hostile analysis • Scope: 3 scripts, 17 functions, ~750 LOC
| Severity | Count | Description |
|---|---|---|
| BUG | 5 | Confirmed bugs that produce wrong output or crash under reachable conditions |
| LOGIC | 7 | Logic issues, inconsistencies, or design problems that may produce surprising results |
| STYLE | 8 | Stylistic concerns, missing guards, or fragile patterns |
| INFO | 6 | Informational observations, notes on design tradeoffs |
| PASS | 5 | Functions with no issues found |
| Script | Function | Verdict |
|---|---|---|
| channel_mapper.py | compute_literature_volume | PASS |
| compute_citation_impact | CONCERN | |
| compute_crisis_evidence | PASS | |
| assign_channels | CONCERN | |
| main | CONCERN | |
| openalex_search.py | extract_paper | PASS |
| search_channel | CONCERN | |
| main | PASS | |
| openalex_client.py | __init__ | PASS |
| _rate_limit | CONCERN | |
| _make_request | FAIL | |
| search_works | CONCERN | |
| get_entity | PASS | |
| batch_lookup | CONCERN | |
| paginate_all | FAIL | |
| sample_works | FAIL | |
| group_by | CONCERN |
The codebase is functional for the happy path but contains several confirmed bugs in the API client layer
(missing pagination in sample_works, caller-dict mutation in paginate_all, incomplete exception
handling in _make_request) and a numerical inconsistency in the ranking pipeline where
mean_top10 in main() uses a different divisor than compute_citation_impact().
Additionally, assign_channels mutates its input in-place without documentation.
The scripts would benefit from defensive hardening before any production or reproducibility-critical use.
409 lines • 5 functions • Maps papers to systemic-risk channels and computes composite rankings
The function correctly computes paper_count / max_paper_count per channel, normalizing to [0, 1].
The guard max_count = max(...) if counts else 1 at line 52 prevents both a max() crash on empty input
and division by zero.
If channel_papers is empty, max_count falls back to 1 and the function returns an empty dict
(the comprehension at line 54 iterates over nothing). If all channels have zero papers, max_count would be 0 —
but this cannot happen because len(papers) of an empty list is 0 and the channel would still be in the dict.
Actually, wait: if a channel maps to an empty list, len([]) == 0, so all counts could be 0, making
max_count = 0. This would cause division by zero.
However, in practice this cannot be reached because channel_papers is built by assign_channels,
which only adds a channel key when there is at least one paper for it (line 187). So the guard is sufficient for the
actual call site, but the function is not self-contained against adversarial input.
Lines 51–54
Unlike compute_citation_impact (which explicitly guards max_mean == 0 at line 83),
this function has no such guard. If the function were ever called with channels that map to empty lists,
it would raise ZeroDivisionError. Add a if max_count == 0: max_count = 1 guard for consistency.
# Current (line 52):
max_count = max(counts.values()) if counts else 1
# Suggested:
max_count = max(counts.values()) if counts else 1
if max_count == 0:
max_count = 1
Line 52
Line 77 always divides by 10, even when a channel has fewer than 10 papers. If a channel has 3 papers
with 100, 50, and 30 citations, the "mean of top 10" is (100+50+30)/10 = 18, not 60.
The docstring says "mean citation count of top-10 papers (sum / 10, even if fewer exist)", so this is
documented behavior — but it systematically penalizes channels with few papers, which may not be the intent.
# Line 77: always divides by 10
mean_cites = sum(p.get("cited_by_count", 0) for p in top_10) / 10
Line 77
This function divides by hardcoded 10. But in main() at line 289, a separate mean_top10 computation
divides by len(top_papers_cites) — the actual number of papers in the slice (capped at 10).
For a channel with 3 papers, this function computes sum/10 = 18 while main() computes
sum/3 = 60. The composite score uses this function, but the JSON output reports the other value.
A reader comparing citation_impact against mean_top10_citations will find they do not reconcile.
# Line 77 (compute_citation_impact): always /10
mean_cites = sum(...) / 10
# Line 289 (main): /len(top_papers_cites)
mean_top10 = sum(top_papers_cites) / len(top_papers_cites) if top_papers_cites else 0
Lines 77, 289
Lines 82–84 correctly handle the case where all channels have zero citations
by guarding max_mean == 0 with a fallback to 1.
The function correctly computes weighted crisis evidence using log10(losses_usd), handles missing/non-numeric
losses via median fallback, warns about unknown channel IDs, ensures all channels have an entry, and normalizes to [0, 1]
with a zero guard. This is the most defensively written function in the codebase.
Line 132 guards loss_val > 0 before calling log10, preventing a ValueError
on zero or negative values. However, a loss value of "0" will parse as 0.0 and get weight 0,
effectively ignoring the event for that channel. This is arguably correct (zero-loss event has zero weight),
but worth noting for data quality.
weight = math.log10(loss_val) if loss_val > 0 else 0
Line 132
The function iterates over crisis_events twice: once to accumulate weights (lines 125–137)
and once to collect unknown channel IDs (lines 141–145). These could be merged into a single pass.
Not a correctness issue, but O(2n) instead of O(n) with a larger constant factor from redundant
.get() calls.
Lines 125–147
Lines 180–184 add primary_channel and secondary_channels keys directly to the
input paper dicts. The function signature and docstring say "Returns: Tuple of (papers_with_assignments, ...)"
which implies it returns new data, but the returned papers list is the same list
with the same dict objects, now mutated.
This means the caller's original data is silently altered. In main() (line 259), the papers
list loaded from JSON is passed in and permanently modified. Any subsequent code that inspects the raw paper data
will see these injected keys. This is a side-effect contract violation: the function's return type suggests
transformation, but the implementation is mutation.
# Lines 180-184: modifies the dict objects from the input list
paper["primary_channel"] = channels[0]
paper["secondary_channels"] = channels[1:] if len(channels) > 1 else []
Lines 180–184
When a paper has no channels (line 183), it gets primary_channel = "unassigned" but the paper
is not added to channel_papers["unassigned"] because the for ch in channels loop
at line 186 iterates over an empty list. This means "unassigned" papers are invisible to all downstream
scoring functions. This is likely intentional but is not documented.
Lines 183–187
This is the downstream half of the inconsistency noted above. At line 289, mean_top10 divides by
len(top_papers_cites), which is the actual number of papers in the top-10 slice (could be 1–10).
The citation_impact sub-score stored in the same output record was computed by
compute_citation_impact which always divides by 10.
The JSON output therefore contains two numbers that claim to represent the same concept
(citation_impact and mean_top10_citations) but use different denominators.
For a channel with 3 papers, the reported mean_top10_citations will be 3.33x higher than
the value that actually drove the composite score.
# Line 289
mean_top10 = sum(top_papers_cites) / len(top_papers_cites) if top_papers_cites else 0
Line 289
Lines 285–289 re-sort and re-slice the top-10 papers per channel, duplicating what
compute_citation_impact already computed at lines 72–77. The values are not reused
from the function's return; instead, a fresh computation is done with a different formula.
This is both wasteful and the source of the inconsistency above.
Lines 285–289 vs. 72–77
Lines 311–317 mutate the dicts in rankings to add rank and round scores.
This works correctly but means the list is only valid after this loop. If any future code
moves above this loop, it will see unrounded values. Consider building the final dicts in one pass.
Lines 311–317
Lines 328–362 re-fetch lit_volume, cit_impact, crisis_ev for each channel
and recompute composite scores under multiple weighting schemes. This duplicates the work done at lines 276–282.
Not a bug, but a maintenance hazard: if the scoring formula changes, both locations must be updated.
Lines 328–362
The function validates file existence before opening, uses os.makedirs with exist_ok=True,
and exits with informative messages on missing inputs. The graceful degradation when
crisis_chronology.json is missing (lines 248–256) is well handled.
337 lines • 3 functions • Searches OpenAlex per channel, de-duplicates, and saves results
The function correctly extracts and normalizes fields from an OpenAlex work object. All .get() calls
have sensible defaults. The abstract reconstruction from abstract_inverted_index (lines 57–65)
correctly sorts by position and joins with spaces.
The abstract reconstruction builds a list of (position, word) tuples, then sorts them. For a typical
abstract of ~200 words, this is negligible. For pathological inputs with very long abstracts, the sort dominates.
No practical concern.
Lines 60–65
Line 76: work.get("title", "") returns an empty string when the work has no title.
Downstream code in search_channel does not check for this, so a paper with an empty title
and a valid ID will be included in results. This is probably fine for data collection but could
confuse humans reviewing the output.
Line 76
Line 125 constructs the filter as f">{min_citations - 1}" to mean "greater than or equal to min_citations".
For min_citations=5, this produces ">4" which is correct. But the OpenAlex API actually supports
the >= operator directly via ">4" or "5-" range syntax. The current approach
works but is an unnecessarily clever encoding of >=.
f">{min_citations - 1}" if min_citations > 0 else None
Line 125
Lines 137–139 and 173–175 catch bare Exception and continue/break. This means any bug in
extract_paper or search_works (e.g., a KeyError) would be silently swallowed
and logged as an "Error querying OpenAlex". Consider catching requests.exceptions.RequestException instead.
except Exception as e:
print(f" Error querying OpenAlex: {e}")
continue
Lines 137–139
Line 160 hard-caps pagination at 4 additional pages with the comment "Cap at 4 additional pages to be polite".
With per_page=200, this means a maximum of 200 + 4*200 = 1000 results per query.
This is a reasonable rate-limiting decision but is not exposed as a parameter or documented
in the function's docstring. A caller might expect per_channel_limit=2000 to work,
but it will silently cap at ~1000 per query (though multiple queries per channel can accumulate more).
Line 160
The function correctly parses arguments, validates inputs, iterates channels, de-duplicates across channels
via the merged_papers dict, and produces both per-channel and merged output files. The summary
statistics and low-result-channel warnings are useful operational output.
Line 224: args.year_range.split("-") splits on every hyphen. An input like
"2009-2026" splits into ["2009", "2026"] (correct), but "2009-2026-2030"
would give 3 elements and int() conversion would fail (caught by the except ValueError).
Edge case: "2009-" splits into ["2009", ""], and int("") raises
ValueError, also caught. So the error handling is actually robust here despite the simple split.
Lines 223–228
Lines 279–286: when a paper appears in multiple channels, its channels list preserves
the order in which channels were iterated from channels.items(). In Python 3.7+, this is
insertion order of the JSON object keys, which depends on the search_queries.json file structure.
This is fine but means the "primary channel" assignment in channel_mapper.py is implicitly
determined by JSON key order.
Lines 279–286
338 lines • 9 methods • OpenAlex API client with rate limiting, retries, pagination
Simple and correct. Computes min_delay from requests_per_second, initializes
last_request_time to 0. No issues.
Passing requests_per_second=0 would cause ZeroDivisionError at line 33:
self.min_delay = 1.0 / requests_per_second. Negative values would set a negative delay,
effectively disabling rate limiting (which the time_since_last < self.min_delay check
would never trigger). Not a realistic concern given the default of 10 and the constructor's usage,
but worth a guard for a reusable library class.
self.min_delay = 1.0 / requests_per_second # ZeroDivisionError if 0
Line 33
Line 42 sets self.last_request_time = time.time() after the potential sleep.
This means the recorded time is the moment after the sleep, not the moment the request fires.
If time.sleep overshoots (common on Windows), the next call's
time_since_last will be measured from an inflated timestamp, making the actual delay
shorter than min_delay. In practice, the overshoot is small (<1ms on modern systems),
and the rate limit is 100ms between requests, so this is unlikely to cause API throttling.
But for correctness, the timestamp should be set immediately before the request, not after the sleep.
def _rate_limit(self):
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.min_delay:
time.sleep(self.min_delay - time_since_last)
self.last_request_time = time.time() # set after sleep, not before request
Lines 36–42
Line 91 catches requests.exceptions.Timeout for retry, but does not catch
requests.exceptions.ConnectionError. A DNS failure, refused connection, or network
drop will raise ConnectionError, which is not caught by the retry logic.
It will propagate immediately as an unhandled exception, bypassing all retry logic.
This is a confirmed bug: network interruptions are a common transient failure in long-running API scraping sessions. The retry logic exists precisely for these scenarios but fails to cover them.
except requests.exceptions.Timeout: # line 91
if attempt < max_retries - 1:
wait_time = 2 ** attempt
...
# Missing:
except requests.exceptions.ConnectionError:
# should also retry
Lines 91–97
Lines 77–81: on a 403 response, the code sleeps and retries but does not check whether this is
the last attempt. If all 5 retries get 403, the loop exits and falls through to line 99:
raise Exception(f"Failed after {max_retries} retries"). This is technically correct,
but the generic exception message loses the context that the server consistently returned 403.
The response body (which often contains rate-limit reset times or error details) is discarded.
Lines 77–81, 99
Line 68: urljoin(self.BASE_URL, endpoint) where BASE_URL = "https://api.openalex.org"
and endpoint = "/works". Because BASE_URL has no trailing slash, urljoin
replaces the path: urljoin("https://api.openalex.org", "/works") produces
"https://api.openalex.org/works" which is correct. However, if endpoint
were a relative path like "works" (no leading slash), urljoin would produce
"https://api.openalex.org/works" — still correct by coincidence because the base has no path.
But with a base like "https://api.openalex.org/v1", urljoin(..., "works") would
produce "https://api.openalex.org/works", dropping /v1. Since all callers use
leading-slash endpoints, this is fine today but fragile.
Line 68
Line 125: min(per_page, 200) silently clamps the value. If a caller passes per_page=500,
they will get 200 results without any warning. This is correct behavior (the API rejects values >200),
but the silent clamping can be confusing during debugging.
Line 125
Line 133: ','.join([f"{k}:{v}" for k, v in filter_params.items()]) will produce
unexpected results if any value contains a comma (e.g., {"concept.id": "C1,C2"}
would produce "concept.id:C1,C2" which the API might interpret as two separate filters).
The OpenAlex API uses commas to separate filter conditions, so embedded commas in values would
be ambiguous. In practice, the callers only pass simple values, so this is not triggered.
Line 133
Constructs the endpoint and delegates to _make_request. No issues.
The entity_id is not URL-encoded, but OpenAlex IDs are URL-safe by construction
(e.g., "W2741809807" or "https://openalex.org/W2741809807").
DOIs containing slashes would need encoding, but requests handles URL construction
via params, and the DOI appears in the path here — however, OpenAlex accepts
full URL-form DOIs in this position. Acceptable.
The docstring says "up to 50 per batch" and the code processes in chunks of 50. However, if the API returns fewer results than requested (e.g., some IDs are invalid or deleted), the function silently returns fewer results than IDs were provided. There is no warning and no way for the caller to identify which IDs failed. For a lookup function, this is a surprising contract.
Lines 178–190
Line 187: response = self._make_request(...) is called without try/except. If one
batch fails (e.g., after exhausting retries), the entire batch_lookup call fails and
all previously collected results are lost. Consider accumulating results with per-batch error handling.
Line 187
Lines 212–213 directly modify the params dict passed by the caller:
params['per-page'] = 200
params['page'] = 1
And line 234 increments it during pagination:
params['page'] += 1
If the caller passes a dict and then inspects it after the call, they will find it has been
modified with per-page set to 200 and page set to whatever the last
page was. This is a side-effect bug. The params is None check at line 209 creates
a new empty dict only when None is passed; any non-None dict is mutated in place.
Fix: Add params = dict(params) after the None check to create a shallow copy.
Lines 212–213, 234
The OpenAlex API returns a maximum of 10,000 results via page-based pagination
(page * per-page must be ≤ 10,000). Beyond that, the API returns an error.
This function has no guard for this limit and will attempt to paginate beyond page 50
(50 * 200 = 10,000) if total_count exceeds 10,000. The API will return an error
which _make_request will propagate as an exception, crashing the pagination.
The docstring claims to "paginate through all results" but this is impossible for result sets larger than 10,000 via page-based pagination. The OpenAlex API requires cursor-based pagination for larger sets, which this function does not implement.
Lines 217–234
The function has two code paths: one for sample_size > 10000 (line 268, the large-sample
multi-request path) and one for the else branch (line 291, a single request). The single-request path
sets per-page: 200 at line 257, so the API returns at most 200 results regardless of
sample_size.
If a caller requests sample_size=5000, the code takes the else branch (5000 ≤ 10000),
makes a single API call with sample=5000, per-page=200, and returns at most 200 results.
The remaining 4800 results are silently lost.
# Line 268: only triggers for >10000
if sample_size > 10000:
# multi-request path with pagination...
else:
# Line 292: single request, max 200 results
response = self._make_request('/works', params)
return response.get('results', [])
The fix requires adding pagination in the else branch, or lowering the threshold to 200 instead of 10000.
Lines 256–293
Even the large-sample path (lines 268–290) makes each request with per-page=200,
but sets sample to up to 10,000. This means each request returns only 200 of the
10,000 sampled works. The remaining 9,800 are inaccessible because the /works?sample=N
endpoint does not support pagination (each request is an independent random draw). So even the
"large sample" path returns at most 200 results per iteration, and the loop runs
(sample_size // 10000) + 1 times, yielding at most ~200 * (N/10000 + 1)
results for a request of N > 10,000.
Lines 268–290
Line 321: response.get('group_by', []) assumes the API returns a group_by key.
The OpenAlex API actually returns the grouped data under the key "group_by" when the
request includes the group_by parameter. However, if the API changes its response structure
or returns an error response (which would lack this key), the function silently returns an empty list
with no indication of failure. This is a minor robustness concern.
Line 321
The sample_works function in openalex_client.py silently returns fewer results
than requested for any sample_size between 201 and 10,000. If openalex_search.py or any
other script uses this function to collect a representative sample, the sample will be systematically
undersized. There is no warning, no error, and the caller's only clue is that len(results) < sample_size.
Impact: Any statistical analysis based on the "sample" would have an undisclosed sample-size bias. In a research context, this is a data integrity issue.
The three scripts have inconsistent mutation contracts:
assign_channels (channel_mapper.py:177–189) mutates its input paper dicts in-placepaginate_all (openalex_client.py:212–213) mutates its input params dictextract_paper (openalex_search.py:33–87) correctly creates a new dictmain in openalex_search.py (line 282) creates new dicts via dict(paper)The inconsistency means some functions are safe to call with shared data and others are not. A developer working on one script may not realize that calling functions in another script will mutate their data.
_make_request has retry logic for Timeout and HTTP 403/5xx, but not ConnectionError.
Meanwhile, search_channel wraps all calls in broad except Exception handlers.
This means ConnectionError is "handled" in search_channel (by silently skipping the query), but
if paginate_all or batch_lookup hit a ConnectionError, it propagates uncaught.
The error handling strategy is split between two layers with gaps in between.
The channel_rankings.json output contains both citation_impact (normalized, driven by
compute_citation_impact with divisor=10) and mean_top10_citations (raw, computed in
main() with divisor=len). For a channel with 3 papers having 300 total citations in top-3:
citation_impact is based on 300/10 = 30 (then normalized)mean_top10_citations reports 300/3 = 100.0
A consumer of the JSON output who tries to reverse-engineer the composite score from the reported
mean_top10_citations will get the wrong answer. The two numbers claim to describe the
same thing but disagree by a factor of up to 10x.
openalex_client.py uses type hints throughout (Optional[str], Dict[str, Any], etc.),
but the other two scripts use none. This creates an inconsistent developer experience and makes
static analysis tools less effective for the pipeline as a whole.
All three scripts use print() for both informational output and error reporting.
There is no way to control verbosity, redirect errors separately, or integrate with a logging
aggregator. The print(f"Warning: ...") pattern in compute_crisis_evidence
is indistinguishable from normal output at the stream level.
| ID | Severity | Script | Function | Line(s) | Description |
|---|---|---|---|---|---|
| B1 | BUG | openalex_client.py | sample_works | 256–293 | No pagination for sample_size 201–10000; returns max 200 results |
| B2 | BUG | openalex_client.py | paginate_all | 212–234 | Mutates caller's params dict in-place (adds per-page, page keys) |
| B3 | BUG | openalex_client.py | _make_request | 91–97 | Catches Timeout but not ConnectionError; network drops bypass retry logic |
| B4 | BUG | openalex_client.py | paginate_all | 217–234 | No guard for OpenAlex 10,000-result pagination limit; crashes beyond page 50 |
| B5 | BUG | openalex_client.py | sample_works | 268–290 | Large-sample path also limited to 200 results per request due to per-page cap |
| L1 | LOGIC | channel_mapper.py | main / compute_citation_impact | 77, 289 | mean_top10 uses different divisors: hardcoded 10 vs. len(top_papers_cites) |
| L2 | LOGIC | channel_mapper.py | assign_channels | 177–189 | Mutates input paper dicts in-place; return type suggests transformation |
| L3 | LOGIC | channel_mapper.py | compute_citation_impact | 77 | Hardcoded /10 penalizes channels with fewer than 10 papers |
| L4 | LOGIC | openalex_client.py | _make_request | 77–81 | 403 exhaustion discards response body; generic exception loses context |
| L5 | LOGIC | openalex_client.py | batch_lookup | 178–190 | Silent truncation when IDs are invalid; no caller feedback on missing results |
| L6 | LOGIC | openalex_client.py | _rate_limit | 36–42 | Timestamp set after sleep instead of before request; minor drift potential |
| L7 | LOGIC | openalex_client.py | group_by | 321 | Silently returns empty list on unexpected API response structure |
| Priority | Action | Effort |
|---|---|---|
| 1 |
Fix sample_works pagination.
Add a pagination loop for the 201–10,000 range, or set per-page equal to min(sample_size, 200)
and paginate through the results with incrementing page numbers and the same seed.
|
Small |
| 2 |
Fix paginate_all mutation.
Add params = dict(params) after the None check to avoid mutating the caller's dict.
|
Trivial |
| 3 |
Add ConnectionError to _make_request retry.
Change the except clause to catch (requests.exceptions.Timeout, requests.exceptions.ConnectionError).
|
Trivial |
| 4 | Reconcile mean_top10 divisors. Decide whether the scoring function or the display function has the correct semantics, and make them consistent. Document the chosen behavior. | Small |
| 5 |
Add 10,000-result guard to paginate_all.
Either cap at page 50 with a warning, or implement cursor-based pagination for large result sets.
|
Medium |
| 6 |
Document mutation in assign_channels.
Either copy the dicts (paper = dict(paper)) or explicitly document the in-place mutation contract.
|
Trivial |
| 7 |
Replace print() with logging.
Use Python's logging module with appropriate levels (INFO, WARNING, ERROR) for all three scripts.
|
Medium |
Generated by automated hostile code review • 2026-03-25 • 3 scripts • 17 functions • ~750 lines analyzed • 5 BUG • 7 LOGIC • 8 STYLE • 6 INFO • 5 PASS