LECTURE · FRAMEWORK
Build log · the SentimenTrader data layer

From a paid API to your dashboard — every file, every line

How the 10 SentimenTrader indicators get fetched (full history), reconciled against your existing CSVs without making duplicates, and folded into the AI-cycle tracker — plus a section-by-section reading of the two Python files that do it. Use Aa · Theme (top-right) to set the font, colour, size and line width; your choice is remembered.

2 new files10 indicatorsfull history auto-merged daily
01 — The cast

The files, and what each one does

Two new files do the work; three existing ones consume what they produce. Nothing downstream had to change.

Files involved in the SentimenTrader layer
FileRoleRuns
st_feed.py NEWThe daily ingester. Pulls every subscribed indicator's full history from the API and merges it into the feed CSVs. Importable (refresh_all()) and a CLI.Every morning, inside the 07:00 fetch
st_sync.py NEWThe one-time reconciler. Matches each API slug to your existing CSV (never creating slug-named duplicates), backfills full history, and writes the resolved map. Safe-by-default dry run.Once at onboarding (and to repair)
st_symbol_map.jsonThe single source of truth: slug → {file, col}. Written by st_sync, read by st_feed — so both write to the same files.Data file
sentiment_feed.pyExisting. Builds the senti-euphoria composite from the feed CSVs. Unchanged — it just sees fresher data.Inside fetch_indicators
ipo_froth_fetch.pyExisting. Its AAII leg reads the feed CSVs. Unchanged.Inside fetch_indicators
fetch_indicators.pyExisting orchestrator. Gained ~10 lines that call st_feed.refresh_all() before building the sentiment indicators.The 07:00 daily task
The one design decision that made it painless

The ingester writes into the exact CSV schema your tracker already reads (Date + <slug>_close). So the API replaces your manual downloads underneath the existing pipeline — sentiment_feed.py, the AAII leg, and the staleness gates never knew the difference. New plumbing, same taps.

02 — The data flow

From subscription to dashboard, in one path

Follow one indicator — say Total Put/Call — from the vendor to a tile on your dashboard:

  1. Subscription — the API lists what you bought: {chart_name, url} per indicator.
  2. st_sync (once) — matches slug pc_total to your pc_total.csv, records it in st_symbol_map.json.
  3. st_feed (daily) — fetches pc_total's entire history, merges it into pc_total.csv.
  4. sentiment_feed.py — reads pc_total.csv as one leg of the contrarian senti-euphoria composite.
  5. fetch_indicators.py — scores it, writes indicators_data.json + the regime block.
  6. Dashboardsenti-euphoria renders as a core tile; the value traces back, unbroken, to the API.
What changed this build

