最近在某群发现一个阿里云盘列表的网页,然后我也想搭建一个,在Github上找了好久都没发现一个云函数版本的阿里云盘列表,所以准备自己写一个,模版是通过AliPan修改的,后端是使用的Aligo的Python库,使用简单方便。demo,演示图如下:
云函数部署
云函数部署请参考:免费验证码识别API开源,新建Flask框架模版后在终端安装aligo、yagmail库,这里就不说怎么新建及安装库了。
app.py代码
import tempfile
import qrcode
import yagmail#发送邮件用
from pathlib import Path
import os,json,re,urllib
from aligo import Aligo, Auth#连接阿里云盘
from flask import Flask, jsonify, render_template, request, redirect
#登陆阿里云盘
def login_to_email(qr_link: str):
"""自定义显示二维码"""
# 1.将二维码链接转为图片
qr_img = qrcode.make(qr_link)
png_file = tempfile.mktemp('.png')
qr_img.save(png_file)
# 2.将二维码发送到自己的(手机)邮箱, 这里以QQ邮箱为例
# 使用 yagmail 库发送邮件, `pip install yagmail[all]`
yag = yagmail.SMTP('QQ邮箱', '授权码', 'smtp.qq.com', 465)
yag.send('收件箱', 'Alist扫码登录', qr_link, attachments=[png_file])
#初始化
ali = Aligo(name='xxx', show=login_to_email)
#设置主目录id
home_id = "625189562a2c0797f0f54a84aefcd201d8c51287"
'''
#阿里云登陆二维码发送到邮箱
#ali = Aligo(email=('2638710695@qq.com', 'Young'))
#使用refresh_token登陆
Ali_List_Refresh_Token = str(os.environ.get('refresh_token'))
Ali_List_Refresh_Token = "550c0d16466e4a05a76e3c2692a40b12"
ali = Aligo(refresh_token=Ali_List_Refresh_Token)
#使用网页扫码登陆
ali = Aligo(port=9000)
'''
#开启web服务器
app = Flask(__name__)
app.config.update(DEBUG=True)#False
#取字符串中间文本
def get_mid_str(s, start_str, stop_str):
# 查找左边文本的结束位置
start_pos = s.find(start_str)
if start_pos == -1:
return None
start_pos += len(start_str)
# 查找右边文本的起始位置
stop_pos = s.find(stop_str, start_pos)
if stop_pos == -1:
return None
# 通过切片取出中间的文本
return s[start_pos:stop_pos]
#判断是否登陆成功
def is_login():
global ali
try:
user = ali.get_user()
#print(user)
return True
except Exception as e:
return False
@app.route("/")
def index():
# 访问路径
try:
fullPath = urllib.parse.unquote(request.full_path)
#获取分享类型
share_type = get_mid_str(fullPath,"/?","/")
#print("share_type:"+share_type)
if "file" in share_type:#分享文件
share_id = fullPath.replace("/?file/",'')
elif "folder" in share_type:#分享文件夹
share_id = fullPath.replace("/?folder/",'')
else:
share_id = home_id
#print("share_id:"+share_id)
return render_template('index.html',fullFileId = share_id,share_type = share_type)
except Exception as e:
print(f"\033[1;31m {e}\033[0m")
if is_login():
return render_template('index.html',fullFileId = "root",share_type = "folder")
else:
return render_template('404.html')
#通过id获取office文档信息
@app.route("/get_office/<path:path>")
def get_office_info(path):#获取文件列表
global ali
try:
if path == "root":
fileid = home_id
else:
fileid = path
#print("path=",fileid)
file_list = ali.get_office_info(fileid)
rsp_json = file_list.json()
# 遍历文件列表
list_json = {
"preview_url":rsp_json['preview_url'],
"access_token":rsp_json['access_token']
}
#print(list_json)
return jsonify(list_json)
except Exception as e:
print(f"\033[1;31m {e}\033[0m")
if is_login():
return jsonify(msg="没有文件")
else:
return jsonify(msg="系统维护中...")
#通过id获取文件信息
@app.route("/get_file/<path:path>")
def get_file_info(path):#获取文件列表
global ali
try:
if path == "root":
fileid = home_id
else:
fileid = path
#print("path=",fileid)
file_list = str(ali.get_file(fileid))
dowload_url = "https://"+request.host+"/getDownloadUrl/"+fileid
#print("dowload_url",dowload_url)
# 遍历文件列表
list_json = {
"size":0,
"url":dowload_url,
"name":get_mid_str(file_list,"name='","')"),
"type":get_mid_str(file_list,"type='","', file_id='"),
"encrypted":False,#文件密码
"parentFileId":get_mid_str(file_list,"file_id='","', name='")
}
#print(list_json)
return jsonify(list_json)
except Exception as e:
print(f"\033[1;31m {e}\033[0m")
if is_login():
return jsonify(msg="没有文件")
else:
return jsonify(msg="系统维护中...")
#验证密码
@app.route("/get_pass/<path:path>")
def get_pass(path):#获取文件列表
global ali
try:
path = path.split('|')
fileid = path[0]
filepass = path[1]
print(fileid,filepass)
list_json = []
file_list = ali.get_file_list(fileid)
# 遍历文件列表
for file in file_list:
file_size = file.size
if file_size == None:
file_size = 0
if ".young" in str(file.name):
file_pass = get_mid_str(str(file.name),"",".young")
else:
list_json.append({
"url":file.url,
"name":file.name,
"type":file.type,
"size":file_size,
"fileId":file.file_id,
"encrypted":False,#文件密码
"createdAt":file.created_at,
"parentFileId":file.parent_file_id,
"fileExtension":file.file_extension
})
print(file_pass)
for item_list in list_json:
if file_pass == filepass:
item_list["encrypted"] = False
else:
list_json = [{
"fileId":None,
"type":"file",
"name":"password",
"parentFileId":None,
"createdAt":None,
"fileExtension":None,
"size":0,
"url":None,
"encrypted":True
}]
break
return jsonify(list_json)
except Exception as e:
print(f"\033[1;31m {e}\033[0m")
if is_login():
return jsonify(msg="密码错误")
else:
return jsonify(msg="系统维护中...")
#通过id获取文件 OR 文件夹列表
@app.route("/get_list/<path:path>")
def get_file_list(path):#获取文件列表
global ali
try:
if path == "root":
fileid = home_id
else:
fileid = path
list_json = []
file_pass = False
#print("path=",fileid)
file_list = ali.get_file_list(fileid)
# 遍历文件列表
for file in file_list:
file_size = file.size
if file_size == None:
file_size = 0
if ".young" in str(file.name):
file_pass = get_mid_str(str(file.name),"",".young")
print(file_pass)
list_json = [{
"fileId":None,
"type":"file",
"name":"password",
"parentFileId":None,
"createdAt":None,
"fileExtension":None,
"size":0,
"url":None,
"encrypted":True
}]
break
else:
list_json.append({
"url":file.url,
"name":file.name,
"type":file.type,
"size":file_size,
"fileId":file.file_id,
"encrypted":False,#文件密码
"createdAt":file.created_at,
"parentFileId":file.parent_file_id,
"fileExtension":file.file_extension
})
return jsonify(list_json)
except Exception as e:
print(f"\033[1;31m {e}\033[0m")
if is_login():
return jsonify(msg="没有文件")
else:
return jsonify(msg="系统维护中...")
#通过id下载文件
@app.route("/getDownloadUrl/<id>")
def get_download(id):
global ali
try:
down = ali.get_download_url(id)
file_download = str(down.url)
#print(file_download)
return redirect(file_download, code=302)
except Exception as e:
print(f"\033[1;31m {e}\033[0m")
if is_login():
return jsonify(msg="没有该文件")
else:
return jsonify(msg="系统维护中...")
@app.errorhandler(400)
def error(e):
return jsonify({'status':0,'msg':str(e)})
@app.errorhandler(500)
def internal_server_error(e):
return "jsonify({'status':0,'msg':'系统维护中...'})"
if __name__ == '__main__':
app.config['JSON_AS_ASCII'] = False
# 启动服务,监听 9000 端口,监听地址为 0.0.0.0
app.run(port=9000, host='0.0.0.0')
flask静态模版文件下载
修改Aligo库的文件
如果不修改的话无法进行pdf、word等文件预览
修改 src/aligo/apis/Audio.py
函数:get_office_info
追加了:获取预览office信息
"""Audio class"""
from aligo.core import *
from aligo.request import *
from aligo.response import *
class Audio(Core):
"""音频相关"""
def get_audio_play_info(self, file_id: str, drive_id: str = None) -> GetAudioPlayInfoResponse:
"""
官方:获取音频播放信息
:param file_id: [str] 音频 file_id
:param drive_id: Optional[str] 音频文件的 drive_id
:return: [GetAudioPlayInfoResponse]
用法示例:
>>> from aligo import Aligo
>>> ali = Aligo()
>>> audio = ali.get_audio_play_info('<file_id>')
>>> print(audio)
"""
body = GetAudioPlayInfoRequest(file_id=file_id, drive_id=drive_id)
return self._core_get_audio_play_info(body)
#下面这个函数是追加的
def get_office_info(self, file_id: str, drive_id: str = None) -> GetAudioPlayInfoResponse:
"""
Young:获取预览office信息
:param file_id: [str] 文档 file_id
:param drive_id: Optional[str] 文档文件的 drive_id
:return: [GetAudioPlayInfoResponse]
用法示例:
>>> from aligo import Aligo
>>> ali = Aligo()
>>> audio = ali.get_audio_play_info('<file_id>')
>>> print(audio)
"""
body = GetAudioPlayInfoRequest(file_id=file_id, drive_id=drive_id)
return self._core_get_office_info(body)
修改 src/aligo/core/Audio.py
函数:_core_get_office_info
追加了:获取OFFICE信息
"""..."""
from aligo.core import *
from aligo.core.Config import *
from aligo.request import *
from aligo.response import *
class Audio(BaseAligo):
"""..."""
def _core_get_audio_play_info(self, body: GetAudioPlayInfoRequest) -> GetAudioPlayInfoResponse:
"""获取音频播放信息"""
response = self._post(V2_DATABOX_GET_AUDIO_PLAY_INFO, body=body)
return self._result(response, GetAudioPlayInfoResponse)
#下面这个函数是追加的
def _core_get_office_info(self, body: GetAudioPlayInfoRequest) -> GetAudioPlayInfoResponse:
"""获取OFFICE信息"""
response = self._post(V2_DATABOX_GET_OFFICE_PLAY_INFO, body=body)
print(response.text)
return self.Y_result(response)
修改 src/aligo/core/Auth.py
26行替换处追加:
paths = [“/tmp”,”.aligo”]
aligo_config_folder = Path.cwd().parent.joinpath(*paths)
因为腾讯云云函数的tmp目录没有写的权限,所以到这修改保存目录为自己的文件系统
#文件内容过长就只写这么点了25行-30行
#aligo_config_folder = Path.home().joinpath('.aligo')
paths = ["/mnt","aliyun"]
aligo_config_folder = Path.cwd().parent.joinpath(*paths)
aligo_config_folder.mkdir(parents=True, exist_ok=True)
修改 src/aligo/core/BaseAligo.py
120行追加:
函数:Y_result
用于出来回复json数据
#在_result函数上面追加
def Y_result(self, response: requests.Response,
status_code: Union[List, int] = 200) -> Union[Null, DataType]:
"""统一处理响应
:param response:
:param status_code:
:return:
"""
if isinstance(status_code, int):
status_code = [status_code]
if response.status_code in status_code:
text = response.text
self._auth.log.warning(f'{response.status_code} {response.text[:200]}')
return response
修改 src/aligo/core/Config.py
30行处追加:
# 基本信息
......
#获取office文档
V2_DATABOX_GET_OFFICE_PLAY_INFO = '/v2/file/get_office_preview_url'
# 文件操作
......
关于文件系统
云函数创建子网请参考:创建子网
云函数挂载请参考:腾讯云云函数挂载使用cfs
关于更多的文件系统挂载云函数的问题请联系腾讯云在线客服解决,我都是通过腾讯云客服解决的,上面的两个教程也是腾讯云客服发我的。