几年前用思源,但是思源无序列表在移动端真的好丑,就继续用 logseq。
两三个月前,用 ai 搞了下 css,把无序列表的显示效果和 logseq 对齐。
又在论坛发现了手机端快速分享的 http 快捷方式,替代了 logseq 的 quick capture
安卓版 quickAdd V3,从任意位置向思源笔记发送文字与文件
就写了个 py 脚本把 logseq 数据转思源了
说明:
本脚本只完成了 双链[[]] #标签 日记形如 2025_09_08.md 迁移至思源
思源通过 Markdown.zip 导入 logseq 数据,并不能创建双链页面。思源默认创建页面的位置是在日记页的下级创建,但这样写代码逻辑太码烦,我也不喜欢这样,所以结构和 logseq 保持一致
- 文件夹名
- pages
- daily note
同时 logseq 的日记名,必须像 2025_09_08.md 这样,不是的让 ai 改下 py 脚本
具体变量看脚本内的说明
import os
import re
import time
import random
import string
import datetime
import httpx
from concurrent.futures import ThreadPoolExecutor, wait
"""
本脚本只完成了 双链[[]] #标签 日记形如2025_09_08.md 迁移至思源
设置-文档树
新建文档存放位置 填 /pages
块引新建文档存放位置 填 /pages
然后在daily note下随便一个页面中创建一个新页面,会在创建一个与daily note同级的pages,然后可以把pages下新建的页面删除
也可以把daily note下年份等都删除,此时文件夹结构为
- 文件夹名
- pages
- daily note
NOTEBOOK_ID
文件夹名 点击··· 设置 复制id
PAGES_PARENT_ID
pages 点击··· 复制 复制id
DAILY_PARENT_ID
daily note 点击··· 复制 复制id
AUTH_TOKEN
设置 关于 API token 复制
MARKDOWN_DIR
为logseq文件夹 建议单独把 journals 和 pages 这两文件夹复制出来
"""
# ---------------------- 全局配置 ----------------------
API_URL = "http://127.0.0.1:6806/api/filetree/createDocWithMd"
API_APPEND = "http://127.0.0.1:6806/api/block/appendBlock"
API_SETATTRS = "http://127.0.0.1:6806/api/attr/setBlockAttrs"
AUTH_TOKEN = "usiv1cwdpsq2spw4"
NOTEBOOK_ID = "20250909015054-7hbszfv"
DAILY_PARENT_ID = "20250909015058-5boadzw"
PAGES_PARENT_ID = "20250909015122-xxm3oa7"
MARKDOWN_DIR = "./lsq"
CREATED_PATHS = set()
import re
def extract_logseq_links_and_tags(text):
# 1. 剔除所有需要忽略的整块
text = re.sub(r'#\+BEGIN_QUERY.*?#\+END_QUERY', '', text, flags=re.DOTALL)
text = re.sub(r'```.*?```', '', text, flags=re.DOTALL)
text = re.sub(r'`.*?`', '', text, flags=re.DOTALL)
text = re.sub(r'!?$$.*?$$', '', text, flags=re.DOTALL)
text = re.sub(r'https?://[^\s<>\]\)]+', '', text, flags=re.DOTALL)
# 2. 剔除 Markdown 链接 [label](url) 里的 label 部分
text = re.sub(r'(?<!\!)\[([^\[\]]*)\]\([^\)]*\)', '', text)
# 3. 提取 [[...]]
links = re.findall(r'\[\[(.*?)\]\]', text)
# 4. 提取 #标签,排除 ##,并确保不在代码或链接内
tags = re.findall(r'(?:^|(?<=\s))#([^\s#]+)', text)
return links, tags
# -------------------- 时间处理 ------------------------
E8 = datetime.timezone(datetime.timedelta(hours=8))
def parse_date_from_filename(fname):
"""返回 datetime 或 None"""
m = re.fullmatch(r'(\d{4})_(\d{2})_(\d{2})\.md', fname)
if m:
return datetime.datetime(int(m.group(1)), int(m.group(2)), int(m.group(3)), tzinfo=E8)
return None
def datetime_to_14(dt):
"""20250907171159"""
return dt.strftime('%Y%m%d%H%M%S')
def file_creation_dt(path):
"""返回文件创建时间 datetime(东八区)"""
stat = os.stat(path)
# Android/Unix 创建时间 = st_mtime;如需要 st_birthtime 可改
t = stat.st_mtime
return datetime.datetime.fromtimestamp(t, tz=E8)
# -------------------- API 工具 ------------------------
def random_id(dt_prefix=None):
"""14位时间 + 7位随机"""
prefix = dt_prefix or datetime.datetime.now(tz=E8).strftime('%Y%m%d%H%M%S')
suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=7))
return f'{prefix}-{suffix}'
def setBlockAttrs(parent_id,date):
payload = {
"id": parent_id,
"attrs": {f"custom-dailynote-{date}":date}
}
try:
#print(parent_id,date,payload)
resp = httpx.post(API_SETATTRS, json=payload,headers={"Authorization": f"token {AUTH_TOKEN}"},
timeout=30)
resp.raise_for_status()
except Exception as e:
print('[API_APPEND_ERR]', e, payload)
def appendblock(markdown, parent_id, path, id_=None):
"""真正调用思源 API"""
payload = {
"parentID": parent_id,
"dataType": "markdown",
"data": markdown,
}
try:
#print(path)
resp = httpx.post(API_APPEND, json=payload,headers={"Authorization": f"token {AUTH_TOKEN}"},
timeout=30)
resp.raise_for_status()
except Exception as e:
print('[API_APPEND_ERR]', e, payload)
return resp.json()
def get_id_by_hpath(hpath: str) -> str | None:
"""
通过“笔记本相对路径”/pages/<标签> 查询对应文档 id
若不存在则返回 None
"""
url = "http://127.0.0.1:6806/api/filetree/getIDsByHPath"
try:
r = httpx.post(
url,
json={"notebook": NOTEBOOK_ID, "path": hpath},headers={"Authorization": f"token {AUTH_TOKEN}"},
timeout=30
)
r.raise_for_status()
data = r.json()
if data.get("code") == 0 and data.get("data"):
return data["data"][0]
except Exception as e:
print("[get_id_by_hpath error]", e, hpath)
return None
def create_doc(markdown, parent_id, path, id_=None):
"""真正调用思源 API:如 path 已存在则 appendBlock;否则创建。"""
# 已缓存
if path in CREATED_PATHS:
return get_id_by_hpath(f"/{path}")
# 先查询
did = get_id_by_hpath(f"/{path}")
if did:
CREATED_PATHS.add(path)
# 非空内容才追加
print("appendblock 空 ",path,id_)
if markdown and markdown.strip():
appendblock(markdown, did, path)
print("appendblock",path,id_)
return did
# 不存在则创建
id_ = id_ or random_id()
payload = {
"notebook": NOTEBOOK_ID,
"parentID": parent_id,
"path": path,
"markdown": markdown,
"id": id_
}
try:
resp = httpx.post(API_URL,
json=payload,
headers={"Authorization": f"token {AUTH_TOKEN}"},
timeout=30)
resp.raise_for_status()
#print("createdoc",path,id_)
CREATED_PATHS.add(path)
except Exception as e:
print('[API_ERR]', e, payload)
return id_
# ---------------- 目录扫描 & 数据收集 ----------------
def scan_markdown_dir():
"""
返回 (name2ts, daily2file, tag2file)
name2ts : 字符串(标签/双链原文) -> 最早出现时间戳
daily2file: datetime.date -> 文件绝对路径
tag2file : 字符串(标签/双链原文) -> 本地同名 .md 绝对路径 或 None
"""
from collections import defaultdict
import datetime as dt
name2ts = defaultdict(lambda: float('inf'))
daily2file = {}
tag2file = {} # 仅记录非 daily 文件
# 先把 daily 文件放进集合,方便后面排除
daily_paths = set()
for root, _, files in os.walk(MARKDOWN_DIR):
for fname in files:
if not fname.endswith('.md'):
continue
full_path = os.path.join(root, fname)
# 判断是否为 daily
date_obj = parse_date_from_filename(fname)
if date_obj:
ts = int(date_obj.timestamp())
daily2file[date_obj] = full_path
daily_paths.add(full_path)
else:
ts = int(file_creation_dt(full_path).timestamp())
with open(full_path, encoding='utf-8') as f:
content = f.read()
links, tags = extract_logseq_links_and_tags(content)
# 合并并记录最早时间戳
for raw in links + tags:
name2ts[raw] = min(ts, name2ts[raw])
# 记录非 daily 文件
if full_path not in daily_paths:
name_without_ext = fname[:-3] # 去掉 .md
tag2file[name_without_ext] = full_path
name2ts = {k: v for k, v in name2ts.items() if v != float('inf')}
# 1. 对 name2ts 排序:最旧(时间戳最小)的在最前
sorted_name2ts = dict(sorted(name2ts.items(), key=lambda kv: kv[1]))
# 2. 对 daily2file 排序:最旧(日期最早)的在最前
sorted_daily2file = dict(sorted(daily2file.items(), key=lambda kv: kv[0]))
#return name2ts, daily2file, tag2file
return sorted_name2ts, sorted_daily2file, tag2file
# ---------------- markdown 处理 ----------------------
# 只用下面这段替换现有的 md_replace_tags 函数
# ---------------- markdown 处理 ----------------------
import re
import os
import re
from typing import List, Tuple, Optional
def md_replace_tags(text: str, file_path: str = "") -> str:
"""
将 [[标签名]] 与 #标签名 替换为思源块引用;
若找不到对应文档,则保留 [[标签名]] 形式并打印提示。
代码 / 查询 / url / 公式 / 链接等区域自动跳过。
"""
# 1. 需要整块跳过的正则列表
skip_patterns = [
r'#\+BEGIN_QUERY[\s\S]*?#\+END_QUERY',
r'```[\s\S]*?```',
r'`[^`]*`',
r'(?<!\!)\$\$[\s\S]*?\$\$(?!\$)',
r'(?<!\!)\$[^$\n]+(?<!\$)\$(?!\$)',
r'https?://[^\s<>\]\)]+',
r'(?<!\!)\[([^\[\]]*)\]\([^\)]*\)',
]
# 2. 收集所有需要跳过的区间
spans: List[Tuple[int, int]] = []
for pat in skip_patterns:
spans.extend((m.start(), m.end()) for m in re.finditer(pat, text))
spans.sort(key=lambda t: t[0])
# 合并重叠 / 相邻区间
merged: List[Tuple[int, int]] = []
for s, e in spans:
if merged and s <= merged[-1][1]:
merged[-1] = (merged[-1][0], max(merged[-1][1], e))
else:
merged.append((s, e))
# 3. 用占位符替换跳过区域,保持长度不变
Placeholder = "\ue000"
cleaned_parts: List[str] = []
last = 0
for s, e in merged:
cleaned_parts.append(text[last:s])
cleaned_parts.append(Placeholder * (e - s))
last = e
cleaned_parts.append(text[last:])
cleaned = "".join(cleaned_parts)
# 4. 定义替换生成函数
TagSpan = Tuple[int, int, str]
def make_replacement(tag: str, m: re.Match[str]) -> Optional[TagSpan]:
did = get_id_by_hpath(f"/pages/{tag}")
if not did:
# 创建空文档
path = f"pages/{tag}"
did = create_doc("", PAGES_PARENT_ID, path)
html = f'<span data-type="block-ref" data-subtype="d" data-id="{did}">{tag}</span>'
return (m.start(), m.end(), html)
# 5. 收集 [[...]] 与 #标签
replacements: List[TagSpan] = []
for m in re.finditer(r"\[\[([^\[\]\n]+)\]\]", cleaned):
repl = make_replacement(m.group(1), m)
if repl:
replacements.append(repl)
for m in re.finditer(r"(?:^|(?<=\s))#([^\s#]+)", cleaned):
repl = make_replacement(m.group(1), m)
if repl:
replacements.append(repl)
# 6. 从后向前替换,避免偏移
for start, end, repl in sorted(replacements, key=lambda x: x[0], reverse=True):
text = text[:start] + repl + text[end:]
return text
# -------------------- 并发任务 ------------------------
def create_tag_pages(name2ts, tag2file):
"""
根据 name2ts 创建页面。
页面内容来源:
- 若 tag2file 中存在同名 .md,则用该文件内容
- 否则为空
"""
#futures = []
#with ThreadPoolExecutor(max_workers=1) as pool:
for name, ts in name2ts.items():
path = f'/pages/{name}'
# 读取内容
file_path = tag2file.get(name)
md = ''
if file_path and os.path.exists(file_path):
with open(file_path, encoding='utf-8') as f:
md = md_replace_tags(f.read(),file_path)
id_prefix = datetime.datetime.fromtimestamp(ts, tz=E8).strftime('%Y%m%d%H%M%S')
#f = pool.submit(create_doc, md, PAGES_PARENT_ID, path, random_id(id_prefix))
#print(path,id_prefix)
create_doc(md, PAGES_PARENT_ID, path, random_id(id_prefix))
#futures.append(f)
#wait(futures)
def create_daily_notes(daily2file):
"""再创建日记"""
#futures = []
#with ThreadPoolExecutor(max_workers=1) as pool:
for date_obj, file_path in daily2file.items():
with open(file_path, encoding='utf-8') as f:
md = md_replace_tags(f.read(),file_path)
path = f'/daily note/{date_obj.year}/{date_obj.month:02d}/{date_obj.strftime("%Y-%m-%d")}'
id_prefix = datetime_to_14(date_obj)
#f = pool.submit(create_doc, md, DAILY_PARENT_ID, path, random_id(id_prefix))
did = create_doc(md, PAGES_PARENT_ID, path, random_id(id_prefix))
if not did:
did = get_id_by_hpath(path)
else:
#print(path,did)
setBlockAttrs(did,str(id_prefix[:8]))
#futures.append(f)
#wait(futures)
# ------------------------ 主流程 ----------------------
def main():
name2ts, daily2file, tag2file = scan_markdown_dir()
print(f'唯一标签/双链 {len(name2ts)} 个,日记 {len(daily2file)} 篇,本地同名文件 {len([v for v in tag2file.values() if v])} 篇。')
create_tag_pages(name2ts, tag2file)
create_daily_notes(daily2file)
print('迁移完成。')
if __name__ == '__main__':
main()
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于