Pipeline Documentation: Systemic Risk Channel Scoring

Generated: 2026-03-25 Scripts: 3 Functions: 17 Channels: 14

Pipeline Overview

This pipeline collects academic literature from the OpenAlex API for 14 systemic risk channels in digital finance, then scores and ranks each channel by combining three normalized signals: literature volume (how many papers), citation impact (how influential the top papers are), and crisis evidence (how often the channel appears in real-world crisis events, weighted by financial losses). The final composite score uses a weighted sum (0.35 / 0.35 / 0.30) with sensitivity analysis across four alternative weight schemes.

The pipeline executes in three layers:

  1. API Layer (openalex_client.py) — HTTP transport, rate limiting, retry, pagination
  2. Data Collection Layer (openalex_search.py) — channel-wise querying, deduplication, merging
  3. Scoring Layer (channel_mapper.py) — channel assignment, sub-score computation, composite ranking

Data Flow Diagram

Stage 1: Data Collection

search_queries.json
14 channels, 4 queries each
openalex_search.py
search_channel() × 14
openalex_raw_{channel_id}.json
× 14 per-channel files
openalex_merged.json
deduplicated, with channels[]

Stage 2: Channel Scoring

openalex_merged.json
search_queries.json
crisis_chronology.json
channel_mapper.py
assign + score + rank
channel_rankings.json
rankings[] + weights + metadata
sensitivity_analysis.json
4 weight schemes, ranks per scheme

Internal: API Transport

openalex_client.py
OpenAlexClient class
OpenAlex REST API
https://api.openalex.org

Table of Contents (Pipeline Execution Order)

Layer A: API Transportopenalex_client.py

  1. 1__init__(email, requests_per_second)
  2. 2_rate_limit()
  3. 3_make_request(endpoint, params, max_retries)
  4. 4search_works(search, filter_params, ...)
  5. 5get_entity(entity_type, entity_id)
  6. 6batch_lookup(entity_type, ids, id_field)
  7. 7paginate_all(endpoint, params, max_results)
  8. 8sample_works(sample_size, seed, filter_params)
  9. 9group_by(entity_type, group_field, filter_params)

Layer B: Data Collectionopenalex_search.py

  1. 10extract_paper(work)
  2. 11search_channel(client, channel_id, channel_info, ...)
  3. 12main() — data collection orchestrator

Layer C: Scoring & Rankingchannel_mapper.py

  1. 13assign_channels(papers)
  2. 14compute_literature_volume(channel_papers)
  3. 15compute_citation_impact(channel_papers)
  4. 16compute_crisis_evidence(crisis_events, all_channel_ids)
  5. 17main() — composite scoring & sensitivity

Layer A: API Transport — openalex_client.py

API LAYER Provides rate-limited, retry-capable HTTP access to the OpenAlex REST API. All data collection flows through this class.

1 OpenAlexClient.__init__(email, requests_per_second)

Constructor. Stores configuration for polite-pool access and rate limiting.

INPUT

ParameterTypeDefaultDescription
emailOptional[str]NoneEmail address for polite pool (10x rate boost). Appended as mailto param to every request.
requests_per_secondint10Maximum requests per second. Polite pool allows 10 req/s; without email, 1 req/s.

CALCULATION

  • Computes min_delay = 1.0 / requests_per_second (e.g. 0.1s at 10 req/s)
  • Initializes last_request_time = 0

OUTPUT

Configured OpenAlexClient instance with attributes:

AttributeValue
self.emailStored email string
self.requests_per_secondRate limit integer
self.min_delayMinimum seconds between requests
self.last_request_time0 (no requests yet)
BASE_URLhttps://api.openalex.org (class constant)
2 OpenAlexClient._rate_limit()

Internal method called before every HTTP request. Enforces minimum inter-request delay.

INPUT

None (reads self.last_request_time and self.min_delay).

CALCULATION

  1. Read current time: current_time = time.time()
  2. Compute elapsed: time_since_last = current_time - self.last_request_time
  3. If time_since_last < self.min_delay: sleep for the difference
  4. Update self.last_request_time = time.time()