The fetch now defaults to FULL history (the API's /getSymbolData) instead of a 30-day window — so every CSV holds as much history as the vendor provides, and each daily run is self-healing (it re-fills any gap and refreshes revised values). The backfill numbers are in §06.

03 — The API (and where the PDF was wrong)

Three surprises the official doc didn't mention

The integration is small; the friction was entirely in the API's real behaviour diverging from its v2.4 PDF. Each surprise is handled in code so you never see it.

▼ What the PDF said
  • Response is JSON you can use directly
  • Symbol field is chart_url
  • History is {date: close} (flat)
  • Auth via an API key
▲ What it actually is (handled)
  • Double-encoded: r.json() returns a string of JSON — decode twice
  • Field is url (we accept both)
  • History is {date: {"close": v}} (nested), ISO datetimes
  • HTTP Basic auth with your website login (email + password)
Bonus gotcha — a wrong slug

The Smart Money Index's real slug is vol_smi (an index level ~20,800), not model_smart_money (a 0–1 confidence reading). They share name-words, so the reconciler's fuzzy matcher nearly merged SMI's index values into your confidence file — which is exactly why st_sync refuses to auto-merge a fuzzy match (§05).

The debugging ladder that found it

401 → credentials/account (was a wrong login). 404 → wrong URL. Timeout → network. 200 but AttributeError: 'str' has no .get → the body parsed to a string, i.e. double-encoded. Each status code points at a different layer; read the code before guessing.

04 — st_feed.py, section by section

The daily ingester, read top to bottom

Each block below is the real code on disk, followed by what each part does.

§1 Imports & config

import os
import sys
import json
import time
import datetime as dt
from pathlib import Path

import requests
import pandas as pd

# --- .env loader with inline-comment stripping (house pattern) -------------
def load_env_file(path: Path) -> None:
    """Minimal KEY=VALUE .env reader -> os.environ (real env vars win).
    Strips inline # comments. NEVER put a comment after a secret value."""
    if not path.exists():
        return
    for raw in path.read_text().splitlines():
        line = raw.strip()
        if not line or line.startswith("#") or "=" not in line:
            continue
        key, _, rest = line.partition("=")
        val = rest.strip()
        for sep in (" #", "\t#"):
            if sep in val:
                val = val[: val.index(sep)].strip()
                break
        val = val.strip('"').strip("'")
        os.environ.setdefault(key.strip(), val)


_THIS_DIR = Path(__file__).parent
load_env_file(_THIS_DIR / ".env")

ST_USERNAME = os.environ.get("SENTIMENTRADER_USERNAME")
ST_PASSWORD = os.environ.get("SENTIMENTRADER_PASSWORD")
ST_BASE_URL = "https://api.sentimentrader.com/ws"

FEED_DIR = Path(r"G:\My Drive\Trading\Build Alpha\data\sentimentdatadailyfeed")

INCREMENTAL_DAYS = 30      # getSymbolDataLimitDays window for daily refresh
COURTESY_SLEEP_S = 0.6     # pause between symbol calls (no hard rate limit,
                           # but the doc asks API users to be courteous)
TIMEOUT_S = 30
  • load_env_file() — a tiny .env reader. It splits each KEY=VALUE line, then strips an inline # comment from the value (the bug that once broke the Finnhub key) and trims surrounding quotes.
  • os.environ.setdefault(...) — real OS environment variables win over the .env file, so you can override per-machine.
  • ST_USERNAME / ST_PASSWORD — loaded from .env; used for HTTP Basic auth (the API has no key).
  • FEED_DIR — the one folder every feed CSV lives in.
  • INCREMENTAL_DAYS=30 — the window for the fast path; COURTESY_SLEEP_S=0.6 — a polite pause between calls (the doc says there's no hard rate limit, but to be courteous).

§2 Symbol map (slug → your file)

# The local FILENAME sometimes differs from the API slug (legacy names from
# the manual-download era), but the VALUE COLUMN is always "<slug>_close" —
# that is how these mappings were confirmed against the existing files.
# Default for any subscribed slug not listed here: <slug>.csv / <slug>_close.
#
#   slug (chart_url)            -> (local filename,               value column)
ST_SYMBOL_MAP: dict[str, tuple[str, str]] = {
    # --- the Light-API purchase (10) ---
    "vol_vix_bonds":            ("vol_vix_bonds.csv",            "vol_vix_bonds_close"),             # Bond Market VIX (MOVE)
    "model_panic_euphoria":     ("model_panic_euphoria.csv",     "model_panic_euphoria_close"),      # Panic/Euphoria Model
    "pc_total":                 ("pc_total.csv",                 "pc_total_close"),                  # Total Put/Call Ratio
    "vol_stock_bond_ratio":     ("vol_stock_bond_ratio.csv",     "vol_stock_bond_ratio_close"),      # Stock/Bond Ratio
    "vol_smi":                  ("vol_smi.csv",                  "vol_smi_close"),                   # Smart Money Index (SMI) — NEW series (no legacy CSV; ~20,800 index level)
    "model_smart_dumb_spread":  ("smart_dumb_spread.csv",        "model_smart_dumb_spread_close"),   # Smart/Dumb Confidence Spread (legacy filename!)
    "model_cnn_fear_greed":     ("model_cnn_fear_greed.csv",     "model_cnn_fear_greed_close"),      # Fear & Greed Model
    "bitcoin_fng":              ("bitcoin_fng.csv",              "bitcoin_fng_close"),               # Crypto Fear & Greed
    "model_risk_stocks_short":  ("model_risk_stocks_short.csv",  "model_risk_stocks_short_close"),   # Short-Term Risk Levels
    "model_risk_stocks_medium": ("model_risk_stocks_medium.csv", "model_risk_stocks_medium_close"),  # Medium-Term Risk Level
    # --- map these too if ever added to the subscription ---
    "survey_aaii_bulls":        ("survey_aaii_bulls.csv",        "survey_aaii_bulls_close"),         # AAII bulls (ipo leg)
    "survey_aaii_bears":        ("survey_aaii_bears.csv",        "survey_aaii_bears_close"),         # AAII bears (ipo leg)
    "etf_spy":                  ("spy_optix.csv",                "etf_spy_close"),                   # SPY Optix (benched; legacy filename)
    "model_bear_macro_spread":  ("model_bear_macro_spread.csv",  "model_bear_macro_spread_close"),   # credit-leverage input (dormant)
}

# st_sync.py reconciles the subscription against your existing CSVs and writes
# the resolved mapping here; if present it OVERRIDES the hardcoded defaults
# above so the daily refresh writes to the very same files you matched.
_RESOLVED_MAP = _THIS_DIR / "st_symbol_map.json"
if _RESOLVED_MAP.exists():
    try:
        import json as _json
        for _slug, _v in _json.loads(_RESOLVED_MAP.read_text(encoding="utf-8")).items():
            if isinstance(_v, dict) and _v.get("file") and _v.get("col"):
                ST_SYMBOL_MAP[_slug] = (_v["file"], _v["col"])
    except Exception as _e:
        print(f"  [st_feed] could not load st_symbol_map.json: {_e}")
  • ST_SYMBOL_MAP — the fallback mapping from each API slug to (filename, value-column). The filename often differs from the slug (legacy names from the manual-download era), but the column is always <slug>_close — that's how the mappings were confirmed against your files.
  • The _RESOLVED_MAP block — if st_symbol_map.json exists (written by st_sync), it overrides these hardcoded defaults. That's the single-source-of-truth link: reconcile once in st_sync, and the daily feed writes to the very same files.

§3 API client — the heart of it

def _auth() -> requests.auth.HTTPBasicAuth:
    if not (ST_USERNAME and ST_PASSWORD):
        raise RuntimeError(
            "SentimenTrader credentials not set. Add to the project .env "
            "(no inline comments after the values!):\n"
            "  SENTIMENTRADER_USERNAME=your-sentimentrader.com-login-email\n"
            "  SENTIMENTRADER_PASSWORD=your-sentimentrader.com-password"
        )
    return requests.auth.HTTPBasicAuth(ST_USERNAME, ST_PASSWORD)


def _get(endpoint: str, params: dict | None = None):
    """GET an API endpoint, return parsed JSON. Raises on HTTP/auth errors."""
    url = f"{ST_BASE_URL}{endpoint}"
    r = requests.get(url, params=params or {}, auth=_auth(), timeout=TIMEOUT_S)
    if r.status_code == 401:
        raise RuntimeError(
            "SentimenTrader API: 401 Unauthorized. The server is reachable and the "
            "Basic-auth header was sent, so this is a CREDENTIALS / ACCOUNT issue, not a "
            "code or .env-parsing bug. Verify in order: (1) the EXACT same email + password "
            "log into sentimentrader.com in a browser; (2) API access is enabled on your "
            "account — the Light API can need activation after purchase (their docs invite "
            "setup questions); (3) the username is the website LOGIN (often an email), not a "
            "display name. .env keys: SENTIMENTRADER_USERNAME / SENTIMENTRADER_PASSWORD. "
            f"Server response: {(r.text or '')[:120]!r}")
    if r.status_code != 200:
        raise RuntimeError(f"SentimenTrader API HTTP {r.status_code} on "
                           f"{endpoint}: {r.text[:200]}")
    raw = r.json()
    # SentimenTrader DOUBLE-ENCODES: the JSON body is itself a JSON *string*
    # (e.g. r.json() -> '"[{...}]"'), so decode a second time when needed.
    if isinstance(raw, str):
        try:
            raw = json.loads(raw)
        except Exception:
            pass
    return raw


def list_subscribed() -> list[dict]:
    """Normalised [{chart_name, chart_url}] for every subscribed indicator.
    NOTE: the live API field is 'url' (the v2.4 PDF says 'chart_url'); we accept
    both. Double-encoding is already undone in _get()."""
    data = _get("/getAllUserSelectedSymbols")
    items = data if isinstance(data, list) else (
        data.get("data", []) if isinstance(data, dict) else [])
    out = []
    for it in items:
        if not isinstance(it, dict):
            continue
        out.append({
            "chart_name": it.get("chart_name") or it.get("name") or "",
            "chart_url": it.get("chart_url") or it.get("url") or "",
        })
    return out


def _parse_rows(data) -> pd.DataFrame:
    """Normalise an API history payload into a [Date, close] frame.

    The doc says JSON 'in the format of "date: close"' — handle both shapes:
      list of {date:..., close:...} objects, OR a {date: close} mapping.
    """
    rows: list[tuple[str, float]] = []
    if isinstance(data, dict):
        # Live shape is {date: {"close": value}}; also accept flat {date: value}.
        for k, v in data.items():
            c = v.get("close", v.get("Close")) if isinstance(v, dict) else v
            if c is not None:
                rows.append((k, c))
    elif isinstance(data, list):
        for item in data:
            if isinstance(item, dict):
                d_ = item.get("date") or item.get("Date") or item.get("datetimestamp")
                c_ = item.get("close") if "close" in item else item.get("Close")
                if d_ is not None and c_ is not None:
                    rows.append((d_, c_))
    df = pd.DataFrame(rows, columns=["Date", "close"])
    if df.empty:
        return df
    df["Date"] = pd.to_datetime(df["Date"], format="mixed", errors="coerce")
    df["close"] = pd.to_numeric(df["close"], errors="coerce")
    df = df.dropna().sort_values("Date").reset_index(drop=True)
    return df


def fetch_symbol(slug: str, full: bool = True) -> pd.DataFrame:
    """Fetch a symbol's history. Default full=True -> /getSymbolData returns
    the WHOLE history the API has for the slug (as long as it provides). Pass
    full=False for the fast last-30-day tail (/getSymbolDataLimitDays)."""
    if full:
        data = _get(f"/getSymbolData/{slug}")
    else:
        data = _get(f"/getSymbolDataLimitDays/{slug}", {"days": INCREMENTAL_DAYS})
    return _parse_rows(data)
  • _auth() — builds the HTTPBasicAuth object, or raises a clear setup message if the credentials are missing.
  • _get() — one GET helper for every endpoint. The 401 branch explains it's an account/login issue (not a code bug). The crucial lines: after r.json(), if the result is a string, it's double-encoded → json.loads() it again. Every endpoint flows through here, so the double-decode is fixed in exactly one place.
  • list_subscribed() — calls /getAllUserSelectedSymbols and normalises each item to {chart_name, chart_url}, accepting the real url field or the PDF's chart_url.
  • _parse_rows() — turns a history payload into a tidy [Date, close] DataFrame. The dict branch handles the real nested shape {date: {"close": v}} (and flat {date: v}); dates are parsed with format="mixed" so ISO datetimes and US dates both work; non-numeric/NaT rows are dropped.
  • fetch_symbol(slug, full=True)full=True (the new default) calls /getSymbolData = the whole history the API has; full=False is the 30-day tail.

§4 CSV merge-writer

def merge_into_csv(slug: str, new_df: pd.DataFrame) -> tuple[Path, int, str]:
    """Merge fetched rows into the canonical feed CSV (Date, <value_col>).

    - Existing rows are kept; API rows win on date collisions (keep='last').
    - `_old.csv` companions are NEVER touched (history archives).
    Returns (path, n_new_rows, latest_date_str).
    """
    fname, value_col = ST_SYMBOL_MAP.get(slug, (f"{slug}.csv", f"{slug}_close"))
    path = FEED_DIR / fname

    incoming = new_df.rename(columns={"close": value_col})[["Date", value_col]]

    if path.exists():
        try:
            existing = pd.read_csv(path)
            dcol = next((c for c in existing.columns if c.strip().lower() == "date"), None)
            vcol = next((c for c in existing.columns if c != dcol), value_col)
            existing = existing.rename(columns={dcol: "Date", vcol: value_col})
            existing["Date"] = pd.to_datetime(existing["Date"], format="mixed",
                                              errors="coerce")
            existing = existing.dropna(subset=["Date"])
        except Exception:
            existing = pd.DataFrame(columns=["Date", value_col])
    else:
        existing = pd.DataFrame(columns=["Date", value_col])

    before = len(existing)
    merged = (pd.concat([existing, incoming])
              .drop_duplicates(subset="Date", keep="last")
              .sort_values("Date").reset_index(drop=True))
    n_new = len(merged) - before

    out = merged.copy()
    out["Date"] = out["Date"].dt.strftime("%Y-%m-%d")   # ISO, matches recent files
    out.to_csv(path, index=False)
    latest = out["Date"].iloc[-1] if len(out) else "—"
    return path, n_new, latest
  • merge_into_csv() — writes fetched rows into the canonical CSV without losing yours.
  • It reads the existing file, renames its columns to Date/<value_col>, then concat + drop_duplicates(subset="Date", keep="last") — so API rows win on overlapping dates while your non-overlapping history is preserved.
  • Dates are written back as YYYY-MM-DD (matching your recent files), and _old.csv archives are never touched.

§5 Orchestration

def refresh_all(full: bool = True, quiet: bool = False) -> dict:
    """Fetch every subscribed symbol (FULL history by default) and merge into
    the feed CSVs.

    Returns {slug: {"file":..., "new_rows":..., "latest":...}} (or {"error":..}).
    Never raises on a single-symbol failure — fetches the rest.
    """
    say = (lambda *a: None) if quiet else print
    subs = list_subscribed()
    say(f"  [st_feed] subscription has {len(subs)} symbols")
    unmapped = [s.get("chart_url") for s in subs if s.get("chart_url") not in ST_SYMBOL_MAP]
    if unmapped:
        say(f"  [st_feed] NOTE — unmapped slugs (will write <slug>.csv defaults): {unmapped}")

    results: dict[str, dict] = {}
    for s in subs:
        slug = s.get("chart_url")
        if not slug:
            continue
        try:
            df = fetch_symbol(slug, full=full)
            if df.empty:
                results[slug] = {"error": "empty payload"}
                say(f"  [st_feed] {slug}: EMPTY payload")
            else:
                path, n_new, latest = merge_into_csv(slug, df)
                results[slug] = {"file": path.name, "new_rows": n_new, "latest": latest}
                say(f"  [st_feed] {slug:26} -> {path.name:30} +{n_new} rows, latest {latest}")
        except Exception as e:
            results[slug] = {"error": str(e)}
            say(f"  [st_feed] {slug}: FAILED — {e}")
        time.sleep(COURTESY_SLEEP_S)
    return results


def check() -> bool:
    """Credential + connectivity check. True if the API is reachable."""
    try:
        subs = list_subscribed()
    except RuntimeError as e:
        print(f"[st_feed] CHECK FAILED: {e}")
        return False
    print(f"[st_feed] OK — authenticated. {len(subs)} subscribed symbols:")
    for s in subs:
        slug = s.get("chart_url", "?")
        mapped = ST_SYMBOL_MAP.get(slug, (f"{slug}.csv (default)",))[0]
        print(f"  {s.get('chart_name','?'):42} slug={slug:28} -> {mapped}")
    return True
  • refresh_all(full=True) — the daily entry point: list the subscription, then for each slug fetch + merge, collecting a per-slug result. A single symbol failing never stops the rest (it's caught and recorded).
  • check() — the --check credential/connectivity probe: authenticates and prints every subscribed symbol next to the file it maps to.

§6 CLI

if __name__ == "__main__":
    args = set(sys.argv[1:])
    if "--check" in args:
        sys.exit(0 if check() else 1)
    full = "--incremental" not in args   # DEFAULT = full history
    print(f"[st_feed] {'FULL-history' if full else 'incremental (30d)'} refresh starting...")
    res = refresh_all(full=full)
    fails = {k: v for k, v in res.items() if "error" in v}
    print(f"[st_feed] done: {len(res) - len(fails)} ok, {len(fails)} failed")
    sys.exit(1 if fails and not (len(res) - len(fails)) else 0)
  • Run directly: default is now full history; --incremental is the fast 30-day path; --check verifies auth. The exit code is non-zero only if everything failed.
05 — st_sync.py, section by section

The reconciler — match without making duplicates

This is the file that honours your rule: prefer YOUR naming, ignore slug-named duplicates, never corrupt a file on a fuzzy guess.

§1 Imports

import sys
import json
import time
import datetime as dt
from pathlib import Path

import pandas as pd

import st_feed  # reuse: list_subscribed, fetch_symbol, FEED_DIR, creds, sleep

FEED_DIR = st_feed.FEED_DIR
THIS_DIR = Path(__file__).parent
MAP_FILE = THIS_DIR / "st_symbol_map.json"
OVERRIDE_FILE = THIS_DIR / "st_sync_overrides.json"
PLAN_FILE = THIS_DIR / "st_sync_plan.json"

# filename artifacts that mark a likely duplicate/copy (deprioritised)
_ARTIFACT_MARKERS = (" (1)", " (2)", " (3)", " - copy", " copy", "- copy")
  • It reuses st_feed's API client (import st_feed) — no duplicated networking. Defines the paths for the resolved map, your overrides, and the saved plan, plus the list of filename _ARTIFACT_MARKERS ((1), - copy, …) used to spot duplicate copies.

§2 Index your existing CSVs (cheap)

def _is_artifact(name: str) -> bool:
    """True for copy/duplicate artifacts: 'x (1).csv', 'x - Copy.csv', 'x .csv'."""
    stem = name[:-4] if name.lower().endswith(".csv") else name
    low = stem.lower()
    return stem != stem.strip() or any(m in low for m in _ARTIFACT_MARKERS)


def build_csv_index() -> list[dict]:
    """Scan the feed folder once; return one record per CSV (ignoring *_old.csv).
    Reads ONLY the header row to learn each file's columns."""
    index = []
    for p in sorted(FEED_DIR.glob("*.csv")):
        if p.name.lower().endswith("_old.csv"):
            continue
        try:
            cols = list(pd.read_csv(p, nrows=0).columns)
        except Exception:
            cols = []
        stem = p.name[:-4]
        index.append({
            "path": p, "name": p.name, "stem": stem,
            "cols": cols, "cols_low": [c.strip().lower() for c in cols],
            "artifact": _is_artifact(p.name),
        })
    return index


def _date_col(rec: dict) -> str | None:
    for c in rec["cols"]:
        if c.strip().lower() in ("date", "datetime"):
            return c
    return None


def _file_stats(rec: dict, value_col: str) -> tuple[int, str | None]:
    """(row_count, latest_date_iso) for tie-breaks/report. Reads the file."""
    dcol = _date_col(rec)
    if not dcol:
        return 0, None
    try:
        df = pd.read_csv(rec["path"], usecols=[dcol])
        dts = pd.to_datetime(df[dcol], format="mixed", errors="coerce").dropna()
        if dts.empty:
            return 0, None
        return len(dts), dts.max().strftime("%Y-%m-%d")
    except Exception:
        return 0, None
  • _is_artifact() — flags copy/duplicate files by name (x (1).csv, x - Copy.csv, trailing-space).
  • build_csv_index() — scans the folder once, reading only each file's header row (fast) to learn its columns; skips _old.csv archives.
  • _file_stats() — for the few real candidates, reads the date column to get row-count and latest date (used for tie-breaks and the report).

§3 The matcher (the core logic)

def match_slug(slug: str, chart_name: str, index: list[dict],
               overrides: dict) -> dict:
    """Return the match decision for one slug:
       {slug, chart_name, value_col, chosen, candidates[], ignored_dupes[], reason}
    """
    val_col = f"{slug}_close"

    # 0) explicit override wins, no questions asked
    if slug in overrides:
        rec = next((r for r in index if r["name"] == overrides[slug]), None)
        col = (val_col if rec and val_col in rec["cols"]
               else (rec["cols"][1] if rec and len(rec["cols"]) > 1 else val_col))
        return {"slug": slug, "chart_name": chart_name, "value_col": col,
                "chosen": overrides[slug], "candidates": [], "ignored_dupes": [],
                "reason": "pinned in st_sync_overrides.json"}

    # 1) candidates: data-column match (strongest), else filename, else fuzzy
    val_matches = [r for r in index if val_col in r["cols"]]
    name_matches = [r for r in index if r["stem"] == slug]
    if val_matches:
        pool, basis = val_matches, "data column <slug>_close"
    elif name_matches:
        pool, basis = name_matches, "filename == slug"
    else:
        toks = set(slug.lower().split("_")) | set(chart_name.lower().split())
        pool = [r for r in index
                if slug.lower() in r["stem"].lower()
                or r["stem"].lower() in slug.lower()
                or len(toks & set(r["stem"].lower().replace("_", " ").split())) >= 2]
        basis = "fuzzy name/token overlap"

    if not pool:
        return {"slug": slug, "chart_name": chart_name, "value_col": val_col,
                "chosen": None, "candidates": [], "ignored_dupes": [],
                "reason": "NO existing CSV matched (new series; create with --create-new)"}

    # Fuzzy name overlap is NOT safe to auto-merge: different series share name
    # tokens (e.g. vol_smi 'Smart Money Index' ~20,800 vs model_smart_money
    # 'Smart Money Confidence' 0-1). Surface the candidate but treat as
    # UNMATCHED, so apply creates a fresh <slug>.csv instead of corrupting a file.
    if basis.startswith("fuzzy"):
        sugg = [r["name"] for r in pool][:3]
        return {"slug": slug, "chart_name": chart_name, "value_col": val_col,
                "chosen": None,
                "candidates": [{"file": s, "score": "fuzzy", "rows": "-", "latest": "-"}
                               for s in sugg],
                "ignored_dupes": [],
                "reason": (f"NO safe match — only fuzzy name overlap with {sugg} "
                           f"(different data column). Will CREATE {slug}.csv on "
                           f"apply --create-new; pin an override to merge instead.")}

    # 2) score each candidate
    # a deliberate (non-artifact) custom-named match = your own naming for this series
    has_custom_named = any(r["stem"] != slug and not r["artifact"] for r in pool)
    scored = []
    for r in pool:
        rows, latest = _file_stats(r, val_col if val_col in r["cols"] else "")
        score = 0.0
        if val_col in r["cols"]:
            score += 1000
        if r["stem"] == slug:
            score += 200
            # "ignore the slug-named duplicate when MY naming exists" (user's rule)
            if has_custom_named and val_col in r["cols"]:
                score -= 600
        # copy/duplicate artifacts must never beat a clean file (only chosen if
        # they are the ONLY candidate, where the penalty cancels out across them)
        score += -10000 if r["artifact"] else 40
        score += min(rows, 20000) / 1000.0                # longer history
        if latest:                                        # fresher wins ties
            try:
                age = (dt.date.today() - dt.date.fromisoformat(latest)).days
                score += max(0, 90 - age) / 30.0
            except Exception:
                pass
        scored.append({"rec": r, "score": round(score, 2),
                       "rows": rows, "latest": latest})
    scored.sort(key=lambda x: x["score"], reverse=True)
    best = scored[0]

    col = (val_col if val_col in best["rec"]["cols"]
           else (best["rec"]["cols"][1] if len(best["rec"]["cols"]) > 1 else val_col))
    ignored = [s["rec"]["name"] for s in scored[1:]
               if s["rec"]["stem"] == slug and val_col in s["rec"]["cols"]]
    return {
        "slug": slug, "chart_name": chart_name, "value_col": col,
        "chosen": best["rec"]["name"],
        "chosen_rows": best["rows"], "chosen_latest": best["latest"],
        "candidates": [{"file": s["rec"]["name"], "score": s["score"],
                        "rows": s["rows"], "latest": s["latest"]} for s in scored],
        "ignored_dupes": ignored,
        "reason": f"matched by {basis}" + (
            f"; ignoring slug-named duplicate(s) in favour of your naming"
            if ignored else ""),
    }


def build_plan() -> list[dict]:
    subs = st_feed.list_subscribed()
    index = build_csv_index()
    overrides = {}
    if OVERRIDE_FILE.exists():
        try:
            overrides = json.loads(OVERRIDE_FILE.read_text(encoding="utf-8"))
        except Exception as e:
            print(f"  [st_sync] could not read overrides: {e}")
    plan = []
    for s in subs:
        slug = s.get("chart_url")
        if not slug:
            continue
        plan.append(match_slug(slug, s.get("chart_name", ""), index, overrides))
    return plan
  • 0) Override — if you pinned a file in st_sync_overrides.json, that wins immediately.
  • 1) Candidate pool — strongest signal first: files whose data column is <slug>_close (same series), else exact filename, else fuzzy name/token overlap.
  • Fuzzy guard — if the only basis is fuzzy, it refuses to auto-merge (returns unmatched) and suggests creating <slug>.csv instead. This is what stopped SMI (vol_smi) from corrupting model_smart_money.csv.
  • 2) Scoring — data-column match +1000; exact slug-name +200 but −600 when a non-artifact custom-named file also matches (so YOUR name wins and the slug-named duplicate is demoted); artifacts −10000 (a clean file always beats a copy); small bonuses for longer history and freshness.
  • ignored_dupes — the slug-named files it deliberately leaves untouched, surfaced in the report so the behaviour is visible.
  • build_plan() — runs the matcher over every subscribed slug and returns the full plan.

