This pipeline collects academic literature from the OpenAlex API for 14 systemic risk channels in digital finance, then scores and ranks each channel by combining three normalized signals: literature volume (how many papers), citation impact (how influential the top papers are), and crisis evidence (how often the channel appears in real-world crisis events, weighted by financial losses). The final composite score uses a weighted sum (0.35 / 0.35 / 0.30) with sensitivity analysis across four alternative weight schemes.
The pipeline executes in three layers:
openalex_client.py) — HTTP transport, rate limiting, retry, paginationopenalex_search.py) — channel-wise querying, deduplication, mergingchannel_mapper.py) — channel assignment, sub-score computation, composite rankingStage 1: Data Collection
Stage 2: Channel Scoring
Internal: API Transport
Layer A: API Transport — openalex_client.py
Layer B: Data Collection — openalex_search.py
Layer C: Scoring & Ranking — channel_mapper.py
API LAYER Provides rate-limited, retry-capable HTTP access to the OpenAlex REST API. All data collection flows through this class.
Constructor. Stores configuration for polite-pool access and rate limiting.
| Parameter | Type | Default | Description |
|---|---|---|---|
| Optional[str] | None | Email address for polite pool (10x rate boost). Appended as mailto param to every request. | |
| requests_per_second | int | 10 | Maximum requests per second. Polite pool allows 10 req/s; without email, 1 req/s. |
min_delay = 1.0 / requests_per_second (e.g. 0.1s at 10 req/s)last_request_time = 0Configured OpenAlexClient instance with attributes:
| Attribute | Value |
|---|---|
self.email | Stored email string |
self.requests_per_second | Rate limit integer |
self.min_delay | Minimum seconds between requests |
self.last_request_time | 0 (no requests yet) |
BASE_URL | https://api.openalex.org (class constant) |
Internal method called before every HTTP request. Enforces minimum inter-request delay.
None (reads self.last_request_time and self.min_delay).
current_time = time.time()time_since_last = current_time - self.last_request_timetime_since_last < self.min_delay: sleep for the differenceself.last_request_time = time.time()None. Side effect: blocks the calling thread until the rate limit window has passed. Updates self.last_request_time.
Core HTTP GET method. All API calls flow through here. Handles rate limiting, polite-pool email injection, exponential backoff on errors.
| Parameter | Type | Default | Description |
|---|---|---|---|
| endpoint | str | required | API path, e.g. /works, /authors |
| params | Optional[Dict] | None | URL query parameters |
| max_retries | int | 5 | Maximum retry attempts |
params to {} if Noneself.email is set, inject params['mailto'] = self.emailurljoin(BASE_URL, endpoint)self._rate_limit()requests.get(url, params=params, timeout=30)response.json()2^attempt seconds, retry2^attempt seconds, retry2^attempt seconds, retry (or raise on last attempt)Dict[str, Any] — Parsed JSON response body from OpenAlex API.
Raises Exception after all retries fail, or requests.HTTPError on non-retryable status codes.
Primary search method used by the pipeline. Constructs a filtered, sorted, paginated query against the /works endpoint. This is the method called by search_channel().
| Parameter | Type | Default | Description |
|---|---|---|---|
| search | Optional[str] | None | Full-text search query string |
| filter_params | Optional[Dict] | None | Filter dict, e.g. {"publication_year":"2009-2026", "cited_by_count":">4"} |
| per_page | int | 200 | Results per page (clamped to max 200) |
| page | int | 1 | Page number for offset pagination |
| sort | Optional[str] | None | Sort expression, e.g. cited_by_count:desc |
| select | Optional[List[str]] | None | Fields to return (projection) |
Pipeline usage: Called with search=query, filter_params={publication_year, cited_by_count}, per_page=200, sort="cited_by_count:desc"
{'per-page': min(per_page, 200), 'page': page}search: add params['search'] = searchfilter_params: join as comma-separated key:value string into params['filter']sort: add params['sort'] = sortselect: join as comma-separated string into params['select']self._make_request('/works', params)Example constructed URL:
/works?search=systemic+risk+contagion&filter=publication_year:2009-2026,cited_by_count:>4&sort=cited_by_count:desc&per-page=200&page=1
Dict[str, Any] — OpenAlex response containing:
| Key | Description |
|---|---|
meta | {"count": N, "db_response_time_ms": ...} — total matching works |
results | List of work objects (up to per_page items) |
Fetch a single entity by ID. Not used in the main pipeline run but available for ad-hoc lookups.
| Parameter | Type | Default | Description |
|---|---|---|---|
| entity_type | str | required | Entity kind: 'works', 'authors', 'institutions', 'venues', 'concepts' |
| entity_id | str | required | OpenAlex ID (e.g. W2741809807) or external ID (DOI, ORCID) |
Constructs endpoint /{entity_type}/{entity_id} and delegates to _make_request().
Dict[str, Any] — Complete entity object from OpenAlex.
Efficient batch retrieval. Looks up multiple entities using pipe-separated ID filters in groups of 50.
| Parameter | Type | Default | Description |
|---|---|---|---|
| entity_type | str | required | Entity kind: 'works', 'authors', etc. |
| ids | List[str] | required | List of IDs to look up |
| id_field | str | 'openalex_id' | Which ID field to filter on |
ids in chunks of 50| separator{id_field}:{id1|id2|...|id50}_make_request(/{entity_type}, params={'filter':..., 'per-page':50})results arraysList[Dict[str, Any]] — All matched entity objects (may be fewer than input IDs if some not found).
Traverses all pages of a paginated endpoint. Respects OpenAlex's 10,000-result offset pagination limit.
| Parameter | Type | Default | Description |
|---|---|---|---|
| endpoint | str | required | API endpoint path |
| params | Optional[Dict] | None | Base query parameters (copied, not mutated) |
| max_results | Optional[int] | None | Stop after this many results. None = collect all. |
per-page=200, page=1_make_request()resultsmax_results reached → truncate and returnlen(all_results) ≥ meta.count → breakpage × 200 > 10000 → break (OpenAlex hard limit)List[Dict[str, Any]] — All collected result objects, up to max_results or 10,000.
Random sampling from OpenAlex. Supports reproducible samples via seed. For large samples (>10,000), uses multiple seeds with deduplication.
| Parameter | Type | Default | Description |
|---|---|---|---|
| sample_size | int | required | Number of works to sample |
| seed | Optional[int] | None | Random seed for reproducibility |
| filter_params | Optional[Dict] | None | Filters to constrain sampling universe |
Case A: sample_size ≤ 10,000 (standard path)
params = {sample: sample_size, per-page: 200, seed: seed}ceil(sample_size / 200) pagesCase B: sample_size > 10,000 (multi-seed path)
current_seed = seed + iList[Dict[str, Any]] — List of sampled work objects, deduplicated, truncated to sample_size.
Aggregation query. Returns grouped counts for a field (e.g., publication year distribution).
| Parameter | Type | Default | Description |
|---|---|---|---|
| entity_type | str | required | Entity kind: 'works', 'authors', etc. |
| group_field | str | required | Field to group by, e.g. 'publication_year', 'type' |
| filter_params | Optional[Dict] | None | Filters to constrain aggregation |
{group_by: group_field}filter_params: build comma-separated filter string_make_request(/{entity_type}, params)response['group_by']List[Dict[str, Any]] — Grouped results, each containing {key, key_display_name, count}.
DATA COLLECTION LAYER Iterates over all 14 channels defined in search_queries.json, queries OpenAlex via the client, extracts and normalizes paper records, deduplicates, and writes per-channel and merged output files.
Transforms a raw OpenAlex work object into a normalized paper record. Reconstructs abstracts from the inverted index format used by OpenAlex.
| Parameter | Type | Source | Description |
|---|---|---|---|
| work | Dict | OpenAlex API response results[] | Raw work object containing all OpenAlex fields |
Key fields read from input:
work['authorships'][*]['author']['display_name']work['concepts'][*]['display_name']work['abstract_inverted_index'] — dict of {word: [position, ...]}work['primary_location']['source']['display_name']authorships, collect each author.display_nameconcepts, collect each display_name(word, positions) in the inverted index: create tuples (position, word)primary_location → source → display_nameDict — Normalized paper record with keys:
| Key | Type | Description |
|---|---|---|
id | str | OpenAlex work ID (e.g. https://openalex.org/W...) |
title | str | Work title |
authors | List[str] | Author display names |
year | int|None | Publication year |
doi | str|None | DOI URL |
abstract | str|None | Reconstructed abstract text |
cited_by_count | int | Total citations (default 0) |
concepts | List[str] | Associated concept names |
referenced_works | List[str] | IDs of referenced works |
source | str | Always "openalex" |
type | str | Work type (article, review, etc.) |
host_venue | str | Journal or venue name |
Searches OpenAlex for all queries belonging to a single channel. Iterates through each query string, paginates results sorted by citation count, deduplicates within the channel, and stops at the per-channel limit.
| Parameter | Type | Default | Description |
|---|---|---|---|
| client | OpenAlexClient | required | Configured API client instance |
| channel_id | str | required | Channel identifier, e.g. "network_contagion" |
| channel_info | Dict | required | Channel config with name and queries (list of search strings) |
| per_channel_limit | int | required | Max papers to collect for this channel (default 200 from CLI) |
| min_citations | int | required | Minimum cited_by_count filter (default 5 from CLI) |
| year_start | int | required | Start of publication year range (default 2009) |
| year_end | int | required | End of publication year range (default 2026) |
seen_ids = set(), channel_papers = []channel_info['queries']:
len(channel_papers) ≥ per_channel_limit: stop{publication_year: "YYYY-YYYY", cited_by_count: ">N"}client.search_works(search=query, filter_params=..., per_page=remaining, sort="cited_by_count:desc")extract_paper()total_available > per_page and limit not reached:
pages_needed + 1List[Dict] — Extracted paper dicts for this channel. Each paper has the schema from extract_paper(). Deduplicated by paper ID within the channel. At most per_channel_limit papers.
Entry point for the data collection stage. Parses CLI arguments, iterates all 14 channels, saves per-channel files, merges with cross-channel deduplication, and prints summary statistics.
CLI Arguments:
| Argument | Type | Default | Description |
|---|---|---|---|
| str | required | Email for OpenAlex polite pool | |
| --per-channel-limit | int | 200 | Max papers per channel |
| --min-citations | int | 5 | Minimum citation count filter |
| --year-range | str | "2009-2026" | Publication year range (START-END) |
| --output-dir | str | ../output/data/ | Directory for output files |
File Input:
| File | Path | Schema |
|---|---|---|
| search_queries.json | ../references/search_queries.json | {"channels": {"channel_id": {"name":str, "queries":[str]}}} |
year_start, year_endsearch_queries.json, extract channels dict (14 channels)OpenAlexClient(email=...)search_channel()openalex_raw_{channel_id}.jsonmerged_papers[pid]['channels'] listopenalex_merged.jsonFiles written:
| File | Count | Content |
|---|---|---|
openalex_raw_{channel_id}.json | 14 files | Array of paper dicts for that channel |
openalex_merged.json | 1 file | Deduplicated array of paper dicts, each with added channels: [str] key listing all channels that found the paper |
Console output: Per-channel paper counts, year distribution histogram, low-result channel warnings (<20 papers).
SCORING LAYER Takes the merged paper collection from Stage 1, assigns papers to channels, computes three normalized sub-scores, and produces a composite ranking. Also runs sensitivity analysis under four alternative weight schemes.
Assigns primary and secondary channels to each paper based on its channels list (set during search). Builds the reverse mapping needed by all scoring functions. Mutates input dicts in-place.
| Parameter | Type | Source | Description |
|---|---|---|---|
| papers | List[Dict] | openalex_merged.json | Paper dicts, each with channels: [str] key listing channel IDs that found the paper (first = primary) |
channel_papers = defaultdict(list)channels = paper.get("channels", [])paper["primary_channel"] = channels[0]paper["secondary_channels"] = channels[1:] (empty list if only one channel)paper["primary_channel"] = "unassigned"channel_papers[ch].append(paper)Note: A paper appears in multiple channel_papers lists if it was found by multiple channel queries. This is intentional — it means the paper contributes to volume and impact scores for all channels that found it.
Tuple[List[Dict], defaultdict(list)]
| Element | Type | Description |
|---|---|---|
papers | List[Dict] | Same input list, mutated with added primary_channel and secondary_channels keys |
channel_papers | defaultdict(list) | Reverse mapping: channel_id → [paper, paper, ...] |
Computes how much literature exists for each channel, normalized so the channel with the most papers scores 1.0.
| Parameter | Type | Source | Description |
|---|---|---|---|
| channel_papers | Dict[str, List[Dict]] | assign_channels() output | Mapping of channel_id to list of paper dicts |
counts[ch] = len(papers)max_count = max(counts.values()) (fallback: 1 if empty)lit_volume[ch] = counts[ch] / max_countProperties: All values in [0, 1]. Exactly one channel scores 1.0 (the one with the most papers). Monotonically increasing with paper count.
Dict[str, float] — channel_id → normalized volume score [0, 1]
The channel with the most papers gets 1.0; others are proportional fractions.
Measures how influential each channel's top papers are. Uses the mean citation count of the top-10 most-cited papers per channel, normalized across channels.
| Parameter | Type | Source | Description |
|---|---|---|---|
| channel_papers | Dict[str, List[Dict]] | assign_channels() output | Mapping of channel_id to list of paper dicts, each paper having cited_by_count |
cited_by_count descendingsum(cited_by_count for top 10) / 10max_meanmax_mean == 0: set to 1 (prevent division by zero)cit_impact[ch] = channel_mean[ch] / max_meanEdge cases:
mean_cites = 0Dict[str, float] — channel_id → normalized citation impact [0, 1]
The channel whose top-10 papers have the highest mean citations gets 1.0.
Measures how prominently each channel appears in real-world crisis events, weighted by financial losses. Events with larger losses contribute more weight (via log-scale).
| Parameter | Type | Source | Description |
|---|---|---|---|
| crisis_events | List[Dict] | crisis_chronology.json ["crisis_events"] | Event dicts with channels_activated, losses_usd, etc. |
| all_channel_ids | Set[str] | Union of search_queries.json keys and channel_papers keys | Complete set of valid channel IDs |
Crisis event schema (each event):
| Key | Type | Example |
|---|---|---|
event | str | "Mt. Gox Collapse" |
channels_activated | List[str] | ["counterparty_concentration", "information_asymmetry"] |
losses_usd | str | "460000000" or "undetermined" |
losses_usd strings that are valid floatslosses_usd. If non-numeric (e.g. "undetermined") → use median losslog10(losses_usd)losses_usd ≤ 0: weight = 0channels_activated (or channels fallback):
all_channel_ids: crisis_counts[ch] += weightall_channel_idscrisis_ev[ch] = crisis_counts[ch] / max(crisis_counts)Edge cases:
losses_usd = "undetermined" → uses median of all known losseslosses_usd = "0" or negative → weight = 0Dict[str, float] — channel_id → normalized crisis evidence [0, 1]
The channel most implicated in high-loss crisis events scores 1.0.
Entry point for the scoring stage. Loads all data, calls scoring functions, computes weighted composite scores, produces ranked output, and runs sensitivity analysis under four weight schemes.
CLI Arguments:
| Argument | Type | Default | Description |
|---|---|---|---|
| --input | str | ../output/data/openalex_merged.json | Merged papers file from Stage 1 |
| --queries | str | ../references/search_queries.json | Channel definitions for names and IDs |
| --crisis | str | ../references/crisis_chronology.json | Crisis events data |
| --output | str | ../output/data/channel_rankings.json | Output rankings file |
File Inputs:
| File | Key Fields Used |
|---|---|
| openalex_merged.json | channels, cited_by_count per paper |
| search_queries.json | channels.{id}.name for display names |
| crisis_chronology.json | crisis_events[].channels_activated, losses_usd |
assign_channels(papers) → get reverse mappingcompute_literature_volume(channel_papers)compute_citation_impact(channel_papers)compute_crisis_evidence(crisis_events, all_channel_ids)paper_count, mean_top10_citations (rounded to 1 decimal), and crisis_event_count| Scheme | Wlit | Wcit | Wcrisis |
|---|---|---|---|
| primary | 0.35 | 0.35 | 0.30 |
| equal | 0.333 | 0.333 | 0.333 |
| crisis_dominant | 0.25 | 0.25 | 0.50 |
| literature_dominant | 0.50 | 0.25 | 0.25 |
For each scheme, ranks are independently computed from the reweighted composite scores.
Files written:
| File | Schema |
|---|---|
channel_rankings.json |
|
sensitivity_analysis.json |
|
Console output: Formatted rankings table with rank, score, paper count, citation impact, crisis evidence, and channel name for each of the 14 channels.
| Stage | Input File(s) | Script / Function | Output File(s) |
|---|---|---|---|
| 1. Data Collection | references/search_queries.json | openalex_search.py → main() | output/data/openalex_raw_network_contagion.json |
| (OpenAlex API) | output/data/openalex_raw_liquidity_spirals.json output/data/openalex_raw_stablecoin_runs.json output/data/openalex_raw_oracle_manipulation.json output/data/openalex_raw_composability_risk.json output/data/openalex_raw_liquidation_cascades.json output/data/openalex_raw_counterparty_concentration.json output/data/openalex_raw_regulatory_contagion.json output/data/openalex_raw_gateway_risk.json output/data/openalex_raw_governance_failure.json output/data/openalex_raw_information_asymmetry.json output/data/openalex_raw_rwa_transmission.json output/data/openalex_raw_bridge_vulnerability.json output/data/openalex_raw_validator_concentration.json |
||
| output/data/openalex_merged.json | |||
| 2. Scoring | output/data/openalex_merged.json references/search_queries.json references/crisis_chronology.json |
channel_mapper.py → main() | output/data/channel_rankings.json |
| output/data/sensitivity_analysis.json |
| Caller | Calls | Purpose |
|---|---|---|
| openalex_search.main() | OpenAlexClient.__init__() | Create API client |
| openalex_search.main() | search_channel() × 14 | Fetch papers per channel |
| search_channel() | client.search_works() | Query OpenAlex /works |
| search_channel() | extract_paper() | Normalize each result |
| client.search_works() | client._make_request() | HTTP GET with retry |
| client._make_request() | client._rate_limit() | Throttle requests |
| channel_mapper.main() | assign_channels() | Map papers ↔ channels |
| channel_mapper.main() | compute_literature_volume() | Score: paper count |
| channel_mapper.main() | compute_citation_impact() | Score: top-10 citations |
| channel_mapper.main() | compute_crisis_evidence() | Score: crisis events |
| Scheme | Wliterature | Wcitation | Wcrisis | Emphasis |
|---|---|---|---|---|
| primary | 0.35 | 0.35 | 0.30 | Balanced academic + crisis |
| equal | 0.333 | 0.333 | 0.333 | Equal weight baseline |
| crisis_dominant | 0.25 | 0.25 | 0.50 | Prioritize real-world evidence |
| literature_dominant | 0.50 | 0.25 | 0.25 | Prioritize research volume |
Pipeline Documentation — Systemic Risk Channel Scoring — Generated 2026-03-25