AI--RAG离线流程开发

AI–RAG离线流程开发

0.离线服务流程流程图

  离线流程的开发主要涉及两个脚本:
  • 文件上传服务功能app_file_upload.py
  • 向量知识库管理knowledge_base.py

  

1.文件上传服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
"""
基于streamlist完成web网页上传服务
pip install streamlit
"""

import streamlit as st

#添加网页标题
st.title("文件上传服务")

#file_uploader: 上传文件组件
uploaded_file = st.file_uploader(
"请上传文件",
type=["txt", "pdf", "docx"],
accept_multiple_files=False, # 是否支持上传多个文件
)

if uploaded_file is not None:
# 提取文件的信息
file_name = uploaded_file.name
file_size = uploaded_file.size / 1024 # 单位:KB
file_type = uploaded_file.type

# 显示文件信息 subheader的作用是添加一个二级标题
st.subheader("文件信息")
st.write(f"文件名:{file_name}")
st.write(f"文件大小:{file_size:.2f} KB")
st.write(f"文件类型:{file_type}")

#get_value -> bytes -> decode -> str
file_content = uploaded_file.getvalue().decode("utf-8")
st.subheader("文件内容")
st.write(file_content)

  在编写好代码后,开始启动页面

1
streamlit run .\Z_AI_RAG\app_file_uploader.py

  

2.md5工具函数开发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
"""
知识库
"""

import os
import config_data as config
import hashlib

def check_md5(md5_str: str):
"""
检查传入的md5字符串是否已经被处理过
return False表示未处理过
True表示已处理过

"""
if not os.path.exists(config.md5_file_path):
# if进入表示文件不存在,需要创建文件并直接返回False
open(config.md5_file_path, "w", encoding="utf-8").close()
return False
else:
for line in open(config.md5_file_path, "r", encoding="utf-8").readlines():
line = line.strip() # 去掉行尾的换行符
if line == md5_str:
return True # 表示已处理过
return False

def save_md5(md5_str: str):
""" 保存传入的md5字符串到文件中"""
with open(config.md5_file_path, "a", encoding="utf-8") as f:
f.write(md5_str + "\n")

def get_string_md5(input_str: str,encoding="utf-8"):
""" 将传入的字符串转换为md5字符串"""

# 将字符串转化为字节数组
str_bytes = input_str.encode(encoding = encoding)

md5_obj = hashlib.md5() # 创建md5对象
md5_obj.update(str_bytes) # 更新md5对象的状态
md5_str = md5_obj.hexdigest() # 获取md5字符串
return md5_str


if __name__ == "__main__":
print(check_md5("123456"))
save_md5("123456")
print(check_md5("123456"))

  

3.知识库更新服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
"""
知识库
"""

from importlib import metadata
import os
import config_data as config
import hashlib
from langchain_chroma import Chroma
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from datetime import datetime

def check_md5(md5_str: str):
"""
检查传入的md5字符串是否已经被处理过
return False表示未处理过
True表示已处理过

"""
if not os.path.exists(config.md5_file_path):
# if进入表示文件不存在,需要创建文件并直接返回False
open(config.md5_file_path, "w", encoding="utf-8").close()
return False
else:
for line in open(config.md5_file_path, "r", encoding="utf-8").readlines():
line = line.strip() # 去掉行尾的换行符
if line == md5_str:
return True # 表示已处理过
return False

def save_md5(md5_str: str):
""" 保存传入的md5字符串到文件中"""
with open(config.md5_file_path, "a", encoding="utf-8") as f:
f.write(md5_str + "\n")

def get_string_md5(input_str: str,encoding="utf-8"):
""" 将传入的字符串转换为md5字符串"""

# 将字符串转化为字节数组
str_bytes = input_str.encode(encoding = encoding)

md5_obj = hashlib.md5() # 创建md5对象
md5_obj.update(str_bytes) # 更新md5对象的状态
md5_str = md5_obj.hexdigest() # 获取md5字符串
return md5_str



class KnowledgeBaseService(object):

def __init__(self):
#如果文件夹不存在,创建文件夹
os.makedirs(config.chroma_persist_directory, exist_ok=True)

self.chroma = Chroma(
collection_name=config.chroma_collection_name, # 向量库的名称 表名
embedding_function=DashScopeEmbeddings(model="text-embedding-v4"), # 嵌入函数
persist_directory=config.chroma_persist_directory, # 向量库的持久化目录
)
self.spliter = RecursiveCharacterTextSplitter( # 文本分割器的对象
chunk_size=config.chunk_size, # 每个文本块的最大字符数
chunk_overlap=config.chunk_overlap, # 连续文本块之间的重叠字符数
separators=config.separators, # 自然段落划分的符号
length_function=len, # 使用python自带的计算文本长度的函数
)

def upload_file(self, data, file_name) :
""" 将出入的字符串,进行向量化,存入向量数据库中"""

#先得到传入字符串的md5值
md5_str = get_string_md5(data)

if check_md5(md5_str):
# 如果md5值已经存在,说明文件已经上传过了
return "[跳过]内容已经存在知识库中了"

if len(data) > config.max_splist_chat_number:
# 如果字符串长度超过阈值,需要进行分割
knowledge_chunks : list[str] = self.spliter.split_text(data)
else:
# 如果字符串长度未超过阈值,直接作为一个文本块
knowledge_chunks = [data]

metadata = {
"source": file_name,
# 2026-08-01 12:00:00
"create_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"operator": "onenullpointer",

}

self.chroma.add_texts( #内容就加载到向量库中了
# iterable ->? list \ tuple
knowledge_chunks,
metadatas=[metadata for _ in knowledge_chunks]
)