§4 Report + the resolved map

def print_plan(plan: list[dict]) -> None:
    print(f"\n  {'SLUG':26} {'->':2} {'CHOSEN CSV':32} {'col':30} note")
    print("  " + "-" * 110)
    for m in plan:
        chosen = m["chosen"] or "(none — unmatched)"
        line = f"  {m['slug']:26} -> {chosen:32} {m['value_col']:30} {m['reason']}"
        print(line)
        if m["chosen"]:
            print(f"      rows={m.get('chosen_rows','?')} latest={m.get('chosen_latest','?')}")
        if m["ignored_dupes"]:
            print(f"      IGNORED slug-duplicate(s): {', '.join(m['ignored_dupes'])}")
        if len(m["candidates"]) > 1:
            alts = "; ".join(f"{c['file']}({c['score']})" for c in m["candidates"][1:4])
            print(f"      other candidates: {alts}")
    unmatched = [m["slug"] for m in plan if not m["chosen"]]
    if unmatched:
        print(f"\n  UNMATCHED (skipped unless --create-new): {', '.join(unmatched)}")


def resolved_map(plan: list[dict], create_new: bool = False) -> dict:
    """slug -> {file, col} for the daily refresh. Includes newly-created
    <slug>.csv files only when create_new (so the map matches what apply wrote)."""
    mp = {}
    for m in plan:
        f = m["chosen"] or (f"{m['slug']}.csv" if create_new else None)
        if f:
            mp[m["slug"]] = {"file": f, "col": m["value_col"]}
    return mp


