зеркало из
https://github.com/docxology/cognitive.git
synced 2025-10-30 12:46:04 +02:00
466 строки
19 KiB
Python
466 строки
19 KiB
Python
import json
|
|
import csv
|
|
from pathlib import Path
|
|
from typing import Dict, Set, List, Tuple
|
|
import re
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from tqdm import tqdm
|
|
import time
|
|
|
|
class LinkAnalyzer:
|
|
def __init__(self, output_dir: Path):
|
|
"""Initialize link analyzer with output directory.
|
|
|
|
Args:
|
|
output_dir (Path): Path to directory containing analysis files
|
|
"""
|
|
self.output_dir = output_dir
|
|
|
|
# Increase CSV field size limit
|
|
csv.field_size_limit(2**30) # Set to a large value
|
|
|
|
# Load data files
|
|
try:
|
|
self.structure = self._load_json('file_structure.json')
|
|
self.files = self._load_csv('files.csv')
|
|
self.wikilinks = self._load_csv('wikilinks.csv')
|
|
self.existing_files = self._get_existing_files()
|
|
self.link_graph = self._build_link_graph()
|
|
except Exception as e:
|
|
print(f"Error loading data files: {e}")
|
|
raise
|
|
|
|
def _load_json(self, filename: str) -> Dict:
|
|
"""Load JSON file from output directory."""
|
|
try:
|
|
with open(self.output_dir / filename, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
print(f"Error loading {filename}: {e}")
|
|
raise
|
|
|
|
def _load_csv(self, filename: str) -> List[Dict]:
|
|
"""Load CSV file from output directory."""
|
|
try:
|
|
with open(self.output_dir / filename, 'r', encoding='utf-8') as f:
|
|
return list(csv.DictReader(f))
|
|
except Exception as e:
|
|
print(f"Error loading {filename}: {e}")
|
|
raise
|
|
|
|
def _get_existing_files(self) -> Set[str]:
|
|
"""Get set of existing file paths."""
|
|
return {f['path'] for f in self.files}
|
|
|
|
def _build_link_graph(self) -> Dict[str, Set[str]]:
|
|
"""Build graph of file links."""
|
|
graph = defaultdict(set)
|
|
for link in self.wikilinks:
|
|
source = link['source']
|
|
target = link['target']
|
|
graph[source].add(target)
|
|
return dict(graph)
|
|
|
|
def _find_broken_links(self) -> Dict[str, Set[str]]:
|
|
"""Find broken links in the knowledge base."""
|
|
broken_links = defaultdict(set)
|
|
for source, targets in self.link_graph.items():
|
|
for target in targets:
|
|
target_path = f"knowledge_base/{target}.md"
|
|
if target_path not in self.existing_files:
|
|
broken_links[source].add(target)
|
|
return dict(broken_links)
|
|
|
|
def _find_ambiguous_links(self) -> Dict[str, List[str]]:
|
|
"""Find ambiguous links that could refer to multiple files."""
|
|
ambiguous = {}
|
|
for source, targets in self.link_graph.items():
|
|
for target in targets:
|
|
matches = []
|
|
pattern = f".*{target}.*\\.md$"
|
|
for file in self.existing_files:
|
|
if re.match(pattern, file, re.IGNORECASE):
|
|
matches.append(file)
|
|
if len(matches) > 1:
|
|
ambiguous[f"{source} -> {target}"] = matches
|
|
return ambiguous
|
|
|
|
def _find_missing_backlinks(self) -> Dict[str, Set[str]]:
|
|
"""Find files that should have reciprocal links."""
|
|
missing_backlinks = defaultdict(set)
|
|
for source, targets in self.link_graph.items():
|
|
for target in targets:
|
|
target_path = f"knowledge_base/{target}.md"
|
|
if target_path in self.existing_files:
|
|
if source not in self.link_graph.get(target_path, set()):
|
|
missing_backlinks[target_path].add(source)
|
|
return dict(missing_backlinks)
|
|
|
|
def _similarity_score(self, a: str, b: str) -> float:
|
|
"""Compute similarity score between two strings."""
|
|
a = a.lower()
|
|
b = b.lower()
|
|
if not a or not b:
|
|
return 0.0
|
|
intersection = len(set(a) & set(b))
|
|
union = len(set(a) | set(b))
|
|
return intersection / union if union > 0 else 0.0
|
|
|
|
def suggest_fixes(self) -> Dict[str, List[Dict]]:
|
|
"""Suggest fixes for link issues."""
|
|
fixes = defaultdict(list)
|
|
|
|
# Check broken links
|
|
broken = self._find_broken_links()
|
|
for source, targets in broken.items():
|
|
for target in targets:
|
|
# Find similar existing files
|
|
suggestions = []
|
|
for existing in self.existing_files:
|
|
score = self._similarity_score(target, Path(existing).stem)
|
|
if score > 0.5: # Threshold for similarity
|
|
suggestions.append((existing, score))
|
|
suggestions.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
fix = {
|
|
'type': 'broken_link',
|
|
'source': source,
|
|
'target': target,
|
|
'suggestions': [s[0] for s in suggestions[:3]]
|
|
}
|
|
fixes[source].append(fix)
|
|
|
|
# Check ambiguous links
|
|
ambiguous = self._find_ambiguous_links()
|
|
for link, matches in ambiguous.items():
|
|
source, target = link.split(' -> ')
|
|
fix = {
|
|
'type': 'ambiguous_link',
|
|
'source': source,
|
|
'target': target,
|
|
'matches': matches
|
|
}
|
|
fixes[source].append(fix)
|
|
|
|
# Check missing backlinks
|
|
missing = self._find_missing_backlinks()
|
|
for target, sources in missing.items():
|
|
for source in sources:
|
|
fix = {
|
|
'type': 'missing_backlink',
|
|
'source': target,
|
|
'target': source
|
|
}
|
|
fixes[target].append(fix)
|
|
|
|
return dict(fixes)
|
|
|
|
def _get_template_for_link(self, link_name: str) -> str:
|
|
"""Generate template content for a new file."""
|
|
return f"""---
|
|
title: {link_name.replace('_', ' ').title()}
|
|
type: knowledge_base
|
|
status: draft
|
|
created: {datetime.now().strftime('%Y-%m-%d')}
|
|
tags:
|
|
- todo
|
|
semantic_relations:
|
|
- type: related
|
|
links: []
|
|
---
|
|
|
|
# {link_name.replace('_', ' ').title()}
|
|
|
|
[TODO: Add content]
|
|
"""
|
|
|
|
def apply_fixes(self, fixes: Dict[str, List[Dict]], kb_root: Path) -> List[Dict]:
|
|
"""Apply suggested fixes to the knowledge base."""
|
|
changes = []
|
|
|
|
for source, file_fixes in fixes.items():
|
|
source_path = kb_root / source.lstrip('/')
|
|
if not source_path.exists():
|
|
continue
|
|
|
|
content = source_path.read_text(encoding='utf-8')
|
|
modified = False
|
|
|
|
for fix in file_fixes:
|
|
if fix['type'] == 'broken_link':
|
|
if fix['suggestions']:
|
|
# Replace broken link with first suggestion
|
|
old_link = f"[[{fix['target']}]]"
|
|
new_link = f"[[{Path(fix['suggestions'][0]).stem}]]"
|
|
if old_link in content:
|
|
content = content.replace(old_link, new_link)
|
|
modified = True
|
|
changes.append({
|
|
'type': 'fixed_link',
|
|
'file': str(source),
|
|
'old': old_link,
|
|
'new': new_link
|
|
})
|
|
else:
|
|
# Create new file for broken link
|
|
new_file = kb_root / f"{fix['target']}.md"
|
|
if not new_file.exists():
|
|
new_file.parent.mkdir(parents=True, exist_ok=True)
|
|
new_file.write_text(
|
|
self._get_template_for_link(fix['target']),
|
|
encoding='utf-8'
|
|
)
|
|
changes.append({
|
|
'type': 'created_file',
|
|
'file': str(new_file)
|
|
})
|
|
|
|
elif fix['type'] == 'missing_backlink':
|
|
target_path = kb_root / fix['target'].lstrip('/')
|
|
if target_path.exists():
|
|
target_content = target_path.read_text(encoding='utf-8')
|
|
# Add backlink in semantic_relations
|
|
if 'semantic_relations:' in target_content:
|
|
lines = target_content.splitlines()
|
|
for i, line in enumerate(lines):
|
|
if line.strip() == 'semantic_relations:':
|
|
# Find the related section or create it
|
|
related_found = False
|
|
for j in range(i+1, len(lines)):
|
|
if ' - type: related' in lines[j]:
|
|
related_found = True
|
|
# Find or create links section
|
|
for k in range(j+1, len(lines)):
|
|
if 'links:' in lines[k]:
|
|
lines[k] = lines[k].rstrip() + f" [[{Path(source).stem}]]"
|
|
break
|
|
break
|
|
if not related_found:
|
|
lines.insert(i+1, ' - type: related\n links: []')
|
|
target_content = '\n'.join(lines)
|
|
target_path.write_text(target_content, encoding='utf-8')
|
|
changes.append({
|
|
'type': 'added_backlink',
|
|
'file': str(target_path),
|
|
'link': str(source)
|
|
})
|
|
break
|
|
|
|
if modified:
|
|
source_path.write_text(content, encoding='utf-8')
|
|
|
|
return changes
|
|
|
|
def generate_report(self, fixes: Dict[str, List[Dict]], changes: List[Dict]) -> str:
|
|
"""Generate a detailed report of link analysis and changes."""
|
|
report = ["# Link Analysis Report\n"]
|
|
|
|
# Summarize issues
|
|
total_issues = sum(len(fixes) for fixes in fixes.values())
|
|
report.append(f"## Summary\nTotal issues found: {total_issues}\n")
|
|
|
|
# Detail fixes by type
|
|
report.append("## Issues by Type\n")
|
|
|
|
# Collect all issues by type
|
|
broken_links = []
|
|
ambiguous_links = []
|
|
missing_backlinks = []
|
|
|
|
for source_file, file_fixes in fixes.items():
|
|
for fix in file_fixes:
|
|
if fix['type'] == 'broken_link':
|
|
broken_links.append({
|
|
'source': source_file,
|
|
'target': fix['target'],
|
|
'suggestions': fix.get('suggestions', [])
|
|
})
|
|
elif fix['type'] == 'ambiguous_link':
|
|
ambiguous_links.append({
|
|
'source': source_file,
|
|
'target': fix['target'],
|
|
'matches': fix.get('matches', [])
|
|
})
|
|
elif fix['type'] == 'missing_backlink':
|
|
missing_backlinks.append({
|
|
'source': fix['source'],
|
|
'target': fix['target']
|
|
})
|
|
|
|
# Report broken links
|
|
if broken_links:
|
|
report.append(f"### Broken Links ({len(broken_links)})\n")
|
|
report.append("Links that point to non-existent files:\n")
|
|
for issue in broken_links:
|
|
report.append(f"- In `{issue['source']}`: [[{issue['target']}]]")
|
|
if issue['suggestions']:
|
|
report.append(" Suggestions:")
|
|
for suggestion in issue['suggestions']:
|
|
report.append(f" - `{suggestion}`")
|
|
report.append("")
|
|
|
|
# Report ambiguous links
|
|
if ambiguous_links:
|
|
report.append(f"### Ambiguous Links ({len(ambiguous_links)})\n")
|
|
report.append("Links that could refer to multiple files:\n")
|
|
for issue in ambiguous_links:
|
|
report.append(f"- In `{issue['source']}`: [[{issue['target']}]]")
|
|
report.append(" Could refer to:")
|
|
for match in issue['matches']:
|
|
report.append(f" - `{match}`")
|
|
report.append("")
|
|
|
|
# Report missing backlinks
|
|
if missing_backlinks:
|
|
report.append(f"### Missing Backlinks ({len(missing_backlinks)})\n")
|
|
report.append("Files that should have reciprocal links:\n")
|
|
for issue in missing_backlinks:
|
|
report.append(f"- `{issue['source']}` should link back to `{issue['target']}`")
|
|
report.append("")
|
|
|
|
# List changes made
|
|
report.append("## Changes Applied")
|
|
change_types = defaultdict(list)
|
|
for change in changes:
|
|
change_types[change['type']].append(change)
|
|
|
|
for change_type, type_changes in change_types.items():
|
|
report.append(f"\n### {change_type.replace('_', ' ').title()}")
|
|
for change in type_changes:
|
|
if change_type == 'fixed_link':
|
|
report.append(f"- In {change['file']}: {change['old']} → {change['new']}")
|
|
elif change_type == 'created_file':
|
|
report.append(f"- Created {change['file']}")
|
|
elif change_type == 'added_backlink':
|
|
report.append(f"- Added backlink in {change['file']} to {change['link']}")
|
|
|
|
return '\n'.join(report)
|
|
|
|
def generate_summary_report(self, fixes: Dict[str, List[Dict]], changes: List[Dict]) -> str:
|
|
"""Generate a concise summary report with samples of each issue type."""
|
|
report = ["# Link Analysis Summary Report\n"]
|
|
|
|
# Overall statistics
|
|
total_issues = sum(len(fixes) for fixes in fixes.values())
|
|
report.append(f"## Overall Statistics\n")
|
|
report.append(f"- Total files analyzed: {len(self.files)}")
|
|
report.append(f"- Total issues found: {total_issues}")
|
|
report.append(f"- Total files with issues: {len(fixes)}\n")
|
|
|
|
# Collect issues by type
|
|
broken_links = []
|
|
ambiguous_links = []
|
|
missing_backlinks = []
|
|
|
|
for source_file, file_fixes in fixes.items():
|
|
for fix in file_fixes:
|
|
if fix['type'] == 'broken_link':
|
|
broken_links.append({
|
|
'source': source_file,
|
|
'target': fix['target'],
|
|
'suggestions': fix.get('suggestions', [])
|
|
})
|
|
elif fix['type'] == 'ambiguous_link':
|
|
ambiguous_links.append({
|
|
'source': source_file,
|
|
'target': fix['target'],
|
|
'matches': fix.get('matches', [])
|
|
})
|
|
elif fix['type'] == 'missing_backlink':
|
|
missing_backlinks.append({
|
|
'source': fix['source'],
|
|
'target': fix['target']
|
|
})
|
|
|
|
# Report issue type summaries with samples
|
|
report.append("## Issue Type Summary\n")
|
|
|
|
# Broken links summary
|
|
if broken_links:
|
|
report.append(f"### Broken Links: {len(broken_links)} issues\n")
|
|
report.append("Sample issues (up to 5):")
|
|
for issue in broken_links[:5]:
|
|
report.append(f"- In `{issue['source']}`: [[{issue['target']}]]")
|
|
if issue['suggestions']:
|
|
report.append(" Suggestions:")
|
|
for suggestion in issue['suggestions'][:3]:
|
|
report.append(f" - `{suggestion}`")
|
|
report.append("")
|
|
|
|
# Ambiguous links summary
|
|
if ambiguous_links:
|
|
report.append(f"### Ambiguous Links: {len(ambiguous_links)} issues\n")
|
|
report.append("Sample issues (up to 5):")
|
|
for issue in ambiguous_links[:5]:
|
|
report.append(f"- In `{issue['source']}`: [[{issue['target']}]]")
|
|
report.append(" Could refer to:")
|
|
for match in issue['matches'][:3]:
|
|
report.append(f" - `{match}`")
|
|
report.append("")
|
|
|
|
# Missing backlinks summary
|
|
if missing_backlinks:
|
|
report.append(f"### Missing Backlinks: {len(missing_backlinks)} issues\n")
|
|
report.append("Sample issues (up to 5):")
|
|
for issue in missing_backlinks[:5]:
|
|
report.append(f"- `{issue['source']}` should link back to `{issue['target']}`")
|
|
report.append("")
|
|
|
|
# Add recommendations
|
|
report.append("## Recommendations\n")
|
|
report.append("1. Start by fixing broken links, as they affect content accessibility")
|
|
report.append("2. Resolve ambiguous links to ensure correct references")
|
|
report.append("3. Add missing backlinks to improve navigation")
|
|
report.append("\nNote: Use the full report for complete issue details.")
|
|
|
|
return '\n'.join(report)
|
|
|
|
def main():
|
|
"""Main function to analyze and fix links."""
|
|
script_dir = Path(__file__).parent
|
|
output_dir = script_dir / 'output'
|
|
kb_root = script_dir.parent / 'knowledge_base'
|
|
|
|
start_time = time.time()
|
|
|
|
print("\n🔍 Knowledge Base Link Analysis")
|
|
print("=" * 40)
|
|
|
|
print("\n⚡ Initializing link analyzer...")
|
|
analyzer = LinkAnalyzer(output_dir)
|
|
|
|
print("\n📊 Analyzing link issues...")
|
|
with tqdm(total=3, desc="Progress", unit="step") as pbar:
|
|
fixes = analyzer.suggest_fixes()
|
|
pbar.update(1)
|
|
|
|
print("\n🛠️ Applying fixes...")
|
|
changes = analyzer.apply_fixes(fixes, kb_root)
|
|
pbar.update(1)
|
|
|
|
print("\n📝 Generating reports...")
|
|
report = analyzer.generate_report(fixes, changes)
|
|
summary = analyzer.generate_summary_report(fixes, changes)
|
|
pbar.update(1)
|
|
|
|
# Save reports
|
|
report_file = output_dir / 'link_analysis_report.md'
|
|
summary_file = output_dir / 'link_analysis_summary.md'
|
|
report_file.write_text(report, encoding='utf-8')
|
|
summary_file.write_text(summary, encoding='utf-8')
|
|
|
|
end_time = time.time()
|
|
duration = end_time - start_time
|
|
|
|
print(f"\n✨ Analysis complete in {duration:.2f}s")
|
|
print(f"📄 Full report saved to {report_file}")
|
|
print(f"📑 Summary report saved to {summary_file}")
|
|
print("\nSummary:")
|
|
print(f"🔗 Fixed links: {sum(1 for c in changes if c['type'] == 'fixed_link')}")
|
|
print(f"📁 Created files: {sum(1 for c in changes if c['type'] == 'created_file')}")
|
|
print(f"🔄 Added backlinks: {sum(1 for c in changes if c['type'] == 'added_backlink')}")
|
|
print("=" * 40 + "\n")
|
|
|
|
if __name__ == "__main__":
|
|
main() |