2025 lines
79 KiB
Python
2025 lines
79 KiB
Python
#!/usr/bin/env python3
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import dataclasses
|
||
import hashlib
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import select
|
||
import shutil
|
||
import signal
|
||
import sqlite3
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
import urllib.error
|
||
import urllib.parse
|
||
import urllib.request
|
||
from datetime import UTC, datetime, timedelta
|
||
from pathlib import Path
|
||
from typing import Any
|
||
from zoneinfo import ZoneInfo
|
||
|
||
|
||
PIPELINE_VERSION = "1.0.0"
|
||
AUDIO_EXTENSIONS = {".m4a", ".mp3", ".wav", ".aiff", ".aif", ".caf", ".mp4", ".m4b"}
|
||
DEFAULT_OBSIDIAN_DIR = Path("/Users/maddin/Obsidian/Maddin/Transkripte")
|
||
DEFAULT_MODEL = "gpt-5.2"
|
||
DEFAULT_DEBOUNCE_SECONDS = 10
|
||
DEFAULT_RETENTION_DAYS = 7
|
||
DIRECT_TRANSCRIPT_CHAR_LIMIT = 120_000
|
||
CHUNK_TARGET_CHARS = 60_000
|
||
MEMOS_INDEX_PAGE_SIZE = 20
|
||
|
||
|
||
class PipelineError(RuntimeError):
|
||
pass
|
||
|
||
|
||
class OpenAIError(PipelineError):
|
||
pass
|
||
|
||
|
||
@dataclasses.dataclass(slots=True)
|
||
class Settings:
|
||
base_dir: Path
|
||
watch_dir: Path
|
||
obsidian_dir: Path
|
||
archive_dir: Path
|
||
memos_enabled: bool
|
||
memos_site_url: str
|
||
memos_content_dir: Path
|
||
memos_quartz_dir: Path
|
||
memos_output_dir: Path
|
||
memos_build_command: str | None
|
||
memos_rclone_remote: str | None
|
||
memos_rclone_excludes: tuple[str, ...]
|
||
memos_sync_htpasswd: bool
|
||
memos_remote_htpasswd_path: str | None
|
||
memos_basic_auth_user: str | None
|
||
memos_basic_auth_password: str | None
|
||
memos_basic_auth_htpasswd_path: Path
|
||
prompt_path: Path
|
||
state_db_path: Path
|
||
log_path: Path
|
||
openai_api_key: str | None
|
||
openai_model: str
|
||
debounce_seconds: int
|
||
retention_days: int
|
||
request_timeout_seconds: int
|
||
ffprobe_bin: str
|
||
fswatch_bin: str | None
|
||
rclone_bin: str
|
||
rclone_remote: str
|
||
ntfy_base_url: str
|
||
ntfy_topic: str
|
||
ntfy_access_token: str | None
|
||
|
||
@classmethod
|
||
def from_env(cls, base_dir: Path) -> "Settings":
|
||
watch_dir = Path(os.environ.get("WATCH_DIR", str(base_dir))).expanduser()
|
||
archive_dir = Path(os.environ.get("ARCHIVE_DIR", str(base_dir / "archive"))).expanduser()
|
||
memos_enabled = os.environ.get("MEMOS_ENABLED", "").strip().lower() in {"1", "true", "yes", "on"}
|
||
memos_content_dir = Path(
|
||
os.environ.get("MEMOS_CONTENT_DIR", str(base_dir / "memos-content"))
|
||
).expanduser()
|
||
memos_quartz_dir = Path(
|
||
os.environ.get("MEMOS_QUARTZ_DIR", str(base_dir / "memos-quartz"))
|
||
).expanduser()
|
||
memos_output_dir = Path(
|
||
os.environ.get("MEMOS_OUTPUT_DIR", str(base_dir / "memos-site"))
|
||
).expanduser()
|
||
prompt_path = Path(
|
||
os.environ.get("PROMPT_PATH", str(base_dir / "transkript-zusammenfassung-prompt.md"))
|
||
).expanduser()
|
||
obsidian_dir = Path(
|
||
os.environ.get("OBSIDIAN_DIR", str(DEFAULT_OBSIDIAN_DIR))
|
||
).expanduser()
|
||
state_db_path = Path(
|
||
os.environ.get("STATE_DB_PATH", str(base_dir / "pipeline_state.sqlite3"))
|
||
).expanduser()
|
||
log_path = Path(os.environ.get("LOG_PATH", str(base_dir / "pipeline.log"))).expanduser()
|
||
|
||
return cls(
|
||
base_dir=base_dir,
|
||
watch_dir=watch_dir,
|
||
obsidian_dir=obsidian_dir,
|
||
archive_dir=archive_dir,
|
||
memos_enabled=memos_enabled,
|
||
memos_site_url=os.environ.get("MEMOS_SITE_URL", "https://memos.maddin.app").strip(),
|
||
memos_content_dir=memos_content_dir,
|
||
memos_quartz_dir=memos_quartz_dir,
|
||
memos_output_dir=memos_output_dir,
|
||
memos_build_command=os.environ.get("MEMOS_BUILD_COMMAND", "").strip() or None,
|
||
memos_rclone_remote=os.environ.get("MEMOS_RCLONE_REMOTE", "").strip() or None,
|
||
memos_rclone_excludes=tuple(
|
||
part.strip()
|
||
for part in os.environ.get("MEMOS_RCLONE_EXCLUDES", "").split(",")
|
||
if part.strip()
|
||
),
|
||
memos_sync_htpasswd=os.environ.get("MEMOS_SYNC_HTPASSWD", "").strip().lower() in {"1", "true", "yes", "on"},
|
||
memos_remote_htpasswd_path=os.environ.get("MEMOS_REMOTE_HTPASSWD_PATH", "").strip() or None,
|
||
memos_basic_auth_user=os.environ.get("MEMOS_BASIC_AUTH_USER", "").strip() or None,
|
||
memos_basic_auth_password=os.environ.get("MEMOS_BASIC_AUTH_PASSWORD", "").strip() or None,
|
||
memos_basic_auth_htpasswd_path=Path(
|
||
os.environ.get("MEMOS_BASIC_AUTH_HTPASSWD_PATH", str(base_dir / "deploy/nginx/memos.htpasswd"))
|
||
).expanduser(),
|
||
prompt_path=prompt_path,
|
||
state_db_path=state_db_path,
|
||
log_path=log_path,
|
||
openai_api_key=os.environ.get("OPENAI_API_KEY"),
|
||
openai_model=os.environ.get("OPENAI_MODEL", DEFAULT_MODEL),
|
||
debounce_seconds=int(os.environ.get("DEBOUNCE_SECONDS", str(DEFAULT_DEBOUNCE_SECONDS))),
|
||
retention_days=int(os.environ.get("RETENTION_DAYS", str(DEFAULT_RETENTION_DAYS))),
|
||
request_timeout_seconds=int(os.environ.get("OPENAI_TIMEOUT_SECONDS", "600")),
|
||
ffprobe_bin=os.environ.get("FFPROBE_BIN", shutil.which("ffprobe") or "/opt/homebrew/bin/ffprobe"),
|
||
fswatch_bin=os.environ.get("FSWATCH_BIN", shutil.which("fswatch") or "/opt/homebrew/bin/fswatch"),
|
||
rclone_bin=os.environ.get("RCLONE_BIN", shutil.which("rclone") or "/opt/homebrew/bin/rclone"),
|
||
rclone_remote=os.environ.get("RCLONE_REMOTE", "transkripte:/"),
|
||
ntfy_base_url=os.environ.get("NTFY_BASE_URL", "https://ntfy.maddin.app").strip(),
|
||
ntfy_topic=os.environ.get("NTFY_TOPIC", "Transkript").strip(),
|
||
ntfy_access_token=os.environ.get("NTFY_ACCESS_TOKEN", "").strip() or None,
|
||
)
|
||
|
||
def ensure_directories(self) -> None:
|
||
self.watch_dir.mkdir(parents=True, exist_ok=True)
|
||
self.obsidian_dir.mkdir(parents=True, exist_ok=True)
|
||
(self.archive_dir / "processed").mkdir(parents=True, exist_ok=True)
|
||
(self.archive_dir / "failed").mkdir(parents=True, exist_ok=True)
|
||
if self.memos_enabled:
|
||
self.memos_content_dir.mkdir(parents=True, exist_ok=True)
|
||
self.memos_output_dir.mkdir(parents=True, exist_ok=True)
|
||
self.memos_basic_auth_htpasswd_path.parent.mkdir(parents=True, exist_ok=True)
|
||
self.state_db_path.parent.mkdir(parents=True, exist_ok=True)
|
||
self.log_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
|
||
@dataclasses.dataclass(slots=True)
|
||
class SourcePair:
|
||
basename: str
|
||
audio_path: Path
|
||
transcript_path: Path
|
||
|
||
@property
|
||
def audio_signature(self) -> str:
|
||
return build_file_signature(self.audio_path)
|
||
|
||
@property
|
||
def transcript_signature(self) -> str:
|
||
return build_file_signature(self.transcript_path)
|
||
|
||
|
||
@dataclasses.dataclass(slots=True)
|
||
class AudioMetadata:
|
||
recorded_at: datetime
|
||
recorded_at_source: str
|
||
duration_seconds: float
|
||
duration_human: str
|
||
audio_size: int
|
||
|
||
|
||
@dataclasses.dataclass(slots=True)
|
||
class SummaryPayload:
|
||
title: str
|
||
summary_markdown: str
|
||
tags: list[str]
|
||
|
||
|
||
@dataclasses.dataclass(slots=True)
|
||
class NoteTarget:
|
||
source_id: str
|
||
title: str
|
||
note_path: Path
|
||
existing_path: Path | None
|
||
|
||
|
||
@dataclasses.dataclass(slots=True)
|
||
class RawTranscriptPayload:
|
||
title: str
|
||
transcript_markdown: str
|
||
|
||
|
||
@dataclasses.dataclass(slots=True)
|
||
class UploadResult:
|
||
remote_path: str
|
||
status: str
|
||
uploaded_at: datetime | None
|
||
error: str | None = None
|
||
|
||
|
||
def load_dotenv(path: Path) -> None:
|
||
if not path.exists():
|
||
return
|
||
for line in path.read_text(encoding="utf-8").splitlines():
|
||
stripped = line.strip()
|
||
if not stripped or stripped.startswith("#") or "=" not in stripped:
|
||
continue
|
||
key, value = stripped.split("=", 1)
|
||
key = key.strip()
|
||
if not key or key in os.environ:
|
||
continue
|
||
value = value.strip().strip("'").strip('"')
|
||
os.environ[key] = value
|
||
|
||
|
||
def setup_logging(log_path: Path) -> None:
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s %(levelname)s %(message)s",
|
||
handlers=[
|
||
logging.FileHandler(log_path, encoding="utf-8"),
|
||
logging.StreamHandler(sys.stdout),
|
||
],
|
||
)
|
||
|
||
|
||
def build_file_signature(path: Path) -> str:
|
||
stat = path.stat()
|
||
return f"{stat.st_mtime_ns}:{stat.st_size}"
|
||
|
||
|
||
def slugify_title(value: str) -> str:
|
||
value = value.strip()
|
||
value = re.sub(r"\s+", " ", value)
|
||
value = re.sub(r"[/:]+", " ", value)
|
||
value = re.sub(r"[^\w\s.-]", "", value, flags=re.UNICODE)
|
||
value = re.sub(r"\s+", " ", value).strip(" .-_")
|
||
return value or "Unbenannt"
|
||
|
||
|
||
def format_note_date(recorded_at: datetime) -> str:
|
||
return recorded_at.strftime("%y%m%d")
|
||
|
||
|
||
def format_duration(duration_seconds: float) -> str:
|
||
total_seconds = max(int(round(duration_seconds)), 0)
|
||
hours, remainder = divmod(total_seconds, 3600)
|
||
minutes, seconds = divmod(remainder, 60)
|
||
if hours:
|
||
return f"{hours}:{minutes:02d}:{seconds:02d}"
|
||
return f"{minutes}:{seconds:02d}"
|
||
|
||
|
||
def parse_iso_datetime(value: str) -> datetime:
|
||
normalized = value.replace("Z", "+00:00")
|
||
return datetime.fromisoformat(normalized)
|
||
|
||
|
||
def extract_json_object(value: str) -> dict[str, Any]:
|
||
candidate = value.strip()
|
||
if candidate.startswith("```"):
|
||
candidate = re.sub(r"^```[a-zA-Z0-9_-]*\n", "", candidate)
|
||
candidate = re.sub(r"\n```$", "", candidate)
|
||
candidate = candidate.strip()
|
||
return json.loads(candidate)
|
||
|
||
|
||
def split_text_into_chunks(text: str, target_chars: int = CHUNK_TARGET_CHARS) -> list[str]:
|
||
paragraphs = re.split(r"\n\s*\n", text.strip())
|
||
chunks: list[str] = []
|
||
current: list[str] = []
|
||
current_len = 0
|
||
for paragraph in paragraphs:
|
||
paragraph = paragraph.strip()
|
||
if not paragraph:
|
||
continue
|
||
extra = len(paragraph) + 2
|
||
if current and current_len + extra > target_chars:
|
||
chunks.append("\n\n".join(current))
|
||
current = [paragraph]
|
||
current_len = len(paragraph)
|
||
else:
|
||
current.append(paragraph)
|
||
current_len += extra
|
||
if current:
|
||
chunks.append("\n\n".join(current))
|
||
return chunks or [text]
|
||
|
||
|
||
def short_hash(value: str) -> str:
|
||
return hashlib.sha1(value.encode("utf-8")).hexdigest()[:12]
|
||
|
||
|
||
def extract_transcript_section(body: str) -> str:
|
||
pattern = re.compile(r"(?ms)^## (?:Rohtranskript|Transkript)\n(.*?)(?=^## |\Z)")
|
||
match = pattern.search(body)
|
||
if not match:
|
||
return ""
|
||
return match.group(1).strip()
|
||
|
||
|
||
def upsert_transcript_section(body: str, transcript_text: str) -> str:
|
||
transcript_section = "## Transkript\n" + transcript_text.strip() + "\n"
|
||
if re.search(r"(?m)^## Rohtranskript\n", body):
|
||
return replace_or_append_section(body, "Rohtranskript", transcript_section)
|
||
return replace_or_append_section(body, "Transkript", transcript_section)
|
||
|
||
|
||
def join_remote_path(root: str, *parts: str) -> str:
|
||
normalized_root = root.rstrip("/")
|
||
cleaned_parts = [part.strip("/") for part in parts if part.strip("/")]
|
||
if not cleaned_parts:
|
||
return normalized_root
|
||
return normalized_root + "/" + "/".join(cleaned_parts)
|
||
|
||
|
||
class StateStore:
|
||
def __init__(self, db_path: Path) -> None:
|
||
self.db_path = db_path
|
||
self.connection = sqlite3.connect(db_path)
|
||
self.connection.row_factory = sqlite3.Row
|
||
self._initialize()
|
||
|
||
def _initialize(self) -> None:
|
||
self.connection.execute(
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS source_state (
|
||
source_id TEXT PRIMARY KEY,
|
||
basename TEXT NOT NULL,
|
||
audio_signature TEXT NOT NULL,
|
||
transcript_signature TEXT NOT NULL,
|
||
note_path TEXT,
|
||
raw_note_path TEXT,
|
||
local_audio_path TEXT,
|
||
remote_audio_path TEXT,
|
||
remote_audio_status TEXT,
|
||
remote_audio_uploaded_at TEXT,
|
||
remote_audio_last_error TEXT,
|
||
status TEXT NOT NULL,
|
||
last_processed_at TEXT,
|
||
updated_at TEXT,
|
||
last_error TEXT
|
||
)
|
||
"""
|
||
)
|
||
columns = {row["name"] for row in self.connection.execute("PRAGMA table_info(source_state)")}
|
||
expected_columns = {
|
||
"raw_note_path": "TEXT",
|
||
"local_audio_path": "TEXT",
|
||
"remote_audio_path": "TEXT",
|
||
"remote_audio_status": "TEXT",
|
||
"remote_audio_uploaded_at": "TEXT",
|
||
"remote_audio_last_error": "TEXT",
|
||
}
|
||
for column_name, column_type in expected_columns.items():
|
||
if column_name not in columns:
|
||
self.connection.execute(
|
||
f"ALTER TABLE source_state ADD COLUMN {column_name} {column_type}"
|
||
)
|
||
self.connection.execute(
|
||
"CREATE INDEX IF NOT EXISTS idx_source_state_basename ON source_state(basename)"
|
||
)
|
||
self.connection.commit()
|
||
|
||
def get_by_source_id(self, source_id: str) -> sqlite3.Row | None:
|
||
row = self.connection.execute(
|
||
"SELECT * FROM source_state WHERE source_id = ?",
|
||
(source_id,),
|
||
).fetchone()
|
||
return row
|
||
|
||
def get_by_basename(self, basename: str) -> sqlite3.Row | None:
|
||
row = self.connection.execute(
|
||
"""
|
||
SELECT * FROM source_state
|
||
WHERE basename = ?
|
||
ORDER BY COALESCE(updated_at, last_processed_at, '') DESC
|
||
LIMIT 1
|
||
""",
|
||
(basename,),
|
||
).fetchone()
|
||
return row
|
||
|
||
def upsert_processing(
|
||
self,
|
||
*,
|
||
source_id: str,
|
||
basename: str,
|
||
audio_signature: str,
|
||
transcript_signature: str,
|
||
preserve_remote_state: bool,
|
||
) -> None:
|
||
now = datetime.now(UTC).isoformat()
|
||
self.connection.execute(
|
||
"""
|
||
INSERT INTO source_state (
|
||
source_id, basename, audio_signature, transcript_signature,
|
||
note_path, raw_note_path, local_audio_path, remote_audio_path,
|
||
remote_audio_status, remote_audio_uploaded_at, remote_audio_last_error,
|
||
status, last_processed_at, updated_at, last_error
|
||
) VALUES (?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'processing', NULL, ?, NULL)
|
||
ON CONFLICT(source_id) DO UPDATE SET
|
||
basename = excluded.basename,
|
||
audio_signature = excluded.audio_signature,
|
||
transcript_signature = excluded.transcript_signature,
|
||
status = 'processing',
|
||
updated_at = excluded.updated_at,
|
||
last_error = NULL,
|
||
remote_audio_status = CASE
|
||
WHEN ? THEN source_state.remote_audio_status
|
||
ELSE NULL
|
||
END,
|
||
remote_audio_uploaded_at = CASE
|
||
WHEN ? THEN source_state.remote_audio_uploaded_at
|
||
ELSE NULL
|
||
END,
|
||
remote_audio_last_error = CASE
|
||
WHEN ? THEN source_state.remote_audio_last_error
|
||
ELSE NULL
|
||
END,
|
||
remote_audio_path = CASE
|
||
WHEN ? THEN source_state.remote_audio_path
|
||
ELSE NULL
|
||
END
|
||
""",
|
||
(
|
||
source_id,
|
||
basename,
|
||
audio_signature,
|
||
transcript_signature,
|
||
now,
|
||
1 if preserve_remote_state else 0,
|
||
1 if preserve_remote_state else 0,
|
||
1 if preserve_remote_state else 0,
|
||
1 if preserve_remote_state else 0,
|
||
),
|
||
)
|
||
self.connection.commit()
|
||
|
||
def mark_processed(
|
||
self,
|
||
*,
|
||
source_id: str,
|
||
basename: str,
|
||
audio_signature: str,
|
||
transcript_signature: str,
|
||
note_path: Path,
|
||
raw_note_path: Path,
|
||
local_audio_path: Path | None,
|
||
remote_audio_path: str | None,
|
||
remote_audio_status: str,
|
||
remote_audio_uploaded_at: datetime | None,
|
||
remote_audio_last_error: str | None,
|
||
processing_status: str,
|
||
preserve_processed_at: bool,
|
||
) -> None:
|
||
now = datetime.now(UTC).isoformat()
|
||
last_processed_at = None if preserve_processed_at else now
|
||
self.connection.execute(
|
||
"""
|
||
INSERT INTO source_state (
|
||
source_id, basename, audio_signature, transcript_signature,
|
||
note_path, raw_note_path, local_audio_path, remote_audio_path,
|
||
remote_audio_status, remote_audio_uploaded_at, remote_audio_last_error,
|
||
status, last_processed_at, updated_at, last_error
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL)
|
||
ON CONFLICT(source_id) DO UPDATE SET
|
||
basename = excluded.basename,
|
||
audio_signature = excluded.audio_signature,
|
||
transcript_signature = excluded.transcript_signature,
|
||
note_path = excluded.note_path,
|
||
raw_note_path = excluded.raw_note_path,
|
||
local_audio_path = excluded.local_audio_path,
|
||
remote_audio_path = excluded.remote_audio_path,
|
||
remote_audio_status = excluded.remote_audio_status,
|
||
remote_audio_uploaded_at = excluded.remote_audio_uploaded_at,
|
||
remote_audio_last_error = excluded.remote_audio_last_error,
|
||
status = excluded.status,
|
||
last_processed_at = COALESCE(source_state.last_processed_at, excluded.last_processed_at),
|
||
updated_at = excluded.updated_at,
|
||
last_error = NULL
|
||
""",
|
||
(
|
||
source_id,
|
||
basename,
|
||
audio_signature,
|
||
transcript_signature,
|
||
str(note_path),
|
||
str(raw_note_path),
|
||
str(local_audio_path) if local_audio_path else None,
|
||
remote_audio_path,
|
||
remote_audio_status,
|
||
remote_audio_uploaded_at.isoformat() if remote_audio_uploaded_at else None,
|
||
remote_audio_last_error,
|
||
processing_status,
|
||
last_processed_at,
|
||
now,
|
||
),
|
||
)
|
||
self.connection.commit()
|
||
|
||
def mark_failed(
|
||
self,
|
||
*,
|
||
source_id: str,
|
||
basename: str,
|
||
audio_signature: str,
|
||
transcript_signature: str,
|
||
error_message: str,
|
||
) -> None:
|
||
now = datetime.now(UTC).isoformat()
|
||
self.connection.execute(
|
||
"""
|
||
INSERT INTO source_state (
|
||
source_id, basename, audio_signature, transcript_signature,
|
||
note_path, raw_note_path, local_audio_path, remote_audio_path,
|
||
remote_audio_status, remote_audio_uploaded_at, remote_audio_last_error,
|
||
status, last_processed_at, updated_at, last_error
|
||
) VALUES (?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'failed', NULL, ?, ?)
|
||
ON CONFLICT(source_id) DO UPDATE SET
|
||
basename = excluded.basename,
|
||
audio_signature = excluded.audio_signature,
|
||
transcript_signature = excluded.transcript_signature,
|
||
status = 'failed',
|
||
updated_at = excluded.updated_at,
|
||
last_error = excluded.last_error
|
||
""",
|
||
(source_id, basename, audio_signature, transcript_signature, now, error_message),
|
||
)
|
||
self.connection.commit()
|
||
|
||
def iter_pending_uploads(self) -> list[sqlite3.Row]:
|
||
return list(
|
||
self.connection.execute(
|
||
"""
|
||
SELECT *
|
||
FROM source_state
|
||
WHERE local_audio_path IS NOT NULL
|
||
AND COALESCE(remote_audio_status, '') != 'uploaded'
|
||
ORDER BY COALESCE(updated_at, last_processed_at, '') ASC
|
||
"""
|
||
).fetchall()
|
||
)
|
||
|
||
def close(self) -> None:
|
||
self.connection.close()
|
||
|
||
|
||
class OpenAIClient:
|
||
def __init__(self, api_key: str, model: str, timeout_seconds: int) -> None:
|
||
self.api_key = api_key
|
||
self.model = model
|
||
self.timeout_seconds = timeout_seconds
|
||
|
||
def _request(self, payload: dict[str, Any]) -> dict[str, Any]:
|
||
request = urllib.request.Request(
|
||
url="https://api.openai.com/v1/responses",
|
||
data=json.dumps(payload).encode("utf-8"),
|
||
headers={
|
||
"Authorization": f"Bearer {self.api_key}",
|
||
"Content-Type": "application/json",
|
||
},
|
||
method="POST",
|
||
)
|
||
last_error: Exception | None = None
|
||
for attempt in range(3):
|
||
try:
|
||
with urllib.request.urlopen(request, timeout=self.timeout_seconds) as response:
|
||
return json.loads(response.read().decode("utf-8"))
|
||
except urllib.error.HTTPError as exc:
|
||
body = exc.read().decode("utf-8", errors="replace")
|
||
raise OpenAIError(f"OpenAI API error {exc.code}: {body}") from exc
|
||
except (urllib.error.URLError, TimeoutError) as exc:
|
||
last_error = exc
|
||
time.sleep(2**attempt)
|
||
raise OpenAIError(f"OpenAI request failed after retries: {last_error}")
|
||
|
||
def _extract_output_text(self, response_json: dict[str, Any]) -> str:
|
||
chunks: list[str] = []
|
||
for item in response_json.get("output", []):
|
||
if item.get("type") != "message":
|
||
continue
|
||
for content in item.get("content", []):
|
||
if content.get("type") == "output_text":
|
||
text = content.get("text", "")
|
||
if text:
|
||
chunks.append(text)
|
||
elif content.get("type") == "refusal":
|
||
raise OpenAIError(f"Model refusal: {content.get('refusal', '')}")
|
||
if not chunks:
|
||
raise OpenAIError(f"No text output in response: {json.dumps(response_json)[:1000]}")
|
||
return "\n".join(chunks).strip()
|
||
|
||
def _schema_payload(self, schema: dict[str, Any]) -> dict[str, Any]:
|
||
return {
|
||
"type": "json_schema",
|
||
"name": "transcript_summary",
|
||
"strict": True,
|
||
"schema": schema,
|
||
}
|
||
|
||
def summarize_chunk(
|
||
self,
|
||
*,
|
||
chunk_index: int,
|
||
chunk_count: int,
|
||
transcript_chunk: str,
|
||
metadata: AudioMetadata,
|
||
basename: str,
|
||
) -> str:
|
||
prompt = (
|
||
"Du fasst einen Ausschnitt eines längeren deutschsprachigen Transkripts zusammen. "
|
||
"Schreibe eine sachliche, kompakte Markdown-Zusammenfassung des Ausschnitts. "
|
||
"Halte Themen, Entscheidungen, offene Fragen, Konflikte, Risiken, Aufgaben und "
|
||
"nächste Schritte fest, sofern sie in diesem Ausschnitt vorkommen. "
|
||
"Erfinde nichts hinzu.\n\n"
|
||
f"Quellname: {basename}\n"
|
||
f"Chunk: {chunk_index} von {chunk_count}\n"
|
||
f"Aufnahmezeit: {metadata.recorded_at.isoformat()}\n"
|
||
f"Dauer der Gesamtaufnahme: {metadata.duration_human}\n\n"
|
||
"Transkript-Ausschnitt:\n"
|
||
f"{transcript_chunk}"
|
||
)
|
||
|
||
schema = {
|
||
"type": "object",
|
||
"additionalProperties": False,
|
||
"properties": {
|
||
"chunk_summary_markdown": {"type": "string"},
|
||
},
|
||
"required": ["chunk_summary_markdown"],
|
||
}
|
||
|
||
response_json = self._request(
|
||
{
|
||
"model": self.model,
|
||
"input": [
|
||
{
|
||
"role": "system",
|
||
"content": [
|
||
{
|
||
"type": "input_text",
|
||
"text": "Produce JSON that matches the schema exactly.",
|
||
}
|
||
],
|
||
},
|
||
{
|
||
"role": "user",
|
||
"content": [{"type": "input_text", "text": prompt}],
|
||
},
|
||
],
|
||
"text": {"format": self._schema_payload(schema)},
|
||
}
|
||
)
|
||
parsed = extract_json_object(self._extract_output_text(response_json))
|
||
return parsed["chunk_summary_markdown"].strip()
|
||
|
||
def summarize_transcript(
|
||
self,
|
||
*,
|
||
prompt_template: str,
|
||
transcript_text: str,
|
||
metadata: AudioMetadata,
|
||
basename: str,
|
||
) -> SummaryPayload:
|
||
transcript_input = transcript_text
|
||
if len(transcript_text) > DIRECT_TRANSCRIPT_CHAR_LIMIT:
|
||
chunks = split_text_into_chunks(transcript_text, target_chars=CHUNK_TARGET_CHARS)
|
||
chunk_summaries = [
|
||
self.summarize_chunk(
|
||
chunk_index=index,
|
||
chunk_count=len(chunks),
|
||
transcript_chunk=chunk,
|
||
metadata=metadata,
|
||
basename=basename,
|
||
)
|
||
for index, chunk in enumerate(chunks, start=1)
|
||
]
|
||
transcript_input = (
|
||
"Das Originaltranskript ist sehr lang. Verwende deshalb diese "
|
||
"Ausschnittszusammenfassungen als Arbeitsgrundlage für die endgültige "
|
||
"Zusammenfassung und den Titel.\n\n"
|
||
+ "\n\n".join(
|
||
f"### Chunk {index}\n{summary}" for index, summary in enumerate(chunk_summaries, start=1)
|
||
)
|
||
)
|
||
|
||
schema = {
|
||
"type": "object",
|
||
"additionalProperties": False,
|
||
"properties": {
|
||
"title": {
|
||
"type": "string",
|
||
"description": "Kurzer präziser deutscher Titel ohne Datum.",
|
||
},
|
||
"summary_markdown": {
|
||
"type": "string",
|
||
"description": "Vollständige strukturierte Zusammenfassung in Markdown.",
|
||
},
|
||
"tags": {
|
||
"type": "array",
|
||
"items": {"type": "string"},
|
||
"minItems": 2,
|
||
"maxItems": 8,
|
||
},
|
||
},
|
||
"required": ["title", "summary_markdown", "tags"],
|
||
}
|
||
|
||
metadata_block = (
|
||
f"Quellname: {basename}\n"
|
||
f"Aufnahmezeit: {metadata.recorded_at.isoformat()}\n"
|
||
f"Aufnahmezeit-Quelle: {metadata.recorded_at_source}\n"
|
||
f"Dauer in Sekunden: {metadata.duration_seconds:.3f}\n"
|
||
f"Dauer formatiert: {metadata.duration_human}\n"
|
||
)
|
||
if "[TRANSKRIPT EINFÜGEN]" in prompt_template:
|
||
template_with_material = prompt_template.replace("[TRANSKRIPT EINFÜGEN]", transcript_input)
|
||
else:
|
||
template_with_material = f"{prompt_template}\n\nArbeitsmaterial:\n{transcript_input}"
|
||
full_prompt = (
|
||
"Nutze die folgende Prompt-Vorlage als redaktionellen Standard. "
|
||
"Die endgültige Antwort selbst muss aber JSON sein, das exakt dem Schema entspricht.\n\n"
|
||
f"{template_with_material}\n\n"
|
||
"Zusätzliche Anforderungen für diese Pipeline:\n"
|
||
"- Gib einen knappen, brauchbaren Titel ohne Datum zurück.\n"
|
||
"- Der Titel soll das Gesprächsthema treffen und als Dateiname taugen.\n"
|
||
"- summary_markdown soll direkt als Obsidian-Notizinhalt verwendbar sein.\n"
|
||
"- tags sollen kurze deutschsprachige Schlagworte sein.\n"
|
||
"- Schreibe keine Vorbemerkung außerhalb des Schemas.\n\n"
|
||
"Metadaten:\n"
|
||
f"{metadata_block}\n"
|
||
"Beachte die Metadaten zusätzlich bei Titel und Einordnung."
|
||
)
|
||
|
||
response_json = self._request(
|
||
{
|
||
"model": self.model,
|
||
"input": [
|
||
{
|
||
"role": "system",
|
||
"content": [
|
||
{
|
||
"type": "input_text",
|
||
"text": "Produce JSON that matches the schema exactly.",
|
||
}
|
||
],
|
||
},
|
||
{
|
||
"role": "user",
|
||
"content": [{"type": "input_text", "text": full_prompt}],
|
||
},
|
||
],
|
||
"text": {"format": self._schema_payload(schema)},
|
||
}
|
||
)
|
||
parsed = extract_json_object(self._extract_output_text(response_json))
|
||
tags = [slugify_title(tag).replace(" ", "-").lower() for tag in parsed["tags"] if tag.strip()]
|
||
return SummaryPayload(
|
||
title=parsed["title"].strip(),
|
||
summary_markdown=parsed["summary_markdown"].strip(),
|
||
tags=tags or ["transkript", "ki-zusammenfassung"],
|
||
)
|
||
|
||
|
||
class NtfyClient:
|
||
def __init__(self, base_url: str, topic: str, access_token: str, timeout_seconds: int) -> None:
|
||
self.publish_url = base_url.rstrip("/") + "/" + urllib.parse.quote(topic, safe="")
|
||
self.access_token = access_token
|
||
self.timeout_seconds = timeout_seconds
|
||
|
||
def publish_summary_ready(
|
||
self,
|
||
*,
|
||
title: str,
|
||
note_path: Path,
|
||
source_basename: str,
|
||
remote_upload: UploadResult,
|
||
) -> None:
|
||
lines = [
|
||
f"Zusammenfassung fertig: {title}",
|
||
f"Datei: {note_path.name}",
|
||
f"Pfad: {note_path}",
|
||
f"Quelle: {source_basename}",
|
||
]
|
||
if remote_upload.status == "uploaded":
|
||
lines.append(f"Audio-Archiv: {remote_upload.remote_path}")
|
||
elif remote_upload.error:
|
||
lines.append(f"Audio-Upload: {remote_upload.status} ({remote_upload.error})")
|
||
|
||
request = urllib.request.Request(
|
||
url=self.publish_url,
|
||
data="\n".join(lines).encode("utf-8"),
|
||
headers={
|
||
"Authorization": f"Bearer {self.access_token}",
|
||
"Content-Type": "text/plain; charset=utf-8",
|
||
"Title": f"Transkript fertig: {title}",
|
||
"Tags": "memo",
|
||
},
|
||
method="POST",
|
||
)
|
||
with urllib.request.urlopen(request, timeout=self.timeout_seconds):
|
||
return
|
||
|
||
|
||
class MemosPublisher:
|
||
PUBLIC_FRONTMATTER_KEYS = (
|
||
"title",
|
||
"date",
|
||
"recorded_at",
|
||
"duration_human",
|
||
"processed_at",
|
||
"updated_at",
|
||
"tags",
|
||
)
|
||
|
||
def __init__(self, settings: Settings) -> None:
|
||
self.settings = settings
|
||
|
||
def export_all_notes(self) -> list[Path]:
|
||
export_root = self.settings.memos_content_dir / "transkripte"
|
||
export_root.mkdir(parents=True, exist_ok=True)
|
||
exported_paths: list[Path] = []
|
||
|
||
for note_path in sorted(self.settings.obsidian_dir.glob("*.md")):
|
||
exported_path = self.export_note(note_path)
|
||
if exported_path is not None:
|
||
exported_paths.append(exported_path)
|
||
|
||
keep_names = {path.name for path in exported_paths}
|
||
for stale_path in export_root.glob("*.md"):
|
||
if stale_path.name not in keep_names:
|
||
stale_path.unlink()
|
||
|
||
self.write_index_pages(exported_paths)
|
||
return exported_paths
|
||
|
||
def export_note(self, note_path: Path) -> Path | None:
|
||
frontmatter, body = parse_frontmatter_and_body(note_path.read_text(encoding="utf-8"))
|
||
if not is_summary_note_frontmatter(frontmatter):
|
||
return None
|
||
|
||
public_frontmatter: dict[str, Any] = {}
|
||
for key in self.PUBLIC_FRONTMATTER_KEYS:
|
||
value = frontmatter.get(key)
|
||
if value in (None, ""):
|
||
continue
|
||
public_frontmatter[key] = value
|
||
|
||
title = str(frontmatter.get("title") or note_path.stem)
|
||
public_frontmatter.setdefault("title", title)
|
||
public_frontmatter.setdefault("tags", ["transkript", "memo"])
|
||
|
||
sanitized_body = remove_section(strip_leading_h1(body, title).strip(), "Quellen").strip()
|
||
if not sanitized_body:
|
||
sanitized_body = "## Hinweis\nDiese Notiz enthält derzeit keinen veröffentlichbaren Inhalt.\n"
|
||
|
||
target_path = self.settings.memos_content_dir / "transkripte" / note_path.name
|
||
target_path.parent.mkdir(parents=True, exist_ok=True)
|
||
target_path.write_text(
|
||
f"{build_frontmatter(public_frontmatter)}\n{sanitized_body.rstrip()}\n",
|
||
encoding="utf-8",
|
||
)
|
||
return target_path
|
||
|
||
def write_index_pages(self, exported_paths: list[Path]) -> None:
|
||
notes = sorted(exported_paths, key=lambda path: path.stem, reverse=True)
|
||
stale_pagination_paths = list(self.settings.memos_content_dir.glob("seite-*.md"))
|
||
for stale_path in stale_pagination_paths:
|
||
stale_path.unlink()
|
||
|
||
page_chunks = [
|
||
notes[index : index + MEMOS_INDEX_PAGE_SIZE]
|
||
for index in range(0, len(notes), MEMOS_INDEX_PAGE_SIZE)
|
||
] or [[]]
|
||
|
||
for page_number, page_notes in enumerate(page_chunks, start=1):
|
||
note_links = "\n".join(f"- [[transkripte/{path.stem}]]" for path in page_notes)
|
||
if not note_links:
|
||
note_links = "- Noch keine veröffentlichten Memos vorhanden."
|
||
|
||
navigation_links: list[str] = []
|
||
if page_number > 1:
|
||
previous_target = "index" if page_number == 2 else f"seite-{page_number - 1}"
|
||
navigation_links.append(f"[[{previous_target}|Neuere Memos]]")
|
||
if page_number < len(page_chunks):
|
||
navigation_links.append(f"[[seite-{page_number + 1}|Ältere Memos]]")
|
||
|
||
navigation_block = ""
|
||
if navigation_links:
|
||
navigation_block = "## Navigation\n" + " | ".join(navigation_links) + "\n"
|
||
|
||
title = "Memos" if page_number == 1 else f"Memos – Seite {page_number}"
|
||
page_frontmatter = build_frontmatter(
|
||
{
|
||
"title": title,
|
||
"tags": ["memos", "transkripte"],
|
||
}
|
||
)
|
||
page_body = (
|
||
"Diese Website enthält privat veröffentlichte Zusammenfassungen mit eingebettetem Transkript.\n\n"
|
||
f"## Übersicht Seite {page_number}\n{note_links}\n\n"
|
||
f"{navigation_block}"
|
||
)
|
||
page_path = (
|
||
self.settings.memos_content_dir / "index.md"
|
||
if page_number == 1
|
||
else self.settings.memos_content_dir / f"seite-{page_number}.md"
|
||
)
|
||
page_path.write_text(
|
||
f"{page_frontmatter}\n{page_body}",
|
||
encoding="utf-8",
|
||
)
|
||
|
||
def build_site(self) -> None:
|
||
quartz_dir = self.settings.memos_quartz_dir
|
||
if not quartz_dir.exists():
|
||
raise PipelineError(f"Quartz directory does not exist: {quartz_dir}")
|
||
|
||
if self.settings.memos_build_command:
|
||
subprocess.run(
|
||
self.settings.memos_build_command,
|
||
cwd=quartz_dir,
|
||
shell=True,
|
||
check=True,
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
return
|
||
|
||
node_modules = quartz_dir / "node_modules"
|
||
if not node_modules.exists():
|
||
subprocess.run(
|
||
["npm", "ci", "--no-fund", "--no-audit"],
|
||
cwd=quartz_dir,
|
||
check=True,
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
|
||
subprocess.run(
|
||
[
|
||
"npm",
|
||
"run",
|
||
"quartz",
|
||
"--",
|
||
"build",
|
||
"--directory",
|
||
str(self.settings.memos_content_dir),
|
||
"--output",
|
||
str(self.settings.memos_output_dir),
|
||
],
|
||
cwd=quartz_dir,
|
||
check=True,
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
self.ensure_pretty_urls()
|
||
|
||
def ensure_pretty_urls(self) -> None:
|
||
for html_path in self.settings.memos_output_dir.rglob("*.html"):
|
||
if html_path.name in {"index.html", "404.html"}:
|
||
continue
|
||
target_dir = html_path.with_suffix("")
|
||
target_dir.mkdir(parents=True, exist_ok=True)
|
||
shutil.copy2(html_path, target_dir / "index.html")
|
||
|
||
def deploy_site(self) -> None:
|
||
if not self.settings.memos_rclone_remote:
|
||
return
|
||
|
||
subprocess.run(
|
||
[
|
||
self.settings.rclone_bin,
|
||
"sync",
|
||
"--delete-after",
|
||
"--fast-list",
|
||
*[arg for pattern in self.settings.memos_rclone_excludes for arg in ("--exclude", pattern)],
|
||
str(self.settings.memos_output_dir),
|
||
self.settings.memos_rclone_remote,
|
||
],
|
||
check=True,
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
|
||
if not self.settings.memos_sync_htpasswd:
|
||
return
|
||
if not self.settings.memos_remote_htpasswd_path:
|
||
raise PipelineError("MEMOS_REMOTE_HTPASSWD_PATH must be set when MEMOS_SYNC_HTPASSWD is enabled.")
|
||
if not self.settings.memos_basic_auth_htpasswd_path.exists():
|
||
raise PipelineError(
|
||
f"htpasswd file does not exist: {self.settings.memos_basic_auth_htpasswd_path}. Run `transcript memos-auth` first."
|
||
)
|
||
|
||
subprocess.run(
|
||
[
|
||
self.settings.rclone_bin,
|
||
"copyto",
|
||
str(self.settings.memos_basic_auth_htpasswd_path),
|
||
self.settings.memos_remote_htpasswd_path,
|
||
],
|
||
check=True,
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
|
||
def sync_and_build(self) -> int:
|
||
exported_paths = self.export_all_notes()
|
||
self.build_site()
|
||
self.deploy_site()
|
||
return len(exported_paths)
|
||
|
||
|
||
class TranscriptPipeline:
|
||
def __init__(self, settings: Settings) -> None:
|
||
self.settings = settings
|
||
self.state = StateStore(settings.state_db_path)
|
||
self.last_scan_had_unstable = False
|
||
self.openai_client = (
|
||
OpenAIClient(
|
||
api_key=settings.openai_api_key,
|
||
model=settings.openai_model,
|
||
timeout_seconds=settings.request_timeout_seconds,
|
||
)
|
||
if settings.openai_api_key
|
||
else None
|
||
)
|
||
self.ntfy_client = (
|
||
NtfyClient(
|
||
base_url=settings.ntfy_base_url,
|
||
topic=settings.ntfy_topic,
|
||
access_token=settings.ntfy_access_token,
|
||
timeout_seconds=settings.request_timeout_seconds,
|
||
)
|
||
if settings.ntfy_access_token
|
||
else None
|
||
)
|
||
self.memos_publisher = MemosPublisher(settings) if settings.memos_enabled else None
|
||
|
||
def close(self) -> None:
|
||
self.state.close()
|
||
|
||
def notify_summary_ready(
|
||
self,
|
||
*,
|
||
title: str,
|
||
note_path: Path,
|
||
source_basename: str,
|
||
remote_upload: UploadResult,
|
||
) -> None:
|
||
if self.ntfy_client is None:
|
||
return
|
||
try:
|
||
self.ntfy_client.publish_summary_ready(
|
||
title=title,
|
||
note_path=note_path,
|
||
source_basename=source_basename,
|
||
remote_upload=remote_upload,
|
||
)
|
||
except Exception as exc:
|
||
logging.warning("ntfy notification failed for %s: %s", source_basename, exc)
|
||
|
||
def sync_memos_site(self) -> int:
|
||
if self.memos_publisher is None:
|
||
raise PipelineError("MEMOS_ENABLED is not enabled.")
|
||
exported_count = self.memos_publisher.sync_and_build()
|
||
destination = self.settings.memos_rclone_remote or str(self.settings.memos_output_dir)
|
||
logging.info("Updated memos site with %s published notes -> %s", exported_count, destination)
|
||
return exported_count
|
||
|
||
def scan_pairs(self) -> list[SourcePair]:
|
||
audio_files: dict[str, Path] = {}
|
||
transcript_files: dict[str, Path] = {}
|
||
for entry in self.settings.watch_dir.iterdir():
|
||
if not entry.is_file():
|
||
continue
|
||
suffix = entry.suffix.lower()
|
||
if suffix in AUDIO_EXTENSIONS:
|
||
audio_files[entry.stem] = entry
|
||
elif suffix == ".md" and entry != self.settings.prompt_path:
|
||
transcript_files[entry.stem] = entry
|
||
basenames = sorted(set(audio_files) & set(transcript_files))
|
||
return [
|
||
SourcePair(
|
||
basename=basename,
|
||
audio_path=audio_files[basename],
|
||
transcript_path=transcript_files[basename],
|
||
)
|
||
for basename in basenames
|
||
]
|
||
|
||
def should_ignore_watch_path(self, path: Path) -> bool:
|
||
resolved = path.resolve(strict=False)
|
||
ignored_files = {
|
||
self.settings.prompt_path.resolve(strict=False),
|
||
self.settings.state_db_path.resolve(strict=False),
|
||
self.settings.log_path.resolve(strict=False),
|
||
(self.settings.base_dir / ".env").resolve(strict=False),
|
||
(self.settings.base_dir / ".env.example").resolve(strict=False),
|
||
}
|
||
if resolved in ignored_files:
|
||
return True
|
||
archive_root = self.settings.archive_dir.resolve(strict=False)
|
||
try:
|
||
resolved.relative_to(archive_root)
|
||
return True
|
||
except ValueError:
|
||
return False
|
||
|
||
def files_stable(self, pair: SourcePair) -> bool:
|
||
threshold = time.time() - self.settings.debounce_seconds
|
||
return (
|
||
pair.audio_path.stat().st_mtime <= threshold
|
||
and pair.transcript_path.stat().st_mtime <= threshold
|
||
)
|
||
|
||
def build_source_id(self, pair: SourcePair, metadata: AudioMetadata) -> str:
|
||
raw = f"{pair.basename}|{metadata.recorded_at.isoformat()}|{metadata.duration_seconds:.3f}"
|
||
return short_hash(raw)
|
||
|
||
def should_process(self, pair: SourcePair, source_id: str) -> bool:
|
||
row = self.state.get_by_source_id(source_id)
|
||
if row is None:
|
||
return True
|
||
return (
|
||
row["audio_signature"] != pair.audio_signature
|
||
or row["transcript_signature"] != pair.transcript_signature
|
||
or row["status"] not in {"processed", "processed_with_upload_error"}
|
||
or not row["note_path"]
|
||
or not row["remote_audio_status"]
|
||
)
|
||
|
||
def process_available_pairs(self) -> int:
|
||
processed = 0
|
||
self.last_scan_had_unstable = False
|
||
for pair in self.scan_pairs():
|
||
if not self.files_stable(pair):
|
||
self.last_scan_had_unstable = True
|
||
logging.info("Skipping unstable pair %s", pair.basename)
|
||
continue
|
||
try:
|
||
if self.process_pair(pair):
|
||
processed += 1
|
||
except Exception:
|
||
logging.exception("Processing failed for %s", pair.basename)
|
||
return processed
|
||
|
||
def extract_audio_metadata(self, audio_path: Path) -> AudioMetadata:
|
||
result = subprocess.run(
|
||
[
|
||
self.settings.ffprobe_bin,
|
||
"-v",
|
||
"quiet",
|
||
"-print_format",
|
||
"json",
|
||
"-show_format",
|
||
"-show_streams",
|
||
str(audio_path),
|
||
],
|
||
check=True,
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
payload = json.loads(result.stdout)
|
||
format_data = payload.get("format", {})
|
||
stream_data = payload.get("streams", [{}])[0]
|
||
|
||
creation_time = (
|
||
format_data.get("tags", {}).get("creation_time")
|
||
or stream_data.get("tags", {}).get("creation_time")
|
||
)
|
||
stat = audio_path.stat()
|
||
if creation_time:
|
||
recorded_at = parse_iso_datetime(creation_time)
|
||
source = "ffprobe.creation_time"
|
||
else:
|
||
recorded_at = datetime.fromtimestamp(getattr(stat, "st_birthtime", stat.st_mtime), UTC)
|
||
source = "filesystem.birthtime"
|
||
|
||
local_recorded_at = recorded_at.astimezone(datetime.now().astimezone().tzinfo or ZoneInfo("Europe/Berlin"))
|
||
duration_seconds = float(format_data.get("duration") or stream_data.get("duration") or 0.0)
|
||
return AudioMetadata(
|
||
recorded_at=local_recorded_at,
|
||
recorded_at_source=source,
|
||
duration_seconds=duration_seconds,
|
||
duration_human=format_duration(duration_seconds),
|
||
audio_size=stat.st_size,
|
||
)
|
||
|
||
def upload_audio_to_remote(self, audio_path: Path, source_id: str) -> UploadResult:
|
||
remote_path = join_remote_path(self.settings.rclone_remote, source_id, "audio", audio_path.name)
|
||
try:
|
||
subprocess.run(
|
||
[
|
||
self.settings.rclone_bin,
|
||
"copyto",
|
||
str(audio_path),
|
||
remote_path,
|
||
],
|
||
check=True,
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
except subprocess.CalledProcessError as exc:
|
||
error_message = exc.stderr.strip() or exc.stdout.strip() or str(exc)
|
||
return UploadResult(
|
||
remote_path=remote_path,
|
||
status="upload_failed",
|
||
uploaded_at=None,
|
||
error=error_message,
|
||
)
|
||
return UploadResult(
|
||
remote_path=remote_path,
|
||
status="uploaded",
|
||
uploaded_at=datetime.now(UTC),
|
||
error=None,
|
||
)
|
||
|
||
def retry_pending_uploads(self) -> int:
|
||
retried = 0
|
||
for row in self.state.iter_pending_uploads():
|
||
local_audio_path = row["local_audio_path"]
|
||
if not local_audio_path:
|
||
continue
|
||
audio_path = Path(local_audio_path)
|
||
if not audio_path.exists():
|
||
continue
|
||
source_id = row["source_id"]
|
||
upload_result = self.upload_audio_to_remote(audio_path, source_id)
|
||
processing_status = "processed" if upload_result.status == "uploaded" else "processed_with_upload_error"
|
||
self.state.mark_processed(
|
||
source_id=source_id,
|
||
basename=row["basename"],
|
||
audio_signature=row["audio_signature"],
|
||
transcript_signature=row["transcript_signature"],
|
||
note_path=Path(row["note_path"]),
|
||
raw_note_path=Path(row["raw_note_path"] or row["note_path"]),
|
||
local_audio_path=audio_path,
|
||
remote_audio_path=upload_result.remote_path,
|
||
remote_audio_status=upload_result.status,
|
||
remote_audio_uploaded_at=upload_result.uploaded_at,
|
||
remote_audio_last_error=upload_result.error,
|
||
processing_status=processing_status,
|
||
preserve_processed_at=True,
|
||
)
|
||
retried += 1
|
||
if upload_result.status == "uploaded":
|
||
logging.info("Uploaded cached audio for %s to %s", row["basename"], upload_result.remote_path)
|
||
else:
|
||
logging.warning("Retry upload failed for %s: %s", row["basename"], upload_result.error)
|
||
return retried
|
||
|
||
def migrate_archive(self) -> int:
|
||
migrated = 0
|
||
rows = list(
|
||
self.state.connection.execute(
|
||
"SELECT * FROM source_state ORDER BY COALESCE(updated_at, last_processed_at, '') ASC"
|
||
).fetchall()
|
||
)
|
||
for row in rows:
|
||
note_path_value = row["note_path"]
|
||
if not note_path_value:
|
||
continue
|
||
note_path = Path(note_path_value)
|
||
if not note_path.exists():
|
||
fallback_note = self.find_existing_note(row["source_id"], note_type="summary")
|
||
if fallback_note is not None and fallback_note.exists():
|
||
note_path = fallback_note
|
||
else:
|
||
logging.warning("Skipping migration for missing note %s", note_path)
|
||
continue
|
||
|
||
existing_frontmatter, existing_body = parse_frontmatter_and_body(note_path.read_text(encoding="utf-8"))
|
||
if extract_transcript_section(existing_body) and existing_frontmatter.get("remote_audio_status"):
|
||
continue
|
||
|
||
source_id = row["source_id"]
|
||
basename = row["basename"]
|
||
audio_path, transcript_path = self.find_archived_pair(source_id)
|
||
normalized_audio_path = self.normalize_archived_audio(source_id, audio_path)
|
||
|
||
metadata = None
|
||
if normalized_audio_path and normalized_audio_path.exists():
|
||
metadata = self.extract_audio_metadata(normalized_audio_path)
|
||
else:
|
||
recorded_at_value = existing_frontmatter.get("recorded_at")
|
||
if not recorded_at_value:
|
||
logging.warning("Skipping migration for %s due to missing audio and recorded_at", basename)
|
||
continue
|
||
recorded_at = parse_iso_datetime(recorded_at_value)
|
||
metadata = AudioMetadata(
|
||
recorded_at=recorded_at,
|
||
recorded_at_source=existing_frontmatter.get("recorded_at_source", "existing_note"),
|
||
duration_seconds=float(existing_frontmatter.get("duration_seconds", 0.0)),
|
||
duration_human=str(existing_frontmatter.get("duration_human", "0:00")),
|
||
audio_size=0,
|
||
)
|
||
|
||
transcript_text = transcript_path.read_text(encoding="utf-8") if transcript_path and transcript_path.exists() else ""
|
||
|
||
if normalized_audio_path and normalized_audio_path.exists():
|
||
upload_result = self.upload_audio_to_remote(normalized_audio_path, source_id)
|
||
else:
|
||
upload_result = UploadResult(
|
||
remote_path=join_remote_path(self.settings.rclone_remote, source_id, "audio"),
|
||
status="upload_failed",
|
||
uploaded_at=None,
|
||
error="Local audio file missing during migration.",
|
||
)
|
||
|
||
processed_at_value = existing_frontmatter.get("processed_at")
|
||
processed_at = parse_iso_datetime(processed_at_value) if processed_at_value else datetime.now(UTC)
|
||
self.update_existing_summary_note(
|
||
note_path=note_path,
|
||
metadata=metadata,
|
||
source_id=source_id,
|
||
source_basename=basename,
|
||
transcript_text=transcript_text,
|
||
local_audio_path=normalized_audio_path,
|
||
remote_upload=upload_result,
|
||
processed_at=processed_at,
|
||
)
|
||
self.state.mark_processed(
|
||
source_id=source_id,
|
||
basename=basename,
|
||
audio_signature=build_file_signature(normalized_audio_path) if normalized_audio_path and normalized_audio_path.exists() else row["audio_signature"],
|
||
transcript_signature=build_file_signature(transcript_path) if transcript_path and transcript_path.exists() else row["transcript_signature"],
|
||
note_path=note_path,
|
||
raw_note_path=note_path,
|
||
local_audio_path=normalized_audio_path,
|
||
remote_audio_path=upload_result.remote_path,
|
||
remote_audio_status=upload_result.status,
|
||
remote_audio_uploaded_at=upload_result.uploaded_at,
|
||
remote_audio_last_error=upload_result.error,
|
||
processing_status="processed" if upload_result.status == "uploaded" else "processed_with_upload_error",
|
||
preserve_processed_at=True,
|
||
)
|
||
migrated += 1
|
||
logging.info("Migrated archived source %s", basename)
|
||
if migrated and self.memos_publisher is not None:
|
||
self.sync_memos_site()
|
||
return migrated
|
||
|
||
def build_note_target(
|
||
self,
|
||
*,
|
||
source_id: str,
|
||
title: str,
|
||
recorded_at: datetime,
|
||
note_type: str,
|
||
) -> NoteTarget:
|
||
sanitized_title = slugify_title(title)
|
||
suffix = " - Transkript" if note_type == "raw_transcript" else ""
|
||
base_filename = f"{format_note_date(recorded_at)} {sanitized_title}{suffix}.md"
|
||
existing_path = self.find_existing_note(source_id, note_type=note_type)
|
||
|
||
if existing_path is not None and existing_path.exists():
|
||
desired = self.unique_note_path(base_filename, source_id, preferred_existing=existing_path, note_type=note_type)
|
||
return NoteTarget(source_id=source_id, title=sanitized_title, note_path=desired, existing_path=existing_path)
|
||
|
||
desired = self.unique_note_path(base_filename, source_id, preferred_existing=None, note_type=note_type)
|
||
return NoteTarget(source_id=source_id, title=sanitized_title, note_path=desired, existing_path=None)
|
||
|
||
def unique_note_path(
|
||
self,
|
||
filename: str,
|
||
source_id: str,
|
||
preferred_existing: Path | None,
|
||
note_type: str,
|
||
) -> Path:
|
||
candidate = self.settings.obsidian_dir / filename
|
||
if preferred_existing is not None and preferred_existing == candidate:
|
||
return candidate
|
||
if not candidate.exists():
|
||
return candidate
|
||
if self.note_belongs_to_source(candidate, source_id, note_type=note_type):
|
||
return candidate
|
||
stem = candidate.stem
|
||
suffix = candidate.suffix
|
||
counter = 2
|
||
while True:
|
||
next_candidate = candidate.with_name(f"{stem}-{counter}{suffix}")
|
||
if preferred_existing is not None and preferred_existing == next_candidate:
|
||
return next_candidate
|
||
if not next_candidate.exists() or self.note_belongs_to_source(next_candidate, source_id, note_type=note_type):
|
||
return next_candidate
|
||
counter += 1
|
||
|
||
def note_belongs_to_source(self, note_path: Path, source_id: str, note_type: str | None = None) -> bool:
|
||
if not note_path.exists():
|
||
return False
|
||
text = note_path.read_text(encoding="utf-8")
|
||
if not re.search(rf'^source_id:\s+"?{re.escape(source_id)}"?$', text, re.MULTILINE):
|
||
return False
|
||
if note_type is None:
|
||
return True
|
||
type_match = re.search(r'^type: "([^"]+)"$', text, re.MULTILINE)
|
||
if type_match:
|
||
return type_match.group(1) == note_type
|
||
return note_type == "summary"
|
||
|
||
def find_existing_note(self, source_id: str, note_type: str) -> Path | None:
|
||
row = self.state.get_by_source_id(source_id)
|
||
key = "raw_note_path" if note_type == "raw_transcript" else "note_path"
|
||
if row and key in row.keys() and row[key]:
|
||
candidate = Path(row[key])
|
||
if candidate.exists() and self.note_belongs_to_source(candidate, source_id, note_type=note_type):
|
||
return candidate
|
||
for note_path in self.settings.obsidian_dir.glob("*.md"):
|
||
if self.note_belongs_to_source(note_path, source_id, note_type=note_type):
|
||
return note_path
|
||
return None
|
||
|
||
def cache_audio(self, audio_path: Path, source_id: str) -> Path:
|
||
recorded_dir = self.settings.archive_dir / "processed" / source_id / "audio"
|
||
recorded_dir.mkdir(parents=True, exist_ok=True)
|
||
archived_audio = recorded_dir / audio_path.name
|
||
if archived_audio.exists():
|
||
archived_audio.unlink()
|
||
shutil.move(str(audio_path), str(archived_audio))
|
||
return archived_audio
|
||
|
||
def remove_source_transcript(self, transcript_path: Path) -> None:
|
||
if transcript_path.exists():
|
||
transcript_path.unlink()
|
||
|
||
def find_archived_pair(self, source_id: str) -> tuple[Path | None, Path | None]:
|
||
source_dir = self.settings.archive_dir / "processed" / source_id
|
||
if not source_dir.exists():
|
||
return None, None
|
||
|
||
transcript_path: Path | None = None
|
||
for candidate in sorted(source_dir.glob("*.md")):
|
||
transcript_path = candidate
|
||
break
|
||
|
||
audio_path: Path | None = None
|
||
audio_dir = source_dir / "audio"
|
||
search_roots = [audio_dir] if audio_dir.exists() else []
|
||
search_roots.append(source_dir)
|
||
for root in search_roots:
|
||
for candidate in sorted(root.iterdir()):
|
||
if candidate.is_file() and candidate.suffix.lower() in AUDIO_EXTENSIONS:
|
||
audio_path = candidate
|
||
break
|
||
if audio_path:
|
||
break
|
||
return audio_path, transcript_path
|
||
|
||
def normalize_archived_audio(self, source_id: str, audio_path: Path | None) -> Path | None:
|
||
if audio_path is None or not audio_path.exists():
|
||
return None
|
||
if audio_path.parent.name == "audio":
|
||
return audio_path
|
||
return self.cache_audio(audio_path, source_id)
|
||
|
||
def write_note(
|
||
self,
|
||
*,
|
||
target: NoteTarget,
|
||
payload: SummaryPayload,
|
||
metadata: AudioMetadata,
|
||
transcript_text: str,
|
||
local_audio_path: Path | None,
|
||
remote_upload: UploadResult,
|
||
source_basename: str,
|
||
processed_at: datetime,
|
||
preserve_processed_at: bool,
|
||
) -> Path:
|
||
target.note_path.parent.mkdir(parents=True, exist_ok=True)
|
||
if target.existing_path and target.existing_path.exists() and target.existing_path != target.note_path:
|
||
target.existing_path.replace(target.note_path)
|
||
|
||
existing_processed_at = processed_at
|
||
if preserve_processed_at and target.note_path.exists():
|
||
content = target.note_path.read_text(encoding="utf-8")
|
||
match = re.search(r'^processed_at: "([^"]+)"$', content, re.MULTILINE)
|
||
if match:
|
||
existing_processed_at = parse_iso_datetime(match.group(1))
|
||
|
||
tags = ["transkript", "ki-zusammenfassung", *payload.tags]
|
||
tags = list(dict.fromkeys(tags))
|
||
summary_markdown = strip_leading_h1(payload.summary_markdown, payload.title).strip()
|
||
|
||
frontmatter = build_frontmatter(
|
||
{
|
||
"title": payload.title,
|
||
"type": "summary",
|
||
"date": metadata.recorded_at.date().isoformat(),
|
||
"recorded_at": metadata.recorded_at.isoformat(),
|
||
"recorded_at_source": metadata.recorded_at_source,
|
||
"duration_seconds": round(metadata.duration_seconds, 3),
|
||
"duration_human": metadata.duration_human,
|
||
"source_id": target.source_id,
|
||
"source_basename": source_basename,
|
||
"source_audio_cache": str(local_audio_path) if local_audio_path else "",
|
||
"remote_audio": remote_upload.remote_path,
|
||
"remote_audio_status": remote_upload.status,
|
||
"remote_audio_uploaded_at": remote_upload.uploaded_at.isoformat() if remote_upload.uploaded_at else "",
|
||
"remote_audio_last_error": remote_upload.error or "",
|
||
"processed_at": existing_processed_at.isoformat(),
|
||
"updated_at": processed_at.isoformat(),
|
||
"pipeline_version": PIPELINE_VERSION,
|
||
"llm_model": self.settings.openai_model,
|
||
"status": "processed" if remote_upload.status == "uploaded" else "processed_with_upload_error",
|
||
"tags": tags,
|
||
}
|
||
)
|
||
body = (
|
||
"## Metadaten\n"
|
||
f"- Aufnahmedatum: {metadata.recorded_at.date().isoformat()}\n"
|
||
f"- Aufnahmezeit: {metadata.recorded_at.strftime('%H:%M:%S %Z')}\n"
|
||
f"- Dauer: {metadata.duration_human}\n"
|
||
f"- Quelle: `{source_basename}`\n\n"
|
||
f"{summary_markdown}\n\n"
|
||
"## Transkript\n"
|
||
f"{transcript_text.strip()}\n\n"
|
||
"## Quellen\n"
|
||
f"- Remote-Audio: `{remote_upload.remote_path}`\n"
|
||
f"- Upload-Status: `{remote_upload.status}`\n"
|
||
)
|
||
target.note_path.write_text(remove_blank_lines(f"{frontmatter}\n{body}"), encoding="utf-8")
|
||
return target.note_path
|
||
|
||
def write_raw_transcript_note(
|
||
self,
|
||
*,
|
||
target: NoteTarget,
|
||
metadata: AudioMetadata,
|
||
transcript_text: str,
|
||
source_basename: str,
|
||
processed_at: datetime,
|
||
) -> Path:
|
||
target.note_path.parent.mkdir(parents=True, exist_ok=True)
|
||
if target.existing_path and target.existing_path.exists() and target.existing_path != target.note_path:
|
||
target.existing_path.replace(target.note_path)
|
||
|
||
frontmatter = build_frontmatter(
|
||
{
|
||
"title": f"{target.title} - Transkript",
|
||
"type": "raw_transcript",
|
||
"date": metadata.recorded_at.date().isoformat(),
|
||
"recorded_at": metadata.recorded_at.isoformat(),
|
||
"recorded_at_source": metadata.recorded_at_source,
|
||
"duration_seconds": round(metadata.duration_seconds, 3),
|
||
"duration_human": metadata.duration_human,
|
||
"source_id": target.source_id,
|
||
"source_basename": source_basename,
|
||
"processed_at": processed_at.isoformat(),
|
||
"updated_at": processed_at.isoformat(),
|
||
"pipeline_version": PIPELINE_VERSION,
|
||
"status": "processed",
|
||
"tags": ["transkript", "rohtranskript", "whisper-segmente"],
|
||
}
|
||
)
|
||
body = (
|
||
"## Metadaten\n"
|
||
f"- Aufnahmedatum: {metadata.recorded_at.date().isoformat()}\n"
|
||
f"- Aufnahmezeit: {metadata.recorded_at.strftime('%H:%M:%S %Z')}\n"
|
||
f"- Dauer: {metadata.duration_human}\n"
|
||
f"- Quelle: `{source_basename}`\n\n"
|
||
"## Rohtranskript\n"
|
||
f"{transcript_text.strip()}\n"
|
||
)
|
||
target.note_path.write_text(remove_blank_lines(f"{frontmatter}\n{body}"), encoding="utf-8")
|
||
return target.note_path
|
||
|
||
def update_existing_summary_note(
|
||
self,
|
||
*,
|
||
note_path: Path,
|
||
metadata: AudioMetadata,
|
||
source_id: str,
|
||
source_basename: str,
|
||
transcript_text: str,
|
||
local_audio_path: Path | None,
|
||
remote_upload: UploadResult,
|
||
processed_at: datetime,
|
||
) -> None:
|
||
existing_text = note_path.read_text(encoding="utf-8")
|
||
frontmatter, body = parse_frontmatter_and_body(existing_text)
|
||
title = frontmatter.get("title", note_path.stem)
|
||
tags = frontmatter.get("tags", ["transkript", "ki-zusammenfassung"])
|
||
if not isinstance(tags, list):
|
||
tags = ["transkript", "ki-zusammenfassung"]
|
||
tags = list(dict.fromkeys([str(tag) for tag in tags]))
|
||
|
||
processed_at_value = frontmatter.get("processed_at") or processed_at.isoformat()
|
||
updated_frontmatter = {
|
||
"title": title,
|
||
"type": "summary",
|
||
"date": metadata.recorded_at.date().isoformat(),
|
||
"recorded_at": metadata.recorded_at.isoformat(),
|
||
"recorded_at_source": metadata.recorded_at_source,
|
||
"duration_seconds": round(metadata.duration_seconds, 3),
|
||
"duration_human": metadata.duration_human,
|
||
"source_id": source_id,
|
||
"source_basename": source_basename,
|
||
"source_audio_cache": str(local_audio_path) if local_audio_path else "",
|
||
"remote_audio": remote_upload.remote_path,
|
||
"remote_audio_status": remote_upload.status,
|
||
"remote_audio_uploaded_at": remote_upload.uploaded_at.isoformat() if remote_upload.uploaded_at else "",
|
||
"remote_audio_last_error": remote_upload.error or "",
|
||
"processed_at": processed_at_value,
|
||
"updated_at": processed_at.isoformat(),
|
||
"pipeline_version": PIPELINE_VERSION,
|
||
"llm_model": frontmatter.get("llm_model", self.settings.openai_model),
|
||
"status": "processed" if remote_upload.status == "uploaded" else "processed_with_upload_error",
|
||
"tags": tags,
|
||
}
|
||
body = strip_leading_h1(body, title).strip()
|
||
transcript_value = transcript_text.strip()
|
||
if transcript_value:
|
||
body = upsert_transcript_section(body, transcript_value)
|
||
sources_section = (
|
||
"## Quellen\n"
|
||
f"- Remote-Audio: `{remote_upload.remote_path}`\n"
|
||
f"- Upload-Status: `{remote_upload.status}`\n"
|
||
)
|
||
body = replace_or_append_section(body, "Quellen", sources_section)
|
||
note_path.write_text(remove_blank_lines(f"{build_frontmatter(updated_frontmatter)}\n{body}"), encoding="utf-8")
|
||
|
||
def process_pair(self, pair: SourcePair) -> bool:
|
||
if self.openai_client is None:
|
||
raise PipelineError("OPENAI_API_KEY is not set.")
|
||
|
||
audio_signature = pair.audio_signature
|
||
transcript_signature = pair.transcript_signature
|
||
transcript_text = pair.transcript_path.read_text(encoding="utf-8").strip()
|
||
if not transcript_text:
|
||
raise PipelineError(f"Transcript is empty: {pair.transcript_path}")
|
||
|
||
metadata = self.extract_audio_metadata(pair.audio_path)
|
||
source_id = self.build_source_id(pair, metadata)
|
||
if not self.should_process(pair, source_id):
|
||
logging.info("Skipping unchanged source %s", pair.basename)
|
||
return False
|
||
|
||
existing_row = self.state.get_by_source_id(source_id)
|
||
self.state.upsert_processing(
|
||
source_id=source_id,
|
||
basename=pair.basename,
|
||
audio_signature=audio_signature,
|
||
transcript_signature=transcript_signature,
|
||
preserve_remote_state=existing_row is not None,
|
||
)
|
||
|
||
prompt_template = self.settings.prompt_path.read_text(encoding="utf-8")
|
||
try:
|
||
payload = self.openai_client.summarize_transcript(
|
||
prompt_template=prompt_template,
|
||
transcript_text=transcript_text,
|
||
metadata=metadata,
|
||
basename=pair.basename,
|
||
)
|
||
note_target = self.build_note_target(
|
||
source_id=source_id,
|
||
title=payload.title,
|
||
recorded_at=metadata.recorded_at,
|
||
note_type="summary",
|
||
)
|
||
archived_audio = self.cache_audio(pair.audio_path, source_id)
|
||
processed_at = datetime.now(UTC)
|
||
upload_result = self.upload_audio_to_remote(archived_audio, source_id)
|
||
note_path = self.write_note(
|
||
target=note_target,
|
||
payload=payload,
|
||
metadata=metadata,
|
||
transcript_text=transcript_text,
|
||
local_audio_path=archived_audio,
|
||
remote_upload=upload_result,
|
||
source_basename=pair.basename,
|
||
processed_at=processed_at,
|
||
preserve_processed_at=existing_row is not None,
|
||
)
|
||
self.remove_source_transcript(pair.transcript_path)
|
||
self.state.mark_processed(
|
||
source_id=source_id,
|
||
basename=pair.basename,
|
||
audio_signature=audio_signature,
|
||
transcript_signature=transcript_signature,
|
||
note_path=note_path,
|
||
raw_note_path=note_path,
|
||
local_audio_path=archived_audio,
|
||
remote_audio_path=upload_result.remote_path,
|
||
remote_audio_status=upload_result.status,
|
||
remote_audio_uploaded_at=upload_result.uploaded_at,
|
||
remote_audio_last_error=upload_result.error,
|
||
processing_status="processed" if upload_result.status == "uploaded" else "processed_with_upload_error",
|
||
preserve_processed_at=existing_row is not None,
|
||
)
|
||
if self.memos_publisher is not None:
|
||
try:
|
||
self.sync_memos_site()
|
||
except Exception:
|
||
logging.exception("Memos publish failed for %s", pair.basename)
|
||
self.notify_summary_ready(
|
||
title=payload.title,
|
||
note_path=note_path,
|
||
source_basename=pair.basename,
|
||
remote_upload=upload_result,
|
||
)
|
||
logging.info("Processed %s -> %s", pair.basename, note_path)
|
||
if upload_result.status != "uploaded":
|
||
logging.warning("Remote upload failed for %s: %s", pair.basename, upload_result.error)
|
||
return True
|
||
except Exception as exc:
|
||
self.state.mark_failed(
|
||
source_id=source_id,
|
||
basename=pair.basename,
|
||
audio_signature=audio_signature,
|
||
transcript_signature=transcript_signature,
|
||
error_message=str(exc),
|
||
)
|
||
raise
|
||
|
||
def cleanup_archive(self) -> int:
|
||
cutoff = datetime.now(UTC) - timedelta(days=self.settings.retention_days)
|
||
deleted = 0
|
||
processed_root = self.settings.archive_dir / "processed"
|
||
if not processed_root.exists():
|
||
return 0
|
||
for source_dir in processed_root.iterdir():
|
||
if not source_dir.is_dir():
|
||
continue
|
||
audio_dir = source_dir / "audio"
|
||
if audio_dir.exists():
|
||
for audio_path in audio_dir.iterdir():
|
||
if not audio_path.is_file():
|
||
continue
|
||
if datetime.fromtimestamp(audio_path.stat().st_mtime, UTC) < cutoff:
|
||
audio_path.unlink()
|
||
deleted += 1
|
||
for cleanup_dir in sorted(source_dir.rglob("*"), reverse=True):
|
||
if cleanup_dir.is_dir() and not any(cleanup_dir.iterdir()):
|
||
cleanup_dir.rmdir()
|
||
if source_dir.exists() and not any(source_dir.iterdir()):
|
||
source_dir.rmdir()
|
||
return deleted
|
||
|
||
def run_scan(self) -> int:
|
||
self.retry_pending_uploads()
|
||
deleted = self.cleanup_archive()
|
||
if deleted:
|
||
logging.info("Deleted %s archived audio files", deleted)
|
||
return self.process_available_pairs()
|
||
|
||
def run_watch(self) -> None:
|
||
self.cleanup_archive()
|
||
self.process_available_pairs()
|
||
if self.settings.fswatch_bin and Path(self.settings.fswatch_bin).exists():
|
||
self._run_fswatch_loop()
|
||
else:
|
||
self._run_polling_loop()
|
||
|
||
def _run_fswatch_loop(self) -> None:
|
||
logging.info("Starting fswatch loop on %s", self.settings.watch_dir)
|
||
process = subprocess.Popen(
|
||
[
|
||
self.settings.fswatch_bin,
|
||
"-0",
|
||
str(self.settings.watch_dir),
|
||
],
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
)
|
||
assert process.stdout is not None
|
||
next_scan_at: float | None = None
|
||
buffer = b""
|
||
should_stop = False
|
||
|
||
def stop_handler(signum: int, frame: Any) -> None:
|
||
nonlocal should_stop
|
||
should_stop = True
|
||
|
||
signal.signal(signal.SIGINT, stop_handler)
|
||
signal.signal(signal.SIGTERM, stop_handler)
|
||
|
||
while not should_stop:
|
||
readable, _, _ = select.select([process.stdout], [], [], 1.0)
|
||
if readable:
|
||
buffer += os.read(process.stdout.fileno(), 65536)
|
||
if b"\0" in buffer:
|
||
parts = buffer.split(b"\0")
|
||
buffer = parts[-1]
|
||
decoded = [
|
||
Path(part.decode("utf-8", errors="replace"))
|
||
for part in parts[:-1]
|
||
if part
|
||
]
|
||
if any(not self.should_ignore_watch_path(path) for path in decoded):
|
||
next_scan_at = time.monotonic() + self.settings.debounce_seconds
|
||
if next_scan_at is not None and time.monotonic() >= next_scan_at:
|
||
self.run_scan()
|
||
next_scan_at = None
|
||
|
||
process.terminate()
|
||
process.wait(timeout=5)
|
||
|
||
def _run_polling_loop(self) -> None:
|
||
logging.info("fswatch not found; using polling loop on %s", self.settings.watch_dir)
|
||
previous_snapshot: dict[str, str] = {}
|
||
while True:
|
||
current_snapshot = {}
|
||
for path in self.settings.watch_dir.iterdir():
|
||
if path.is_file() and not self.should_ignore_watch_path(path):
|
||
current_snapshot[str(path)] = build_file_signature(path)
|
||
if current_snapshot != previous_snapshot:
|
||
self.run_scan()
|
||
if self.last_scan_had_unstable:
|
||
previous_snapshot = {}
|
||
else:
|
||
previous_snapshot = current_snapshot
|
||
time.sleep(max(self.settings.debounce_seconds, 3))
|
||
|
||
|
||
def build_frontmatter(values: dict[str, Any]) -> str:
|
||
lines = ["---"]
|
||
for key, value in values.items():
|
||
if isinstance(value, list):
|
||
lines.append(f"{key}:")
|
||
for item in value:
|
||
lines.append(f' - "{str(item).replace(chr(34), chr(39))}"')
|
||
elif isinstance(value, bool):
|
||
lines.append(f"{key}: {'true' if value else 'false'}")
|
||
elif isinstance(value, (int, float)):
|
||
lines.append(f"{key}: {value}")
|
||
else:
|
||
escaped = str(value).replace('"', "'")
|
||
lines.append(f'{key}: "{escaped}"')
|
||
lines.append("---")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def remove_blank_lines(text: str) -> str:
|
||
cleaned_lines = [line.rstrip() for line in text.splitlines() if line.strip()]
|
||
return "\n".join(cleaned_lines) + "\n"
|
||
|
||
|
||
def strip_leading_h1(text: str, title: str | None = None) -> str:
|
||
lines = text.splitlines()
|
||
if not lines:
|
||
return text
|
||
first = lines[0].strip()
|
||
if not first.startswith("# "):
|
||
return text
|
||
if title is None or first[2:].strip() == title.strip():
|
||
return "\n".join(lines[1:]).lstrip("\n")
|
||
return text
|
||
|
||
|
||
def parse_frontmatter_and_body(text: str) -> tuple[dict[str, Any], str]:
|
||
lines = text.splitlines()
|
||
if not lines or lines[0].strip() != "---":
|
||
return {}, text
|
||
frontmatter_lines: list[str] = []
|
||
body_start = 0
|
||
for index in range(1, len(lines)):
|
||
if lines[index].strip() == "---":
|
||
body_start = index + 1
|
||
break
|
||
frontmatter_lines.append(lines[index])
|
||
data: dict[str, Any] = {}
|
||
current_list_key: str | None = None
|
||
for raw_line in frontmatter_lines:
|
||
line = raw_line.rstrip()
|
||
if not line:
|
||
continue
|
||
if current_list_key and line.startswith(" - "):
|
||
item = line[4:].strip().strip('"')
|
||
data.setdefault(current_list_key, []).append(item)
|
||
continue
|
||
current_list_key = None
|
||
if line.endswith(":") and ": " not in line:
|
||
current_list_key = line[:-1]
|
||
data[current_list_key] = []
|
||
continue
|
||
if ": " not in line:
|
||
continue
|
||
key, value = line.split(": ", 1)
|
||
value = value.strip()
|
||
if value.startswith('"') and value.endswith('"'):
|
||
data[key] = value.strip('"')
|
||
else:
|
||
data[key] = value
|
||
body = "\n".join(lines[body_start:]).lstrip("\n")
|
||
return data, body
|
||
|
||
|
||
def replace_or_append_section(body: str, heading: str, replacement: str) -> str:
|
||
pattern = re.compile(rf"(?ms)^## {re.escape(heading)}\n.*?(?=^## |\Z)")
|
||
replacement_block = replacement.strip()
|
||
if pattern.search(body):
|
||
updated = pattern.sub(replacement_block + "\n", body).rstrip("\n")
|
||
return updated
|
||
return body.rstrip("\n") + "\n" + replacement_block
|
||
|
||
|
||
def remove_section(body: str, heading: str) -> str:
|
||
pattern = re.compile(rf"(?ms)^## {re.escape(heading)}\n.*?(?=^## |\Z)")
|
||
updated = pattern.sub("", body).strip()
|
||
return updated + "\n" if updated else ""
|
||
|
||
|
||
def is_summary_note_frontmatter(frontmatter: dict[str, Any]) -> bool:
|
||
note_type = str(frontmatter.get("type", "summary")).strip()
|
||
source_id = str(frontmatter.get("source_id", "")).strip()
|
||
return note_type == "summary" and bool(source_id)
|
||
|
||
|
||
def write_htpasswd(settings: Settings) -> Path:
|
||
if not settings.memos_basic_auth_user or not settings.memos_basic_auth_password:
|
||
raise PipelineError("MEMOS_BASIC_AUTH_USER and MEMOS_BASIC_AUTH_PASSWORD must be set.")
|
||
result = subprocess.run(
|
||
[
|
||
"openssl",
|
||
"passwd",
|
||
"-apr1",
|
||
settings.memos_basic_auth_password,
|
||
],
|
||
check=True,
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
hashed_password = result.stdout.strip()
|
||
settings.memos_basic_auth_htpasswd_path.write_text(
|
||
f"{settings.memos_basic_auth_user}:{hashed_password}\n",
|
||
encoding="utf-8",
|
||
)
|
||
return settings.memos_basic_auth_htpasswd_path
|
||
|
||
|
||
def parse_args(argv: list[str]) -> argparse.Namespace:
|
||
parser = argparse.ArgumentParser(description="Watch and process MacWhisper transcripts into Obsidian notes.")
|
||
subparsers = parser.add_subparsers(dest="command", required=True)
|
||
|
||
scan_parser = subparsers.add_parser("scan", help="Process available transcript/audio pairs once.")
|
||
scan_parser.add_argument("--basename", help="Process only a specific basename if present.")
|
||
|
||
subparsers.add_parser("watch", help="Watch the directory and process new files automatically.")
|
||
subparsers.add_parser("cleanup", help="Delete archived originals older than the retention period.")
|
||
subparsers.add_parser("memos-sync", help="Export published notes for Quartz and rebuild the memos site.")
|
||
subparsers.add_parser("memos-auth", help="Write the Basic Auth htpasswd file for the memos site.")
|
||
subparsers.add_parser("retry-uploads", help="Retry pending remote audio uploads from the local archive.")
|
||
subparsers.add_parser("migrate-archive", help="Retroactively migrate archived sources to raw transcript notes and remote audio.")
|
||
|
||
reprocess_parser = subparsers.add_parser("reprocess", help="Reprocess a basename from the watch folder.")
|
||
reprocess_parser.add_argument("basename", help="Basename without extension, e.g. 'Neue Aufnahme 23'")
|
||
|
||
return parser.parse_args(argv)
|
||
|
||
|
||
def run_cli(argv: list[str]) -> int:
|
||
base_dir = Path(__file__).resolve().parent
|
||
load_dotenv(base_dir / ".env")
|
||
settings = Settings.from_env(base_dir)
|
||
settings.ensure_directories()
|
||
setup_logging(settings.log_path)
|
||
|
||
args = parse_args(argv)
|
||
pipeline = TranscriptPipeline(settings)
|
||
try:
|
||
if args.command == "watch":
|
||
pipeline.run_watch()
|
||
return 0
|
||
if args.command == "cleanup":
|
||
deleted = pipeline.cleanup_archive()
|
||
logging.info("Deleted %s archived audio files", deleted)
|
||
return 0
|
||
if args.command == "memos-auth":
|
||
path = write_htpasswd(settings)
|
||
logging.info("Wrote htpasswd file to %s", path)
|
||
return 0
|
||
if args.command == "memos-sync":
|
||
exported_count = pipeline.sync_memos_site()
|
||
logging.info("Built memos site with %s notes", exported_count)
|
||
return 0
|
||
if args.command == "retry-uploads":
|
||
retried = pipeline.retry_pending_uploads()
|
||
logging.info("Retried %s pending uploads", retried)
|
||
return 0
|
||
if args.command == "migrate-archive":
|
||
migrated = pipeline.migrate_archive()
|
||
logging.info("Migrated %s archived sources", migrated)
|
||
return 0
|
||
if args.command == "scan":
|
||
if args.basename:
|
||
pair = next((p for p in pipeline.scan_pairs() if p.basename == args.basename), None)
|
||
if pair is None:
|
||
raise PipelineError(f"No matching pair found for basename: {args.basename}")
|
||
pipeline.process_pair(pair)
|
||
return 0
|
||
pipeline.run_scan()
|
||
return 0
|
||
if args.command == "reprocess":
|
||
pair = next((p for p in pipeline.scan_pairs() if p.basename == args.basename), None)
|
||
if pair is None:
|
||
raise PipelineError(f"No matching pair found for basename: {args.basename}")
|
||
pipeline.process_pair(pair)
|
||
return 0
|
||
finally:
|
||
pipeline.close()
|
||
return 1
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
raise SystemExit(run_cli(sys.argv[1:]))
|
||
except PipelineError as exc:
|
||
logging.error("%s", exc)
|
||
raise SystemExit(1)
|