sleep_time = max(0, min_delay - (now - last_request_time))

OUTPUT

None. Side effect: blocks the calling thread until the rate limit window has passed. Updates self.last_request_time.

3 OpenAlexClient._make_request(endpoint, params, max_retries)

Core HTTP GET method. All API calls flow through here. Handles rate limiting, polite-pool email injection, exponential backoff on errors.

INPUT

ParameterTypeDefaultDescription
endpointstrrequiredAPI path, e.g. /works, /authors
paramsOptional[Dict]NoneURL query parameters
max_retriesint5Maximum retry attempts

CALCULATION

  1. Default params to {} if None
  2. If self.email is set, inject params['mailto'] = self.email
  3. Build URL: urljoin(BASE_URL, endpoint)
  4. For each attempt 0..max_retries-1:
    • Call self._rate_limit()
    • Execute requests.get(url, params=params, timeout=30)
    • HTTP 200 → return response.json()
    • HTTP 403 (rate limited) → sleep 2^attempt seconds, retry
    • HTTP 5xx (server error) → sleep 2^attempt seconds, retry
    • Other 4xx → raise immediately (no retry)
    • Timeout / ConnectionError → sleep 2^attempt seconds, retry (or raise on last attempt)
  5. If all retries exhausted → raise Exception
backoff_delay(attempt) = 2^attempt seconds  (1s, 2s, 4s, 8s, 16s)

OUTPUT

Dict[str, Any] — Parsed JSON response body from OpenAlex API.

Raises Exception after all retries fail, or requests.HTTPError on non-retryable status codes.

4 OpenAlexClient.search_works(search, filter_params, per_page, page, sort, select)

Primary search method used by the pipeline. Constructs a filtered, sorted, paginated query against the /works endpoint. This is the method called by search_channel().

INPUT

ParameterTypeDefaultDescription
searchOptional[str]NoneFull-text search query string
filter_paramsOptional[Dict]NoneFilter dict, e.g. {"publication_year":"2009-2026", "cited_by_count":">4"}
per_pageint200Results per page (clamped to max 200)
pageint1Page number for offset pagination
sortOptional[str]NoneSort expression, e.g. cited_by_count:desc
selectOptional[List[str]]NoneFields to return (projection)

Pipeline usage: Called with search=query, filter_params={publication_year, cited_by_count}, per_page=200, sort="cited_by_count:desc"

CALCULATION

  1. Build params dict: {'per-page': min(per_page, 200), 'page': page}
  2. If search: add params['search'] = search
  3. If filter_params: join as comma-separated key:value string into params['filter']
  4. If sort: add params['sort'] = sort
  5. If select: join as comma-separated string into params['select']
  6. Delegate to self._make_request('/works', params)
Example constructed URL:
/works?search=systemic+risk+contagion&filter=publication_year:2009-2026,cited_by_count:>4&sort=cited_by_count:desc&per-page=200&page=1

OUTPUT

Dict[str, Any] — OpenAlex response containing:

KeyDescription
meta{"count": N, "db_response_time_ms": ...} — total matching works
resultsList of work objects (up to per_page items)
5 OpenAlexClient.get_entity(entity_type, entity_id)

Fetch a single entity by ID. Not used in the main pipeline run but available for ad-hoc lookups.

INPUT

ParameterTypeDefaultDescription
entity_typestrrequiredEntity kind: 'works', 'authors', 'institutions', 'venues', 'concepts'
entity_idstrrequiredOpenAlex ID (e.g. W2741809807) or external ID (DOI, ORCID)

CALCULATION

Constructs endpoint /{entity_type}/{entity_id} and delegates to _make_request().

OUTPUT

Dict[str, Any] — Complete entity object from OpenAlex.

6 OpenAlexClient.batch_lookup(entity_type, ids, id_field)

Efficient batch retrieval. Looks up multiple entities using pipe-separated ID filters in groups of 50.

INPUT