def write_symbol_map(mp: dict) -> None:
    MAP_FILE.write_text(json.dumps(mp, indent=2), encoding="utf-8")
    print(f"  [st_sync] wrote resolved map -> {MAP_FILE.name} "
          f"({len(mp)} slugs) — st_feed.py uses it for the daily refresh")
  • print_plan() — the human-readable dry-run table (chosen file, rows, latest, ignored duplicates, other candidates).
  • resolved_map() / write_symbol_map() — write st_symbol_map.json (slug → {file, col}), including newly-created files when --create-new — so st_feed's daily run targets exactly what was applied.

§5 Apply — backfill full history

def merge_history(target: Path, value_col: str, new_df: pd.DataFrame,
                  overwrite: bool, force: bool) -> tuple[int, str, str]:
    """Write fetched history into `target` (Date,<value_col>). Returns
    (n_added, latest, mode). Refuses multi-column files unless --force."""
    incoming = new_df.rename(columns={"close": value_col})[["Date", value_col]].copy()

    if target.exists() and not overwrite:
        existing = pd.read_csv(target)
        dcol = next((c for c in existing.columns if c.strip().lower() == "date"), None)
        if dcol is None or len(existing.columns) != 2:
            if not force:
                raise RuntimeError(
                    f"{target.name} is not a clean 2-column Date/value file "
                    f"(cols={list(existing.columns)}); refusing to merge without --force")
        vcol = next((c for c in existing.columns if c != dcol), value_col)
        existing = existing.rename(columns={dcol: "Date", vcol: value_col})
        existing["Date"] = pd.to_datetime(existing["Date"], format="mixed",
                                          errors="coerce")
        existing = existing.dropna(subset=["Date"])
        before = len(existing)
        merged = (pd.concat([existing, incoming])
                  .drop_duplicates(subset="Date", keep="last")
                  .sort_values("Date").reset_index(drop=True))
        mode = "merged (append + replace overlap)"
    else:
        merged = incoming.sort_values("Date").reset_index(drop=True)
        before = 0
        mode = "overwrote (full rebuild)" if target.exists() else "created"

    out = merged.copy()
    out["Date"] = out["Date"].dt.strftime("%Y-%m-%d")
    out.to_csv(target, index=False)
    latest = out["Date"].iloc[-1] if len(out) else "—"
    return len(merged) - before, latest, mode