# 保存md5值到文件中
save_md5(md5_str)

return "[成功]内容已加载到知识库中"

if __name__ == "__main__":
kb_service = KnowledgeBaseService()
kb_service.upload_file("周杰伦", "test.md")

  

4.合并代码块

  • app_file_uploader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
"""
基于streamlist完成web网页上传服务
pip install streamlit

Streamlist:当web刷新一次,会重新执行所有的代码!!!
"""

import streamlit as st
from knowledge_base import KnowledgeBaseService
import time

#添加网页标题
st.title("知识库更新服务")

#file_uploader: 上传文件组件
uploaded_file = st.file_uploader(
"请上传文件",
type=["txt"],
accept_multiple_files=False, # 是否支持上传多个文件
)



# session_state本身是一个字典,由Streamlist提供,用来解决刷新会重新加载代码问题
if "knowledge_base_service" not in st.session_state:
st.session_state["knowledge_base_service"] = KnowledgeBaseService()

if uploaded_file is not None:
# 提取文件的信息
file_name = uploaded_file.name
file_size = uploaded_file.size / 1024 # 单位:KB
file_type = uploaded_file.type

# 显示文件信息 subheader的作用是添加一个二级标题
st.subheader("文件信息")
st.write(f"文件名:{file_name}")
st.write(f"文件大小:{file_size:.2f} KB")
st.write(f"文件类型:{file_type}")

#get_value -> bytes -> decode -> str
file_content = uploaded_file.getvalue().decode("utf-8")

with st.spinner("正在上传知识库中..."): #在执行的过程中,spinner会显示一个旋转的图标
time.sleep(1)
result = st.session_state["knowledge_base_service"].upload_file(file_content, file_name)
st.write(result)
  • config_data.py
1
2
3
4
5
6
7
8
9
10
11
12
md5_file_path = "./md5.txt"


#Chroma向量库的配置
chroma_collection_name = "RAG"
chroma_persist_directory = "./chroma_db"

#spliter
chunk_size = 1000 # 每个文本块的最大字符数
chunk_overlap = 100 # 连续文本块之间的重叠字符数
separators = ["\n\n", "\n", "。", "!", "?", "!", "?", " "] # 自然段落划分的符号
max_splist_chat_number = 1000 # 文本分割的阈值,超过才进行分割
  • knowledge_base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
"""
知识库
"""

from importlib import metadata
import os
import config_data as config
import hashlib
from langchain_chroma import Chroma
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from datetime import datetime

def check_md5(md5_str: str):
"""
检查传入的md5字符串是否已经被处理过
return False表示未处理过
True表示已处理过

"""
if not os.path.exists(config.md5_file_path):
# if进入表示文件不存在,需要创建文件并直接返回False
open(config.md5_file_path, "w", encoding="utf-8").close()
return False
else:
for line in open(config.md5_file_path, "r", encoding="utf-8").readlines():
line = line.strip() # 去掉行尾的换行符
if line == md5_str:
return True # 表示已处理过
return False

def save_md5(md5_str: str):
""" 保存传入的md5字符串到文件中"""
with open(config.md5_file_path, "a", encoding="utf-8") as f:
f.write(md5_str + "\n")

def get_string_md5(input_str: str,encoding="utf-8"):
""" 将传入的字符串转换为md5字符串"""

# 将字符串转化为字节数组
str_bytes = input_str.encode(encoding = encoding)

md5_obj = hashlib.md5() # 创建md5对象
md5_obj.update(str_bytes) # 更新md5对象的状态
md5_str = md5_obj.hexdigest() # 获取md5字符串
return md5_str



class KnowledgeBaseService(object):

def __init__(self):
#如果文件夹不存在,创建文件夹
os.makedirs(config.chroma_persist_directory, exist_ok=True)

self.chroma = Chroma(
collection_name=config.chroma_collection_name, # 向量库的名称 表名
embedding_function=DashScopeEmbeddings(model="text-embedding-v4"), # 嵌入函数
persist_directory=config.chroma_persist_directory, # 向量库的持久化目录
)
self.spliter = RecursiveCharacterTextSplitter( # 文本分割器的对象
chunk_size=config.chunk_size, # 每个文本块的最大字符数
chunk_overlap=config.chunk_overlap, # 连续文本块之间的重叠字符数
separators=config.separators, # 自然段落划分的符号
length_function=len, # 使用python自带的计算文本长度的函数
)

def upload_file(self, data, file_name) :
""" 将出入的字符串,进行向量化,存入向量数据库中"""

#先得到传入字符串的md5值
md5_str = get_string_md5(data)

if check_md5(md5_str):
# 如果md5值已经存在,说明文件已经上传过了
return "[跳过]内容已经存在知识库中了"

if len(data) > config.max_splist_chat_number:
# 如果字符串长度超过阈值,需要进行分割
knowledge_chunks : list[str] = self.spliter.split_text(data)
else:
# 如果字符串长度未超过阈值,直接作为一个文本块
knowledge_chunks = [data]

metadata = {
"source": file_name,
# 2026-08-01 12:00:00
"create_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"operator": "onenullpointer",

}

self.chroma.add_texts( #内容就加载到向量库中了
# iterable ->? list \ tuple
knowledge_chunks,
metadatas=[metadata for _ in knowledge_chunks]
)

# 保存md5值到文件中
save_md5(md5_str)

return "[成功]内容已加载到知识库中"

  至此,离线流程的文件上传更新模块已完成


AI--RAG离线流程开发
https://one-null-pointer.github.io/2026/02/03/AI--RAG离线流程开发/
Author
liaoyue
Posted on
February 3, 2026
传送口