原教程在这里MAC破解QQ数据库并且导出多种格式数据脚本

这里只是对原教程中有更改的部分进行补充说明

在原教程的解密数据库位置变动到

/Users/{user}/Library/Containers/qq/Data/Library/Application Support/QQ/nt_qq_{MD5}/nt_db

md5的值会改变,但路径大致就是这样。

你可以手动将nt_msg.db,profile_info.db复制出来,再通过

tail -c +1025 nt_msg.db > nt_msg_1.clean.db
tail -c +1025 profile_info.db > profile_info_1.clean.db

移除无关文件头。
其余步骤跟着原教程走就好,在最后的python脚本中,由于某些数据库表名称的修改,现在已经没办法直接导出消息了。
于是我修改了一下,但是现在只写了导出json的代码

import ast
import json
import sqlite3
import time
import os
import blackboxprotobuf


# 获取内容的详细信息
def get_content_s(content):
    s = ""
    for c in content:
        if c["45002"] == 1:
            # 文本消息
            if isinstance(c.get("45101"), bytes):
                s += get_content(c["45101"])  # 解码文本消息内容
            else:
                s += str(c["45101"])  # 非字节数据,直接使用原始内容
        elif c["45002"] == 2:
            # 图片消息
            s += f"[img:file={c['45402']},url=https://c2cpicdw.qpic.cn/{c['45802']}]"
        elif c["45002"] == 3:
            # 文件消息
            s += f"[file:{c['45402']}]"
        elif c["45002"] == 5:
            # 视频消息
            s += f"[video:file={c['45402']},url=......]"
        elif c["45002"] == 6:
            # 表情消息
            if c["45003"] == 1:
                s += f"[普通表情:{c['47601']}]"
            elif c["45003"] in [2, 3]:
                s += f"[大表情:{c['47602']}]"
        elif c["45002"] == 7:
            # 回复消息
            s += f"[回复:{c['47423']['45101']}]"
        elif c["45002"] == 10:
            # 邮件消息
            s += f"[mail:{c['47901']}]"
        else:
            s += f"[未知消息类型:{c['45002']}]"
    return s


# 获取消息内容
def get_content(content):
    if isinstance(content, dict):
        return get_content_s([content])
    elif isinstance(content, list):
        return get_content_s(content)
    elif isinstance(content, bytes):
        for encoding in ['utf-8', 'gbk', 'utf-16']:
            try:
                return content.decode(encoding)
            except UnicodeDecodeError:
                continue
        return str(content)
    elif isinstance(content, str):
        if content.startswith("b'") or content.startswith('b"'):
            try:
                byte_content = bytes.fromhex(content[2:-1].replace("\\x", ""))
                return byte_content.decode('utf-8')
            except Exception:
                return content
        return content
    return str(content)


# 定义Person类
class Person:
    def __init__(self, col):
        self.id = col[0]
        self.name = col[1]
        self.remark = col[2] if col[2] != "" else col[1]


# 定义Chat类
class Chat:
    def __init__(self, me, you, persons: [Person]):
        self.me = get_name(me, persons)
        self.you = get_name(you, persons)
        self.p = {me, you}
        self.msgs = []

    def store_json(self, export_path: str):
        if not os.path.exists(export_path + "JSON"):
            os.makedirs(export_path + "JSON")
        with open(export_path + "JSON/" + f"{self.you}_{self.me}_{self.p}.json".replace('/', ''), "w",
                  encoding="utf-8") as f:
            json.dump({"count": len(self.msgs), "data": [msg.__dict__ for msg in self.msgs]}, f, ensure_ascii=False,
                      indent=4)


# 定义Msg类
class Msg:
    def __init__(self, col):
        self.id = col[2]
        self.time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(col[1]))
        self.sender_id = col[3]
        self.sender_name = ""
        self.recipient_id = ME if col[4] == col[3] else col[4]
        self.recipient_name = ""
        self.content = get_content(blackboxprotobuf.decode_message(col[0])[0].get('40800', b""))


# 获取联系人名称
def get_name(id, persons: [Person]):
    for person in persons:
        if id == ME:
            return ""
        if person.id == id:
            return person.remark if person.remark != "" else person.name
    return ""


# 获取所有消息
def get_msg(cur) -> list[Msg]:
    cur.execute(f"""SELECT "40800","40050","40001","40033","40030","40040" FROM c2c_msg_table order by "40050";""")
    M = []
    for col in cur:
        try:
            msg = Msg(col)
            M.append(msg)
        except Exception as e:
            print(f"Error processing message: {e}")
            continue
    return M


# 获取联系人信息
def get_person(db_path: str) -> list[Person]:
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()
    cur.execute(f"""SELECT "1002","20002","20009" FROM profile_info_v6 ORDER by "20009" DESC limit {LIMIT_LEN};""")
    P = []
    for col in cur:
        person = Person(col)
        P.append(person)
    cur.close()
    return P


# 设置备注
def set_remark(persons: [Person], msgs: [Msg]):
    for msg in msgs:
        for person in persons:
            if msg.sender_id == person.id:
                msg.sender_name = person.remark
            elif msg.recipient_id == person.id:
                msg.recipient_name = person.remark
            if msg.sender_id == ME:
                msg.sender_name = ME_NAME
            elif msg.recipient_id == ME:
                msg.recipient_name = ME_NAME
    return msgs


# 存储所有聊天记录
def store_p(export_path: str, msgs: [Msg], persons: [Person], ft: list[str]):
    store_all(export_path, msgs, ft)
    chats = []
    chat_ps = []
    for msg in msgs:
        if msg.sender_id == 0 or msg.recipient_id == 0:
            continue
        if {msg.sender_id, msg.recipient_id} not in chat_ps:
            chat_ps.append({msg.sender_id, msg.recipient_id})
            chat = Chat(msg.sender_id, msg.recipient_id, persons)
            chat.msgs.append(msg)
            chats.append(chat)
        else:
            for chat in chats:
                if {msg.sender_id, msg.recipient_id} == chat.p:
                    chat.msgs.append(msg)

    print("共有", len(chats), "个聊天")
    for chat in chats:
        if "json" in ft:
            chat.store_json(export_path)
        # Add more file formats as needed...


# 存储全部记录为JSON等格式
def store_all(export_path: str, msgs: [Msg], ft: list[str]):
    if "json" in ft:
        with open(export_path + "total_chat.json", "w", encoding="utf-8") as f:
            json.dump([msg.__dict__ for msg in msgs], f, ensure_ascii=False, indent=4)


# 导出聊天记录
def export_chat(export_path: str, db_path: list[str], ft: list[str]):
    conn = sqlite3.connect(db_path[0])
    cur = conn.cursor()
    msgs = get_msg(cur)
    persons = get_person(db_path[1])
    msgs = set_remark(persons, msgs)
    store_p(export_path, msgs, persons, ft)
    conn.close()


if __name__ == "__main__":
    ME = 0  # 你的QQ号
    ME_NAME = "0"  # 你的QQ昵称
    LIMIT_LEN = 2000  # 限制导出的人数
    export_chat("data/", ["data/nt_msg_1.clean.db", "data/profile_info.clean_1.db"], ["json"])

请自行修改最后的部分,然后就可以正常导出聊天数据了

最后修改:2024 年 12 月 06 日
如果觉得我的文章对你有用,请随意赞赏