def apply_plan(plan: list[dict], overwrite: bool, create_new: bool,
               force: bool) -> None:
    write_symbol_map(resolved_map(plan, create_new))
    print(f"\n  [st_sync] APPLY — full-history "
          f"{'OVERWRITE' if overwrite else 'merge'} into matched files\n")
    for m in plan:
        slug = m["slug"]
        target_name = m["chosen"]
        if not target_name:
            if not create_new:
                print(f"  {slug:26} SKIP (unmatched; use --create-new to make {slug}.csv)")
                continue
            target_name = f"{slug}.csv"
        target = FEED_DIR / target_name
        try:
            df = st_feed.fetch_symbol(slug, full=True)
            if df.empty:
                print(f"  {slug:26} EMPTY payload — skipped")
                continue
            n_add, latest, mode = merge_history(target, m["value_col"], df,
                                                overwrite, force)
            print(f"  {slug:26} -> {target_name:32} {mode}: +{n_add} rows, "
                  f"now {len(df)}+ pts, latest {latest}")
            if m["ignored_dupes"]:
                print(f"      (left untouched: {', '.join(m['ignored_dupes'])})")
        except Exception as e:
            print(f"  {slug:26} FAILED — {e}")
        time.sleep(st_feed.COURTESY_SLEEP_S)
    print("\n  [st_sync] done. The daily 07:00 fetch will now keep these files current.")
  • merge_history() — the writer. Default = merge (append new dates + replace overlapping); --overwrite rebuilds the file clean. It refuses a multi-column / OHLCV file unless --force, so a stray match can't clobber a price file.
  • apply_plan() — writes the map, then for each match fetches full history (fetch_symbol(full=True)) and merges it; unmatched slugs are skipped unless --create-new.

