原教程在这里MAC破解QQ数据库并且导出多种格式数据脚本
这里只是对原教程中有更改的部分进行补充说明
在原教程的解密数据库位置变动到
/Users/{user}/Library/Containers/qq/Data/Library/Application Support/QQ/nt_qq_{MD5}/nt_db
md5的值会改变,但路径大致就是这样。
你可以手动将nt_msg.db,profile_info.db复制出来,再通过
tail -c +1025 nt_msg.db > nt_msg_1.clean.db
tail -c +1025 profile_info.db > profile_info_1.clean.db
移除无关文件头。
其余步骤跟着原教程走就好,在最后的python脚本中,由于某些数据库表名称的修改,现在已经没办法直接导出消息了。
于是我修改了一下,但是现在只写了导出json的代码
import ast
import json
import sqlite3
import time
import os
import blackboxprotobuf
# 获取内容的详细信息
def get_content_s(content):
s = ""
for c in content:
if c["45002"] == 1:
# 文本消息
if isinstance(c.get("45101"), bytes):
s += get_content(c["45101"]) # 解码文本消息内容
else:
s += str(c["45101"]) # 非字节数据,直接使用原始内容
elif c["45002"] == 2:
# 图片消息
s += f"[img:file={c['45402']},url=https://c2cpicdw.qpic.cn/{c['45802']}]"
elif c["45002"] == 3:
# 文件消息
s += f"[file:{c['45402']}]"
elif c["45002"] == 5:
# 视频消息
s += f"[video:file={c['45402']},url=......]"
elif c["45002"] == 6:
# 表情消息
if c["45003"] == 1:
s += f"[普通表情:{c['47601']}]"
elif c["45003"] in [2, 3]:
s += f"[大表情:{c['47602']}]"
elif c["45002"] == 7:
# 回复消息
s += f"[回复:{c['47423']['45101']}]"
elif c["45002"] == 10:
# 邮件消息
s += f"[mail:{c['47901']}]"
else:
s += f"[未知消息类型:{c['45002']}]"
return s
# 获取消息内容
def get_content(content):
if isinstance(content, dict):
return get_content_s([content])
elif isinstance(content, list):
return get_content_s(content)
elif isinstance(content, bytes):
for encoding in ['utf-8', 'gbk', 'utf-16']:
try:
return content.decode(encoding)
except UnicodeDecodeError:
continue
return str(content)
elif isinstance(content, str):
if content.startswith("b'") or content.startswith('b"'):
try:
byte_content = bytes.fromhex(content[2:-1].replace("\\x", ""))
return byte_content.decode('utf-8')
except Exception:
return content
return content
return str(content)
# 定义Person类
class Person:
def __init__(self, col):
self.id = col[0]
self.name = col[1]
self.remark = col[2] if col[2] != "" else col[1]
# 定义Chat类
class Chat:
def __init__(self, me, you, persons: [Person]):
self.me = get_name(me, persons)
self.you = get_name(you, persons)
self.p = {me, you}
self.msgs = []
def store_json(self, export_path: str):
if not os.path.exists(export_path + "JSON"):
os.makedirs(export_path + "JSON")
with open(export_path + "JSON/" + f"{self.you}_{self.me}_{self.p}.json".replace('/', ''), "w",
encoding="utf-8") as f:
json.dump({"count": len(self.msgs), "data": [msg.__dict__ for msg in self.msgs]}, f, ensure_ascii=False,
indent=4)
# 定义Msg类
class Msg:
def __init__(self, col):
self.id = col[2]
self.time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(col[1]))
self.sender_id = col[3]
self.sender_name = ""
self.recipient_id = ME if col[4] == col[3] else col[4]
self.recipient_name = ""
self.content = get_content(blackboxprotobuf.decode_message(col[0])[0].get('40800', b""))
# 获取联系人名称
def get_name(id, persons: [Person]):
for person in persons:
if id == ME:
return ""
if person.id == id:
return person.remark if person.remark != "" else person.name
return ""
# 获取所有消息
def get_msg(cur) -> list[Msg]:
cur.execute(f"""SELECT "40800","40050","40001","40033","40030","40040" FROM c2c_msg_table order by "40050";""")
M = []
for col in cur:
try:
msg = Msg(col)
M.append(msg)
except Exception as e:
print(f"Error processing message: {e}")
continue
return M
# 获取联系人信息
def get_person(db_path: str) -> list[Person]:
conn = sqlite3.connect(db_path)
cur = conn.cursor()
cur.execute(f"""SELECT "1002","20002","20009" FROM profile_info_v6 ORDER by "20009" DESC limit {LIMIT_LEN};""")
P = []
for col in cur:
person = Person(col)
P.append(person)
cur.close()
return P
# 设置备注
def set_remark(persons: [Person], msgs: [Msg]):
for msg in msgs:
for person in persons:
if msg.sender_id == person.id:
msg.sender_name = person.remark
elif msg.recipient_id == person.id:
msg.recipient_name = person.remark
if msg.sender_id == ME:
msg.sender_name = ME_NAME
elif msg.recipient_id == ME:
msg.recipient_name = ME_NAME
return msgs
# 存储所有聊天记录
def store_p(export_path: str, msgs: [Msg], persons: [Person], ft: list[str]):
store_all(export_path, msgs, ft)
chats = []
chat_ps = []
for msg in msgs:
if msg.sender_id == 0 or msg.recipient_id == 0:
continue
if {msg.sender_id, msg.recipient_id} not in chat_ps:
chat_ps.append({msg.sender_id, msg.recipient_id})
chat = Chat(msg.sender_id, msg.recipient_id, persons)
chat.msgs.append(msg)
chats.append(chat)
else:
for chat in chats:
if {msg.sender_id, msg.recipient_id} == chat.p:
chat.msgs.append(msg)
print("共有", len(chats), "个聊天")
for chat in chats:
if "json" in ft:
chat.store_json(export_path)
# Add more file formats as needed...
# 存储全部记录为JSON等格式
def store_all(export_path: str, msgs: [Msg], ft: list[str]):
if "json" in ft:
with open(export_path + "total_chat.json", "w", encoding="utf-8") as f:
json.dump([msg.__dict__ for msg in msgs], f, ensure_ascii=False, indent=4)
# 导出聊天记录
def export_chat(export_path: str, db_path: list[str], ft: list[str]):
conn = sqlite3.connect(db_path[0])
cur = conn.cursor()
msgs = get_msg(cur)
persons = get_person(db_path[1])
msgs = set_remark(persons, msgs)
store_p(export_path, msgs, persons, ft)
conn.close()
if __name__ == "__main__":
ME = 0 # 你的QQ号
ME_NAME = "0" # 你的QQ昵称
LIMIT_LEN = 2000 # 限制导出的人数
export_chat("data/", ["data/nt_msg_1.clean.db", "data/profile_info.clean_1.db"], ["json"])
请自行修改最后的部分,然后就可以正常导出聊天数据了