#!/usr/bin/env python3 from __future__ import annotations import argparse import dataclasses import hashlib import json import logging import os import re import select import shutil import signal import sqlite3 import subprocess import sys import time import urllib.error import urllib.parse import urllib.request from datetime import UTC, datetime, timedelta from pathlib import Path from typing import Any from zoneinfo import ZoneInfo PIPELINE_VERSION = "1.0.0" AUDIO_EXTENSIONS = {".m4a", ".mp3", ".wav", ".aiff", ".aif", ".caf", ".mp4", ".m4b"} DEFAULT_OBSIDIAN_DIR = Path("/Users/maddin/Obsidian/Maddin/Transkripte") DEFAULT_MODEL = "gpt-5.2" DEFAULT_DEBOUNCE_SECONDS = 10 DEFAULT_RETENTION_DAYS = 7 DIRECT_TRANSCRIPT_CHAR_LIMIT = 120_000 CHUNK_TARGET_CHARS = 60_000 MEMOS_INDEX_PAGE_SIZE = 20 class PipelineError(RuntimeError): pass class OpenAIError(PipelineError): pass @dataclasses.dataclass(slots=True) class Settings: base_dir: Path watch_dir: Path obsidian_dir: Path archive_dir: Path memos_enabled: bool memos_site_url: str memos_content_dir: Path memos_quartz_dir: Path memos_output_dir: Path memos_build_command: str | None memos_rclone_remote: str | None memos_rclone_excludes: tuple[str, ...] memos_sync_htpasswd: bool memos_remote_htpasswd_path: str | None memos_basic_auth_user: str | None memos_basic_auth_password: str | None memos_basic_auth_htpasswd_path: Path prompt_path: Path state_db_path: Path log_path: Path openai_api_key: str | None openai_model: str debounce_seconds: int retention_days: int request_timeout_seconds: int ffprobe_bin: str fswatch_bin: str | None rclone_bin: str rclone_remote: str ntfy_base_url: str ntfy_topic: str ntfy_access_token: str | None @classmethod def from_env(cls, base_dir: Path) -> "Settings": watch_dir = Path(os.environ.get("WATCH_DIR", str(base_dir))).expanduser() archive_dir = Path(os.environ.get("ARCHIVE_DIR", str(base_dir / "archive"))).expanduser() memos_enabled = os.environ.get("MEMOS_ENABLED", "").strip().lower() in {"1", "true", "yes", "on"} memos_content_dir = Path( os.environ.get("MEMOS_CONTENT_DIR", str(base_dir / "memos-content")) ).expanduser() memos_quartz_dir = Path( os.environ.get("MEMOS_QUARTZ_DIR", str(base_dir / "memos-quartz")) ).expanduser() memos_output_dir = Path( os.environ.get("MEMOS_OUTPUT_DIR", str(base_dir / "memos-site")) ).expanduser() prompt_path = Path( os.environ.get("PROMPT_PATH", str(base_dir / "transkript-zusammenfassung-prompt.md")) ).expanduser() obsidian_dir = Path( os.environ.get("OBSIDIAN_DIR", str(DEFAULT_OBSIDIAN_DIR)) ).expanduser() state_db_path = Path( os.environ.get("STATE_DB_PATH", str(base_dir / "pipeline_state.sqlite3")) ).expanduser() log_path = Path(os.environ.get("LOG_PATH", str(base_dir / "pipeline.log"))).expanduser() return cls( base_dir=base_dir, watch_dir=watch_dir, obsidian_dir=obsidian_dir, archive_dir=archive_dir, memos_enabled=memos_enabled, memos_site_url=os.environ.get("MEMOS_SITE_URL", "https://memos.maddin.app").strip(), memos_content_dir=memos_content_dir, memos_quartz_dir=memos_quartz_dir, memos_output_dir=memos_output_dir, memos_build_command=os.environ.get("MEMOS_BUILD_COMMAND", "").strip() or None, memos_rclone_remote=os.environ.get("MEMOS_RCLONE_REMOTE", "").strip() or None, memos_rclone_excludes=tuple( part.strip() for part in os.environ.get("MEMOS_RCLONE_EXCLUDES", "").split(",") if part.strip() ), memos_sync_htpasswd=os.environ.get("MEMOS_SYNC_HTPASSWD", "").strip().lower() in {"1", "true", "yes", "on"}, memos_remote_htpasswd_path=os.environ.get("MEMOS_REMOTE_HTPASSWD_PATH", "").strip() or None, memos_basic_auth_user=os.environ.get("MEMOS_BASIC_AUTH_USER", "").strip() or None, memos_basic_auth_password=os.environ.get("MEMOS_BASIC_AUTH_PASSWORD", "").strip() or None, memos_basic_auth_htpasswd_path=Path( os.environ.get("MEMOS_BASIC_AUTH_HTPASSWD_PATH", str(base_dir / "deploy/nginx/memos.htpasswd")) ).expanduser(), prompt_path=prompt_path, state_db_path=state_db_path, log_path=log_path, openai_api_key=os.environ.get("OPENAI_API_KEY"), openai_model=os.environ.get("OPENAI_MODEL", DEFAULT_MODEL), debounce_seconds=int(os.environ.get("DEBOUNCE_SECONDS", str(DEFAULT_DEBOUNCE_SECONDS))), retention_days=int(os.environ.get("RETENTION_DAYS", str(DEFAULT_RETENTION_DAYS))), request_timeout_seconds=int(os.environ.get("OPENAI_TIMEOUT_SECONDS", "600")), ffprobe_bin=os.environ.get("FFPROBE_BIN", shutil.which("ffprobe") or "/opt/homebrew/bin/ffprobe"), fswatch_bin=os.environ.get("FSWATCH_BIN", shutil.which("fswatch") or "/opt/homebrew/bin/fswatch"), rclone_bin=os.environ.get("RCLONE_BIN", shutil.which("rclone") or "/opt/homebrew/bin/rclone"), rclone_remote=os.environ.get("RCLONE_REMOTE", "transkripte:/"), ntfy_base_url=os.environ.get("NTFY_BASE_URL", "https://ntfy.maddin.app").strip(), ntfy_topic=os.environ.get("NTFY_TOPIC", "Transkript").strip(), ntfy_access_token=os.environ.get("NTFY_ACCESS_TOKEN", "").strip() or None, ) def ensure_directories(self) -> None: self.watch_dir.mkdir(parents=True, exist_ok=True) self.obsidian_dir.mkdir(parents=True, exist_ok=True) (self.archive_dir / "processed").mkdir(parents=True, exist_ok=True) (self.archive_dir / "failed").mkdir(parents=True, exist_ok=True) if self.memos_enabled: self.memos_content_dir.mkdir(parents=True, exist_ok=True) self.memos_output_dir.mkdir(parents=True, exist_ok=True) self.memos_basic_auth_htpasswd_path.parent.mkdir(parents=True, exist_ok=True) self.state_db_path.parent.mkdir(parents=True, exist_ok=True) self.log_path.parent.mkdir(parents=True, exist_ok=True) @dataclasses.dataclass(slots=True) class SourcePair: basename: str audio_path: Path transcript_path: Path @property def audio_signature(self) -> str: return build_file_signature(self.audio_path) @property def transcript_signature(self) -> str: return build_file_signature(self.transcript_path) @dataclasses.dataclass(slots=True) class AudioMetadata: recorded_at: datetime recorded_at_source: str duration_seconds: float duration_human: str audio_size: int @dataclasses.dataclass(slots=True) class SummaryPayload: title: str summary_markdown: str tags: list[str] @dataclasses.dataclass(slots=True) class NoteTarget: source_id: str title: str note_path: Path existing_path: Path | None @dataclasses.dataclass(slots=True) class RawTranscriptPayload: title: str transcript_markdown: str @dataclasses.dataclass(slots=True) class UploadResult: remote_path: str status: str uploaded_at: datetime | None error: str | None = None def load_dotenv(path: Path) -> None: if not path.exists(): return for line in path.read_text(encoding="utf-8").splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#") or "=" not in stripped: continue key, value = stripped.split("=", 1) key = key.strip() if not key or key in os.environ: continue value = value.strip().strip("'").strip('"') os.environ[key] = value def setup_logging(log_path: Path) -> None: logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", handlers=[ logging.FileHandler(log_path, encoding="utf-8"), logging.StreamHandler(sys.stdout), ], ) def build_file_signature(path: Path) -> str: stat = path.stat() return f"{stat.st_mtime_ns}:{stat.st_size}" def slugify_title(value: str) -> str: value = value.strip() value = re.sub(r"\s+", " ", value) value = re.sub(r"[/:]+", " ", value) value = re.sub(r"[^\w\s.-]", "", value, flags=re.UNICODE) value = re.sub(r"\s+", " ", value).strip(" .-_") return value or "Unbenannt" def format_note_date(recorded_at: datetime) -> str: return recorded_at.strftime("%y%m%d") def format_duration(duration_seconds: float) -> str: total_seconds = max(int(round(duration_seconds)), 0) hours, remainder = divmod(total_seconds, 3600) minutes, seconds = divmod(remainder, 60) if hours: return f"{hours}:{minutes:02d}:{seconds:02d}" return f"{minutes}:{seconds:02d}" def parse_iso_datetime(value: str) -> datetime: normalized = value.replace("Z", "+00:00") return datetime.fromisoformat(normalized) def extract_json_object(value: str) -> dict[str, Any]: candidate = value.strip() if candidate.startswith("```"): candidate = re.sub(r"^```[a-zA-Z0-9_-]*\n", "", candidate) candidate = re.sub(r"\n```$", "", candidate) candidate = candidate.strip() return json.loads(candidate) def split_text_into_chunks(text: str, target_chars: int = CHUNK_TARGET_CHARS) -> list[str]: paragraphs = re.split(r"\n\s*\n", text.strip()) chunks: list[str] = [] current: list[str] = [] current_len = 0 for paragraph in paragraphs: paragraph = paragraph.strip() if not paragraph: continue extra = len(paragraph) + 2 if current and current_len + extra > target_chars: chunks.append("\n\n".join(current)) current = [paragraph] current_len = len(paragraph) else: current.append(paragraph) current_len += extra if current: chunks.append("\n\n".join(current)) return chunks or [text] def short_hash(value: str) -> str: return hashlib.sha1(value.encode("utf-8")).hexdigest()[:12] def extract_transcript_section(body: str) -> str: pattern = re.compile(r"(?ms)^## (?:Rohtranskript|Transkript)\n(.*?)(?=^## |\Z)") match = pattern.search(body) if not match: return "" return match.group(1).strip() def upsert_transcript_section(body: str, transcript_text: str) -> str: transcript_section = "## Transkript\n" + transcript_text.strip() + "\n" if re.search(r"(?m)^## Rohtranskript\n", body): return replace_or_append_section(body, "Rohtranskript", transcript_section) return replace_or_append_section(body, "Transkript", transcript_section) def join_remote_path(root: str, *parts: str) -> str: normalized_root = root.rstrip("/") cleaned_parts = [part.strip("/") for part in parts if part.strip("/")] if not cleaned_parts: return normalized_root return normalized_root + "/" + "/".join(cleaned_parts) class StateStore: def __init__(self, db_path: Path) -> None: self.db_path = db_path self.connection = sqlite3.connect(db_path) self.connection.row_factory = sqlite3.Row self._initialize() def _initialize(self) -> None: self.connection.execute( """ CREATE TABLE IF NOT EXISTS source_state ( source_id TEXT PRIMARY KEY, basename TEXT NOT NULL, audio_signature TEXT NOT NULL, transcript_signature TEXT NOT NULL, note_path TEXT, raw_note_path TEXT, local_audio_path TEXT, remote_audio_path TEXT, remote_audio_status TEXT, remote_audio_uploaded_at TEXT, remote_audio_last_error TEXT, status TEXT NOT NULL, last_processed_at TEXT, updated_at TEXT, last_error TEXT ) """ ) columns = {row["name"] for row in self.connection.execute("PRAGMA table_info(source_state)")} expected_columns = { "raw_note_path": "TEXT", "local_audio_path": "TEXT", "remote_audio_path": "TEXT", "remote_audio_status": "TEXT", "remote_audio_uploaded_at": "TEXT", "remote_audio_last_error": "TEXT", } for column_name, column_type in expected_columns.items(): if column_name not in columns: self.connection.execute( f"ALTER TABLE source_state ADD COLUMN {column_name} {column_type}" ) self.connection.execute( "CREATE INDEX IF NOT EXISTS idx_source_state_basename ON source_state(basename)" ) self.connection.commit() def get_by_source_id(self, source_id: str) -> sqlite3.Row | None: row = self.connection.execute( "SELECT * FROM source_state WHERE source_id = ?", (source_id,), ).fetchone() return row def get_by_basename(self, basename: str) -> sqlite3.Row | None: row = self.connection.execute( """ SELECT * FROM source_state WHERE basename = ? ORDER BY COALESCE(updated_at, last_processed_at, '') DESC LIMIT 1 """, (basename,), ).fetchone() return row def upsert_processing( self, *, source_id: str, basename: str, audio_signature: str, transcript_signature: str, preserve_remote_state: bool, ) -> None: now = datetime.now(UTC).isoformat() self.connection.execute( """ INSERT INTO source_state ( source_id, basename, audio_signature, transcript_signature, note_path, raw_note_path, local_audio_path, remote_audio_path, remote_audio_status, remote_audio_uploaded_at, remote_audio_last_error, status, last_processed_at, updated_at, last_error ) VALUES (?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'processing', NULL, ?, NULL) ON CONFLICT(source_id) DO UPDATE SET basename = excluded.basename, audio_signature = excluded.audio_signature, transcript_signature = excluded.transcript_signature, status = 'processing', updated_at = excluded.updated_at, last_error = NULL, remote_audio_status = CASE WHEN ? THEN source_state.remote_audio_status ELSE NULL END, remote_audio_uploaded_at = CASE WHEN ? THEN source_state.remote_audio_uploaded_at ELSE NULL END, remote_audio_last_error = CASE WHEN ? THEN source_state.remote_audio_last_error ELSE NULL END, remote_audio_path = CASE WHEN ? THEN source_state.remote_audio_path ELSE NULL END """, ( source_id, basename, audio_signature, transcript_signature, now, 1 if preserve_remote_state else 0, 1 if preserve_remote_state else 0, 1 if preserve_remote_state else 0, 1 if preserve_remote_state else 0, ), ) self.connection.commit() def mark_processed( self, *, source_id: str, basename: str, audio_signature: str, transcript_signature: str, note_path: Path, raw_note_path: Path, local_audio_path: Path | None, remote_audio_path: str | None, remote_audio_status: str, remote_audio_uploaded_at: datetime | None, remote_audio_last_error: str | None, processing_status: str, preserve_processed_at: bool, ) -> None: now = datetime.now(UTC).isoformat() last_processed_at = None if preserve_processed_at else now self.connection.execute( """ INSERT INTO source_state ( source_id, basename, audio_signature, transcript_signature, note_path, raw_note_path, local_audio_path, remote_audio_path, remote_audio_status, remote_audio_uploaded_at, remote_audio_last_error, status, last_processed_at, updated_at, last_error ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL) ON CONFLICT(source_id) DO UPDATE SET basename = excluded.basename, audio_signature = excluded.audio_signature, transcript_signature = excluded.transcript_signature, note_path = excluded.note_path, raw_note_path = excluded.raw_note_path, local_audio_path = excluded.local_audio_path, remote_audio_path = excluded.remote_audio_path, remote_audio_status = excluded.remote_audio_status, remote_audio_uploaded_at = excluded.remote_audio_uploaded_at, remote_audio_last_error = excluded.remote_audio_last_error, status = excluded.status, last_processed_at = COALESCE(source_state.last_processed_at, excluded.last_processed_at), updated_at = excluded.updated_at, last_error = NULL """, ( source_id, basename, audio_signature, transcript_signature, str(note_path), str(raw_note_path), str(local_audio_path) if local_audio_path else None, remote_audio_path, remote_audio_status, remote_audio_uploaded_at.isoformat() if remote_audio_uploaded_at else None, remote_audio_last_error, processing_status, last_processed_at, now, ), ) self.connection.commit() def mark_failed( self, *, source_id: str, basename: str, audio_signature: str, transcript_signature: str, error_message: str, ) -> None: now = datetime.now(UTC).isoformat() self.connection.execute( """ INSERT INTO source_state ( source_id, basename, audio_signature, transcript_signature, note_path, raw_note_path, local_audio_path, remote_audio_path, remote_audio_status, remote_audio_uploaded_at, remote_audio_last_error, status, last_processed_at, updated_at, last_error ) VALUES (?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'failed', NULL, ?, ?) ON CONFLICT(source_id) DO UPDATE SET basename = excluded.basename, audio_signature = excluded.audio_signature, transcript_signature = excluded.transcript_signature, status = 'failed', updated_at = excluded.updated_at, last_error = excluded.last_error """, (source_id, basename, audio_signature, transcript_signature, now, error_message), ) self.connection.commit() def iter_pending_uploads(self) -> list[sqlite3.Row]: return list( self.connection.execute( """ SELECT * FROM source_state WHERE local_audio_path IS NOT NULL AND COALESCE(remote_audio_status, '') != 'uploaded' ORDER BY COALESCE(updated_at, last_processed_at, '') ASC """ ).fetchall() ) def close(self) -> None: self.connection.close() class OpenAIClient: def __init__(self, api_key: str, model: str, timeout_seconds: int) -> None: self.api_key = api_key self.model = model self.timeout_seconds = timeout_seconds def _request(self, payload: dict[str, Any]) -> dict[str, Any]: request = urllib.request.Request( url="https://api.openai.com/v1/responses", data=json.dumps(payload).encode("utf-8"), headers={ "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", }, method="POST", ) last_error: Exception | None = None for attempt in range(3): try: with urllib.request.urlopen(request, timeout=self.timeout_seconds) as response: return json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as exc: body = exc.read().decode("utf-8", errors="replace") raise OpenAIError(f"OpenAI API error {exc.code}: {body}") from exc except (urllib.error.URLError, TimeoutError) as exc: last_error = exc time.sleep(2**attempt) raise OpenAIError(f"OpenAI request failed after retries: {last_error}") def _extract_output_text(self, response_json: dict[str, Any]) -> str: chunks: list[str] = [] for item in response_json.get("output", []): if item.get("type") != "message": continue for content in item.get("content", []): if content.get("type") == "output_text": text = content.get("text", "") if text: chunks.append(text) elif content.get("type") == "refusal": raise OpenAIError(f"Model refusal: {content.get('refusal', '')}") if not chunks: raise OpenAIError(f"No text output in response: {json.dumps(response_json)[:1000]}") return "\n".join(chunks).strip() def _schema_payload(self, schema: dict[str, Any]) -> dict[str, Any]: return { "type": "json_schema", "name": "transcript_summary", "strict": True, "schema": schema, } def summarize_chunk( self, *, chunk_index: int, chunk_count: int, transcript_chunk: str, metadata: AudioMetadata, basename: str, ) -> str: prompt = ( "Du fasst einen Ausschnitt eines längeren deutschsprachigen Transkripts zusammen. " "Schreibe eine sachliche, kompakte Markdown-Zusammenfassung des Ausschnitts. " "Halte Themen, Entscheidungen, offene Fragen, Konflikte, Risiken, Aufgaben und " "nächste Schritte fest, sofern sie in diesem Ausschnitt vorkommen. " "Erfinde nichts hinzu.\n\n" f"Quellname: {basename}\n" f"Chunk: {chunk_index} von {chunk_count}\n" f"Aufnahmezeit: {metadata.recorded_at.isoformat()}\n" f"Dauer der Gesamtaufnahme: {metadata.duration_human}\n\n" "Transkript-Ausschnitt:\n" f"{transcript_chunk}" ) schema = { "type": "object", "additionalProperties": False, "properties": { "chunk_summary_markdown": {"type": "string"}, }, "required": ["chunk_summary_markdown"], } response_json = self._request( { "model": self.model, "input": [ { "role": "system", "content": [ { "type": "input_text", "text": "Produce JSON that matches the schema exactly.", } ], }, { "role": "user", "content": [{"type": "input_text", "text": prompt}], }, ], "text": {"format": self._schema_payload(schema)}, } ) parsed = extract_json_object(self._extract_output_text(response_json)) return parsed["chunk_summary_markdown"].strip() def summarize_transcript( self, *, prompt_template: str, transcript_text: str, metadata: AudioMetadata, basename: str, ) -> SummaryPayload: transcript_input = transcript_text if len(transcript_text) > DIRECT_TRANSCRIPT_CHAR_LIMIT: chunks = split_text_into_chunks(transcript_text, target_chars=CHUNK_TARGET_CHARS) chunk_summaries = [ self.summarize_chunk( chunk_index=index, chunk_count=len(chunks), transcript_chunk=chunk, metadata=metadata, basename=basename, ) for index, chunk in enumerate(chunks, start=1) ] transcript_input = ( "Das Originaltranskript ist sehr lang. Verwende deshalb diese " "Ausschnittszusammenfassungen als Arbeitsgrundlage für die endgültige " "Zusammenfassung und den Titel.\n\n" + "\n\n".join( f"### Chunk {index}\n{summary}" for index, summary in enumerate(chunk_summaries, start=1) ) ) schema = { "type": "object", "additionalProperties": False, "properties": { "title": { "type": "string", "description": "Kurzer präziser deutscher Titel ohne Datum.", }, "summary_markdown": { "type": "string", "description": "Vollständige strukturierte Zusammenfassung in Markdown.", }, "tags": { "type": "array", "items": {"type": "string"}, "minItems": 2, "maxItems": 8, }, }, "required": ["title", "summary_markdown", "tags"], } metadata_block = ( f"Quellname: {basename}\n" f"Aufnahmezeit: {metadata.recorded_at.isoformat()}\n" f"Aufnahmezeit-Quelle: {metadata.recorded_at_source}\n" f"Dauer in Sekunden: {metadata.duration_seconds:.3f}\n" f"Dauer formatiert: {metadata.duration_human}\n" ) if "[TRANSKRIPT EINFÜGEN]" in prompt_template: template_with_material = prompt_template.replace("[TRANSKRIPT EINFÜGEN]", transcript_input) else: template_with_material = f"{prompt_template}\n\nArbeitsmaterial:\n{transcript_input}" full_prompt = ( "Nutze die folgende Prompt-Vorlage als redaktionellen Standard. " "Die endgültige Antwort selbst muss aber JSON sein, das exakt dem Schema entspricht.\n\n" f"{template_with_material}\n\n" "Zusätzliche Anforderungen für diese Pipeline:\n" "- Gib einen knappen, brauchbaren Titel ohne Datum zurück.\n" "- Der Titel soll das Gesprächsthema treffen und als Dateiname taugen.\n" "- summary_markdown soll direkt als Obsidian-Notizinhalt verwendbar sein.\n" "- tags sollen kurze deutschsprachige Schlagworte sein.\n" "- Schreibe keine Vorbemerkung außerhalb des Schemas.\n\n" "Metadaten:\n" f"{metadata_block}\n" "Beachte die Metadaten zusätzlich bei Titel und Einordnung." ) response_json = self._request( { "model": self.model, "input": [ { "role": "system", "content": [ { "type": "input_text", "text": "Produce JSON that matches the schema exactly.", } ], }, { "role": "user", "content": [{"type": "input_text", "text": full_prompt}], }, ], "text": {"format": self._schema_payload(schema)}, } ) parsed = extract_json_object(self._extract_output_text(response_json)) tags = [slugify_title(tag).replace(" ", "-").lower() for tag in parsed["tags"] if tag.strip()] return SummaryPayload( title=parsed["title"].strip(), summary_markdown=parsed["summary_markdown"].strip(), tags=tags or ["transkript", "ki-zusammenfassung"], ) class NtfyClient: def __init__(self, base_url: str, topic: str, access_token: str, timeout_seconds: int) -> None: self.publish_url = base_url.rstrip("/") + "/" + urllib.parse.quote(topic, safe="") self.access_token = access_token self.timeout_seconds = timeout_seconds def publish_summary_ready( self, *, title: str, note_path: Path, source_basename: str, remote_upload: UploadResult, ) -> None: lines = [ f"Zusammenfassung fertig: {title}", f"Datei: {note_path.name}", f"Pfad: {note_path}", f"Quelle: {source_basename}", ] if remote_upload.status == "uploaded": lines.append(f"Audio-Archiv: {remote_upload.remote_path}") elif remote_upload.error: lines.append(f"Audio-Upload: {remote_upload.status} ({remote_upload.error})") request = urllib.request.Request( url=self.publish_url, data="\n".join(lines).encode("utf-8"), headers={ "Authorization": f"Bearer {self.access_token}", "Content-Type": "text/plain; charset=utf-8", "Title": f"Transkript fertig: {title}", "Tags": "memo", }, method="POST", ) with urllib.request.urlopen(request, timeout=self.timeout_seconds): return class MemosPublisher: PUBLIC_FRONTMATTER_KEYS = ( "title", "date", "recorded_at", "duration_human", "processed_at", "updated_at", "tags", ) def __init__(self, settings: Settings) -> None: self.settings = settings def export_all_notes(self) -> list[Path]: export_root = self.settings.memos_content_dir / "transkripte" export_root.mkdir(parents=True, exist_ok=True) exported_paths: list[Path] = [] for note_path in sorted(self.settings.obsidian_dir.glob("*.md")): exported_path = self.export_note(note_path) if exported_path is not None: exported_paths.append(exported_path) keep_names = {path.name for path in exported_paths} for stale_path in export_root.glob("*.md"): if stale_path.name not in keep_names: stale_path.unlink() self.write_index_pages(exported_paths) return exported_paths def export_note(self, note_path: Path) -> Path | None: frontmatter, body = parse_frontmatter_and_body(note_path.read_text(encoding="utf-8")) if not is_summary_note_frontmatter(frontmatter): return None public_frontmatter: dict[str, Any] = {} for key in self.PUBLIC_FRONTMATTER_KEYS: value = frontmatter.get(key) if value in (None, ""): continue public_frontmatter[key] = value title = str(frontmatter.get("title") or note_path.stem) public_frontmatter.setdefault("title", title) public_frontmatter.setdefault("tags", ["transkript", "memo"]) sanitized_body = remove_section(strip_leading_h1(body, title).strip(), "Quellen").strip() if not sanitized_body: sanitized_body = "## Hinweis\nDiese Notiz enthält derzeit keinen veröffentlichbaren Inhalt.\n" target_path = self.settings.memos_content_dir / "transkripte" / note_path.name target_path.parent.mkdir(parents=True, exist_ok=True) target_path.write_text( f"{build_frontmatter(public_frontmatter)}\n{sanitized_body.rstrip()}\n", encoding="utf-8", ) return target_path def write_index_pages(self, exported_paths: list[Path]) -> None: notes = sorted(exported_paths, key=lambda path: path.stem, reverse=True) stale_pagination_paths = list(self.settings.memos_content_dir.glob("seite-*.md")) for stale_path in stale_pagination_paths: stale_path.unlink() page_chunks = [ notes[index : index + MEMOS_INDEX_PAGE_SIZE] for index in range(0, len(notes), MEMOS_INDEX_PAGE_SIZE) ] or [[]] for page_number, page_notes in enumerate(page_chunks, start=1): note_links = "\n".join(f"- [[transkripte/{path.stem}]]" for path in page_notes) if not note_links: note_links = "- Noch keine veröffentlichten Memos vorhanden." navigation_links: list[str] = [] if page_number > 1: previous_target = "index" if page_number == 2 else f"seite-{page_number - 1}" navigation_links.append(f"[[{previous_target}|Neuere Memos]]") if page_number < len(page_chunks): navigation_links.append(f"[[seite-{page_number + 1}|Ältere Memos]]") navigation_block = "" if navigation_links: navigation_block = "## Navigation\n" + " | ".join(navigation_links) + "\n" title = "Memos" if page_number == 1 else f"Memos – Seite {page_number}" page_frontmatter = build_frontmatter( { "title": title, "tags": ["memos", "transkripte"], } ) page_body = ( "Diese Website enthält privat veröffentlichte Zusammenfassungen mit eingebettetem Transkript.\n\n" f"## Übersicht Seite {page_number}\n{note_links}\n\n" f"{navigation_block}" ) page_path = ( self.settings.memos_content_dir / "index.md" if page_number == 1 else self.settings.memos_content_dir / f"seite-{page_number}.md" ) page_path.write_text( f"{page_frontmatter}\n{page_body}", encoding="utf-8", ) def build_site(self) -> None: quartz_dir = self.settings.memos_quartz_dir if not quartz_dir.exists(): raise PipelineError(f"Quartz directory does not exist: {quartz_dir}") if self.settings.memos_build_command: subprocess.run( self.settings.memos_build_command, cwd=quartz_dir, shell=True, check=True, capture_output=True, text=True, ) return node_modules = quartz_dir / "node_modules" if not node_modules.exists(): subprocess.run( ["npm", "ci", "--no-fund", "--no-audit"], cwd=quartz_dir, check=True, capture_output=True, text=True, ) subprocess.run( [ "npm", "run", "quartz", "--", "build", "--directory", str(self.settings.memos_content_dir), "--output", str(self.settings.memos_output_dir), ], cwd=quartz_dir, check=True, capture_output=True, text=True, ) self.ensure_pretty_urls() def ensure_pretty_urls(self) -> None: for html_path in self.settings.memos_output_dir.rglob("*.html"): if html_path.name in {"index.html", "404.html"}: continue target_dir = html_path.with_suffix("") target_dir.mkdir(parents=True, exist_ok=True) shutil.copy2(html_path, target_dir / "index.html") def deploy_site(self) -> None: if not self.settings.memos_rclone_remote: return subprocess.run( [ self.settings.rclone_bin, "sync", "--delete-after", "--fast-list", *[arg for pattern in self.settings.memos_rclone_excludes for arg in ("--exclude", pattern)], str(self.settings.memos_output_dir), self.settings.memos_rclone_remote, ], check=True, capture_output=True, text=True, ) if not self.settings.memos_sync_htpasswd: return if not self.settings.memos_remote_htpasswd_path: raise PipelineError("MEMOS_REMOTE_HTPASSWD_PATH must be set when MEMOS_SYNC_HTPASSWD is enabled.") if not self.settings.memos_basic_auth_htpasswd_path.exists(): raise PipelineError( f"htpasswd file does not exist: {self.settings.memos_basic_auth_htpasswd_path}. Run `transcript memos-auth` first." ) subprocess.run( [ self.settings.rclone_bin, "copyto", str(self.settings.memos_basic_auth_htpasswd_path), self.settings.memos_remote_htpasswd_path, ], check=True, capture_output=True, text=True, ) def sync_and_build(self) -> int: exported_paths = self.export_all_notes() self.build_site() self.deploy_site() return len(exported_paths) class TranscriptPipeline: def __init__(self, settings: Settings) -> None: self.settings = settings self.state = StateStore(settings.state_db_path) self.last_scan_had_unstable = False self.openai_client = ( OpenAIClient( api_key=settings.openai_api_key, model=settings.openai_model, timeout_seconds=settings.request_timeout_seconds, ) if settings.openai_api_key else None ) self.ntfy_client = ( NtfyClient( base_url=settings.ntfy_base_url, topic=settings.ntfy_topic, access_token=settings.ntfy_access_token, timeout_seconds=settings.request_timeout_seconds, ) if settings.ntfy_access_token else None ) self.memos_publisher = MemosPublisher(settings) if settings.memos_enabled else None def close(self) -> None: self.state.close() def notify_summary_ready( self, *, title: str, note_path: Path, source_basename: str, remote_upload: UploadResult, ) -> None: if self.ntfy_client is None: return try: self.ntfy_client.publish_summary_ready( title=title, note_path=note_path, source_basename=source_basename, remote_upload=remote_upload, ) except Exception as exc: logging.warning("ntfy notification failed for %s: %s", source_basename, exc) def sync_memos_site(self) -> int: if self.memos_publisher is None: raise PipelineError("MEMOS_ENABLED is not enabled.") exported_count = self.memos_publisher.sync_and_build() destination = self.settings.memos_rclone_remote or str(self.settings.memos_output_dir) logging.info("Updated memos site with %s published notes -> %s", exported_count, destination) return exported_count def scan_pairs(self) -> list[SourcePair]: audio_files: dict[str, Path] = {} transcript_files: dict[str, Path] = {} for entry in self.settings.watch_dir.iterdir(): if not entry.is_file(): continue suffix = entry.suffix.lower() if suffix in AUDIO_EXTENSIONS: audio_files[entry.stem] = entry elif suffix == ".md" and entry != self.settings.prompt_path: transcript_files[entry.stem] = entry basenames = sorted(set(audio_files) & set(transcript_files)) return [ SourcePair( basename=basename, audio_path=audio_files[basename], transcript_path=transcript_files[basename], ) for basename in basenames ] def should_ignore_watch_path(self, path: Path) -> bool: resolved = path.resolve(strict=False) ignored_files = { self.settings.prompt_path.resolve(strict=False), self.settings.state_db_path.resolve(strict=False), self.settings.log_path.resolve(strict=False), (self.settings.base_dir / ".env").resolve(strict=False), (self.settings.base_dir / ".env.example").resolve(strict=False), } if resolved in ignored_files: return True archive_root = self.settings.archive_dir.resolve(strict=False) try: resolved.relative_to(archive_root) return True except ValueError: return False def files_stable(self, pair: SourcePair) -> bool: threshold = time.time() - self.settings.debounce_seconds return ( pair.audio_path.stat().st_mtime <= threshold and pair.transcript_path.stat().st_mtime <= threshold ) def build_source_id(self, pair: SourcePair, metadata: AudioMetadata) -> str: raw = f"{pair.basename}|{metadata.recorded_at.isoformat()}|{metadata.duration_seconds:.3f}" return short_hash(raw) def should_process(self, pair: SourcePair, source_id: str) -> bool: row = self.state.get_by_source_id(source_id) if row is None: return True return ( row["audio_signature"] != pair.audio_signature or row["transcript_signature"] != pair.transcript_signature or row["status"] not in {"processed", "processed_with_upload_error"} or not row["note_path"] or not row["remote_audio_status"] ) def process_available_pairs(self) -> int: processed = 0 self.last_scan_had_unstable = False for pair in self.scan_pairs(): if not self.files_stable(pair): self.last_scan_had_unstable = True logging.info("Skipping unstable pair %s", pair.basename) continue try: if self.process_pair(pair): processed += 1 except Exception: logging.exception("Processing failed for %s", pair.basename) return processed def extract_audio_metadata(self, audio_path: Path) -> AudioMetadata: result = subprocess.run( [ self.settings.ffprobe_bin, "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", str(audio_path), ], check=True, capture_output=True, text=True, ) payload = json.loads(result.stdout) format_data = payload.get("format", {}) stream_data = payload.get("streams", [{}])[0] creation_time = ( format_data.get("tags", {}).get("creation_time") or stream_data.get("tags", {}).get("creation_time") ) stat = audio_path.stat() if creation_time: recorded_at = parse_iso_datetime(creation_time) source = "ffprobe.creation_time" else: recorded_at = datetime.fromtimestamp(getattr(stat, "st_birthtime", stat.st_mtime), UTC) source = "filesystem.birthtime" local_recorded_at = recorded_at.astimezone(datetime.now().astimezone().tzinfo or ZoneInfo("Europe/Berlin")) duration_seconds = float(format_data.get("duration") or stream_data.get("duration") or 0.0) return AudioMetadata( recorded_at=local_recorded_at, recorded_at_source=source, duration_seconds=duration_seconds, duration_human=format_duration(duration_seconds), audio_size=stat.st_size, ) def upload_audio_to_remote(self, audio_path: Path, source_id: str) -> UploadResult: remote_path = join_remote_path(self.settings.rclone_remote, source_id, "audio", audio_path.name) try: subprocess.run( [ self.settings.rclone_bin, "copyto", str(audio_path), remote_path, ], check=True, capture_output=True, text=True, ) except subprocess.CalledProcessError as exc: error_message = exc.stderr.strip() or exc.stdout.strip() or str(exc) return UploadResult( remote_path=remote_path, status="upload_failed", uploaded_at=None, error=error_message, ) return UploadResult( remote_path=remote_path, status="uploaded", uploaded_at=datetime.now(UTC), error=None, ) def retry_pending_uploads(self) -> int: retried = 0 for row in self.state.iter_pending_uploads(): local_audio_path = row["local_audio_path"] if not local_audio_path: continue audio_path = Path(local_audio_path) if not audio_path.exists(): continue source_id = row["source_id"] upload_result = self.upload_audio_to_remote(audio_path, source_id) processing_status = "processed" if upload_result.status == "uploaded" else "processed_with_upload_error" self.state.mark_processed( source_id=source_id, basename=row["basename"], audio_signature=row["audio_signature"], transcript_signature=row["transcript_signature"], note_path=Path(row["note_path"]), raw_note_path=Path(row["raw_note_path"] or row["note_path"]), local_audio_path=audio_path, remote_audio_path=upload_result.remote_path, remote_audio_status=upload_result.status, remote_audio_uploaded_at=upload_result.uploaded_at, remote_audio_last_error=upload_result.error, processing_status=processing_status, preserve_processed_at=True, ) retried += 1 if upload_result.status == "uploaded": logging.info("Uploaded cached audio for %s to %s", row["basename"], upload_result.remote_path) else: logging.warning("Retry upload failed for %s: %s", row["basename"], upload_result.error) return retried def migrate_archive(self) -> int: migrated = 0 rows = list( self.state.connection.execute( "SELECT * FROM source_state ORDER BY COALESCE(updated_at, last_processed_at, '') ASC" ).fetchall() ) for row in rows: note_path_value = row["note_path"] if not note_path_value: continue note_path = Path(note_path_value) if not note_path.exists(): fallback_note = self.find_existing_note(row["source_id"], note_type="summary") if fallback_note is not None and fallback_note.exists(): note_path = fallback_note else: logging.warning("Skipping migration for missing note %s", note_path) continue existing_frontmatter, existing_body = parse_frontmatter_and_body(note_path.read_text(encoding="utf-8")) if extract_transcript_section(existing_body) and existing_frontmatter.get("remote_audio_status"): continue source_id = row["source_id"] basename = row["basename"] audio_path, transcript_path = self.find_archived_pair(source_id) normalized_audio_path = self.normalize_archived_audio(source_id, audio_path) metadata = None if normalized_audio_path and normalized_audio_path.exists(): metadata = self.extract_audio_metadata(normalized_audio_path) else: recorded_at_value = existing_frontmatter.get("recorded_at") if not recorded_at_value: logging.warning("Skipping migration for %s due to missing audio and recorded_at", basename) continue recorded_at = parse_iso_datetime(recorded_at_value) metadata = AudioMetadata( recorded_at=recorded_at, recorded_at_source=existing_frontmatter.get("recorded_at_source", "existing_note"), duration_seconds=float(existing_frontmatter.get("duration_seconds", 0.0)), duration_human=str(existing_frontmatter.get("duration_human", "0:00")), audio_size=0, ) transcript_text = transcript_path.read_text(encoding="utf-8") if transcript_path and transcript_path.exists() else "" if normalized_audio_path and normalized_audio_path.exists(): upload_result = self.upload_audio_to_remote(normalized_audio_path, source_id) else: upload_result = UploadResult( remote_path=join_remote_path(self.settings.rclone_remote, source_id, "audio"), status="upload_failed", uploaded_at=None, error="Local audio file missing during migration.", ) processed_at_value = existing_frontmatter.get("processed_at") processed_at = parse_iso_datetime(processed_at_value) if processed_at_value else datetime.now(UTC) self.update_existing_summary_note( note_path=note_path, metadata=metadata, source_id=source_id, source_basename=basename, transcript_text=transcript_text, local_audio_path=normalized_audio_path, remote_upload=upload_result, processed_at=processed_at, ) self.state.mark_processed( source_id=source_id, basename=basename, audio_signature=build_file_signature(normalized_audio_path) if normalized_audio_path and normalized_audio_path.exists() else row["audio_signature"], transcript_signature=build_file_signature(transcript_path) if transcript_path and transcript_path.exists() else row["transcript_signature"], note_path=note_path, raw_note_path=note_path, local_audio_path=normalized_audio_path, remote_audio_path=upload_result.remote_path, remote_audio_status=upload_result.status, remote_audio_uploaded_at=upload_result.uploaded_at, remote_audio_last_error=upload_result.error, processing_status="processed" if upload_result.status == "uploaded" else "processed_with_upload_error", preserve_processed_at=True, ) migrated += 1 logging.info("Migrated archived source %s", basename) if migrated and self.memos_publisher is not None: self.sync_memos_site() return migrated def build_note_target( self, *, source_id: str, title: str, recorded_at: datetime, note_type: str, ) -> NoteTarget: sanitized_title = slugify_title(title) suffix = " - Transkript" if note_type == "raw_transcript" else "" base_filename = f"{format_note_date(recorded_at)} {sanitized_title}{suffix}.md" existing_path = self.find_existing_note(source_id, note_type=note_type) if existing_path is not None and existing_path.exists(): desired = self.unique_note_path(base_filename, source_id, preferred_existing=existing_path, note_type=note_type) return NoteTarget(source_id=source_id, title=sanitized_title, note_path=desired, existing_path=existing_path) desired = self.unique_note_path(base_filename, source_id, preferred_existing=None, note_type=note_type) return NoteTarget(source_id=source_id, title=sanitized_title, note_path=desired, existing_path=None) def unique_note_path( self, filename: str, source_id: str, preferred_existing: Path | None, note_type: str, ) -> Path: candidate = self.settings.obsidian_dir / filename if preferred_existing is not None and preferred_existing == candidate: return candidate if not candidate.exists(): return candidate if self.note_belongs_to_source(candidate, source_id, note_type=note_type): return candidate stem = candidate.stem suffix = candidate.suffix counter = 2 while True: next_candidate = candidate.with_name(f"{stem}-{counter}{suffix}") if preferred_existing is not None and preferred_existing == next_candidate: return next_candidate if not next_candidate.exists() or self.note_belongs_to_source(next_candidate, source_id, note_type=note_type): return next_candidate counter += 1 def note_belongs_to_source(self, note_path: Path, source_id: str, note_type: str | None = None) -> bool: if not note_path.exists(): return False text = note_path.read_text(encoding="utf-8") if not re.search(rf'^source_id:\s+"?{re.escape(source_id)}"?$', text, re.MULTILINE): return False if note_type is None: return True type_match = re.search(r'^type: "([^"]+)"$', text, re.MULTILINE) if type_match: return type_match.group(1) == note_type return note_type == "summary" def find_existing_note(self, source_id: str, note_type: str) -> Path | None: row = self.state.get_by_source_id(source_id) key = "raw_note_path" if note_type == "raw_transcript" else "note_path" if row and key in row.keys() and row[key]: candidate = Path(row[key]) if candidate.exists() and self.note_belongs_to_source(candidate, source_id, note_type=note_type): return candidate for note_path in self.settings.obsidian_dir.glob("*.md"): if self.note_belongs_to_source(note_path, source_id, note_type=note_type): return note_path return None def cache_audio(self, audio_path: Path, source_id: str) -> Path: recorded_dir = self.settings.archive_dir / "processed" / source_id / "audio" recorded_dir.mkdir(parents=True, exist_ok=True) archived_audio = recorded_dir / audio_path.name if archived_audio.exists(): archived_audio.unlink() shutil.move(str(audio_path), str(archived_audio)) return archived_audio def remove_source_transcript(self, transcript_path: Path) -> None: if transcript_path.exists(): transcript_path.unlink() def find_archived_pair(self, source_id: str) -> tuple[Path | None, Path | None]: source_dir = self.settings.archive_dir / "processed" / source_id if not source_dir.exists(): return None, None transcript_path: Path | None = None for candidate in sorted(source_dir.glob("*.md")): transcript_path = candidate break audio_path: Path | None = None audio_dir = source_dir / "audio" search_roots = [audio_dir] if audio_dir.exists() else [] search_roots.append(source_dir) for root in search_roots: for candidate in sorted(root.iterdir()): if candidate.is_file() and candidate.suffix.lower() in AUDIO_EXTENSIONS: audio_path = candidate break if audio_path: break return audio_path, transcript_path def normalize_archived_audio(self, source_id: str, audio_path: Path | None) -> Path | None: if audio_path is None or not audio_path.exists(): return None if audio_path.parent.name == "audio": return audio_path return self.cache_audio(audio_path, source_id) def write_note( self, *, target: NoteTarget, payload: SummaryPayload, metadata: AudioMetadata, transcript_text: str, local_audio_path: Path | None, remote_upload: UploadResult, source_basename: str, processed_at: datetime, preserve_processed_at: bool, ) -> Path: target.note_path.parent.mkdir(parents=True, exist_ok=True) if target.existing_path and target.existing_path.exists() and target.existing_path != target.note_path: target.existing_path.replace(target.note_path) existing_processed_at = processed_at if preserve_processed_at and target.note_path.exists(): content = target.note_path.read_text(encoding="utf-8") match = re.search(r'^processed_at: "([^"]+)"$', content, re.MULTILINE) if match: existing_processed_at = parse_iso_datetime(match.group(1)) tags = ["transkript", "ki-zusammenfassung", *payload.tags] tags = list(dict.fromkeys(tags)) summary_markdown = strip_leading_h1(payload.summary_markdown, payload.title).strip() frontmatter = build_frontmatter( { "title": payload.title, "type": "summary", "date": metadata.recorded_at.date().isoformat(), "recorded_at": metadata.recorded_at.isoformat(), "recorded_at_source": metadata.recorded_at_source, "duration_seconds": round(metadata.duration_seconds, 3), "duration_human": metadata.duration_human, "source_id": target.source_id, "source_basename": source_basename, "source_audio_cache": str(local_audio_path) if local_audio_path else "", "remote_audio": remote_upload.remote_path, "remote_audio_status": remote_upload.status, "remote_audio_uploaded_at": remote_upload.uploaded_at.isoformat() if remote_upload.uploaded_at else "", "remote_audio_last_error": remote_upload.error or "", "processed_at": existing_processed_at.isoformat(), "updated_at": processed_at.isoformat(), "pipeline_version": PIPELINE_VERSION, "llm_model": self.settings.openai_model, "status": "processed" if remote_upload.status == "uploaded" else "processed_with_upload_error", "tags": tags, } ) body = ( "## Metadaten\n" f"- Aufnahmedatum: {metadata.recorded_at.date().isoformat()}\n" f"- Aufnahmezeit: {metadata.recorded_at.strftime('%H:%M:%S %Z')}\n" f"- Dauer: {metadata.duration_human}\n" f"- Quelle: `{source_basename}`\n\n" f"{summary_markdown}\n\n" "## Transkript\n" f"{transcript_text.strip()}\n\n" "## Quellen\n" f"- Remote-Audio: `{remote_upload.remote_path}`\n" f"- Upload-Status: `{remote_upload.status}`\n" ) target.note_path.write_text(remove_blank_lines(f"{frontmatter}\n{body}"), encoding="utf-8") return target.note_path def write_raw_transcript_note( self, *, target: NoteTarget, metadata: AudioMetadata, transcript_text: str, source_basename: str, processed_at: datetime, ) -> Path: target.note_path.parent.mkdir(parents=True, exist_ok=True) if target.existing_path and target.existing_path.exists() and target.existing_path != target.note_path: target.existing_path.replace(target.note_path) frontmatter = build_frontmatter( { "title": f"{target.title} - Transkript", "type": "raw_transcript", "date": metadata.recorded_at.date().isoformat(), "recorded_at": metadata.recorded_at.isoformat(), "recorded_at_source": metadata.recorded_at_source, "duration_seconds": round(metadata.duration_seconds, 3), "duration_human": metadata.duration_human, "source_id": target.source_id, "source_basename": source_basename, "processed_at": processed_at.isoformat(), "updated_at": processed_at.isoformat(), "pipeline_version": PIPELINE_VERSION, "status": "processed", "tags": ["transkript", "rohtranskript", "whisper-segmente"], } ) body = ( "## Metadaten\n" f"- Aufnahmedatum: {metadata.recorded_at.date().isoformat()}\n" f"- Aufnahmezeit: {metadata.recorded_at.strftime('%H:%M:%S %Z')}\n" f"- Dauer: {metadata.duration_human}\n" f"- Quelle: `{source_basename}`\n\n" "## Rohtranskript\n" f"{transcript_text.strip()}\n" ) target.note_path.write_text(remove_blank_lines(f"{frontmatter}\n{body}"), encoding="utf-8") return target.note_path def update_existing_summary_note( self, *, note_path: Path, metadata: AudioMetadata, source_id: str, source_basename: str, transcript_text: str, local_audio_path: Path | None, remote_upload: UploadResult, processed_at: datetime, ) -> None: existing_text = note_path.read_text(encoding="utf-8") frontmatter, body = parse_frontmatter_and_body(existing_text) title = frontmatter.get("title", note_path.stem) tags = frontmatter.get("tags", ["transkript", "ki-zusammenfassung"]) if not isinstance(tags, list): tags = ["transkript", "ki-zusammenfassung"] tags = list(dict.fromkeys([str(tag) for tag in tags])) processed_at_value = frontmatter.get("processed_at") or processed_at.isoformat() updated_frontmatter = { "title": title, "type": "summary", "date": metadata.recorded_at.date().isoformat(), "recorded_at": metadata.recorded_at.isoformat(), "recorded_at_source": metadata.recorded_at_source, "duration_seconds": round(metadata.duration_seconds, 3), "duration_human": metadata.duration_human, "source_id": source_id, "source_basename": source_basename, "source_audio_cache": str(local_audio_path) if local_audio_path else "", "remote_audio": remote_upload.remote_path, "remote_audio_status": remote_upload.status, "remote_audio_uploaded_at": remote_upload.uploaded_at.isoformat() if remote_upload.uploaded_at else "", "remote_audio_last_error": remote_upload.error or "", "processed_at": processed_at_value, "updated_at": processed_at.isoformat(), "pipeline_version": PIPELINE_VERSION, "llm_model": frontmatter.get("llm_model", self.settings.openai_model), "status": "processed" if remote_upload.status == "uploaded" else "processed_with_upload_error", "tags": tags, } body = strip_leading_h1(body, title).strip() transcript_value = transcript_text.strip() if transcript_value: body = upsert_transcript_section(body, transcript_value) sources_section = ( "## Quellen\n" f"- Remote-Audio: `{remote_upload.remote_path}`\n" f"- Upload-Status: `{remote_upload.status}`\n" ) body = replace_or_append_section(body, "Quellen", sources_section) note_path.write_text(remove_blank_lines(f"{build_frontmatter(updated_frontmatter)}\n{body}"), encoding="utf-8") def process_pair(self, pair: SourcePair) -> bool: if self.openai_client is None: raise PipelineError("OPENAI_API_KEY is not set.") audio_signature = pair.audio_signature transcript_signature = pair.transcript_signature transcript_text = pair.transcript_path.read_text(encoding="utf-8").strip() if not transcript_text: raise PipelineError(f"Transcript is empty: {pair.transcript_path}") metadata = self.extract_audio_metadata(pair.audio_path) source_id = self.build_source_id(pair, metadata) if not self.should_process(pair, source_id): logging.info("Skipping unchanged source %s", pair.basename) return False existing_row = self.state.get_by_source_id(source_id) self.state.upsert_processing( source_id=source_id, basename=pair.basename, audio_signature=audio_signature, transcript_signature=transcript_signature, preserve_remote_state=existing_row is not None, ) prompt_template = self.settings.prompt_path.read_text(encoding="utf-8") try: payload = self.openai_client.summarize_transcript( prompt_template=prompt_template, transcript_text=transcript_text, metadata=metadata, basename=pair.basename, ) note_target = self.build_note_target( source_id=source_id, title=payload.title, recorded_at=metadata.recorded_at, note_type="summary", ) archived_audio = self.cache_audio(pair.audio_path, source_id) processed_at = datetime.now(UTC) upload_result = self.upload_audio_to_remote(archived_audio, source_id) note_path = self.write_note( target=note_target, payload=payload, metadata=metadata, transcript_text=transcript_text, local_audio_path=archived_audio, remote_upload=upload_result, source_basename=pair.basename, processed_at=processed_at, preserve_processed_at=existing_row is not None, ) self.remove_source_transcript(pair.transcript_path) self.state.mark_processed( source_id=source_id, basename=pair.basename, audio_signature=audio_signature, transcript_signature=transcript_signature, note_path=note_path, raw_note_path=note_path, local_audio_path=archived_audio, remote_audio_path=upload_result.remote_path, remote_audio_status=upload_result.status, remote_audio_uploaded_at=upload_result.uploaded_at, remote_audio_last_error=upload_result.error, processing_status="processed" if upload_result.status == "uploaded" else "processed_with_upload_error", preserve_processed_at=existing_row is not None, ) if self.memos_publisher is not None: try: self.sync_memos_site() except Exception: logging.exception("Memos publish failed for %s", pair.basename) self.notify_summary_ready( title=payload.title, note_path=note_path, source_basename=pair.basename, remote_upload=upload_result, ) logging.info("Processed %s -> %s", pair.basename, note_path) if upload_result.status != "uploaded": logging.warning("Remote upload failed for %s: %s", pair.basename, upload_result.error) return True except Exception as exc: self.state.mark_failed( source_id=source_id, basename=pair.basename, audio_signature=audio_signature, transcript_signature=transcript_signature, error_message=str(exc), ) raise def cleanup_archive(self) -> int: cutoff = datetime.now(UTC) - timedelta(days=self.settings.retention_days) deleted = 0 processed_root = self.settings.archive_dir / "processed" if not processed_root.exists(): return 0 for source_dir in processed_root.iterdir(): if not source_dir.is_dir(): continue audio_dir = source_dir / "audio" if audio_dir.exists(): for audio_path in audio_dir.iterdir(): if not audio_path.is_file(): continue if datetime.fromtimestamp(audio_path.stat().st_mtime, UTC) < cutoff: audio_path.unlink() deleted += 1 for cleanup_dir in sorted(source_dir.rglob("*"), reverse=True): if cleanup_dir.is_dir() and not any(cleanup_dir.iterdir()): cleanup_dir.rmdir() if source_dir.exists() and not any(source_dir.iterdir()): source_dir.rmdir() return deleted def run_scan(self) -> int: self.retry_pending_uploads() deleted = self.cleanup_archive() if deleted: logging.info("Deleted %s archived audio files", deleted) return self.process_available_pairs() def run_watch(self) -> None: self.cleanup_archive() self.process_available_pairs() if self.settings.fswatch_bin and Path(self.settings.fswatch_bin).exists(): self._run_fswatch_loop() else: self._run_polling_loop() def _run_fswatch_loop(self) -> None: logging.info("Starting fswatch loop on %s", self.settings.watch_dir) process = subprocess.Popen( [ self.settings.fswatch_bin, "-0", str(self.settings.watch_dir), ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) assert process.stdout is not None next_scan_at: float | None = None buffer = b"" should_stop = False def stop_handler(signum: int, frame: Any) -> None: nonlocal should_stop should_stop = True signal.signal(signal.SIGINT, stop_handler) signal.signal(signal.SIGTERM, stop_handler) while not should_stop: readable, _, _ = select.select([process.stdout], [], [], 1.0) if readable: buffer += os.read(process.stdout.fileno(), 65536) if b"\0" in buffer: parts = buffer.split(b"\0") buffer = parts[-1] decoded = [ Path(part.decode("utf-8", errors="replace")) for part in parts[:-1] if part ] if any(not self.should_ignore_watch_path(path) for path in decoded): next_scan_at = time.monotonic() + self.settings.debounce_seconds if next_scan_at is not None and time.monotonic() >= next_scan_at: self.run_scan() next_scan_at = None process.terminate() process.wait(timeout=5) def _run_polling_loop(self) -> None: logging.info("fswatch not found; using polling loop on %s", self.settings.watch_dir) previous_snapshot: dict[str, str] = {} while True: current_snapshot = {} for path in self.settings.watch_dir.iterdir(): if path.is_file() and not self.should_ignore_watch_path(path): current_snapshot[str(path)] = build_file_signature(path) if current_snapshot != previous_snapshot: self.run_scan() if self.last_scan_had_unstable: previous_snapshot = {} else: previous_snapshot = current_snapshot time.sleep(max(self.settings.debounce_seconds, 3)) def build_frontmatter(values: dict[str, Any]) -> str: lines = ["---"] for key, value in values.items(): if isinstance(value, list): lines.append(f"{key}:") for item in value: lines.append(f' - "{str(item).replace(chr(34), chr(39))}"') elif isinstance(value, bool): lines.append(f"{key}: {'true' if value else 'false'}") elif isinstance(value, (int, float)): lines.append(f"{key}: {value}") else: escaped = str(value).replace('"', "'") lines.append(f'{key}: "{escaped}"') lines.append("---") return "\n".join(lines) def remove_blank_lines(text: str) -> str: cleaned_lines = [line.rstrip() for line in text.splitlines() if line.strip()] return "\n".join(cleaned_lines) + "\n" def strip_leading_h1(text: str, title: str | None = None) -> str: lines = text.splitlines() if not lines: return text first = lines[0].strip() if not first.startswith("# "): return text if title is None or first[2:].strip() == title.strip(): return "\n".join(lines[1:]).lstrip("\n") return text def parse_frontmatter_and_body(text: str) -> tuple[dict[str, Any], str]: lines = text.splitlines() if not lines or lines[0].strip() != "---": return {}, text frontmatter_lines: list[str] = [] body_start = 0 for index in range(1, len(lines)): if lines[index].strip() == "---": body_start = index + 1 break frontmatter_lines.append(lines[index]) data: dict[str, Any] = {} current_list_key: str | None = None for raw_line in frontmatter_lines: line = raw_line.rstrip() if not line: continue if current_list_key and line.startswith(" - "): item = line[4:].strip().strip('"') data.setdefault(current_list_key, []).append(item) continue current_list_key = None if line.endswith(":") and ": " not in line: current_list_key = line[:-1] data[current_list_key] = [] continue if ": " not in line: continue key, value = line.split(": ", 1) value = value.strip() if value.startswith('"') and value.endswith('"'): data[key] = value.strip('"') else: data[key] = value body = "\n".join(lines[body_start:]).lstrip("\n") return data, body def replace_or_append_section(body: str, heading: str, replacement: str) -> str: pattern = re.compile(rf"(?ms)^## {re.escape(heading)}\n.*?(?=^## |\Z)") replacement_block = replacement.strip() if pattern.search(body): updated = pattern.sub(replacement_block + "\n", body).rstrip("\n") return updated return body.rstrip("\n") + "\n" + replacement_block def remove_section(body: str, heading: str) -> str: pattern = re.compile(rf"(?ms)^## {re.escape(heading)}\n.*?(?=^## |\Z)") updated = pattern.sub("", body).strip() return updated + "\n" if updated else "" def is_summary_note_frontmatter(frontmatter: dict[str, Any]) -> bool: note_type = str(frontmatter.get("type", "summary")).strip() source_id = str(frontmatter.get("source_id", "")).strip() return note_type == "summary" and bool(source_id) def write_htpasswd(settings: Settings) -> Path: if not settings.memos_basic_auth_user or not settings.memos_basic_auth_password: raise PipelineError("MEMOS_BASIC_AUTH_USER and MEMOS_BASIC_AUTH_PASSWORD must be set.") result = subprocess.run( [ "openssl", "passwd", "-apr1", settings.memos_basic_auth_password, ], check=True, capture_output=True, text=True, ) hashed_password = result.stdout.strip() settings.memos_basic_auth_htpasswd_path.write_text( f"{settings.memos_basic_auth_user}:{hashed_password}\n", encoding="utf-8", ) return settings.memos_basic_auth_htpasswd_path def parse_args(argv: list[str]) -> argparse.Namespace: parser = argparse.ArgumentParser(description="Watch and process MacWhisper transcripts into Obsidian notes.") subparsers = parser.add_subparsers(dest="command", required=True) scan_parser = subparsers.add_parser("scan", help="Process available transcript/audio pairs once.") scan_parser.add_argument("--basename", help="Process only a specific basename if present.") subparsers.add_parser("watch", help="Watch the directory and process new files automatically.") subparsers.add_parser("cleanup", help="Delete archived originals older than the retention period.") subparsers.add_parser("memos-sync", help="Export published notes for Quartz and rebuild the memos site.") subparsers.add_parser("memos-auth", help="Write the Basic Auth htpasswd file for the memos site.") subparsers.add_parser("retry-uploads", help="Retry pending remote audio uploads from the local archive.") subparsers.add_parser("migrate-archive", help="Retroactively migrate archived sources to raw transcript notes and remote audio.") reprocess_parser = subparsers.add_parser("reprocess", help="Reprocess a basename from the watch folder.") reprocess_parser.add_argument("basename", help="Basename without extension, e.g. 'Neue Aufnahme 23'") return parser.parse_args(argv) def run_cli(argv: list[str]) -> int: base_dir = Path(__file__).resolve().parent load_dotenv(base_dir / ".env") settings = Settings.from_env(base_dir) settings.ensure_directories() setup_logging(settings.log_path) args = parse_args(argv) pipeline = TranscriptPipeline(settings) try: if args.command == "watch": pipeline.run_watch() return 0 if args.command == "cleanup": deleted = pipeline.cleanup_archive() logging.info("Deleted %s archived audio files", deleted) return 0 if args.command == "memos-auth": path = write_htpasswd(settings) logging.info("Wrote htpasswd file to %s", path) return 0 if args.command == "memos-sync": exported_count = pipeline.sync_memos_site() logging.info("Built memos site with %s notes", exported_count) return 0 if args.command == "retry-uploads": retried = pipeline.retry_pending_uploads() logging.info("Retried %s pending uploads", retried) return 0 if args.command == "migrate-archive": migrated = pipeline.migrate_archive() logging.info("Migrated %s archived sources", migrated) return 0 if args.command == "scan": if args.basename: pair = next((p for p in pipeline.scan_pairs() if p.basename == args.basename), None) if pair is None: raise PipelineError(f"No matching pair found for basename: {args.basename}") pipeline.process_pair(pair) return 0 pipeline.run_scan() return 0 if args.command == "reprocess": pair = next((p for p in pipeline.scan_pairs() if p.basename == args.basename), None) if pair is None: raise PipelineError(f"No matching pair found for basename: {args.basename}") pipeline.process_pair(pair) return 0 finally: pipeline.close() return 1 if __name__ == "__main__": try: raise SystemExit(run_cli(sys.argv[1:])) except PipelineError as exc: logging.error("%s", exc) raise SystemExit(1)