ParameterTypeDefaultDescription
entity_typestrrequiredEntity kind: 'works', 'authors', etc.
idsList[str]requiredList of IDs to look up
id_fieldstr'openalex_id'Which ID field to filter on

CALCULATION

  1. Iterate over ids in chunks of 50
  2. For each chunk: join with | separator
  3. Construct filter: {id_field}:{id1|id2|...|id50}
  4. Call _make_request(/{entity_type}, params={'filter':..., 'per-page':50})
  5. Accumulate all results arrays
requests_needed = ceil(len(ids) / 50)

OUTPUT

List[Dict[str, Any]] — All matched entity objects (may be fewer than input IDs if some not found).

7 OpenAlexClient.paginate_all(endpoint, params, max_results)

Traverses all pages of a paginated endpoint. Respects OpenAlex's 10,000-result offset pagination limit.

INPUT

ParameterTypeDefaultDescription
endpointstrrequiredAPI endpoint path
paramsOptional[Dict]NoneBase query parameters (copied, not mutated)
max_resultsOptional[int]NoneStop after this many results. None = collect all.

CALCULATION

  1. Copy params, set per-page=200, page=1
  2. Loop:
    • Fetch page via _make_request()
    • Extend accumulator with results
    • If max_results reached → truncate and return
    • If len(all_results) ≥ meta.count → break
    • Increment page
    • If page × 200 > 10000 → break (OpenAlex hard limit)
max_retrievable = min(meta.count, max_results or inf, 10000)

OUTPUT

List[Dict[str, Any]] — All collected result objects, up to max_results or 10,000.

8 OpenAlexClient.sample_works(sample_size, seed, filter_params)

Random sampling from OpenAlex. Supports reproducible samples via seed. For large samples (>10,000), uses multiple seeds with deduplication.

INPUT

ParameterTypeDefaultDescription
sample_sizeintrequiredNumber of works to sample
seedOptional[int]NoneRandom seed for reproducibility
filter_paramsOptional[Dict]NoneFilters to constrain sampling universe

CALCULATION

Case A: sample_size ≤ 10,000 (standard path)

  1. Set params = {sample: sample_size, per-page: 200, seed: seed}
  2. Paginate through ceil(sample_size / 200) pages
  3. Deduplicate by work ID
  4. Truncate to requested size

Case B: sample_size > 10,000 (multi-seed path)

  1. Iterate over seed offsets: current_seed = seed + i
  2. Each seed fetches up to 10,000 results with internal pagination (max 5 pages per seed)
  3. Deduplicate across all seeds
  4. Stop when target reached

OUTPUT

List[Dict[str, Any]] — List of sampled work objects, deduplicated, truncated to sample_size.

9 OpenAlexClient.group_by(entity_type, group_field, filter_params)

Aggregation query. Returns grouped counts for a field (e.g., publication year distribution).

INPUT

ParameterTypeDefaultDescription
entity_typestrrequiredEntity kind: 'works', 'authors', etc.
group_fieldstrrequiredField to group by, e.g. 'publication_year', 'type'
filter_paramsOptional[Dict]NoneFilters to constrain aggregation

CALCULATION

  1. Build params: {group_by: group_field}
  2. If filter_params: build comma-separated filter string
  3. Call _make_request(/{entity_type}, params)
  4. Extract response['group_by']

OUTPUT

List[Dict[str, Any]] — Grouped results, each containing {key, key_display_name, count}.

Layer B: Data Collection — openalex_search.py

DATA COLLECTION LAYER Iterates over all 14 channels defined in search_queries.json, queries OpenAlex via the client, extracts and normalizes paper records, deduplicates, and writes per-channel and merged output files.

10 extract_paper(work)

Transforms a raw OpenAlex work object into a normalized paper record. Reconstructs abstracts from the inverted index format used by OpenAlex.

INPUT

ParameterTypeSourceDescription
workDictOpenAlex API response results[]Raw work object containing all OpenAlex fields

