The files, and what each one does
Two new files do the work; three existing ones consume what they produce. Nothing downstream had to change.
| File | Role | Runs |
|---|---|---|
| st_feed.py NEW | The daily ingester. Pulls every subscribed indicator's full history from the API and merges it into the feed CSVs. Importable (refresh_all()) and a CLI. | Every morning, inside the 07:00 fetch |
| st_sync.py NEW | The one-time reconciler. Matches each API slug to your existing CSV (never creating slug-named duplicates), backfills full history, and writes the resolved map. Safe-by-default dry run. | Once at onboarding (and to repair) |
| st_symbol_map.json | The single source of truth: slug → {file, col}. Written by st_sync, read by st_feed — so both write to the same files. | Data file |
| sentiment_feed.py | Existing. Builds the senti-euphoria composite from the feed CSVs. Unchanged — it just sees fresher data. | Inside fetch_indicators |
| ipo_froth_fetch.py | Existing. Its AAII leg reads the feed CSVs. Unchanged. | Inside fetch_indicators |
| fetch_indicators.py | Existing orchestrator. Gained ~10 lines that call st_feed.refresh_all() before building the sentiment indicators. | The 07:00 daily task |
The ingester writes into the exact CSV schema your tracker already reads (Date + <slug>_close). So the API replaces your manual downloads underneath the existing pipeline — sentiment_feed.py, the AAII leg, and the staleness gates never knew the difference. New plumbing, same taps.
From subscription to dashboard, in one path
Follow one indicator — say Total Put/Call — from the vendor to a tile on your dashboard:
- Subscription — the API lists what you bought:
{chart_name, url}per indicator. - st_sync (once) — matches slug
pc_totalto yourpc_total.csv, records it inst_symbol_map.json. - st_feed (daily) — fetches
pc_total's entire history, merges it intopc_total.csv. - sentiment_feed.py — reads
pc_total.csvas one leg of the contrariansenti-euphoriacomposite. - fetch_indicators.py — scores it, writes
indicators_data.json+ the regime block. - Dashboard —
senti-euphoriarenders as a core tile; the value traces back, unbroken, to the API.
The fetch now defaults to FULL history (the API's /getSymbolData) instead of a 30-day window — so every CSV holds as much history as the vendor provides, and each daily run is self-healing (it re-fills any gap and refreshes revised values). The backfill numbers are in §06.
Three surprises the official doc didn't mention
The integration is small; the friction was entirely in the API's real behaviour diverging from its v2.4 PDF. Each surprise is handled in code so you never see it.
- Response is JSON you can use directly
- Symbol field is
chart_url - History is
{date: close}(flat) - Auth via an API key
- Double-encoded:
r.json()returns a string of JSON — decode twice - Field is
url(we accept both) - History is
{date: {"close": v}}(nested), ISO datetimes - HTTP Basic auth with your website login (email + password)
The Smart Money Index's real slug is vol_smi (an index level ~20,800), not model_smart_money (a 0–1 confidence reading). They share name-words, so the reconciler's fuzzy matcher nearly merged SMI's index values into your confidence file — which is exactly why st_sync refuses to auto-merge a fuzzy match (§05).
401 → credentials/account (was a wrong login). 404 → wrong URL. Timeout → network. 200 but AttributeError: 'str' has no .get → the body parsed to a string, i.e. double-encoded. Each status code points at a different layer; read the code before guessing.
The daily ingester, read top to bottom
Each block below is the real code on disk, followed by what each part does.
§1 Imports & config
import os
import sys
import json
import time
import datetime as dt
from pathlib import Path
import requests
import pandas as pd
# --- .env loader with inline-comment stripping (house pattern) -------------
def load_env_file(path: Path) -> None:
"""Minimal KEY=VALUE .env reader -> os.environ (real env vars win).
Strips inline # comments. NEVER put a comment after a secret value."""
if not path.exists():
return
for raw in path.read_text().splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, rest = line.partition("=")
val = rest.strip()
for sep in (" #", "\t#"):
if sep in val:
val = val[: val.index(sep)].strip()
break
val = val.strip('"').strip("'")
os.environ.setdefault(key.strip(), val)
_THIS_DIR = Path(__file__).parent
load_env_file(_THIS_DIR / ".env")
ST_USERNAME = os.environ.get("SENTIMENTRADER_USERNAME")
ST_PASSWORD = os.environ.get("SENTIMENTRADER_PASSWORD")
ST_BASE_URL = "https://api.sentimentrader.com/ws"
FEED_DIR = Path(r"G:\My Drive\Trading\Build Alpha\data\sentimentdatadailyfeed")
INCREMENTAL_DAYS = 30 # getSymbolDataLimitDays window for daily refresh
COURTESY_SLEEP_S = 0.6 # pause between symbol calls (no hard rate limit,
# but the doc asks API users to be courteous)
TIMEOUT_S = 30load_env_file()— a tiny.envreader. It splits eachKEY=VALUEline, then strips an inline# commentfrom the value (the bug that once broke the Finnhub key) and trims surrounding quotes.os.environ.setdefault(...)— real OS environment variables win over the.envfile, so you can override per-machine.ST_USERNAME / ST_PASSWORD— loaded from.env; used for HTTP Basic auth (the API has no key).FEED_DIR— the one folder every feed CSV lives in.INCREMENTAL_DAYS=30— the window for the fast path;COURTESY_SLEEP_S=0.6— a polite pause between calls (the doc says there's no hard rate limit, but to be courteous).
§2 Symbol map (slug → your file)
# The local FILENAME sometimes differs from the API slug (legacy names from
# the manual-download era), but the VALUE COLUMN is always "<slug>_close" —
# that is how these mappings were confirmed against the existing files.
# Default for any subscribed slug not listed here: <slug>.csv / <slug>_close.
#
# slug (chart_url) -> (local filename, value column)
ST_SYMBOL_MAP: dict[str, tuple[str, str]] = {
# --- the Light-API purchase (10) ---
"vol_vix_bonds": ("vol_vix_bonds.csv", "vol_vix_bonds_close"), # Bond Market VIX (MOVE)
"model_panic_euphoria": ("model_panic_euphoria.csv", "model_panic_euphoria_close"), # Panic/Euphoria Model
"pc_total": ("pc_total.csv", "pc_total_close"), # Total Put/Call Ratio
"vol_stock_bond_ratio": ("vol_stock_bond_ratio.csv", "vol_stock_bond_ratio_close"), # Stock/Bond Ratio
"vol_smi": ("vol_smi.csv", "vol_smi_close"), # Smart Money Index (SMI) — NEW series (no legacy CSV; ~20,800 index level)
"model_smart_dumb_spread": ("smart_dumb_spread.csv", "model_smart_dumb_spread_close"), # Smart/Dumb Confidence Spread (legacy filename!)
"model_cnn_fear_greed": ("model_cnn_fear_greed.csv", "model_cnn_fear_greed_close"), # Fear & Greed Model
"bitcoin_fng": ("bitcoin_fng.csv", "bitcoin_fng_close"), # Crypto Fear & Greed
"model_risk_stocks_short": ("model_risk_stocks_short.csv", "model_risk_stocks_short_close"), # Short-Term Risk Levels
"model_risk_stocks_medium": ("model_risk_stocks_medium.csv", "model_risk_stocks_medium_close"), # Medium-Term Risk Level
# --- map these too if ever added to the subscription ---
"survey_aaii_bulls": ("survey_aaii_bulls.csv", "survey_aaii_bulls_close"), # AAII bulls (ipo leg)
"survey_aaii_bears": ("survey_aaii_bears.csv", "survey_aaii_bears_close"), # AAII bears (ipo leg)
"etf_spy": ("spy_optix.csv", "etf_spy_close"), # SPY Optix (benched; legacy filename)
"model_bear_macro_spread": ("model_bear_macro_spread.csv", "model_bear_macro_spread_close"), # credit-leverage input (dormant)
}
# st_sync.py reconciles the subscription against your existing CSVs and writes
# the resolved mapping here; if present it OVERRIDES the hardcoded defaults
# above so the daily refresh writes to the very same files you matched.
_RESOLVED_MAP = _THIS_DIR / "st_symbol_map.json"
if _RESOLVED_MAP.exists():
try:
import json as _json
for _slug, _v in _json.loads(_RESOLVED_MAP.read_text(encoding="utf-8")).items():
if isinstance(_v, dict) and _v.get("file") and _v.get("col"):
ST_SYMBOL_MAP[_slug] = (_v["file"], _v["col"])
except Exception as _e:
print(f" [st_feed] could not load st_symbol_map.json: {_e}")ST_SYMBOL_MAP— the fallback mapping from each API slug to(filename, value-column). The filename often differs from the slug (legacy names from the manual-download era), but the column is always<slug>_close— that's how the mappings were confirmed against your files.- The
_RESOLVED_MAPblock — ifst_symbol_map.jsonexists (written by st_sync), it overrides these hardcoded defaults. That's the single-source-of-truth link: reconcile once in st_sync, and the daily feed writes to the very same files.
§3 API client — the heart of it
def _auth() -> requests.auth.HTTPBasicAuth:
if not (ST_USERNAME and ST_PASSWORD):
raise RuntimeError(
"SentimenTrader credentials not set. Add to the project .env "
"(no inline comments after the values!):\n"
" SENTIMENTRADER_USERNAME=your-sentimentrader.com-login-email\n"
" SENTIMENTRADER_PASSWORD=your-sentimentrader.com-password"
)
return requests.auth.HTTPBasicAuth(ST_USERNAME, ST_PASSWORD)
def _get(endpoint: str, params: dict | None = None):
"""GET an API endpoint, return parsed JSON. Raises on HTTP/auth errors."""
url = f"{ST_BASE_URL}{endpoint}"
r = requests.get(url, params=params or {}, auth=_auth(), timeout=TIMEOUT_S)
if r.status_code == 401:
raise RuntimeError(
"SentimenTrader API: 401 Unauthorized. The server is reachable and the "
"Basic-auth header was sent, so this is a CREDENTIALS / ACCOUNT issue, not a "
"code or .env-parsing bug. Verify in order: (1) the EXACT same email + password "
"log into sentimentrader.com in a browser; (2) API access is enabled on your "
"account — the Light API can need activation after purchase (their docs invite "
"setup questions); (3) the username is the website LOGIN (often an email), not a "
"display name. .env keys: SENTIMENTRADER_USERNAME / SENTIMENTRADER_PASSWORD. "
f"Server response: {(r.text or '')[:120]!r}")
if r.status_code != 200:
raise RuntimeError(f"SentimenTrader API HTTP {r.status_code} on "
f"{endpoint}: {r.text[:200]}")
raw = r.json()
# SentimenTrader DOUBLE-ENCODES: the JSON body is itself a JSON *string*
# (e.g. r.json() -> '"[{...}]"'), so decode a second time when needed.
if isinstance(raw, str):
try:
raw = json.loads(raw)
except Exception:
pass
return raw
def list_subscribed() -> list[dict]:
"""Normalised [{chart_name, chart_url}] for every subscribed indicator.
NOTE: the live API field is 'url' (the v2.4 PDF says 'chart_url'); we accept
both. Double-encoding is already undone in _get()."""
data = _get("/getAllUserSelectedSymbols")
items = data if isinstance(data, list) else (
data.get("data", []) if isinstance(data, dict) else [])
out = []
for it in items:
if not isinstance(it, dict):
continue
out.append({
"chart_name": it.get("chart_name") or it.get("name") or "",
"chart_url": it.get("chart_url") or it.get("url") or "",
})
return out
def _parse_rows(data) -> pd.DataFrame:
"""Normalise an API history payload into a [Date, close] frame.
The doc says JSON 'in the format of "date: close"' — handle both shapes:
list of {date:..., close:...} objects, OR a {date: close} mapping.
"""
rows: list[tuple[str, float]] = []
if isinstance(data, dict):
# Live shape is {date: {"close": value}}; also accept flat {date: value}.
for k, v in data.items():
c = v.get("close", v.get("Close")) if isinstance(v, dict) else v
if c is not None:
rows.append((k, c))
elif isinstance(data, list):
for item in data:
if isinstance(item, dict):
d_ = item.get("date") or item.get("Date") or item.get("datetimestamp")
c_ = item.get("close") if "close" in item else item.get("Close")
if d_ is not None and c_ is not None:
rows.append((d_, c_))
df = pd.DataFrame(rows, columns=["Date", "close"])
if df.empty:
return df
df["Date"] = pd.to_datetime(df["Date"], format="mixed", errors="coerce")
df["close"] = pd.to_numeric(df["close"], errors="coerce")
df = df.dropna().sort_values("Date").reset_index(drop=True)
return df
def fetch_symbol(slug: str, full: bool = True) -> pd.DataFrame:
"""Fetch a symbol's history. Default full=True -> /getSymbolData returns
the WHOLE history the API has for the slug (as long as it provides). Pass
full=False for the fast last-30-day tail (/getSymbolDataLimitDays)."""
if full:
data = _get(f"/getSymbolData/{slug}")
else:
data = _get(f"/getSymbolDataLimitDays/{slug}", {"days": INCREMENTAL_DAYS})
return _parse_rows(data)_auth()— builds theHTTPBasicAuthobject, or raises a clear setup message if the credentials are missing._get()— one GET helper for every endpoint. The 401 branch explains it's an account/login issue (not a code bug). The crucial lines: afterr.json(), if the result is a string, it's double-encoded →json.loads()it again. Every endpoint flows through here, so the double-decode is fixed in exactly one place.list_subscribed()— calls/getAllUserSelectedSymbolsand normalises each item to{chart_name, chart_url}, accepting the realurlfield or the PDF'schart_url._parse_rows()— turns a history payload into a tidy[Date, close]DataFrame. The dict branch handles the real nested shape{date: {"close": v}}(and flat{date: v}); dates are parsed withformat="mixed"so ISO datetimes and US dates both work; non-numeric/NaT rows are dropped.fetch_symbol(slug, full=True)— full=True (the new default) calls/getSymbolData= the whole history the API has;full=Falseis the 30-day tail.
§4 CSV merge-writer
def merge_into_csv(slug: str, new_df: pd.DataFrame) -> tuple[Path, int, str]:
"""Merge fetched rows into the canonical feed CSV (Date, <value_col>).
- Existing rows are kept; API rows win on date collisions (keep='last').
- `_old.csv` companions are NEVER touched (history archives).
Returns (path, n_new_rows, latest_date_str).
"""
fname, value_col = ST_SYMBOL_MAP.get(slug, (f"{slug}.csv", f"{slug}_close"))
path = FEED_DIR / fname
incoming = new_df.rename(columns={"close": value_col})[["Date", value_col]]
if path.exists():
try:
existing = pd.read_csv(path)
dcol = next((c for c in existing.columns if c.strip().lower() == "date"), None)
vcol = next((c for c in existing.columns if c != dcol), value_col)
existing = existing.rename(columns={dcol: "Date", vcol: value_col})
existing["Date"] = pd.to_datetime(existing["Date"], format="mixed",
errors="coerce")
existing = existing.dropna(subset=["Date"])
except Exception:
existing = pd.DataFrame(columns=["Date", value_col])
else:
existing = pd.DataFrame(columns=["Date", value_col])
before = len(existing)
merged = (pd.concat([existing, incoming])
.drop_duplicates(subset="Date", keep="last")
.sort_values("Date").reset_index(drop=True))
n_new = len(merged) - before
out = merged.copy()
out["Date"] = out["Date"].dt.strftime("%Y-%m-%d") # ISO, matches recent files
out.to_csv(path, index=False)
latest = out["Date"].iloc[-1] if len(out) else "—"
return path, n_new, latestmerge_into_csv()— writes fetched rows into the canonical CSV without losing yours.- It reads the existing file, renames its columns to
Date/<value_col>, thenconcat+drop_duplicates(subset="Date", keep="last")— so API rows win on overlapping dates while your non-overlapping history is preserved. - Dates are written back as
YYYY-MM-DD(matching your recent files), and_old.csvarchives are never touched.
§5 Orchestration
def refresh_all(full: bool = True, quiet: bool = False) -> dict:
"""Fetch every subscribed symbol (FULL history by default) and merge into
the feed CSVs.
Returns {slug: {"file":..., "new_rows":..., "latest":...}} (or {"error":..}).
Never raises on a single-symbol failure — fetches the rest.
"""
say = (lambda *a: None) if quiet else print
subs = list_subscribed()
say(f" [st_feed] subscription has {len(subs)} symbols")
unmapped = [s.get("chart_url") for s in subs if s.get("chart_url") not in ST_SYMBOL_MAP]
if unmapped:
say(f" [st_feed] NOTE — unmapped slugs (will write <slug>.csv defaults): {unmapped}")
results: dict[str, dict] = {}
for s in subs:
slug = s.get("chart_url")
if not slug:
continue
try:
df = fetch_symbol(slug, full=full)
if df.empty:
results[slug] = {"error": "empty payload"}
say(f" [st_feed] {slug}: EMPTY payload")
else:
path, n_new, latest = merge_into_csv(slug, df)
results[slug] = {"file": path.name, "new_rows": n_new, "latest": latest}
say(f" [st_feed] {slug:26} -> {path.name:30} +{n_new} rows, latest {latest}")
except Exception as e:
results[slug] = {"error": str(e)}
say(f" [st_feed] {slug}: FAILED — {e}")
time.sleep(COURTESY_SLEEP_S)
return results
def check() -> bool:
"""Credential + connectivity check. True if the API is reachable."""
try:
subs = list_subscribed()
except RuntimeError as e:
print(f"[st_feed] CHECK FAILED: {e}")
return False
print(f"[st_feed] OK — authenticated. {len(subs)} subscribed symbols:")
for s in subs:
slug = s.get("chart_url", "?")
mapped = ST_SYMBOL_MAP.get(slug, (f"{slug}.csv (default)",))[0]
print(f" {s.get('chart_name','?'):42} slug={slug:28} -> {mapped}")
return Truerefresh_all(full=True)— the daily entry point: list the subscription, then for each slug fetch + merge, collecting a per-slug result. A single symbol failing never stops the rest (it's caught and recorded).check()— the--checkcredential/connectivity probe: authenticates and prints every subscribed symbol next to the file it maps to.
§6 CLI
if __name__ == "__main__":
args = set(sys.argv[1:])
if "--check" in args:
sys.exit(0 if check() else 1)
full = "--incremental" not in args # DEFAULT = full history
print(f"[st_feed] {'FULL-history' if full else 'incremental (30d)'} refresh starting...")
res = refresh_all(full=full)
fails = {k: v for k, v in res.items() if "error" in v}
print(f"[st_feed] done: {len(res) - len(fails)} ok, {len(fails)} failed")
sys.exit(1 if fails and not (len(res) - len(fails)) else 0)- Run directly: default is now full history;
--incrementalis the fast 30-day path;--checkverifies auth. The exit code is non-zero only if everything failed.
The reconciler — match without making duplicates
This is the file that honours your rule: prefer YOUR naming, ignore slug-named duplicates, never corrupt a file on a fuzzy guess.
§1 Imports
import sys
import json
import time
import datetime as dt
from pathlib import Path
import pandas as pd
import st_feed # reuse: list_subscribed, fetch_symbol, FEED_DIR, creds, sleep
FEED_DIR = st_feed.FEED_DIR
THIS_DIR = Path(__file__).parent
MAP_FILE = THIS_DIR / "st_symbol_map.json"
OVERRIDE_FILE = THIS_DIR / "st_sync_overrides.json"
PLAN_FILE = THIS_DIR / "st_sync_plan.json"
# filename artifacts that mark a likely duplicate/copy (deprioritised)
_ARTIFACT_MARKERS = (" (1)", " (2)", " (3)", " - copy", " copy", "- copy")- It reuses st_feed's API client (
import st_feed) — no duplicated networking. Defines the paths for the resolved map, your overrides, and the saved plan, plus the list of filename_ARTIFACT_MARKERS((1),- copy, …) used to spot duplicate copies.
§2 Index your existing CSVs (cheap)
def _is_artifact(name: str) -> bool:
"""True for copy/duplicate artifacts: 'x (1).csv', 'x - Copy.csv', 'x .csv'."""
stem = name[:-4] if name.lower().endswith(".csv") else name
low = stem.lower()
return stem != stem.strip() or any(m in low for m in _ARTIFACT_MARKERS)
def build_csv_index() -> list[dict]:
"""Scan the feed folder once; return one record per CSV (ignoring *_old.csv).
Reads ONLY the header row to learn each file's columns."""
index = []
for p in sorted(FEED_DIR.glob("*.csv")):
if p.name.lower().endswith("_old.csv"):
continue
try:
cols = list(pd.read_csv(p, nrows=0).columns)
except Exception:
cols = []
stem = p.name[:-4]
index.append({
"path": p, "name": p.name, "stem": stem,
"cols": cols, "cols_low": [c.strip().lower() for c in cols],
"artifact": _is_artifact(p.name),
})
return index
def _date_col(rec: dict) -> str | None:
for c in rec["cols"]:
if c.strip().lower() in ("date", "datetime"):
return c
return None
def _file_stats(rec: dict, value_col: str) -> tuple[int, str | None]:
"""(row_count, latest_date_iso) for tie-breaks/report. Reads the file."""
dcol = _date_col(rec)
if not dcol:
return 0, None
try:
df = pd.read_csv(rec["path"], usecols=[dcol])
dts = pd.to_datetime(df[dcol], format="mixed", errors="coerce").dropna()
if dts.empty:
return 0, None
return len(dts), dts.max().strftime("%Y-%m-%d")
except Exception:
return 0, None_is_artifact()— flags copy/duplicate files by name (x (1).csv,x - Copy.csv, trailing-space).build_csv_index()— scans the folder once, reading only each file's header row (fast) to learn its columns; skips_old.csvarchives._file_stats()— for the few real candidates, reads the date column to get row-count and latest date (used for tie-breaks and the report).
§3 The matcher (the core logic)
def match_slug(slug: str, chart_name: str, index: list[dict],
overrides: dict) -> dict:
"""Return the match decision for one slug:
{slug, chart_name, value_col, chosen, candidates[], ignored_dupes[], reason}
"""
val_col = f"{slug}_close"
# 0) explicit override wins, no questions asked
if slug in overrides:
rec = next((r for r in index if r["name"] == overrides[slug]), None)
col = (val_col if rec and val_col in rec["cols"]
else (rec["cols"][1] if rec and len(rec["cols"]) > 1 else val_col))
return {"slug": slug, "chart_name": chart_name, "value_col": col,
"chosen": overrides[slug], "candidates": [], "ignored_dupes": [],
"reason": "pinned in st_sync_overrides.json"}
# 1) candidates: data-column match (strongest), else filename, else fuzzy
val_matches = [r for r in index if val_col in r["cols"]]
name_matches = [r for r in index if r["stem"] == slug]
if val_matches:
pool, basis = val_matches, "data column <slug>_close"
elif name_matches:
pool, basis = name_matches, "filename == slug"
else:
toks = set(slug.lower().split("_")) | set(chart_name.lower().split())
pool = [r for r in index
if slug.lower() in r["stem"].lower()
or r["stem"].lower() in slug.lower()
or len(toks & set(r["stem"].lower().replace("_", " ").split())) >= 2]
basis = "fuzzy name/token overlap"
if not pool:
return {"slug": slug, "chart_name": chart_name, "value_col": val_col,
"chosen": None, "candidates": [], "ignored_dupes": [],
"reason": "NO existing CSV matched (new series; create with --create-new)"}
# Fuzzy name overlap is NOT safe to auto-merge: different series share name
# tokens (e.g. vol_smi 'Smart Money Index' ~20,800 vs model_smart_money
# 'Smart Money Confidence' 0-1). Surface the candidate but treat as
# UNMATCHED, so apply creates a fresh <slug>.csv instead of corrupting a file.
if basis.startswith("fuzzy"):
sugg = [r["name"] for r in pool][:3]
return {"slug": slug, "chart_name": chart_name, "value_col": val_col,
"chosen": None,
"candidates": [{"file": s, "score": "fuzzy", "rows": "-", "latest": "-"}
for s in sugg],
"ignored_dupes": [],
"reason": (f"NO safe match — only fuzzy name overlap with {sugg} "
f"(different data column). Will CREATE {slug}.csv on "
f"apply --create-new; pin an override to merge instead.")}
# 2) score each candidate
# a deliberate (non-artifact) custom-named match = your own naming for this series
has_custom_named = any(r["stem"] != slug and not r["artifact"] for r in pool)
scored = []
for r in pool:
rows, latest = _file_stats(r, val_col if val_col in r["cols"] else "")
score = 0.0
if val_col in r["cols"]:
score += 1000
if r["stem"] == slug:
score += 200
# "ignore the slug-named duplicate when MY naming exists" (user's rule)
if has_custom_named and val_col in r["cols"]:
score -= 600
# copy/duplicate artifacts must never beat a clean file (only chosen if
# they are the ONLY candidate, where the penalty cancels out across them)
score += -10000 if r["artifact"] else 40
score += min(rows, 20000) / 1000.0 # longer history
if latest: # fresher wins ties
try:
age = (dt.date.today() - dt.date.fromisoformat(latest)).days
score += max(0, 90 - age) / 30.0
except Exception:
pass
scored.append({"rec": r, "score": round(score, 2),
"rows": rows, "latest": latest})
scored.sort(key=lambda x: x["score"], reverse=True)
best = scored[0]
col = (val_col if val_col in best["rec"]["cols"]
else (best["rec"]["cols"][1] if len(best["rec"]["cols"]) > 1 else val_col))
ignored = [s["rec"]["name"] for s in scored[1:]
if s["rec"]["stem"] == slug and val_col in s["rec"]["cols"]]
return {
"slug": slug, "chart_name": chart_name, "value_col": col,
"chosen": best["rec"]["name"],
"chosen_rows": best["rows"], "chosen_latest": best["latest"],
"candidates": [{"file": s["rec"]["name"], "score": s["score"],
"rows": s["rows"], "latest": s["latest"]} for s in scored],
"ignored_dupes": ignored,
"reason": f"matched by {basis}" + (
f"; ignoring slug-named duplicate(s) in favour of your naming"
if ignored else ""),
}
def build_plan() -> list[dict]:
subs = st_feed.list_subscribed()
index = build_csv_index()
overrides = {}
if OVERRIDE_FILE.exists():
try:
overrides = json.loads(OVERRIDE_FILE.read_text(encoding="utf-8"))
except Exception as e:
print(f" [st_sync] could not read overrides: {e}")
plan = []
for s in subs:
slug = s.get("chart_url")
if not slug:
continue
plan.append(match_slug(slug, s.get("chart_name", ""), index, overrides))
return plan- 0) Override — if you pinned a file in
st_sync_overrides.json, that wins immediately. - 1) Candidate pool — strongest signal first: files whose data column is
<slug>_close(same series), else exact filename, else fuzzy name/token overlap. - Fuzzy guard — if the only basis is fuzzy, it refuses to auto-merge (returns unmatched) and suggests creating
<slug>.csvinstead. This is what stopped SMI (vol_smi) from corruptingmodel_smart_money.csv. - 2) Scoring — data-column match
+1000; exact slug-name+200but−600when a non-artifact custom-named file also matches (so YOUR name wins and the slug-named duplicate is demoted); artifacts−10000(a clean file always beats a copy); small bonuses for longer history and freshness. ignored_dupes— the slug-named files it deliberately leaves untouched, surfaced in the report so the behaviour is visible.build_plan()— runs the matcher over every subscribed slug and returns the full plan.
§4 Report + the resolved map
def print_plan(plan: list[dict]) -> None:
print(f"\n {'SLUG':26} {'->':2} {'CHOSEN CSV':32} {'col':30} note")
print(" " + "-" * 110)
for m in plan:
chosen = m["chosen"] or "(none — unmatched)"
line = f" {m['slug']:26} -> {chosen:32} {m['value_col']:30} {m['reason']}"
print(line)
if m["chosen"]:
print(f" rows={m.get('chosen_rows','?')} latest={m.get('chosen_latest','?')}")
if m["ignored_dupes"]:
print(f" IGNORED slug-duplicate(s): {', '.join(m['ignored_dupes'])}")
if len(m["candidates"]) > 1:
alts = "; ".join(f"{c['file']}({c['score']})" for c in m["candidates"][1:4])
print(f" other candidates: {alts}")
unmatched = [m["slug"] for m in plan if not m["chosen"]]
if unmatched:
print(f"\n UNMATCHED (skipped unless --create-new): {', '.join(unmatched)}")
def resolved_map(plan: list[dict], create_new: bool = False) -> dict:
"""slug -> {file, col} for the daily refresh. Includes newly-created
<slug>.csv files only when create_new (so the map matches what apply wrote)."""
mp = {}
for m in plan:
f = m["chosen"] or (f"{m['slug']}.csv" if create_new else None)
if f:
mp[m["slug"]] = {"file": f, "col": m["value_col"]}
return mp
def write_symbol_map(mp: dict) -> None:
MAP_FILE.write_text(json.dumps(mp, indent=2), encoding="utf-8")
print(f" [st_sync] wrote resolved map -> {MAP_FILE.name} "
f"({len(mp)} slugs) — st_feed.py uses it for the daily refresh")print_plan()— the human-readable dry-run table (chosen file, rows, latest, ignored duplicates, other candidates).resolved_map()/write_symbol_map()— writest_symbol_map.json(slug → {file, col}), including newly-created files when--create-new— so st_feed's daily run targets exactly what was applied.
§5 Apply — backfill full history
def merge_history(target: Path, value_col: str, new_df: pd.DataFrame,
overwrite: bool, force: bool) -> tuple[int, str, str]:
"""Write fetched history into `target` (Date,<value_col>). Returns
(n_added, latest, mode). Refuses multi-column files unless --force."""
incoming = new_df.rename(columns={"close": value_col})[["Date", value_col]].copy()
if target.exists() and not overwrite:
existing = pd.read_csv(target)
dcol = next((c for c in existing.columns if c.strip().lower() == "date"), None)
if dcol is None or len(existing.columns) != 2:
if not force:
raise RuntimeError(
f"{target.name} is not a clean 2-column Date/value file "
f"(cols={list(existing.columns)}); refusing to merge without --force")
vcol = next((c for c in existing.columns if c != dcol), value_col)
existing = existing.rename(columns={dcol: "Date", vcol: value_col})
existing["Date"] = pd.to_datetime(existing["Date"], format="mixed",
errors="coerce")
existing = existing.dropna(subset=["Date"])
before = len(existing)
merged = (pd.concat([existing, incoming])
.drop_duplicates(subset="Date", keep="last")
.sort_values("Date").reset_index(drop=True))
mode = "merged (append + replace overlap)"
else:
merged = incoming.sort_values("Date").reset_index(drop=True)
before = 0
mode = "overwrote (full rebuild)" if target.exists() else "created"
out = merged.copy()
out["Date"] = out["Date"].dt.strftime("%Y-%m-%d")
out.to_csv(target, index=False)
latest = out["Date"].iloc[-1] if len(out) else "—"
return len(merged) - before, latest, mode
def apply_plan(plan: list[dict], overwrite: bool, create_new: bool,
force: bool) -> None:
write_symbol_map(resolved_map(plan, create_new))
print(f"\n [st_sync] APPLY — full-history "
f"{'OVERWRITE' if overwrite else 'merge'} into matched files\n")
for m in plan:
slug = m["slug"]
target_name = m["chosen"]
if not target_name:
if not create_new:
print(f" {slug:26} SKIP (unmatched; use --create-new to make {slug}.csv)")
continue
target_name = f"{slug}.csv"
target = FEED_DIR / target_name
try:
df = st_feed.fetch_symbol(slug, full=True)
if df.empty:
print(f" {slug:26} EMPTY payload — skipped")
continue
n_add, latest, mode = merge_history(target, m["value_col"], df,
overwrite, force)
print(f" {slug:26} -> {target_name:32} {mode}: +{n_add} rows, "
f"now {len(df)}+ pts, latest {latest}")
if m["ignored_dupes"]:
print(f" (left untouched: {', '.join(m['ignored_dupes'])})")
except Exception as e:
print(f" {slug:26} FAILED — {e}")
time.sleep(st_feed.COURTESY_SLEEP_S)
print("\n [st_sync] done. The daily 07:00 fetch will now keep these files current.")merge_history()— the writer. Default = merge (append new dates + replace overlapping);--overwriterebuilds the file clean. It refuses a multi-column / OHLCV file unless--force, so a stray match can't clobber a price file.apply_plan()— writes the map, then for each match fetches full history (fetch_symbol(full=True)) and merges it; unmatched slugs are skipped unless--create-new.
§6 CLI
def main():
args = sys.argv[1:]
cmd = next((a for a in args if not a.startswith("-")), "plan")
overwrite = "--overwrite" in args
create_new = "--create-new" in args
force = "--force" in args
if not (st_feed.ST_USERNAME and st_feed.ST_PASSWORD):
print("[st_sync] SentimenTrader credentials missing. Add to .env "
"(no inline comments):\n"
" SENTIMENTRADER_USERNAME=your-sentimentrader.com-login-email\n"
" SENTIMENTRADER_PASSWORD=your-website-password")
return 1
if cmd == "symbols":
subs = st_feed.list_subscribed()
print(f"[st_sync] {len(subs)} subscribed symbols:")
for s in subs:
print(f" {s.get('chart_name','?'):44} slug = {s.get('chart_url','?')}")
return 0
plan = build_plan()
if cmd == "apply":
apply_plan(plan, overwrite, create_new, force)
else: # plan / default
print("[st_sync] DRY RUN (plan) — no files written. Review, then run "
"`python st_sync.py apply`.")
print_plan(plan)
write_symbol_map(resolved_map(plan, create_new))
PLAN_FILE.write_text(json.dumps(plan, indent=2, default=str), encoding="utf-8")
print(f" [st_sync] plan saved -> {PLAN_FILE.name}")
return 0
if __name__ == "__main__":
sys.exit(main())- Sub-commands:
symbols(list),plan(default — dry run, writes nothing),apply(with--overwrite/--create-new/--force). Safe-by-default: you always preview before any write.
"As much history as the API provides" — the result
The change was four small edits: fetch_symbol and refresh_all now default to full=True, the daily caller passes full=True, and the CLI flips to --incremental as the opt-out. The first full run pulled, per indicator:
| Slug → file | Rows pulled | Latest |
|---|---|---|
| vol_stock_bond_ratio → vol_stock_bond_ratio.csv | +14,163 | 2026-06-10 |
| vol_smi → vol_smi.csv (new) | +13,898 | 2026-06-09 |
| vol_vix_bonds → MOVE.csv | +9,354 | 2026-06-10 |
| pc_total → pc_total.csv | +7,715 | 2026-06-10 |
| model_cnn_fear_greed → model_cnn_fear_greed.csv | +7,083 | 2026-06-10 |
| model_smart_dumb_spread → smart_dumb_spread.csv | +6,857 | 2026-06-10 |
| model_risk_stocks_short / medium | +5,578 / +5,572 | 2026-06-10 |
| bitcoin_fng → bitcoin_fng.csv | +2,927 | 2026-06-11 |
| model_panic_euphoria → model_panic_euphoria.csv | +1,951 | 2026-06-05 |
Every contrarian percentile in senti-euphoria and every band on the coming move-vix tile is computed against history. A 30-day window gives a noisy rank; decades give a calibrated one. Pulling full history isn't hoarding data — it sharpens every reading built on top of it.
One source of truth, and what's next
The link that keeps it coherent: st_sync writes st_symbol_map.json; st_feed reads it. Reconcile once, and the daily 07:00 task keeps all ten files current automatically — your manual downloads (and the old convert-notebook, for these slugs) are now obsolete.
- Auth, double-decode, nested parse — all handled
- 10 symbols matched to your files, no duplicates created
- Full history backfilled + merged daily
- Downstream (senti-euphoria, AAII leg) unchanged
move-vixcore tile from MOVE.csv percentilessenti-euphoria v2: swap in ST Panic/Euphoria + Put/Call + Smart-Dumb legs- Composite v2.0 stamp + 4-week parallel run
- Retire RunConvertSentimentNotebook for these slugs
Collection is cheap; attention is the scarce resource. This layer makes the data effortless and always-current so the judgment — which regime are we in, what beats or misses what's priced — stays the thing you spend on. Better inputs, same disciplined read.