Files
transkripte/transcript_pipeline.py
T
2026-04-15 00:01:38 +02:00

2025 lines
79 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import dataclasses
import hashlib
import json
import logging
import os
import re
import select
import shutil
import signal
import sqlite3
import subprocess
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo
PIPELINE_VERSION = "1.0.0"
AUDIO_EXTENSIONS = {".m4a", ".mp3", ".wav", ".aiff", ".aif", ".caf", ".mp4", ".m4b"}
DEFAULT_OBSIDIAN_DIR = Path("/Users/maddin/Obsidian/Maddin/Transkripte")
DEFAULT_MODEL = "gpt-5.2"
DEFAULT_DEBOUNCE_SECONDS = 10
DEFAULT_RETENTION_DAYS = 7
DIRECT_TRANSCRIPT_CHAR_LIMIT = 120_000
CHUNK_TARGET_CHARS = 60_000
MEMOS_INDEX_PAGE_SIZE = 20
class PipelineError(RuntimeError):
pass
class OpenAIError(PipelineError):
pass
@dataclasses.dataclass(slots=True)
class Settings:
base_dir: Path
watch_dir: Path
obsidian_dir: Path
archive_dir: Path
memos_enabled: bool
memos_site_url: str
memos_content_dir: Path
memos_quartz_dir: Path
memos_output_dir: Path
memos_build_command: str | None
memos_rclone_remote: str | None
memos_rclone_excludes: tuple[str, ...]
memos_sync_htpasswd: bool
memos_remote_htpasswd_path: str | None
memos_basic_auth_user: str | None
memos_basic_auth_password: str | None
memos_basic_auth_htpasswd_path: Path
prompt_path: Path
state_db_path: Path
log_path: Path
openai_api_key: str | None
openai_model: str
debounce_seconds: int
retention_days: int
request_timeout_seconds: int
ffprobe_bin: str
fswatch_bin: str | None
rclone_bin: str
rclone_remote: str
ntfy_base_url: str
ntfy_topic: str
ntfy_access_token: str | None
@classmethod
def from_env(cls, base_dir: Path) -> "Settings":
watch_dir = Path(os.environ.get("WATCH_DIR", str(base_dir))).expanduser()
archive_dir = Path(os.environ.get("ARCHIVE_DIR", str(base_dir / "archive"))).expanduser()
memos_enabled = os.environ.get("MEMOS_ENABLED", "").strip().lower() in {"1", "true", "yes", "on"}
memos_content_dir = Path(
os.environ.get("MEMOS_CONTENT_DIR", str(base_dir / "memos-content"))
).expanduser()
memos_quartz_dir = Path(
os.environ.get("MEMOS_QUARTZ_DIR", str(base_dir / "memos-quartz"))
).expanduser()
memos_output_dir = Path(
os.environ.get("MEMOS_OUTPUT_DIR", str(base_dir / "memos-site"))
).expanduser()
prompt_path = Path(
os.environ.get("PROMPT_PATH", str(base_dir / "transkript-zusammenfassung-prompt.md"))
).expanduser()
obsidian_dir = Path(
os.environ.get("OBSIDIAN_DIR", str(DEFAULT_OBSIDIAN_DIR))
).expanduser()
state_db_path = Path(
os.environ.get("STATE_DB_PATH", str(base_dir / "pipeline_state.sqlite3"))
).expanduser()
log_path = Path(os.environ.get("LOG_PATH", str(base_dir / "pipeline.log"))).expanduser()
return cls(
base_dir=base_dir,
watch_dir=watch_dir,
obsidian_dir=obsidian_dir,
archive_dir=archive_dir,
memos_enabled=memos_enabled,
memos_site_url=os.environ.get("MEMOS_SITE_URL", "https://memos.maddin.app").strip(),
memos_content_dir=memos_content_dir,
memos_quartz_dir=memos_quartz_dir,
memos_output_dir=memos_output_dir,
memos_build_command=os.environ.get("MEMOS_BUILD_COMMAND", "").strip() or None,
memos_rclone_remote=os.environ.get("MEMOS_RCLONE_REMOTE", "").strip() or None,
memos_rclone_excludes=tuple(
part.strip()
for part in os.environ.get("MEMOS_RCLONE_EXCLUDES", "").split(",")
if part.strip()
),
memos_sync_htpasswd=os.environ.get("MEMOS_SYNC_HTPASSWD", "").strip().lower() in {"1", "true", "yes", "on"},
memos_remote_htpasswd_path=os.environ.get("MEMOS_REMOTE_HTPASSWD_PATH", "").strip() or None,
memos_basic_auth_user=os.environ.get("MEMOS_BASIC_AUTH_USER", "").strip() or None,
memos_basic_auth_password=os.environ.get("MEMOS_BASIC_AUTH_PASSWORD", "").strip() or None,
memos_basic_auth_htpasswd_path=Path(
os.environ.get("MEMOS_BASIC_AUTH_HTPASSWD_PATH", str(base_dir / "deploy/nginx/memos.htpasswd"))
).expanduser(),
prompt_path=prompt_path,
state_db_path=state_db_path,
log_path=log_path,
openai_api_key=os.environ.get("OPENAI_API_KEY"),
openai_model=os.environ.get("OPENAI_MODEL", DEFAULT_MODEL),
debounce_seconds=int(os.environ.get("DEBOUNCE_SECONDS", str(DEFAULT_DEBOUNCE_SECONDS))),
retention_days=int(os.environ.get("RETENTION_DAYS", str(DEFAULT_RETENTION_DAYS))),
request_timeout_seconds=int(os.environ.get("OPENAI_TIMEOUT_SECONDS", "600")),
ffprobe_bin=os.environ.get("FFPROBE_BIN", shutil.which("ffprobe") or "/opt/homebrew/bin/ffprobe"),
fswatch_bin=os.environ.get("FSWATCH_BIN", shutil.which("fswatch") or "/opt/homebrew/bin/fswatch"),
rclone_bin=os.environ.get("RCLONE_BIN", shutil.which("rclone") or "/opt/homebrew/bin/rclone"),
rclone_remote=os.environ.get("RCLONE_REMOTE", "transkripte:/"),
ntfy_base_url=os.environ.get("NTFY_BASE_URL", "https://ntfy.maddin.app").strip(),
ntfy_topic=os.environ.get("NTFY_TOPIC", "Transkript").strip(),
ntfy_access_token=os.environ.get("NTFY_ACCESS_TOKEN", "").strip() or None,
)
def ensure_directories(self) -> None:
self.watch_dir.mkdir(parents=True, exist_ok=True)
self.obsidian_dir.mkdir(parents=True, exist_ok=True)
(self.archive_dir / "processed").mkdir(parents=True, exist_ok=True)
(self.archive_dir / "failed").mkdir(parents=True, exist_ok=True)
if self.memos_enabled:
self.memos_content_dir.mkdir(parents=True, exist_ok=True)
self.memos_output_dir.mkdir(parents=True, exist_ok=True)
self.memos_basic_auth_htpasswd_path.parent.mkdir(parents=True, exist_ok=True)
self.state_db_path.parent.mkdir(parents=True, exist_ok=True)
self.log_path.parent.mkdir(parents=True, exist_ok=True)
@dataclasses.dataclass(slots=True)
class SourcePair:
basename: str
audio_path: Path
transcript_path: Path
@property
def audio_signature(self) -> str:
return build_file_signature(self.audio_path)
@property
def transcript_signature(self) -> str:
return build_file_signature(self.transcript_path)
@dataclasses.dataclass(slots=True)
class AudioMetadata:
recorded_at: datetime
recorded_at_source: str
duration_seconds: float
duration_human: str
audio_size: int
@dataclasses.dataclass(slots=True)
class SummaryPayload:
title: str
summary_markdown: str
tags: list[str]
@dataclasses.dataclass(slots=True)
class NoteTarget:
source_id: str
title: str
note_path: Path
existing_path: Path | None
@dataclasses.dataclass(slots=True)
class RawTranscriptPayload:
title: str
transcript_markdown: str
@dataclasses.dataclass(slots=True)
class UploadResult:
remote_path: str
status: str
uploaded_at: datetime | None
error: str | None = None
def load_dotenv(path: Path) -> None:
if not path.exists():
return
for line in path.read_text(encoding="utf-8").splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#") or "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
if not key or key in os.environ:
continue
value = value.strip().strip("'").strip('"')
os.environ[key] = value
def setup_logging(log_path: Path) -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
handlers=[
logging.FileHandler(log_path, encoding="utf-8"),
logging.StreamHandler(sys.stdout),
],
)
def build_file_signature(path: Path) -> str:
stat = path.stat()
return f"{stat.st_mtime_ns}:{stat.st_size}"
def slugify_title(value: str) -> str:
value = value.strip()
value = re.sub(r"\s+", " ", value)
value = re.sub(r"[/:]+", " ", value)
value = re.sub(r"[^\w\s.-]", "", value, flags=re.UNICODE)
value = re.sub(r"\s+", " ", value).strip(" .-_")
return value or "Unbenannt"
def format_note_date(recorded_at: datetime) -> str:
return recorded_at.strftime("%y%m%d")
def format_duration(duration_seconds: float) -> str:
total_seconds = max(int(round(duration_seconds)), 0)
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
if hours:
return f"{hours}:{minutes:02d}:{seconds:02d}"
return f"{minutes}:{seconds:02d}"
def parse_iso_datetime(value: str) -> datetime:
normalized = value.replace("Z", "+00:00")
return datetime.fromisoformat(normalized)
def extract_json_object(value: str) -> dict[str, Any]:
candidate = value.strip()
if candidate.startswith("```"):
candidate = re.sub(r"^```[a-zA-Z0-9_-]*\n", "", candidate)
candidate = re.sub(r"\n```$", "", candidate)
candidate = candidate.strip()
return json.loads(candidate)
def split_text_into_chunks(text: str, target_chars: int = CHUNK_TARGET_CHARS) -> list[str]:
paragraphs = re.split(r"\n\s*\n", text.strip())
chunks: list[str] = []
current: list[str] = []
current_len = 0
for paragraph in paragraphs:
paragraph = paragraph.strip()
if not paragraph:
continue
extra = len(paragraph) + 2
if current and current_len + extra > target_chars:
chunks.append("\n\n".join(current))
current = [paragraph]
current_len = len(paragraph)
else:
current.append(paragraph)
current_len += extra
if current:
chunks.append("\n\n".join(current))
return chunks or [text]
def short_hash(value: str) -> str:
return hashlib.sha1(value.encode("utf-8")).hexdigest()[:12]
def extract_transcript_section(body: str) -> str:
pattern = re.compile(r"(?ms)^## (?:Rohtranskript|Transkript)\n(.*?)(?=^## |\Z)")
match = pattern.search(body)
if not match:
return ""
return match.group(1).strip()
def upsert_transcript_section(body: str, transcript_text: str) -> str:
transcript_section = "## Transkript\n" + transcript_text.strip() + "\n"
if re.search(r"(?m)^## Rohtranskript\n", body):
return replace_or_append_section(body, "Rohtranskript", transcript_section)
return replace_or_append_section(body, "Transkript", transcript_section)
def join_remote_path(root: str, *parts: str) -> str:
normalized_root = root.rstrip("/")
cleaned_parts = [part.strip("/") for part in parts if part.strip("/")]
if not cleaned_parts:
return normalized_root
return normalized_root + "/" + "/".join(cleaned_parts)
class StateStore:
def __init__(self, db_path: Path) -> None:
self.db_path = db_path
self.connection = sqlite3.connect(db_path)
self.connection.row_factory = sqlite3.Row
self._initialize()
def _initialize(self) -> None:
self.connection.execute(
"""
CREATE TABLE IF NOT EXISTS source_state (
source_id TEXT PRIMARY KEY,
basename TEXT NOT NULL,
audio_signature TEXT NOT NULL,
transcript_signature TEXT NOT NULL,
note_path TEXT,
raw_note_path TEXT,
local_audio_path TEXT,
remote_audio_path TEXT,
remote_audio_status TEXT,
remote_audio_uploaded_at TEXT,
remote_audio_last_error TEXT,
status TEXT NOT NULL,
last_processed_at TEXT,
updated_at TEXT,
last_error TEXT
)
"""
)
columns = {row["name"] for row in self.connection.execute("PRAGMA table_info(source_state)")}
expected_columns = {
"raw_note_path": "TEXT",
"local_audio_path": "TEXT",
"remote_audio_path": "TEXT",
"remote_audio_status": "TEXT",
"remote_audio_uploaded_at": "TEXT",
"remote_audio_last_error": "TEXT",
}
for column_name, column_type in expected_columns.items():
if column_name not in columns:
self.connection.execute(
f"ALTER TABLE source_state ADD COLUMN {column_name} {column_type}"
)
self.connection.execute(
"CREATE INDEX IF NOT EXISTS idx_source_state_basename ON source_state(basename)"
)
self.connection.commit()
def get_by_source_id(self, source_id: str) -> sqlite3.Row | None:
row = self.connection.execute(
"SELECT * FROM source_state WHERE source_id = ?",
(source_id,),
).fetchone()
return row
def get_by_basename(self, basename: str) -> sqlite3.Row | None:
row = self.connection.execute(
"""
SELECT * FROM source_state
WHERE basename = ?
ORDER BY COALESCE(updated_at, last_processed_at, '') DESC
LIMIT 1
""",
(basename,),
).fetchone()
return row
def upsert_processing(
self,
*,
source_id: str,
basename: str,
audio_signature: str,
transcript_signature: str,
preserve_remote_state: bool,
) -> None:
now = datetime.now(UTC).isoformat()
self.connection.execute(
"""
INSERT INTO source_state (
source_id, basename, audio_signature, transcript_signature,
note_path, raw_note_path, local_audio_path, remote_audio_path,
remote_audio_status, remote_audio_uploaded_at, remote_audio_last_error,
status, last_processed_at, updated_at, last_error
) VALUES (?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'processing', NULL, ?, NULL)
ON CONFLICT(source_id) DO UPDATE SET
basename = excluded.basename,
audio_signature = excluded.audio_signature,
transcript_signature = excluded.transcript_signature,
status = 'processing',
updated_at = excluded.updated_at,
last_error = NULL,
remote_audio_status = CASE
WHEN ? THEN source_state.remote_audio_status
ELSE NULL
END,
remote_audio_uploaded_at = CASE
WHEN ? THEN source_state.remote_audio_uploaded_at
ELSE NULL
END,
remote_audio_last_error = CASE
WHEN ? THEN source_state.remote_audio_last_error
ELSE NULL
END,
remote_audio_path = CASE
WHEN ? THEN source_state.remote_audio_path
ELSE NULL
END
""",
(
source_id,
basename,
audio_signature,
transcript_signature,
now,
1 if preserve_remote_state else 0,
1 if preserve_remote_state else 0,
1 if preserve_remote_state else 0,
1 if preserve_remote_state else 0,
),
)
self.connection.commit()
def mark_processed(
self,
*,
source_id: str,
basename: str,
audio_signature: str,
transcript_signature: str,
note_path: Path,
raw_note_path: Path,
local_audio_path: Path | None,
remote_audio_path: str | None,
remote_audio_status: str,
remote_audio_uploaded_at: datetime | None,
remote_audio_last_error: str | None,
processing_status: str,
preserve_processed_at: bool,
) -> None:
now = datetime.now(UTC).isoformat()
last_processed_at = None if preserve_processed_at else now
self.connection.execute(
"""
INSERT INTO source_state (
source_id, basename, audio_signature, transcript_signature,
note_path, raw_note_path, local_audio_path, remote_audio_path,
remote_audio_status, remote_audio_uploaded_at, remote_audio_last_error,
status, last_processed_at, updated_at, last_error
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL)
ON CONFLICT(source_id) DO UPDATE SET
basename = excluded.basename,
audio_signature = excluded.audio_signature,
transcript_signature = excluded.transcript_signature,
note_path = excluded.note_path,
raw_note_path = excluded.raw_note_path,
local_audio_path = excluded.local_audio_path,
remote_audio_path = excluded.remote_audio_path,
remote_audio_status = excluded.remote_audio_status,
remote_audio_uploaded_at = excluded.remote_audio_uploaded_at,
remote_audio_last_error = excluded.remote_audio_last_error,
status = excluded.status,
last_processed_at = COALESCE(source_state.last_processed_at, excluded.last_processed_at),
updated_at = excluded.updated_at,
last_error = NULL
""",
(
source_id,
basename,
audio_signature,
transcript_signature,
str(note_path),
str(raw_note_path),
str(local_audio_path) if local_audio_path else None,
remote_audio_path,
remote_audio_status,
remote_audio_uploaded_at.isoformat() if remote_audio_uploaded_at else None,
remote_audio_last_error,
processing_status,
last_processed_at,
now,
),
)
self.connection.commit()
def mark_failed(
self,
*,
source_id: str,
basename: str,
audio_signature: str,
transcript_signature: str,
error_message: str,
) -> None:
now = datetime.now(UTC).isoformat()
self.connection.execute(
"""
INSERT INTO source_state (
source_id, basename, audio_signature, transcript_signature,
note_path, raw_note_path, local_audio_path, remote_audio_path,
remote_audio_status, remote_audio_uploaded_at, remote_audio_last_error,
status, last_processed_at, updated_at, last_error
) VALUES (?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'failed', NULL, ?, ?)
ON CONFLICT(source_id) DO UPDATE SET
basename = excluded.basename,
audio_signature = excluded.audio_signature,
transcript_signature = excluded.transcript_signature,
status = 'failed',
updated_at = excluded.updated_at,
last_error = excluded.last_error
""",
(source_id, basename, audio_signature, transcript_signature, now, error_message),
)
self.connection.commit()
def iter_pending_uploads(self) -> list[sqlite3.Row]:
return list(
self.connection.execute(
"""
SELECT *
FROM source_state
WHERE local_audio_path IS NOT NULL
AND COALESCE(remote_audio_status, '') != 'uploaded'
ORDER BY COALESCE(updated_at, last_processed_at, '') ASC
"""
).fetchall()
)
def close(self) -> None:
self.connection.close()
class OpenAIClient:
def __init__(self, api_key: str, model: str, timeout_seconds: int) -> None:
self.api_key = api_key
self.model = model
self.timeout_seconds = timeout_seconds
def _request(self, payload: dict[str, Any]) -> dict[str, Any]:
request = urllib.request.Request(
url="https://api.openai.com/v1/responses",
data=json.dumps(payload).encode("utf-8"),
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
method="POST",
)
last_error: Exception | None = None
for attempt in range(3):
try:
with urllib.request.urlopen(request, timeout=self.timeout_seconds) as response:
return json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
raise OpenAIError(f"OpenAI API error {exc.code}: {body}") from exc
except (urllib.error.URLError, TimeoutError) as exc:
last_error = exc
time.sleep(2**attempt)
raise OpenAIError(f"OpenAI request failed after retries: {last_error}")
def _extract_output_text(self, response_json: dict[str, Any]) -> str:
chunks: list[str] = []
for item in response_json.get("output", []):
if item.get("type") != "message":
continue
for content in item.get("content", []):
if content.get("type") == "output_text":
text = content.get("text", "")
if text:
chunks.append(text)
elif content.get("type") == "refusal":
raise OpenAIError(f"Model refusal: {content.get('refusal', '')}")
if not chunks:
raise OpenAIError(f"No text output in response: {json.dumps(response_json)[:1000]}")
return "\n".join(chunks).strip()
def _schema_payload(self, schema: dict[str, Any]) -> dict[str, Any]:
return {
"type": "json_schema",
"name": "transcript_summary",
"strict": True,
"schema": schema,
}
def summarize_chunk(
self,
*,
chunk_index: int,
chunk_count: int,
transcript_chunk: str,
metadata: AudioMetadata,
basename: str,
) -> str:
prompt = (
"Du fasst einen Ausschnitt eines längeren deutschsprachigen Transkripts zusammen. "
"Schreibe eine sachliche, kompakte Markdown-Zusammenfassung des Ausschnitts. "
"Halte Themen, Entscheidungen, offene Fragen, Konflikte, Risiken, Aufgaben und "
"nächste Schritte fest, sofern sie in diesem Ausschnitt vorkommen. "
"Erfinde nichts hinzu.\n\n"
f"Quellname: {basename}\n"
f"Chunk: {chunk_index} von {chunk_count}\n"
f"Aufnahmezeit: {metadata.recorded_at.isoformat()}\n"
f"Dauer der Gesamtaufnahme: {metadata.duration_human}\n\n"
"Transkript-Ausschnitt:\n"
f"{transcript_chunk}"
)
schema = {
"type": "object",
"additionalProperties": False,
"properties": {
"chunk_summary_markdown": {"type": "string"},
},
"required": ["chunk_summary_markdown"],
}
response_json = self._request(
{
"model": self.model,
"input": [
{
"role": "system",
"content": [
{
"type": "input_text",
"text": "Produce JSON that matches the schema exactly.",
}
],
},
{
"role": "user",
"content": [{"type": "input_text", "text": prompt}],
},
],
"text": {"format": self._schema_payload(schema)},
}
)
parsed = extract_json_object(self._extract_output_text(response_json))
return parsed["chunk_summary_markdown"].strip()
def summarize_transcript(
self,
*,
prompt_template: str,
transcript_text: str,
metadata: AudioMetadata,
basename: str,
) -> SummaryPayload:
transcript_input = transcript_text
if len(transcript_text) > DIRECT_TRANSCRIPT_CHAR_LIMIT:
chunks = split_text_into_chunks(transcript_text, target_chars=CHUNK_TARGET_CHARS)
chunk_summaries = [
self.summarize_chunk(
chunk_index=index,
chunk_count=len(chunks),
transcript_chunk=chunk,
metadata=metadata,
basename=basename,
)
for index, chunk in enumerate(chunks, start=1)
]
transcript_input = (
"Das Originaltranskript ist sehr lang. Verwende deshalb diese "
"Ausschnittszusammenfassungen als Arbeitsgrundlage für die endgültige "
"Zusammenfassung und den Titel.\n\n"
+ "\n\n".join(
f"### Chunk {index}\n{summary}" for index, summary in enumerate(chunk_summaries, start=1)
)
)
schema = {
"type": "object",
"additionalProperties": False,
"properties": {
"title": {
"type": "string",
"description": "Kurzer präziser deutscher Titel ohne Datum.",
},
"summary_markdown": {
"type": "string",
"description": "Vollständige strukturierte Zusammenfassung in Markdown.",
},
"tags": {
"type": "array",
"items": {"type": "string"},
"minItems": 2,
"maxItems": 8,
},
},
"required": ["title", "summary_markdown", "tags"],
}
metadata_block = (
f"Quellname: {basename}\n"
f"Aufnahmezeit: {metadata.recorded_at.isoformat()}\n"
f"Aufnahmezeit-Quelle: {metadata.recorded_at_source}\n"
f"Dauer in Sekunden: {metadata.duration_seconds:.3f}\n"
f"Dauer formatiert: {metadata.duration_human}\n"
)
if "[TRANSKRIPT EINFÜGEN]" in prompt_template:
template_with_material = prompt_template.replace("[TRANSKRIPT EINFÜGEN]", transcript_input)
else:
template_with_material = f"{prompt_template}\n\nArbeitsmaterial:\n{transcript_input}"
full_prompt = (
"Nutze die folgende Prompt-Vorlage als redaktionellen Standard. "
"Die endgültige Antwort selbst muss aber JSON sein, das exakt dem Schema entspricht.\n\n"
f"{template_with_material}\n\n"
"Zusätzliche Anforderungen für diese Pipeline:\n"
"- Gib einen knappen, brauchbaren Titel ohne Datum zurück.\n"
"- Der Titel soll das Gesprächsthema treffen und als Dateiname taugen.\n"
"- summary_markdown soll direkt als Obsidian-Notizinhalt verwendbar sein.\n"
"- tags sollen kurze deutschsprachige Schlagworte sein.\n"
"- Schreibe keine Vorbemerkung außerhalb des Schemas.\n\n"
"Metadaten:\n"
f"{metadata_block}\n"
"Beachte die Metadaten zusätzlich bei Titel und Einordnung."
)
response_json = self._request(
{
"model": self.model,
"input": [
{
"role": "system",
"content": [
{
"type": "input_text",
"text": "Produce JSON that matches the schema exactly.",
}
],
},
{
"role": "user",
"content": [{"type": "input_text", "text": full_prompt}],
},
],
"text": {"format": self._schema_payload(schema)},
}
)
parsed = extract_json_object(self._extract_output_text(response_json))
tags = [slugify_title(tag).replace(" ", "-").lower() for tag in parsed["tags"] if tag.strip()]
return SummaryPayload(
title=parsed["title"].strip(),
summary_markdown=parsed["summary_markdown"].strip(),
tags=tags or ["transkript", "ki-zusammenfassung"],
)
class NtfyClient:
def __init__(self, base_url: str, topic: str, access_token: str, timeout_seconds: int) -> None:
self.publish_url = base_url.rstrip("/") + "/" + urllib.parse.quote(topic, safe="")
self.access_token = access_token
self.timeout_seconds = timeout_seconds
def publish_summary_ready(
self,
*,
title: str,
note_path: Path,
source_basename: str,
remote_upload: UploadResult,
) -> None:
lines = [
f"Zusammenfassung fertig: {title}",
f"Datei: {note_path.name}",
f"Pfad: {note_path}",
f"Quelle: {source_basename}",
]
if remote_upload.status == "uploaded":
lines.append(f"Audio-Archiv: {remote_upload.remote_path}")
elif remote_upload.error:
lines.append(f"Audio-Upload: {remote_upload.status} ({remote_upload.error})")
request = urllib.request.Request(
url=self.publish_url,
data="\n".join(lines).encode("utf-8"),
headers={
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "text/plain; charset=utf-8",
"Title": f"Transkript fertig: {title}",
"Tags": "memo",
},
method="POST",
)
with urllib.request.urlopen(request, timeout=self.timeout_seconds):
return
class MemosPublisher:
PUBLIC_FRONTMATTER_KEYS = (
"title",
"date",
"recorded_at",
"duration_human",
"processed_at",
"updated_at",
"tags",
)
def __init__(self, settings: Settings) -> None:
self.settings = settings
def export_all_notes(self) -> list[Path]:
export_root = self.settings.memos_content_dir / "transkripte"
export_root.mkdir(parents=True, exist_ok=True)
exported_paths: list[Path] = []
for note_path in sorted(self.settings.obsidian_dir.glob("*.md")):
exported_path = self.export_note(note_path)
if exported_path is not None:
exported_paths.append(exported_path)
keep_names = {path.name for path in exported_paths}
for stale_path in export_root.glob("*.md"):
if stale_path.name not in keep_names:
stale_path.unlink()
self.write_index_pages(exported_paths)
return exported_paths
def export_note(self, note_path: Path) -> Path | None:
frontmatter, body = parse_frontmatter_and_body(note_path.read_text(encoding="utf-8"))
if not is_summary_note_frontmatter(frontmatter):
return None
public_frontmatter: dict[str, Any] = {}
for key in self.PUBLIC_FRONTMATTER_KEYS:
value = frontmatter.get(key)
if value in (None, ""):
continue
public_frontmatter[key] = value
title = str(frontmatter.get("title") or note_path.stem)
public_frontmatter.setdefault("title", title)
public_frontmatter.setdefault("tags", ["transkript", "memo"])
sanitized_body = remove_section(strip_leading_h1(body, title).strip(), "Quellen").strip()
if not sanitized_body:
sanitized_body = "## Hinweis\nDiese Notiz enthält derzeit keinen veröffentlichbaren Inhalt.\n"
target_path = self.settings.memos_content_dir / "transkripte" / note_path.name
target_path.parent.mkdir(parents=True, exist_ok=True)
target_path.write_text(
f"{build_frontmatter(public_frontmatter)}\n{sanitized_body.rstrip()}\n",
encoding="utf-8",
)
return target_path
def write_index_pages(self, exported_paths: list[Path]) -> None:
notes = sorted(exported_paths, key=lambda path: path.stem, reverse=True)
stale_pagination_paths = list(self.settings.memos_content_dir.glob("seite-*.md"))
for stale_path in stale_pagination_paths:
stale_path.unlink()
page_chunks = [
notes[index : index + MEMOS_INDEX_PAGE_SIZE]
for index in range(0, len(notes), MEMOS_INDEX_PAGE_SIZE)
] or [[]]
for page_number, page_notes in enumerate(page_chunks, start=1):
note_links = "\n".join(f"- [[transkripte/{path.stem}]]" for path in page_notes)
if not note_links:
note_links = "- Noch keine veröffentlichten Memos vorhanden."
navigation_links: list[str] = []
if page_number > 1:
previous_target = "index" if page_number == 2 else f"seite-{page_number - 1}"
navigation_links.append(f"[[{previous_target}|Neuere Memos]]")
if page_number < len(page_chunks):
navigation_links.append(f"[[seite-{page_number + 1}|Ältere Memos]]")
navigation_block = ""
if navigation_links:
navigation_block = "## Navigation\n" + " | ".join(navigation_links) + "\n"
title = "Memos" if page_number == 1 else f"Memos Seite {page_number}"
page_frontmatter = build_frontmatter(
{
"title": title,
"tags": ["memos", "transkripte"],
}
)
page_body = (
"Diese Website enthält privat veröffentlichte Zusammenfassungen mit eingebettetem Transkript.\n\n"
f"## Übersicht Seite {page_number}\n{note_links}\n\n"
f"{navigation_block}"
)
page_path = (
self.settings.memos_content_dir / "index.md"
if page_number == 1
else self.settings.memos_content_dir / f"seite-{page_number}.md"
)
page_path.write_text(
f"{page_frontmatter}\n{page_body}",
encoding="utf-8",
)
def build_site(self) -> None:
quartz_dir = self.settings.memos_quartz_dir
if not quartz_dir.exists():
raise PipelineError(f"Quartz directory does not exist: {quartz_dir}")
if self.settings.memos_build_command:
subprocess.run(
self.settings.memos_build_command,
cwd=quartz_dir,
shell=True,
check=True,
capture_output=True,
text=True,
)
return
node_modules = quartz_dir / "node_modules"
if not node_modules.exists():
subprocess.run(
["npm", "ci", "--no-fund", "--no-audit"],
cwd=quartz_dir,
check=True,
capture_output=True,
text=True,
)
subprocess.run(
[
"npm",
"run",
"quartz",
"--",
"build",
"--directory",
str(self.settings.memos_content_dir),
"--output",
str(self.settings.memos_output_dir),
],
cwd=quartz_dir,
check=True,
capture_output=True,
text=True,
)
self.ensure_pretty_urls()
def ensure_pretty_urls(self) -> None:
for html_path in self.settings.memos_output_dir.rglob("*.html"):
if html_path.name in {"index.html", "404.html"}:
continue
target_dir = html_path.with_suffix("")
target_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(html_path, target_dir / "index.html")
def deploy_site(self) -> None:
if not self.settings.memos_rclone_remote:
return
subprocess.run(
[
self.settings.rclone_bin,
"sync",
"--delete-after",
"--fast-list",
*[arg for pattern in self.settings.memos_rclone_excludes for arg in ("--exclude", pattern)],
str(self.settings.memos_output_dir),
self.settings.memos_rclone_remote,
],
check=True,
capture_output=True,
text=True,
)
if not self.settings.memos_sync_htpasswd:
return
if not self.settings.memos_remote_htpasswd_path:
raise PipelineError("MEMOS_REMOTE_HTPASSWD_PATH must be set when MEMOS_SYNC_HTPASSWD is enabled.")
if not self.settings.memos_basic_auth_htpasswd_path.exists():
raise PipelineError(
f"htpasswd file does not exist: {self.settings.memos_basic_auth_htpasswd_path}. Run `transcript memos-auth` first."
)
subprocess.run(
[
self.settings.rclone_bin,
"copyto",
str(self.settings.memos_basic_auth_htpasswd_path),
self.settings.memos_remote_htpasswd_path,
],
check=True,
capture_output=True,
text=True,
)
def sync_and_build(self) -> int:
exported_paths = self.export_all_notes()
self.build_site()
self.deploy_site()
return len(exported_paths)
class TranscriptPipeline:
def __init__(self, settings: Settings) -> None:
self.settings = settings
self.state = StateStore(settings.state_db_path)
self.last_scan_had_unstable = False
self.openai_client = (
OpenAIClient(
api_key=settings.openai_api_key,
model=settings.openai_model,
timeout_seconds=settings.request_timeout_seconds,
)
if settings.openai_api_key
else None
)
self.ntfy_client = (
NtfyClient(
base_url=settings.ntfy_base_url,
topic=settings.ntfy_topic,
access_token=settings.ntfy_access_token,
timeout_seconds=settings.request_timeout_seconds,
)
if settings.ntfy_access_token
else None
)
self.memos_publisher = MemosPublisher(settings) if settings.memos_enabled else None
def close(self) -> None:
self.state.close()
def notify_summary_ready(
self,
*,
title: str,
note_path: Path,
source_basename: str,
remote_upload: UploadResult,
) -> None:
if self.ntfy_client is None:
return
try:
self.ntfy_client.publish_summary_ready(
title=title,
note_path=note_path,
source_basename=source_basename,
remote_upload=remote_upload,
)
except Exception as exc:
logging.warning("ntfy notification failed for %s: %s", source_basename, exc)
def sync_memos_site(self) -> int:
if self.memos_publisher is None:
raise PipelineError("MEMOS_ENABLED is not enabled.")
exported_count = self.memos_publisher.sync_and_build()
destination = self.settings.memos_rclone_remote or str(self.settings.memos_output_dir)
logging.info("Updated memos site with %s published notes -> %s", exported_count, destination)
return exported_count
def scan_pairs(self) -> list[SourcePair]:
audio_files: dict[str, Path] = {}
transcript_files: dict[str, Path] = {}
for entry in self.settings.watch_dir.iterdir():
if not entry.is_file():
continue
suffix = entry.suffix.lower()
if suffix in AUDIO_EXTENSIONS:
audio_files[entry.stem] = entry
elif suffix == ".md" and entry != self.settings.prompt_path:
transcript_files[entry.stem] = entry
basenames = sorted(set(audio_files) & set(transcript_files))
return [
SourcePair(
basename=basename,
audio_path=audio_files[basename],
transcript_path=transcript_files[basename],
)
for basename in basenames
]
def should_ignore_watch_path(self, path: Path) -> bool:
resolved = path.resolve(strict=False)
ignored_files = {
self.settings.prompt_path.resolve(strict=False),
self.settings.state_db_path.resolve(strict=False),
self.settings.log_path.resolve(strict=False),
(self.settings.base_dir / ".env").resolve(strict=False),
(self.settings.base_dir / ".env.example").resolve(strict=False),
}
if resolved in ignored_files:
return True
archive_root = self.settings.archive_dir.resolve(strict=False)
try:
resolved.relative_to(archive_root)
return True
except ValueError:
return False
def files_stable(self, pair: SourcePair) -> bool:
threshold = time.time() - self.settings.debounce_seconds
return (
pair.audio_path.stat().st_mtime <= threshold
and pair.transcript_path.stat().st_mtime <= threshold
)
def build_source_id(self, pair: SourcePair, metadata: AudioMetadata) -> str:
raw = f"{pair.basename}|{metadata.recorded_at.isoformat()}|{metadata.duration_seconds:.3f}"
return short_hash(raw)
def should_process(self, pair: SourcePair, source_id: str) -> bool:
row = self.state.get_by_source_id(source_id)
if row is None:
return True
return (
row["audio_signature"] != pair.audio_signature
or row["transcript_signature"] != pair.transcript_signature
or row["status"] not in {"processed", "processed_with_upload_error"}
or not row["note_path"]
or not row["remote_audio_status"]
)
def process_available_pairs(self) -> int:
processed = 0
self.last_scan_had_unstable = False
for pair in self.scan_pairs():
if not self.files_stable(pair):
self.last_scan_had_unstable = True
logging.info("Skipping unstable pair %s", pair.basename)
continue
try:
if self.process_pair(pair):
processed += 1
except Exception:
logging.exception("Processing failed for %s", pair.basename)
return processed
def extract_audio_metadata(self, audio_path: Path) -> AudioMetadata:
result = subprocess.run(
[
self.settings.ffprobe_bin,
"-v",
"quiet",
"-print_format",
"json",
"-show_format",
"-show_streams",
str(audio_path),
],
check=True,
capture_output=True,
text=True,
)
payload = json.loads(result.stdout)
format_data = payload.get("format", {})
stream_data = payload.get("streams", [{}])[0]
creation_time = (
format_data.get("tags", {}).get("creation_time")
or stream_data.get("tags", {}).get("creation_time")
)
stat = audio_path.stat()
if creation_time:
recorded_at = parse_iso_datetime(creation_time)
source = "ffprobe.creation_time"
else:
recorded_at = datetime.fromtimestamp(getattr(stat, "st_birthtime", stat.st_mtime), UTC)
source = "filesystem.birthtime"
local_recorded_at = recorded_at.astimezone(datetime.now().astimezone().tzinfo or ZoneInfo("Europe/Berlin"))
duration_seconds = float(format_data.get("duration") or stream_data.get("duration") or 0.0)
return AudioMetadata(
recorded_at=local_recorded_at,
recorded_at_source=source,
duration_seconds=duration_seconds,
duration_human=format_duration(duration_seconds),
audio_size=stat.st_size,
)
def upload_audio_to_remote(self, audio_path: Path, source_id: str) -> UploadResult:
remote_path = join_remote_path(self.settings.rclone_remote, source_id, "audio", audio_path.name)
try:
subprocess.run(
[
self.settings.rclone_bin,
"copyto",
str(audio_path),
remote_path,
],
check=True,
capture_output=True,
text=True,
)
except subprocess.CalledProcessError as exc:
error_message = exc.stderr.strip() or exc.stdout.strip() or str(exc)
return UploadResult(
remote_path=remote_path,
status="upload_failed",
uploaded_at=None,
error=error_message,
)
return UploadResult(
remote_path=remote_path,
status="uploaded",
uploaded_at=datetime.now(UTC),
error=None,
)
def retry_pending_uploads(self) -> int:
retried = 0
for row in self.state.iter_pending_uploads():
local_audio_path = row["local_audio_path"]
if not local_audio_path:
continue
audio_path = Path(local_audio_path)
if not audio_path.exists():
continue
source_id = row["source_id"]
upload_result = self.upload_audio_to_remote(audio_path, source_id)
processing_status = "processed" if upload_result.status == "uploaded" else "processed_with_upload_error"
self.state.mark_processed(
source_id=source_id,
basename=row["basename"],
audio_signature=row["audio_signature"],
transcript_signature=row["transcript_signature"],
note_path=Path(row["note_path"]),
raw_note_path=Path(row["raw_note_path"] or row["note_path"]),
local_audio_path=audio_path,
remote_audio_path=upload_result.remote_path,
remote_audio_status=upload_result.status,
remote_audio_uploaded_at=upload_result.uploaded_at,
remote_audio_last_error=upload_result.error,
processing_status=processing_status,
preserve_processed_at=True,
)
retried += 1
if upload_result.status == "uploaded":
logging.info("Uploaded cached audio for %s to %s", row["basename"], upload_result.remote_path)
else:
logging.warning("Retry upload failed for %s: %s", row["basename"], upload_result.error)
return retried
def migrate_archive(self) -> int:
migrated = 0
rows = list(
self.state.connection.execute(
"SELECT * FROM source_state ORDER BY COALESCE(updated_at, last_processed_at, '') ASC"
).fetchall()
)
for row in rows:
note_path_value = row["note_path"]
if not note_path_value:
continue
note_path = Path(note_path_value)
if not note_path.exists():
fallback_note = self.find_existing_note(row["source_id"], note_type="summary")
if fallback_note is not None and fallback_note.exists():
note_path = fallback_note
else:
logging.warning("Skipping migration for missing note %s", note_path)
continue
existing_frontmatter, existing_body = parse_frontmatter_and_body(note_path.read_text(encoding="utf-8"))
if extract_transcript_section(existing_body) and existing_frontmatter.get("remote_audio_status"):
continue
source_id = row["source_id"]
basename = row["basename"]
audio_path, transcript_path = self.find_archived_pair(source_id)
normalized_audio_path = self.normalize_archived_audio(source_id, audio_path)
metadata = None
if normalized_audio_path and normalized_audio_path.exists():
metadata = self.extract_audio_metadata(normalized_audio_path)
else:
recorded_at_value = existing_frontmatter.get("recorded_at")
if not recorded_at_value:
logging.warning("Skipping migration for %s due to missing audio and recorded_at", basename)
continue
recorded_at = parse_iso_datetime(recorded_at_value)
metadata = AudioMetadata(
recorded_at=recorded_at,
recorded_at_source=existing_frontmatter.get("recorded_at_source", "existing_note"),
duration_seconds=float(existing_frontmatter.get("duration_seconds", 0.0)),
duration_human=str(existing_frontmatter.get("duration_human", "0:00")),
audio_size=0,
)
transcript_text = transcript_path.read_text(encoding="utf-8") if transcript_path and transcript_path.exists() else ""
if normalized_audio_path and normalized_audio_path.exists():
upload_result = self.upload_audio_to_remote(normalized_audio_path, source_id)
else:
upload_result = UploadResult(
remote_path=join_remote_path(self.settings.rclone_remote, source_id, "audio"),
status="upload_failed",
uploaded_at=None,
error="Local audio file missing during migration.",
)
processed_at_value = existing_frontmatter.get("processed_at")
processed_at = parse_iso_datetime(processed_at_value) if processed_at_value else datetime.now(UTC)
self.update_existing_summary_note(
note_path=note_path,
metadata=metadata,
source_id=source_id,
source_basename=basename,
transcript_text=transcript_text,
local_audio_path=normalized_audio_path,
remote_upload=upload_result,
processed_at=processed_at,
)
self.state.mark_processed(
source_id=source_id,
basename=basename,
audio_signature=build_file_signature(normalized_audio_path) if normalized_audio_path and normalized_audio_path.exists() else row["audio_signature"],
transcript_signature=build_file_signature(transcript_path) if transcript_path and transcript_path.exists() else row["transcript_signature"],
note_path=note_path,
raw_note_path=note_path,
local_audio_path=normalized_audio_path,
remote_audio_path=upload_result.remote_path,
remote_audio_status=upload_result.status,
remote_audio_uploaded_at=upload_result.uploaded_at,
remote_audio_last_error=upload_result.error,
processing_status="processed" if upload_result.status == "uploaded" else "processed_with_upload_error",
preserve_processed_at=True,
)
migrated += 1
logging.info("Migrated archived source %s", basename)
if migrated and self.memos_publisher is not None:
self.sync_memos_site()
return migrated
def build_note_target(
self,
*,
source_id: str,
title: str,
recorded_at: datetime,
note_type: str,
) -> NoteTarget:
sanitized_title = slugify_title(title)
suffix = " - Transkript" if note_type == "raw_transcript" else ""
base_filename = f"{format_note_date(recorded_at)} {sanitized_title}{suffix}.md"
existing_path = self.find_existing_note(source_id, note_type=note_type)
if existing_path is not None and existing_path.exists():
desired = self.unique_note_path(base_filename, source_id, preferred_existing=existing_path, note_type=note_type)
return NoteTarget(source_id=source_id, title=sanitized_title, note_path=desired, existing_path=existing_path)
desired = self.unique_note_path(base_filename, source_id, preferred_existing=None, note_type=note_type)
return NoteTarget(source_id=source_id, title=sanitized_title, note_path=desired, existing_path=None)
def unique_note_path(
self,
filename: str,
source_id: str,
preferred_existing: Path | None,
note_type: str,
) -> Path:
candidate = self.settings.obsidian_dir / filename
if preferred_existing is not None and preferred_existing == candidate:
return candidate
if not candidate.exists():
return candidate
if self.note_belongs_to_source(candidate, source_id, note_type=note_type):
return candidate
stem = candidate.stem
suffix = candidate.suffix
counter = 2
while True:
next_candidate = candidate.with_name(f"{stem}-{counter}{suffix}")
if preferred_existing is not None and preferred_existing == next_candidate:
return next_candidate
if not next_candidate.exists() or self.note_belongs_to_source(next_candidate, source_id, note_type=note_type):
return next_candidate
counter += 1
def note_belongs_to_source(self, note_path: Path, source_id: str, note_type: str | None = None) -> bool:
if not note_path.exists():
return False
text = note_path.read_text(encoding="utf-8")
if not re.search(rf'^source_id:\s+"?{re.escape(source_id)}"?$', text, re.MULTILINE):
return False
if note_type is None:
return True
type_match = re.search(r'^type: "([^"]+)"$', text, re.MULTILINE)
if type_match:
return type_match.group(1) == note_type
return note_type == "summary"
def find_existing_note(self, source_id: str, note_type: str) -> Path | None:
row = self.state.get_by_source_id(source_id)
key = "raw_note_path" if note_type == "raw_transcript" else "note_path"
if row and key in row.keys() and row[key]:
candidate = Path(row[key])
if candidate.exists() and self.note_belongs_to_source(candidate, source_id, note_type=note_type):
return candidate
for note_path in self.settings.obsidian_dir.glob("*.md"):
if self.note_belongs_to_source(note_path, source_id, note_type=note_type):
return note_path
return None
def cache_audio(self, audio_path: Path, source_id: str) -> Path:
recorded_dir = self.settings.archive_dir / "processed" / source_id / "audio"
recorded_dir.mkdir(parents=True, exist_ok=True)
archived_audio = recorded_dir / audio_path.name
if archived_audio.exists():
archived_audio.unlink()
shutil.move(str(audio_path), str(archived_audio))
return archived_audio
def remove_source_transcript(self, transcript_path: Path) -> None:
if transcript_path.exists():
transcript_path.unlink()
def find_archived_pair(self, source_id: str) -> tuple[Path | None, Path | None]:
source_dir = self.settings.archive_dir / "processed" / source_id
if not source_dir.exists():
return None, None
transcript_path: Path | None = None
for candidate in sorted(source_dir.glob("*.md")):
transcript_path = candidate
break
audio_path: Path | None = None
audio_dir = source_dir / "audio"
search_roots = [audio_dir] if audio_dir.exists() else []
search_roots.append(source_dir)
for root in search_roots:
for candidate in sorted(root.iterdir()):
if candidate.is_file() and candidate.suffix.lower() in AUDIO_EXTENSIONS:
audio_path = candidate
break
if audio_path:
break
return audio_path, transcript_path
def normalize_archived_audio(self, source_id: str, audio_path: Path | None) -> Path | None:
if audio_path is None or not audio_path.exists():
return None
if audio_path.parent.name == "audio":
return audio_path
return self.cache_audio(audio_path, source_id)
def write_note(
self,
*,
target: NoteTarget,
payload: SummaryPayload,
metadata: AudioMetadata,
transcript_text: str,
local_audio_path: Path | None,
remote_upload: UploadResult,
source_basename: str,
processed_at: datetime,
preserve_processed_at: bool,
) -> Path:
target.note_path.parent.mkdir(parents=True, exist_ok=True)
if target.existing_path and target.existing_path.exists() and target.existing_path != target.note_path:
target.existing_path.replace(target.note_path)
existing_processed_at = processed_at
if preserve_processed_at and target.note_path.exists():
content = target.note_path.read_text(encoding="utf-8")
match = re.search(r'^processed_at: "([^"]+)"$', content, re.MULTILINE)
if match:
existing_processed_at = parse_iso_datetime(match.group(1))
tags = ["transkript", "ki-zusammenfassung", *payload.tags]
tags = list(dict.fromkeys(tags))
summary_markdown = strip_leading_h1(payload.summary_markdown, payload.title).strip()
frontmatter = build_frontmatter(
{
"title": payload.title,
"type": "summary",
"date": metadata.recorded_at.date().isoformat(),
"recorded_at": metadata.recorded_at.isoformat(),
"recorded_at_source": metadata.recorded_at_source,
"duration_seconds": round(metadata.duration_seconds, 3),
"duration_human": metadata.duration_human,
"source_id": target.source_id,
"source_basename": source_basename,
"source_audio_cache": str(local_audio_path) if local_audio_path else "",
"remote_audio": remote_upload.remote_path,
"remote_audio_status": remote_upload.status,
"remote_audio_uploaded_at": remote_upload.uploaded_at.isoformat() if remote_upload.uploaded_at else "",
"remote_audio_last_error": remote_upload.error or "",
"processed_at": existing_processed_at.isoformat(),
"updated_at": processed_at.isoformat(),
"pipeline_version": PIPELINE_VERSION,
"llm_model": self.settings.openai_model,
"status": "processed" if remote_upload.status == "uploaded" else "processed_with_upload_error",
"tags": tags,
}
)
body = (
"## Metadaten\n"
f"- Aufnahmedatum: {metadata.recorded_at.date().isoformat()}\n"
f"- Aufnahmezeit: {metadata.recorded_at.strftime('%H:%M:%S %Z')}\n"
f"- Dauer: {metadata.duration_human}\n"
f"- Quelle: `{source_basename}`\n\n"
f"{summary_markdown}\n\n"
"## Transkript\n"
f"{transcript_text.strip()}\n\n"
"## Quellen\n"
f"- Remote-Audio: `{remote_upload.remote_path}`\n"
f"- Upload-Status: `{remote_upload.status}`\n"
)
target.note_path.write_text(remove_blank_lines(f"{frontmatter}\n{body}"), encoding="utf-8")
return target.note_path
def write_raw_transcript_note(
self,
*,
target: NoteTarget,
metadata: AudioMetadata,
transcript_text: str,
source_basename: str,
processed_at: datetime,
) -> Path:
target.note_path.parent.mkdir(parents=True, exist_ok=True)
if target.existing_path and target.existing_path.exists() and target.existing_path != target.note_path:
target.existing_path.replace(target.note_path)
frontmatter = build_frontmatter(
{
"title": f"{target.title} - Transkript",
"type": "raw_transcript",
"date": metadata.recorded_at.date().isoformat(),
"recorded_at": metadata.recorded_at.isoformat(),
"recorded_at_source": metadata.recorded_at_source,
"duration_seconds": round(metadata.duration_seconds, 3),
"duration_human": metadata.duration_human,
"source_id": target.source_id,
"source_basename": source_basename,
"processed_at": processed_at.isoformat(),
"updated_at": processed_at.isoformat(),
"pipeline_version": PIPELINE_VERSION,
"status": "processed",
"tags": ["transkript", "rohtranskript", "whisper-segmente"],
}
)
body = (
"## Metadaten\n"
f"- Aufnahmedatum: {metadata.recorded_at.date().isoformat()}\n"
f"- Aufnahmezeit: {metadata.recorded_at.strftime('%H:%M:%S %Z')}\n"
f"- Dauer: {metadata.duration_human}\n"
f"- Quelle: `{source_basename}`\n\n"
"## Rohtranskript\n"
f"{transcript_text.strip()}\n"
)
target.note_path.write_text(remove_blank_lines(f"{frontmatter}\n{body}"), encoding="utf-8")
return target.note_path
def update_existing_summary_note(
self,
*,
note_path: Path,
metadata: AudioMetadata,
source_id: str,
source_basename: str,
transcript_text: str,
local_audio_path: Path | None,
remote_upload: UploadResult,
processed_at: datetime,
) -> None:
existing_text = note_path.read_text(encoding="utf-8")
frontmatter, body = parse_frontmatter_and_body(existing_text)
title = frontmatter.get("title", note_path.stem)
tags = frontmatter.get("tags", ["transkript", "ki-zusammenfassung"])
if not isinstance(tags, list):
tags = ["transkript", "ki-zusammenfassung"]
tags = list(dict.fromkeys([str(tag) for tag in tags]))
processed_at_value = frontmatter.get("processed_at") or processed_at.isoformat()
updated_frontmatter = {
"title": title,
"type": "summary",
"date": metadata.recorded_at.date().isoformat(),
"recorded_at": metadata.recorded_at.isoformat(),
"recorded_at_source": metadata.recorded_at_source,
"duration_seconds": round(metadata.duration_seconds, 3),
"duration_human": metadata.duration_human,
"source_id": source_id,
"source_basename": source_basename,
"source_audio_cache": str(local_audio_path) if local_audio_path else "",
"remote_audio": remote_upload.remote_path,
"remote_audio_status": remote_upload.status,
"remote_audio_uploaded_at": remote_upload.uploaded_at.isoformat() if remote_upload.uploaded_at else "",
"remote_audio_last_error": remote_upload.error or "",
"processed_at": processed_at_value,
"updated_at": processed_at.isoformat(),
"pipeline_version": PIPELINE_VERSION,
"llm_model": frontmatter.get("llm_model", self.settings.openai_model),
"status": "processed" if remote_upload.status == "uploaded" else "processed_with_upload_error",
"tags": tags,
}
body = strip_leading_h1(body, title).strip()
transcript_value = transcript_text.strip()
if transcript_value:
body = upsert_transcript_section(body, transcript_value)
sources_section = (
"## Quellen\n"
f"- Remote-Audio: `{remote_upload.remote_path}`\n"
f"- Upload-Status: `{remote_upload.status}`\n"
)
body = replace_or_append_section(body, "Quellen", sources_section)
note_path.write_text(remove_blank_lines(f"{build_frontmatter(updated_frontmatter)}\n{body}"), encoding="utf-8")
def process_pair(self, pair: SourcePair) -> bool:
if self.openai_client is None:
raise PipelineError("OPENAI_API_KEY is not set.")
audio_signature = pair.audio_signature
transcript_signature = pair.transcript_signature
transcript_text = pair.transcript_path.read_text(encoding="utf-8").strip()
if not transcript_text:
raise PipelineError(f"Transcript is empty: {pair.transcript_path}")
metadata = self.extract_audio_metadata(pair.audio_path)
source_id = self.build_source_id(pair, metadata)
if not self.should_process(pair, source_id):
logging.info("Skipping unchanged source %s", pair.basename)
return False
existing_row = self.state.get_by_source_id(source_id)
self.state.upsert_processing(
source_id=source_id,
basename=pair.basename,
audio_signature=audio_signature,
transcript_signature=transcript_signature,
preserve_remote_state=existing_row is not None,
)
prompt_template = self.settings.prompt_path.read_text(encoding="utf-8")
try:
payload = self.openai_client.summarize_transcript(
prompt_template=prompt_template,
transcript_text=transcript_text,
metadata=metadata,
basename=pair.basename,
)
note_target = self.build_note_target(
source_id=source_id,
title=payload.title,
recorded_at=metadata.recorded_at,
note_type="summary",
)
archived_audio = self.cache_audio(pair.audio_path, source_id)
processed_at = datetime.now(UTC)
upload_result = self.upload_audio_to_remote(archived_audio, source_id)
note_path = self.write_note(
target=note_target,
payload=payload,
metadata=metadata,
transcript_text=transcript_text,
local_audio_path=archived_audio,
remote_upload=upload_result,
source_basename=pair.basename,
processed_at=processed_at,
preserve_processed_at=existing_row is not None,
)
self.remove_source_transcript(pair.transcript_path)
self.state.mark_processed(
source_id=source_id,
basename=pair.basename,
audio_signature=audio_signature,
transcript_signature=transcript_signature,
note_path=note_path,
raw_note_path=note_path,
local_audio_path=archived_audio,
remote_audio_path=upload_result.remote_path,
remote_audio_status=upload_result.status,
remote_audio_uploaded_at=upload_result.uploaded_at,
remote_audio_last_error=upload_result.error,
processing_status="processed" if upload_result.status == "uploaded" else "processed_with_upload_error",
preserve_processed_at=existing_row is not None,
)
if self.memos_publisher is not None:
try:
self.sync_memos_site()
except Exception:
logging.exception("Memos publish failed for %s", pair.basename)
self.notify_summary_ready(
title=payload.title,
note_path=note_path,
source_basename=pair.basename,
remote_upload=upload_result,
)
logging.info("Processed %s -> %s", pair.basename, note_path)
if upload_result.status != "uploaded":
logging.warning("Remote upload failed for %s: %s", pair.basename, upload_result.error)
return True
except Exception as exc:
self.state.mark_failed(
source_id=source_id,
basename=pair.basename,
audio_signature=audio_signature,
transcript_signature=transcript_signature,
error_message=str(exc),
)
raise
def cleanup_archive(self) -> int:
cutoff = datetime.now(UTC) - timedelta(days=self.settings.retention_days)
deleted = 0
processed_root = self.settings.archive_dir / "processed"
if not processed_root.exists():
return 0
for source_dir in processed_root.iterdir():
if not source_dir.is_dir():
continue
audio_dir = source_dir / "audio"
if audio_dir.exists():
for audio_path in audio_dir.iterdir():
if not audio_path.is_file():
continue
if datetime.fromtimestamp(audio_path.stat().st_mtime, UTC) < cutoff:
audio_path.unlink()
deleted += 1
for cleanup_dir in sorted(source_dir.rglob("*"), reverse=True):
if cleanup_dir.is_dir() and not any(cleanup_dir.iterdir()):
cleanup_dir.rmdir()
if source_dir.exists() and not any(source_dir.iterdir()):
source_dir.rmdir()
return deleted
def run_scan(self) -> int:
self.retry_pending_uploads()
deleted = self.cleanup_archive()
if deleted:
logging.info("Deleted %s archived audio files", deleted)
return self.process_available_pairs()
def run_watch(self) -> None:
self.cleanup_archive()
self.process_available_pairs()
if self.settings.fswatch_bin and Path(self.settings.fswatch_bin).exists():
self._run_fswatch_loop()
else:
self._run_polling_loop()
def _run_fswatch_loop(self) -> None:
logging.info("Starting fswatch loop on %s", self.settings.watch_dir)
process = subprocess.Popen(
[
self.settings.fswatch_bin,
"-0",
str(self.settings.watch_dir),
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
assert process.stdout is not None
next_scan_at: float | None = None
buffer = b""
should_stop = False
def stop_handler(signum: int, frame: Any) -> None:
nonlocal should_stop
should_stop = True
signal.signal(signal.SIGINT, stop_handler)
signal.signal(signal.SIGTERM, stop_handler)
while not should_stop:
readable, _, _ = select.select([process.stdout], [], [], 1.0)
if readable:
buffer += os.read(process.stdout.fileno(), 65536)
if b"\0" in buffer:
parts = buffer.split(b"\0")
buffer = parts[-1]
decoded = [
Path(part.decode("utf-8", errors="replace"))
for part in parts[:-1]
if part
]
if any(not self.should_ignore_watch_path(path) for path in decoded):
next_scan_at = time.monotonic() + self.settings.debounce_seconds
if next_scan_at is not None and time.monotonic() >= next_scan_at:
self.run_scan()
next_scan_at = None
process.terminate()
process.wait(timeout=5)
def _run_polling_loop(self) -> None:
logging.info("fswatch not found; using polling loop on %s", self.settings.watch_dir)
previous_snapshot: dict[str, str] = {}
while True:
current_snapshot = {}
for path in self.settings.watch_dir.iterdir():
if path.is_file() and not self.should_ignore_watch_path(path):
current_snapshot[str(path)] = build_file_signature(path)
if current_snapshot != previous_snapshot:
self.run_scan()
if self.last_scan_had_unstable:
previous_snapshot = {}
else:
previous_snapshot = current_snapshot
time.sleep(max(self.settings.debounce_seconds, 3))
def build_frontmatter(values: dict[str, Any]) -> str:
lines = ["---"]
for key, value in values.items():
if isinstance(value, list):
lines.append(f"{key}:")
for item in value:
lines.append(f' - "{str(item).replace(chr(34), chr(39))}"')
elif isinstance(value, bool):
lines.append(f"{key}: {'true' if value else 'false'}")
elif isinstance(value, (int, float)):
lines.append(f"{key}: {value}")
else:
escaped = str(value).replace('"', "'")
lines.append(f'{key}: "{escaped}"')
lines.append("---")
return "\n".join(lines)
def remove_blank_lines(text: str) -> str:
cleaned_lines = [line.rstrip() for line in text.splitlines() if line.strip()]
return "\n".join(cleaned_lines) + "\n"
def strip_leading_h1(text: str, title: str | None = None) -> str:
lines = text.splitlines()
if not lines:
return text
first = lines[0].strip()
if not first.startswith("# "):
return text
if title is None or first[2:].strip() == title.strip():
return "\n".join(lines[1:]).lstrip("\n")
return text
def parse_frontmatter_and_body(text: str) -> tuple[dict[str, Any], str]:
lines = text.splitlines()
if not lines or lines[0].strip() != "---":
return {}, text
frontmatter_lines: list[str] = []
body_start = 0
for index in range(1, len(lines)):
if lines[index].strip() == "---":
body_start = index + 1
break
frontmatter_lines.append(lines[index])
data: dict[str, Any] = {}
current_list_key: str | None = None
for raw_line in frontmatter_lines:
line = raw_line.rstrip()
if not line:
continue
if current_list_key and line.startswith(" - "):
item = line[4:].strip().strip('"')
data.setdefault(current_list_key, []).append(item)
continue
current_list_key = None
if line.endswith(":") and ": " not in line:
current_list_key = line[:-1]
data[current_list_key] = []
continue
if ": " not in line:
continue
key, value = line.split(": ", 1)
value = value.strip()
if value.startswith('"') and value.endswith('"'):
data[key] = value.strip('"')
else:
data[key] = value
body = "\n".join(lines[body_start:]).lstrip("\n")
return data, body
def replace_or_append_section(body: str, heading: str, replacement: str) -> str:
pattern = re.compile(rf"(?ms)^## {re.escape(heading)}\n.*?(?=^## |\Z)")
replacement_block = replacement.strip()
if pattern.search(body):
updated = pattern.sub(replacement_block + "\n", body).rstrip("\n")
return updated
return body.rstrip("\n") + "\n" + replacement_block
def remove_section(body: str, heading: str) -> str:
pattern = re.compile(rf"(?ms)^## {re.escape(heading)}\n.*?(?=^## |\Z)")
updated = pattern.sub("", body).strip()
return updated + "\n" if updated else ""
def is_summary_note_frontmatter(frontmatter: dict[str, Any]) -> bool:
note_type = str(frontmatter.get("type", "summary")).strip()
source_id = str(frontmatter.get("source_id", "")).strip()
return note_type == "summary" and bool(source_id)
def write_htpasswd(settings: Settings) -> Path:
if not settings.memos_basic_auth_user or not settings.memos_basic_auth_password:
raise PipelineError("MEMOS_BASIC_AUTH_USER and MEMOS_BASIC_AUTH_PASSWORD must be set.")
result = subprocess.run(
[
"openssl",
"passwd",
"-apr1",
settings.memos_basic_auth_password,
],
check=True,
capture_output=True,
text=True,
)
hashed_password = result.stdout.strip()
settings.memos_basic_auth_htpasswd_path.write_text(
f"{settings.memos_basic_auth_user}:{hashed_password}\n",
encoding="utf-8",
)
return settings.memos_basic_auth_htpasswd_path
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Watch and process MacWhisper transcripts into Obsidian notes.")
subparsers = parser.add_subparsers(dest="command", required=True)
scan_parser = subparsers.add_parser("scan", help="Process available transcript/audio pairs once.")
scan_parser.add_argument("--basename", help="Process only a specific basename if present.")
subparsers.add_parser("watch", help="Watch the directory and process new files automatically.")
subparsers.add_parser("cleanup", help="Delete archived originals older than the retention period.")
subparsers.add_parser("memos-sync", help="Export published notes for Quartz and rebuild the memos site.")
subparsers.add_parser("memos-auth", help="Write the Basic Auth htpasswd file for the memos site.")
subparsers.add_parser("retry-uploads", help="Retry pending remote audio uploads from the local archive.")
subparsers.add_parser("migrate-archive", help="Retroactively migrate archived sources to raw transcript notes and remote audio.")
reprocess_parser = subparsers.add_parser("reprocess", help="Reprocess a basename from the watch folder.")
reprocess_parser.add_argument("basename", help="Basename without extension, e.g. 'Neue Aufnahme 23'")
return parser.parse_args(argv)
def run_cli(argv: list[str]) -> int:
base_dir = Path(__file__).resolve().parent
load_dotenv(base_dir / ".env")
settings = Settings.from_env(base_dir)
settings.ensure_directories()
setup_logging(settings.log_path)
args = parse_args(argv)
pipeline = TranscriptPipeline(settings)
try:
if args.command == "watch":
pipeline.run_watch()
return 0
if args.command == "cleanup":
deleted = pipeline.cleanup_archive()
logging.info("Deleted %s archived audio files", deleted)
return 0
if args.command == "memos-auth":
path = write_htpasswd(settings)
logging.info("Wrote htpasswd file to %s", path)
return 0
if args.command == "memos-sync":
exported_count = pipeline.sync_memos_site()
logging.info("Built memos site with %s notes", exported_count)
return 0
if args.command == "retry-uploads":
retried = pipeline.retry_pending_uploads()
logging.info("Retried %s pending uploads", retried)
return 0
if args.command == "migrate-archive":
migrated = pipeline.migrate_archive()
logging.info("Migrated %s archived sources", migrated)
return 0
if args.command == "scan":
if args.basename:
pair = next((p for p in pipeline.scan_pairs() if p.basename == args.basename), None)
if pair is None:
raise PipelineError(f"No matching pair found for basename: {args.basename}")
pipeline.process_pair(pair)
return 0
pipeline.run_scan()
return 0
if args.command == "reprocess":
pair = next((p for p in pipeline.scan_pairs() if p.basename == args.basename), None)
if pair is None:
raise PipelineError(f"No matching pair found for basename: {args.basename}")
pipeline.process_pair(pair)
return 0
finally:
pipeline.close()
return 1
if __name__ == "__main__":
try:
raise SystemExit(run_cli(sys.argv[1:]))
except PipelineError as exc:
logging.error("%s", exc)
raise SystemExit(1)