§6 CLI

def main():
    args = sys.argv[1:]
    cmd = next((a for a in args if not a.startswith("-")), "plan")
    overwrite = "--overwrite" in args
    create_new = "--create-new" in args
    force = "--force" in args

    if not (st_feed.ST_USERNAME and st_feed.ST_PASSWORD):
        print("[st_sync] SentimenTrader credentials missing. Add to .env "
              "(no inline comments):\n"
              "  SENTIMENTRADER_USERNAME=your-sentimentrader.com-login-email\n"
              "  SENTIMENTRADER_PASSWORD=your-website-password")
        return 1

    if cmd == "symbols":
        subs = st_feed.list_subscribed()
        print(f"[st_sync] {len(subs)} subscribed symbols:")
        for s in subs:
            print(f"  {s.get('chart_name','?'):44} slug = {s.get('chart_url','?')}")
        return 0

    plan = build_plan()
    if cmd == "apply":
        apply_plan(plan, overwrite, create_new, force)
    else:  # plan / default
        print("[st_sync] DRY RUN (plan) — no files written. Review, then run "
              "`python st_sync.py apply`.")
        print_plan(plan)
        write_symbol_map(resolved_map(plan, create_new))
        PLAN_FILE.write_text(json.dumps(plan, indent=2, default=str), encoding="utf-8")
        print(f"  [st_sync] plan saved -> {PLAN_FILE.name}")
    return 0