Key fields read from input:

  • work['authorships'][*]['author']['display_name']
  • work['concepts'][*]['display_name']
  • work['abstract_inverted_index'] — dict of {word: [position, ...]}
  • work['primary_location']['source']['display_name']

CALCULATION

  1. Authors: Iterate authorships, collect each author.display_name
  2. Concepts: Iterate concepts, collect each display_name
  3. Abstract reconstruction:
    • For each (word, positions) in the inverted index: create tuples (position, word)
    • Sort by position ascending
    • Join words with spaces
  4. Host venue: Navigate primary_location → source → display_name

OUTPUT

Dict — Normalized paper record with keys:

KeyTypeDescription
idstrOpenAlex work ID (e.g. https://openalex.org/W...)
titlestrWork title
authorsList[str]Author display names
yearint|NonePublication year
doistr|NoneDOI URL
abstractstr|NoneReconstructed abstract text
cited_by_countintTotal citations (default 0)
conceptsList[str]Associated concept names
referenced_worksList[str]IDs of referenced works
sourcestrAlways "openalex"
typestrWork type (article, review, etc.)
host_venuestrJournal or venue name
11 search_channel(client, channel_id, channel_info, per_channel_limit, min_citations, year_start, year_end)

Searches OpenAlex for all queries belonging to a single channel. Iterates through each query string, paginates results sorted by citation count, deduplicates within the channel, and stops at the per-channel limit.

INPUT

ParameterTypeDefaultDescription
clientOpenAlexClientrequiredConfigured API client instance
channel_idstrrequiredChannel identifier, e.g. "network_contagion"
channel_infoDictrequiredChannel config with name and queries (list of search strings)
per_channel_limitintrequiredMax papers to collect for this channel (default 200 from CLI)
min_citationsintrequiredMinimum cited_by_count filter (default 5 from CLI)
year_startintrequiredStart of publication year range (default 2009)
year_endintrequiredEnd of publication year range (default 2026)

CALCULATION

  1. Initialize seen_ids = set(), channel_papers = []
  2. For each query in channel_info['queries']:
    • If len(channel_papers) ≥ per_channel_limit: stop
    • Construct filter: {publication_year: "YYYY-YYYY", cited_by_count: ">N"}
    • Call client.search_works(search=query, filter_params=..., per_page=remaining, sort="cited_by_count:desc")
    • Extract and deduplicate results via extract_paper()
  3. Pagination: If total_available > per_page and limit not reached:
    • Compute pages needed (capped at 4 additional pages)
    • Fetch pages 2 through pages_needed + 1
    • Continue deduplicating and accumulating
max_pages_per_query = 1 (initial) + min(4, pages_needed) = up to 5 pages
max_papers_per_query_page = min(200, per_channel_limit - len(collected))

OUTPUT

List[Dict] — Extracted paper dicts for this channel. Each paper has the schema from extract_paper(). Deduplicated by paper ID within the channel. At most per_channel_limit papers.

12 main() — data collection orchestrator [openalex_search.py]

Entry point for the data collection stage. Parses CLI arguments, iterates all 14 channels, saves per-channel files, merges with cross-channel deduplication, and prints summary statistics.

INPUT

CLI Arguments:

ArgumentTypeDefaultDescription
--emailstrrequiredEmail for OpenAlex polite pool
--per-channel-limitint200Max papers per channel
--min-citationsint5Minimum citation count filter
--year-rangestr"2009-2026"Publication year range (START-END)
--output-dirstr../output/data/Directory for output files

File Input:

FilePathSchema
search_queries.json../references/search_queries.json{"channels": {"channel_id": {"name":str, "queries":[str]}}}

CALCULATION

  1. Parse year range string into year_start, year_end
  2. Load search_queries.json, extract channels dict (14 channels)
  3. Initialize OpenAlexClient(email=...)
  4. For each channel:
    • Call search_channel()
    • Write per-channel file: openalex_raw_{channel_id}.json
    • Merge into global dict keyed by paper ID
    • Track which channels found each paper in merged_papers[pid]['channels'] list
  5. Convert merged dict to list, save as openalex_merged.json
  6. Print summary: per-channel counts, total unique papers, year distribution histogram, low-result warnings

OUTPUT

Files written:

FileCountContent
openalex_raw_{channel_id}.json14 filesArray of paper dicts for that channel
openalex_merged.json1 fileDeduplicated array of paper dicts, each with added channels: [str] key listing all channels that found the paper

Console output: Per-channel paper counts, year distribution histogram, low-result channel warnings (<20 papers).

Layer C: Scoring & Ranking — channel_mapper.py

SCORING LAYER Takes the merged paper collection from Stage 1, assigns papers to channels, computes three normalized sub-scores, and produces a composite ranking. Also runs sensitivity analysis under four alternative weight schemes.

13 assign_channels(papers)

Assigns primary and secondary channels to each paper based on its channels list (set during search). Builds the reverse mapping needed by all scoring functions. Mutates input dicts in-place.

INPUT

ParameterTypeSourceDescription
papersList[Dict]openalex_merged.jsonPaper dicts, each with channels: [str] key listing channel IDs that found the paper (first = primary)

CALCULATION

  1. Initialize channel_papers = defaultdict(list)
  2. For each paper:
    • channels = paper.get("channels", [])
    • If channels exist: paper["primary_channel"] = channels[0]
    • paper["secondary_channels"] = channels[1:] (empty list if only one channel)
    • If no channels: paper["primary_channel"] = "unassigned"
    • For each channel in the paper's channel list: channel_papers[ch].append(paper)

Note: A paper appears in multiple channel_papers lists if it was found by multiple channel queries. This is intentional — it means the paper contributes to volume and impact scores for all channels that found it.

OUTPUT

Tuple[List[Dict], defaultdict(list)]

ElementTypeDescription
papersList[Dict]Same input list, mutated with added primary_channel and secondary_channels keys
channel_papersdefaultdict(list)Reverse mapping: channel_id → [paper, paper, ...]
14 compute_literature_volume(channel_papers)

Computes how much literature exists for each channel, normalized so the channel with the most papers scores 1.0.

INPUT

ParameterTypeSourceDescription
channel_papersDict[str, List[Dict]]assign_channels() outputMapping of channel_id to list of paper dicts

CALCULATION

  1. Count papers per channel: counts[ch] = len(papers)
  2. Find maximum: max_count = max(counts.values()) (fallback: 1 if empty)
  3. Normalize each channel: lit_volume[ch] = counts[ch] / max_count
lit_volume(ch) = |papers_ch| ÷ max(|papers_ch|) ∀ ch

Properties: All values in [0, 1]. Exactly one channel scores 1.0 (the one with the most papers). Monotonically increasing with paper count.

OUTPUT

Dict[str, float]channel_id → normalized volume score [0, 1]

The channel with the most papers gets 1.0; others are proportional fractions.

15 compute_citation_impact(channel_papers)

Measures how influential each channel's top papers are. Uses the mean citation count of the top-10 most-cited papers per channel, normalized across channels.

INPUT

ParameterTypeSourceDescription
channel_papersDict[str, List[Dict]]assign_channels() outputMapping of channel_id to list of paper dicts, each paper having cited_by_count

CALCULATION

  1. For each channel:
    • Sort papers by cited_by_count descending
    • Take the top 10 papers (or fewer if channel has <10)
    • Compute mean: sum(cited_by_count for top 10) / 10
    • Important: Always divides by 10, even if fewer than 10 papers exist. This penalizes channels with few papers.
  2. Find maximum mean across all channels: max_mean
  3. If max_mean == 0: set to 1 (prevent division by zero)
  4. Normalize: cit_impact[ch] = channel_mean[ch] / max_mean
cit_impact(ch) = [ Σ(cited_by_count for top-10 papers) ÷ 10 ] ÷ max_across_channels

Edge cases:

  • Empty channel (no papers) → mean_cites = 0
  • Channel with 3 papers having 100, 50, 20 citations → mean = (100+50+20)/10 = 17.0 (not 56.7)
  • All channels have zero citations → all scores = 0.0

OUTPUT

Dict[str, float]channel_id → normalized citation impact [0, 1]

The channel whose top-10 papers have the highest mean citations gets 1.0.

16 compute_crisis_evidence(crisis_events, all_channel_ids)

Measures how prominently each channel appears in real-world crisis events, weighted by financial losses. Events with larger losses contribute more weight (via log-scale).

INPUT

ParameterTypeSourceDescription
crisis_eventsList[Dict]crisis_chronology.json ["crisis_events"]Event dicts with channels_activated, losses_usd, etc.
all_channel_idsSet[str]Union of search_queries.json keys and channel_papers keysComplete set of valid channel IDs

Crisis event schema (each event):

KeyTypeExample
eventstr"Mt. Gox Collapse"
channels_activatedList[str]["counterparty_concentration", "information_asymmetry"]
losses_usdstr"460000000" or "undetermined"

CALCULATION

  1. Compute median loss:
    • Parse all losses_usd strings that are valid floats
    • Sort numerically, compute median
    • Fallback: $100,000,000 if no numeric losses exist
  2. For each crisis event:
    • Parse losses_usd. If non-numeric (e.g. "undetermined") → use median loss
    • Compute weight: log10(losses_usd)
    • If losses_usd ≤ 0: weight = 0
    • For each channel in channels_activated (or channels fallback):
      • If channel in all_channel_ids: crisis_counts[ch] += weight
  3. Warn about channels in events that are not in all_channel_ids
  4. Ensure all channels have entries (default 0 for channels with no crisis events)
  5. Normalize: crisis_ev[ch] = crisis_counts[ch] / max(crisis_counts)
weight(event) = log10(losses_usd)   [e.g. $460M → log10(4.6×108) = 8.66]
crisis_ev(ch) = Σevents weight(e) × [ch ∈ e.channels] ÷ max(Σevents weight(e)) ∀ ch

Edge cases:

  • losses_usd = "undetermined" → uses median of all known losses
  • losses_usd = "0" or negative → weight = 0
  • No crisis events at all → all channels get 0.0
  • max crisis count = 0 → treated as 1 (prevents division by zero)
  • Unknown channel ID in event → skipped with printed warning

OUTPUT

Dict[str, float]channel_id → normalized crisis evidence [0, 1]

The channel most implicated in high-loss crisis events scores 1.0.

17 main() — composite scoring & sensitivity [channel_mapper.py]

Entry point for the scoring stage. Loads all data, calls scoring functions, computes weighted composite scores, produces ranked output, and runs sensitivity analysis under four weight schemes.

INPUT

CLI Arguments:

ArgumentTypeDefaultDescription
--inputstr../output/data/openalex_merged.jsonMerged papers file from Stage 1
--queriesstr../references/search_queries.jsonChannel definitions for names and IDs
--crisisstr../references/crisis_chronology.jsonCrisis events data
--outputstr../output/data/channel_rankings.jsonOutput rankings file

File Inputs:

FileKey Fields Used
openalex_merged.jsonchannels, cited_by_count per paper
search_queries.jsonchannels.{id}.name for display names
crisis_chronology.jsoncrisis_events[].channels_activated, losses_usd

CALCULATION

  1. Load data: Read all three JSON files. Crisis file is optional (missing = zero crisis scores + warning).
  2. Assign channels: assign_channels(papers) → get reverse mapping
  3. Build channel ID set: Union of search_queries keys and any IDs found in papers
  4. Compute sub-scores:
    • compute_literature_volume(channel_papers)
    • compute_citation_impact(channel_papers)
    • compute_crisis_evidence(crisis_events, all_channel_ids)
  5. Composite score (primary weights):
composite(ch) = 0.35 × lit_volume(ch) + 0.35 × cit_impact(ch) + 0.30 × crisis_evidence(ch)
  1. Rank: Sort channels by raw composite score descending, assign ranks 1..N, then round scores to 4 decimal places
  2. Enrichment: Each ranking entry also includes paper_count, mean_top10_citations (rounded to 1 decimal), and crisis_event_count
  3. Sensitivity analysis: Recompute composite scores under 4 weight schemes:
SchemeWlitWcitWcrisis
primary0.350.350.30
equal0.3330.3330.333
crisis_dominant0.250.250.50
literature_dominant0.500.250.25

For each scheme, ranks are independently computed from the reweighted composite scores.

OUTPUT

Files written:

FileSchema
channel_rankings.json
{
  "weights": {"literature_volume": 0.35, "citation_impact": 0.35, "crisis_evidence": 0.30},
  "rankings": [
    {
      "rank": 1,
      "channel_id": "...",
      "channel_name": "...",
      "composite_score": 0.XXXX,
      "literature_volume": 0.XXXX,
      "citation_impact": 0.XXXX,
      "crisis_evidence": 0.XXXX,
      "paper_count": N,
      "mean_top10_citations": N.N,
      "crisis_event_count": N
    }, ...
  ],
  "total_papers": N,
  "total_channels": N,
  "warnings": [...]
}
sensitivity_analysis.json
{
  "weight_schemes": {
    "primary": {"literature_volume": 0.35, ...},
    "equal": {...}, "crisis_dominant": {...}, "literature_dominant": {...}
  },
  "channels": {
    "channel_id": {
      "channel_name": "...",
      "sub_scores": {"literature_volume": ..., "citation_impact": ..., "crisis_evidence": ...},
      "composites": {"primary": ..., "equal": ..., ...},
      "ranks": {"primary": N, "equal": N, ...}
    }, ...
  }
}

Console output: Formatted rankings table with rank, score, paper count, citation impact, crisis evidence, and channel name for each of the 14 channels.

Appendix: Complete File-to-File Data Flow

Stage Input File(s) Script / Function Output File(s)
1. Data Collection references/search_queries.json openalex_search.py → main() output/data/openalex_raw_network_contagion.json
(OpenAlex API) output/data/openalex_raw_liquidity_spirals.json
output/data/openalex_raw_stablecoin_runs.json
output/data/openalex_raw_oracle_manipulation.json
output/data/openalex_raw_composability_risk.json
output/data/openalex_raw_liquidation_cascades.json
output/data/openalex_raw_counterparty_concentration.json
output/data/openalex_raw_regulatory_contagion.json
output/data/openalex_raw_gateway_risk.json
output/data/openalex_raw_governance_failure.json
output/data/openalex_raw_information_asymmetry.json
output/data/openalex_raw_rwa_transmission.json
output/data/openalex_raw_bridge_vulnerability.json
output/data/openalex_raw_validator_concentration.json
output/data/openalex_merged.json
2. Scoring output/data/openalex_merged.json
references/search_queries.json
references/crisis_chronology.json
channel_mapper.py → main() output/data/channel_rankings.json
output/data/sensitivity_analysis.json

Function Call Graph

CallerCallsPurpose
openalex_search.main()OpenAlexClient.__init__()Create API client
openalex_search.main()search_channel() × 14Fetch papers per channel
search_channel()client.search_works()Query OpenAlex /works
search_channel()extract_paper()Normalize each result
client.search_works()client._make_request()HTTP GET with retry
client._make_request()client._rate_limit()Throttle requests
channel_mapper.main()assign_channels()Map papers ↔ channels
channel_mapper.main()compute_literature_volume()Score: paper count
channel_mapper.main()compute_citation_impact()Score: top-10 citations
channel_mapper.main()compute_crisis_evidence()Score: crisis events

Weight Sensitivity Summary

Scheme Wliterature Wcitation Wcrisis Emphasis
primary0.350.350.30Balanced academic + crisis
equal0.3330.3330.333Equal weight baseline
crisis_dominant0.250.250.50Prioritize real-world evidence
literature_dominant0.500.250.25Prioritize research volume

Pipeline Documentation — Systemic Risk Channel Scoring — Generated 2026-03-25