Pipeline Documentation: Systemic Risk Channel Scoring
Pipeline Overview
This pipeline collects academic literature from the OpenAlex API for 14 systemic risk channels in digital finance, then scores and ranks each channel by combining three normalized signals: literature volume (how many papers), citation impact (how influential the top papers are), and crisis evidence (how often the channel appears in real-world crisis events, weighted by financial losses). The final composite score uses a weighted sum (0.35 / 0.35 / 0.30) with sensitivity analysis across four alternative weight schemes.
The pipeline executes in three layers:
- API Layer (
openalex_client.py) — HTTP transport, rate limiting, retry, pagination - Data Collection Layer (
openalex_search.py) — channel-wise querying, deduplication, merging - Scoring Layer (
channel_mapper.py) — channel assignment, sub-score computation, composite ranking
Data Flow Diagram
Stage 1: Data Collection
14 channels, 4 queries each
search_channel() × 14
× 14 per-channel files
deduplicated, with channels[]
Stage 2: Channel Scoring
assign + score + rank
rankings[] + weights + metadata
4 weight schemes, ranks per scheme
Internal: API Transport
OpenAlexClient class
https://api.openalex.org
Table of Contents (Pipeline Execution Order)
Layer A: API Transport — openalex_client.py
- 1__init__(email, requests_per_second)
- 2_rate_limit()
- 3_make_request(endpoint, params, max_retries)
- 4search_works(search, filter_params, ...)
- 5get_entity(entity_type, entity_id)
- 6batch_lookup(entity_type, ids, id_field)
- 7paginate_all(endpoint, params, max_results)
- 8sample_works(sample_size, seed, filter_params)
- 9group_by(entity_type, group_field, filter_params)
Layer B: Data Collection — openalex_search.py
- 10extract_paper(work)
- 11search_channel(client, channel_id, channel_info, ...)
- 12main() — data collection orchestrator
Layer C: Scoring & Ranking — channel_mapper.py
Layer A: API Transport — openalex_client.py
API LAYER Provides rate-limited, retry-capable HTTP access to the OpenAlex REST API. All data collection flows through this class.
1 OpenAlexClient.__init__(email, requests_per_second)
Constructor. Stores configuration for polite-pool access and rate limiting.
INPUT
| Parameter | Type | Default | Description |
|---|---|---|---|
| Optional[str] | None | Email address for polite pool (10x rate boost). Appended as mailto param to every request. | |
| requests_per_second | int | 10 | Maximum requests per second. Polite pool allows 10 req/s; without email, 1 req/s. |
CALCULATION
- Computes
min_delay = 1.0 / requests_per_second(e.g. 0.1s at 10 req/s) - Initializes
last_request_time = 0
OUTPUT
Configured OpenAlexClient instance with attributes:
| Attribute | Value |
|---|---|
self.email | Stored email string |
self.requests_per_second | Rate limit integer |
self.min_delay | Minimum seconds between requests |
self.last_request_time | 0 (no requests yet) |
BASE_URL | https://api.openalex.org (class constant) |
2 OpenAlexClient._rate_limit()
Internal method called before every HTTP request. Enforces minimum inter-request delay.
INPUT
None (reads self.last_request_time and self.min_delay).
CALCULATION
- Read current time:
current_time = time.time() - Compute elapsed:
time_since_last = current_time - self.last_request_time - If
time_since_last < self.min_delay: sleep for the difference - Update
self.last_request_time = time.time()
OUTPUT
None. Side effect: blocks the calling thread until the rate limit window has passed. Updates self.last_request_time.
3 OpenAlexClient._make_request(endpoint, params, max_retries)
Core HTTP GET method. All API calls flow through here. Handles rate limiting, polite-pool email injection, exponential backoff on errors.
INPUT
| Parameter | Type | Default | Description |
|---|---|---|---|
| endpoint | str | required | API path, e.g. /works, /authors |
| params | Optional[Dict] | None | URL query parameters |
| max_retries | int | 5 | Maximum retry attempts |
CALCULATION
- Default
paramsto{}if None - If
self.emailis set, injectparams['mailto'] = self.email - Build URL:
urljoin(BASE_URL, endpoint) - For each attempt 0..max_retries-1:
- Call
self._rate_limit() - Execute
requests.get(url, params=params, timeout=30) - HTTP 200 → return
response.json() - HTTP 403 (rate limited) → sleep
2^attemptseconds, retry - HTTP 5xx (server error) → sleep
2^attemptseconds, retry - Other 4xx → raise immediately (no retry)
- Timeout / ConnectionError → sleep
2^attemptseconds, retry (or raise on last attempt)
- Call
- If all retries exhausted → raise Exception
OUTPUT
Dict[str, Any] — Parsed JSON response body from OpenAlex API.
Raises Exception after all retries fail, or requests.HTTPError on non-retryable status codes.
4 OpenAlexClient.search_works(search, filter_params, per_page, page, sort, select)
Primary search method used by the pipeline. Constructs a filtered, sorted, paginated query against the /works endpoint. This is the method called by search_channel().
INPUT
| Parameter | Type | Default | Description |
|---|---|---|---|
| search | Optional[str] | None | Full-text search query string |
| filter_params | Optional[Dict] | None | Filter dict, e.g. {"publication_year":"2009-2026", "cited_by_count":">4"} |
| per_page | int | 200 | Results per page (clamped to max 200) |
| page | int | 1 | Page number for offset pagination |
| sort | Optional[str] | None | Sort expression, e.g. cited_by_count:desc |
| select | Optional[List[str]] | None | Fields to return (projection) |
Pipeline usage: Called with search=query, filter_params={publication_year, cited_by_count}, per_page=200, sort="cited_by_count:desc"
CALCULATION
- Build params dict:
{'per-page': min(per_page, 200), 'page': page} - If
search: addparams['search'] = search - If
filter_params: join as comma-separatedkey:valuestring intoparams['filter'] - If
sort: addparams['sort'] = sort - If
select: join as comma-separated string intoparams['select'] - Delegate to
self._make_request('/works', params)
Example constructed URL:
/works?search=systemic+risk+contagion&filter=publication_year:2009-2026,cited_by_count:>4&sort=cited_by_count:desc&per-page=200&page=1
OUTPUT
Dict[str, Any] — OpenAlex response containing:
| Key | Description |
|---|---|
meta | {"count": N, "db_response_time_ms": ...} — total matching works |
results | List of work objects (up to per_page items) |
5 OpenAlexClient.get_entity(entity_type, entity_id)
Fetch a single entity by ID. Not used in the main pipeline run but available for ad-hoc lookups.
INPUT
| Parameter | Type | Default | Description |
|---|---|---|---|
| entity_type | str | required | Entity kind: 'works', 'authors', 'institutions', 'venues', 'concepts' |
| entity_id | str | required | OpenAlex ID (e.g. W2741809807) or external ID (DOI, ORCID) |
CALCULATION
Constructs endpoint /{entity_type}/{entity_id} and delegates to _make_request().
OUTPUT
Dict[str, Any] — Complete entity object from OpenAlex.
6 OpenAlexClient.batch_lookup(entity_type, ids, id_field)
Efficient batch retrieval. Looks up multiple entities using pipe-separated ID filters in groups of 50.
INPUT
| Parameter | Type | Default | Description |
|---|---|---|---|
| entity_type | str | required | Entity kind: 'works', 'authors', etc. |
| ids | List[str] | required | List of IDs to look up |
| id_field | str | 'openalex_id' | Which ID field to filter on |
CALCULATION
- Iterate over
idsin chunks of 50 - For each chunk: join with
|separator - Construct filter:
{id_field}:{id1|id2|...|id50} - Call
_make_request(/{entity_type}, params={'filter':..., 'per-page':50}) - Accumulate all
resultsarrays
OUTPUT
List[Dict[str, Any]] — All matched entity objects (may be fewer than input IDs if some not found).
7 OpenAlexClient.paginate_all(endpoint, params, max_results)
Traverses all pages of a paginated endpoint. Respects OpenAlex's 10,000-result offset pagination limit.
INPUT
| Parameter | Type | Default | Description |
|---|---|---|---|
| endpoint | str | required | API endpoint path |
| params | Optional[Dict] | None | Base query parameters (copied, not mutated) |
| max_results | Optional[int] | None | Stop after this many results. None = collect all. |
CALCULATION
- Copy params, set
per-page=200,page=1 - Loop:
- Fetch page via
_make_request() - Extend accumulator with
results - If
max_resultsreached → truncate and return - If
len(all_results) ≥ meta.count→ break - Increment page
- If
page × 200 > 10000→ break (OpenAlex hard limit)
- Fetch page via
OUTPUT
List[Dict[str, Any]] — All collected result objects, up to max_results or 10,000.
8 OpenAlexClient.sample_works(sample_size, seed, filter_params)
Random sampling from OpenAlex. Supports reproducible samples via seed. For large samples (>10,000), uses multiple seeds with deduplication.
INPUT
| Parameter | Type | Default | Description |
|---|---|---|---|
| sample_size | int | required | Number of works to sample |
| seed | Optional[int] | None | Random seed for reproducibility |
| filter_params | Optional[Dict] | None | Filters to constrain sampling universe |
CALCULATION
Case A: sample_size ≤ 10,000 (standard path)
- Set
params = {sample: sample_size, per-page: 200, seed: seed} - Paginate through
ceil(sample_size / 200)pages - Deduplicate by work ID
- Truncate to requested size
Case B: sample_size > 10,000 (multi-seed path)
- Iterate over seed offsets:
current_seed = seed + i - Each seed fetches up to 10,000 results with internal pagination (max 5 pages per seed)
- Deduplicate across all seeds
- Stop when target reached
OUTPUT
List[Dict[str, Any]] — List of sampled work objects, deduplicated, truncated to sample_size.
9 OpenAlexClient.group_by(entity_type, group_field, filter_params)
Aggregation query. Returns grouped counts for a field (e.g., publication year distribution).
INPUT
| Parameter | Type | Default | Description |
|---|---|---|---|
| entity_type | str | required | Entity kind: 'works', 'authors', etc. |
| group_field | str | required | Field to group by, e.g. 'publication_year', 'type' |
| filter_params | Optional[Dict] | None | Filters to constrain aggregation |
CALCULATION
- Build params:
{group_by: group_field} - If
filter_params: build comma-separated filter string - Call
_make_request(/{entity_type}, params) - Extract
response['group_by']
OUTPUT
List[Dict[str, Any]] — Grouped results, each containing {key, key_display_name, count}.
Layer B: Data Collection — openalex_search.py
DATA COLLECTION LAYER Iterates over all 14 channels defined in search_queries.json, queries OpenAlex via the client, extracts and normalizes paper records, deduplicates, and writes per-channel and merged output files.
10 extract_paper(work)
Transforms a raw OpenAlex work object into a normalized paper record. Reconstructs abstracts from the inverted index format used by OpenAlex.
INPUT
| Parameter | Type | Source | Description |
|---|---|---|---|
| work | Dict | OpenAlex API response results[] | Raw work object containing all OpenAlex fields |
Key fields read from input:
work['authorships'][*]['author']['display_name']work['concepts'][*]['display_name']work['abstract_inverted_index']— dict of{word: [position, ...]}work['primary_location']['source']['display_name']
CALCULATION
- Authors: Iterate
authorships, collect eachauthor.display_name - Concepts: Iterate
concepts, collect eachdisplay_name - Abstract reconstruction:
- For each
(word, positions)in the inverted index: create tuples(position, word) - Sort by position ascending
- Join words with spaces
- For each
- Host venue: Navigate
primary_location → source → display_name
OUTPUT
Dict — Normalized paper record with keys:
| Key | Type | Description |
|---|---|---|
id | str | OpenAlex work ID (e.g. https://openalex.org/W...) |
title | str | Work title |
authors | List[str] | Author display names |
year | int|None | Publication year |
doi | str|None | DOI URL |
abstract | str|None | Reconstructed abstract text |
cited_by_count | int | Total citations (default 0) |
concepts | List[str] | Associated concept names |
referenced_works | List[str] | IDs of referenced works |
source | str | Always "openalex" |
type | str | Work type (article, review, etc.) |
host_venue | str | Journal or venue name |
11 search_channel(client, channel_id, channel_info, per_channel_limit, min_citations, year_start, year_end)
Searches OpenAlex for all queries belonging to a single channel. Iterates through each query string, paginates results sorted by citation count, deduplicates within the channel, and stops at the per-channel limit.
INPUT
| Parameter | Type | Default | Description |
|---|---|---|---|
| client | OpenAlexClient | required | Configured API client instance |
| channel_id | str | required | Channel identifier, e.g. "network_contagion" |
| channel_info | Dict | required | Channel config with name and queries (list of search strings) |
| per_channel_limit | int | required | Max papers to collect for this channel (default 200 from CLI) |
| min_citations | int | required | Minimum cited_by_count filter (default 5 from CLI) |
| year_start | int | required | Start of publication year range (default 2009) |
| year_end | int | required | End of publication year range (default 2026) |
CALCULATION
- Initialize
seen_ids = set(),channel_papers = [] - For each query in
channel_info['queries']:- If
len(channel_papers) ≥ per_channel_limit: stop - Construct filter:
{publication_year: "YYYY-YYYY", cited_by_count: ">N"} - Call
client.search_works(search=query, filter_params=..., per_page=remaining, sort="cited_by_count:desc") - Extract and deduplicate results via
extract_paper()
- If
- Pagination: If
total_available > per_pageand limit not reached:- Compute pages needed (capped at 4 additional pages)
- Fetch pages 2 through
pages_needed + 1 - Continue deduplicating and accumulating
OUTPUT
List[Dict] — Extracted paper dicts for this channel. Each paper has the schema from extract_paper(). Deduplicated by paper ID within the channel. At most per_channel_limit papers.
12 main() — data collection orchestrator [openalex_search.py]
Entry point for the data collection stage. Parses CLI arguments, iterates all 14 channels, saves per-channel files, merges with cross-channel deduplication, and prints summary statistics.
INPUT
CLI Arguments:
| Argument | Type | Default | Description |
|---|---|---|---|
| str | required | Email for OpenAlex polite pool | |
| --per-channel-limit | int | 200 | Max papers per channel |
| --min-citations | int | 5 | Minimum citation count filter |
| --year-range | str | "2009-2026" | Publication year range (START-END) |
| --output-dir | str | ../output/data/ | Directory for output files |
File Input:
| File | Path | Schema |
|---|---|---|
| search_queries.json | ../references/search_queries.json | {"channels": {"channel_id": {"name":str, "queries":[str]}}} |
CALCULATION
- Parse year range string into
year_start,year_end - Load
search_queries.json, extractchannelsdict (14 channels) - Initialize
OpenAlexClient(email=...) - For each channel:
- Call
search_channel() - Write per-channel file:
openalex_raw_{channel_id}.json - Merge into global dict keyed by paper ID
- Track which channels found each paper in
merged_papers[pid]['channels']list
- Call
- Convert merged dict to list, save as
openalex_merged.json - Print summary: per-channel counts, total unique papers, year distribution histogram, low-result warnings
OUTPUT
Files written:
| File | Count | Content |
|---|---|---|
openalex_raw_{channel_id}.json | 14 files | Array of paper dicts for that channel |
openalex_merged.json | 1 file | Deduplicated array of paper dicts, each with added channels: [str] key listing all channels that found the paper |
Console output: Per-channel paper counts, year distribution histogram, low-result channel warnings (<20 papers).
Layer C: Scoring & Ranking — channel_mapper.py
SCORING LAYER Takes the merged paper collection from Stage 1, assigns papers to channels, computes three normalized sub-scores, and produces a composite ranking. Also runs sensitivity analysis under four alternative weight schemes.
13 assign_channels(papers)
Assigns primary and secondary channels to each paper based on its channels list (set during search). Builds the reverse mapping needed by all scoring functions. Mutates input dicts in-place.
INPUT
| Parameter | Type | Source | Description |
|---|---|---|---|
| papers | List[Dict] | openalex_merged.json | Paper dicts, each with channels: [str] key listing channel IDs that found the paper (first = primary) |
CALCULATION
- Initialize
channel_papers = defaultdict(list) - For each paper:
channels = paper.get("channels", [])- If channels exist:
paper["primary_channel"] = channels[0] paper["secondary_channels"] = channels[1:](empty list if only one channel)- If no channels:
paper["primary_channel"] = "unassigned" - For each channel in the paper's channel list:
channel_papers[ch].append(paper)
Note: A paper appears in multiple channel_papers lists if it was found by multiple channel queries. This is intentional — it means the paper contributes to volume and impact scores for all channels that found it.
OUTPUT
Tuple[List[Dict], defaultdict(list)]
| Element | Type | Description |
|---|---|---|
papers | List[Dict] | Same input list, mutated with added primary_channel and secondary_channels keys |
channel_papers | defaultdict(list) | Reverse mapping: channel_id → [paper, paper, ...] |
14 compute_literature_volume(channel_papers)
Computes how much literature exists for each channel, normalized so the channel with the most papers scores 1.0.
INPUT
| Parameter | Type | Source | Description |
|---|---|---|---|
| channel_papers | Dict[str, List[Dict]] | assign_channels() output | Mapping of channel_id to list of paper dicts |
CALCULATION
- Count papers per channel:
counts[ch] = len(papers) - Find maximum:
max_count = max(counts.values())(fallback: 1 if empty) - Normalize each channel:
lit_volume[ch] = counts[ch] / max_count
Properties: All values in [0, 1]. Exactly one channel scores 1.0 (the one with the most papers). Monotonically increasing with paper count.
OUTPUT
Dict[str, float] — channel_id → normalized volume score [0, 1]
The channel with the most papers gets 1.0; others are proportional fractions.
15 compute_citation_impact(channel_papers)
Measures how influential each channel's top papers are. Uses the mean citation count of the top-10 most-cited papers per channel, normalized across channels.
INPUT
| Parameter | Type | Source | Description |
|---|---|---|---|
| channel_papers | Dict[str, List[Dict]] | assign_channels() output | Mapping of channel_id to list of paper dicts, each paper having cited_by_count |
CALCULATION
- For each channel:
- Sort papers by
cited_by_countdescending - Take the top 10 papers (or fewer if channel has <10)
- Compute mean:
sum(cited_by_count for top 10) / 10 - Important: Always divides by 10, even if fewer than 10 papers exist. This penalizes channels with few papers.
- Sort papers by
- Find maximum mean across all channels:
max_mean - If
max_mean == 0: set to 1 (prevent division by zero) - Normalize:
cit_impact[ch] = channel_mean[ch] / max_mean
Edge cases:
- Empty channel (no papers) →
mean_cites = 0 - Channel with 3 papers having 100, 50, 20 citations → mean = (100+50+20)/10 = 17.0 (not 56.7)
- All channels have zero citations → all scores = 0.0
OUTPUT
Dict[str, float] — channel_id → normalized citation impact [0, 1]
The channel whose top-10 papers have the highest mean citations gets 1.0.
16 compute_crisis_evidence(crisis_events, all_channel_ids)
Measures how prominently each channel appears in real-world crisis events, weighted by financial losses. Events with larger losses contribute more weight (via log-scale).
INPUT
| Parameter | Type | Source | Description |
|---|---|---|---|
| crisis_events | List[Dict] | crisis_chronology.json ["crisis_events"] | Event dicts with channels_activated, losses_usd, etc. |
| all_channel_ids | Set[str] | Union of search_queries.json keys and channel_papers keys | Complete set of valid channel IDs |
Crisis event schema (each event):
| Key | Type | Example |
|---|---|---|
event | str | "Mt. Gox Collapse" |
channels_activated | List[str] | ["counterparty_concentration", "information_asymmetry"] |
losses_usd | str | "460000000" or "undetermined" |
CALCULATION
- Compute median loss:
- Parse all
losses_usdstrings that are valid floats - Sort numerically, compute median
- Fallback: $100,000,000 if no numeric losses exist
- Parse all
- For each crisis event:
- Parse
losses_usd. If non-numeric (e.g. "undetermined") → use median loss - Compute weight:
log10(losses_usd) - If
losses_usd ≤ 0: weight = 0 - For each channel in
channels_activated(orchannelsfallback):- If channel in
all_channel_ids:crisis_counts[ch] += weight
- If channel in
- Parse
- Warn about channels in events that are not in
all_channel_ids - Ensure all channels have entries (default 0 for channels with no crisis events)
- Normalize:
crisis_ev[ch] = crisis_counts[ch] / max(crisis_counts)
Edge cases:
losses_usd = "undetermined"→ uses median of all known losseslosses_usd = "0"or negative → weight = 0- No crisis events at all → all channels get 0.0
- max crisis count = 0 → treated as 1 (prevents division by zero)
- Unknown channel ID in event → skipped with printed warning
OUTPUT
Dict[str, float] — channel_id → normalized crisis evidence [0, 1]
The channel most implicated in high-loss crisis events scores 1.0.
17 main() — composite scoring & sensitivity [channel_mapper.py]
Entry point for the scoring stage. Loads all data, calls scoring functions, computes weighted composite scores, produces ranked output, and runs sensitivity analysis under four weight schemes.
INPUT
CLI Arguments:
| Argument | Type | Default | Description |
|---|---|---|---|
| --input | str | ../output/data/openalex_merged.json | Merged papers file from Stage 1 |
| --queries | str | ../references/search_queries.json | Channel definitions for names and IDs |
| --crisis | str | ../references/crisis_chronology.json | Crisis events data |
| --output | str | ../output/data/channel_rankings.json | Output rankings file |
File Inputs:
| File | Key Fields Used |
|---|---|
| openalex_merged.json | channels, cited_by_count per paper |
| search_queries.json | channels.{id}.name for display names |
| crisis_chronology.json | crisis_events[].channels_activated, losses_usd |
CALCULATION
- Load data: Read all three JSON files. Crisis file is optional (missing = zero crisis scores + warning).
- Assign channels:
assign_channels(papers)→ get reverse mapping - Build channel ID set: Union of search_queries keys and any IDs found in papers
- Compute sub-scores:
compute_literature_volume(channel_papers)compute_citation_impact(channel_papers)compute_crisis_evidence(crisis_events, all_channel_ids)
- Composite score (primary weights):
- Rank: Sort channels by raw composite score descending, assign ranks 1..N, then round scores to 4 decimal places
- Enrichment: Each ranking entry also includes
paper_count,mean_top10_citations(rounded to 1 decimal), andcrisis_event_count - Sensitivity analysis: Recompute composite scores under 4 weight schemes:
| Scheme | Wlit | Wcit | Wcrisis |
|---|---|---|---|
| primary | 0.35 | 0.35 | 0.30 |
| equal | 0.333 | 0.333 | 0.333 |
| crisis_dominant | 0.25 | 0.25 | 0.50 |
| literature_dominant | 0.50 | 0.25 | 0.25 |
For each scheme, ranks are independently computed from the reweighted composite scores.
OUTPUT
Files written:
| File | Schema |
|---|---|
channel_rankings.json |
|
sensitivity_analysis.json |
|
Console output: Formatted rankings table with rank, score, paper count, citation impact, crisis evidence, and channel name for each of the 14 channels.
Appendix: Complete File-to-File Data Flow
| Stage | Input File(s) | Script / Function | Output File(s) |
|---|---|---|---|
| 1. Data Collection | references/search_queries.json | openalex_search.py → main() | output/data/openalex_raw_network_contagion.json |
| (OpenAlex API) | output/data/openalex_raw_liquidity_spirals.json output/data/openalex_raw_stablecoin_runs.json output/data/openalex_raw_oracle_manipulation.json output/data/openalex_raw_composability_risk.json output/data/openalex_raw_liquidation_cascades.json output/data/openalex_raw_counterparty_concentration.json output/data/openalex_raw_regulatory_contagion.json output/data/openalex_raw_gateway_risk.json output/data/openalex_raw_governance_failure.json output/data/openalex_raw_information_asymmetry.json output/data/openalex_raw_rwa_transmission.json output/data/openalex_raw_bridge_vulnerability.json output/data/openalex_raw_validator_concentration.json |
||
| output/data/openalex_merged.json | |||
| 2. Scoring | output/data/openalex_merged.json references/search_queries.json references/crisis_chronology.json |
channel_mapper.py → main() | output/data/channel_rankings.json |
| output/data/sensitivity_analysis.json |
Function Call Graph
| Caller | Calls | Purpose |
|---|---|---|
| openalex_search.main() | OpenAlexClient.__init__() | Create API client |
| openalex_search.main() | search_channel() × 14 | Fetch papers per channel |
| search_channel() | client.search_works() | Query OpenAlex /works |
| search_channel() | extract_paper() | Normalize each result |
| client.search_works() | client._make_request() | HTTP GET with retry |
| client._make_request() | client._rate_limit() | Throttle requests |
| channel_mapper.main() | assign_channels() | Map papers ↔ channels |
| channel_mapper.main() | compute_literature_volume() | Score: paper count |
| channel_mapper.main() | compute_citation_impact() | Score: top-10 citations |
| channel_mapper.main() | compute_crisis_evidence() | Score: crisis events |
Weight Sensitivity Summary
| Scheme | Wliterature | Wcitation | Wcrisis | Emphasis |
|---|---|---|---|---|
| primary | 0.35 | 0.35 | 0.30 | Balanced academic + crisis |
| equal | 0.333 | 0.333 | 0.333 | Equal weight baseline |
| crisis_dominant | 0.25 | 0.25 | 0.50 | Prioritize real-world evidence |
| literature_dominant | 0.50 | 0.25 | 0.25 | Prioritize research volume |
Pipeline Documentation — Systemic Risk Channel Scoring — Generated 2026-03-25