if __name__ == "__main__":
    sys.exit(main())
  • Sub-commands: symbols (list), plan (default — dry run, writes nothing), apply (with --overwrite/--create-new/--force). Safe-by-default: you always preview before any write.
06 — The full-history upgrade

"As much history as the API provides" — the result

The change was four small edits: fetch_symbol and refresh_all now default to full=True, the daily caller passes full=True, and the CLI flips to --incremental as the opt-out. The first full run pulled, per indicator:

First full-history merge (rows added, latest date)
Slug → fileRows pulledLatest
vol_stock_bond_ratio → vol_stock_bond_ratio.csv+14,1632026-06-10
vol_smi → vol_smi.csv (new)+13,8982026-06-09
vol_vix_bonds → MOVE.csv+9,3542026-06-10
pc_total → pc_total.csv+7,7152026-06-10
model_cnn_fear_greed → model_cnn_fear_greed.csv+7,0832026-06-10
model_smart_dumb_spread → smart_dumb_spread.csv+6,8572026-06-10
model_risk_stocks_short / medium+5,578 / +5,5722026-06-10
bitcoin_fng → bitcoin_fng.csv+2,9272026-06-11
model_panic_euphoria → model_panic_euphoria.csv+1,9512026-06-05
Why deep history matters beyond completeness

Every contrarian percentile in senti-euphoria and every band on the coming move-vix tile is computed against history. A 30-day window gives a noisy rank; decades give a calibrated one. Pulling full history isn't hoarding data — it sharpens every reading built on top of it.

07 — How it plugs into the tracker

One source of truth, and what's next

The link that keeps it coherent: st_sync writes st_symbol_map.json; st_feed reads it. Reconcile once, and the daily 07:00 task keeps all ten files current automatically — your manual downloads (and the old convert-notebook, for these slugs) are now obsolete.

▲ Done
  • Auth, double-decode, nested parse — all handled
  • 10 symbols matched to your files, no duplicates created
  • Full history backfilled + merged daily
  • Downstream (senti-euphoria, AAII leg) unchanged
▼ Phase 2 (next)
  • move-vix core tile from MOVE.csv percentiles
  • senti-euphoria v2: swap in ST Panic/Euphoria + Put/Call + Smart-Dumb legs
  • Composite v2.0 stamp + 4-week parallel run
  • Retire RunConvertSentimentNotebook for these slugs
The principle behind the whole layer

Collection is cheap; attention is the scarce resource. This layer makes the data effortless and always-current so the judgment — which regime are we in, what beats or misses what's priced — stays the thing you spend on. Better inputs, same disciplined read.