部署基于腾讯云-云函数的阿里云盘list


最近在某群发现一个阿里云盘列表的网页,然后我也想搭建一个,在Github上找了好久都没发现一个云函数版本的阿里云盘列表,所以准备自己写一个,模版是通过AliPan修改的,后端是使用的Aligo的Python库,使用简单方便。demo,演示图如下:
演示图片

云函数部署

云函数部署请参考:免费验证码识别API开源,新建Flask框架模版后在终端安装aligo、yagmail库,这里就不说怎么新建及安装库了。

app.py代码

import tempfile
import qrcode
import yagmail#发送邮件用

from pathlib import Path
import os,json,re,urllib
from aligo import Aligo, Auth#连接阿里云盘
from flask import Flask, jsonify, render_template, request, redirect

#登陆阿里云盘
def login_to_email(qr_link: str):
    """自定义显示二维码"""
    # 1.将二维码链接转为图片
    qr_img = qrcode.make(qr_link)
    png_file = tempfile.mktemp('.png')
    qr_img.save(png_file)

    # 2.将二维码发送到自己的(手机)邮箱, 这里以QQ邮箱为例
    # 使用 yagmail 库发送邮件, `pip install yagmail[all]`
    yag = yagmail.SMTP('QQ邮箱', '授权码', 'smtp.qq.com', 465)
    yag.send('收件箱', 'Alist扫码登录', qr_link, attachments=[png_file])

#初始化
ali = Aligo(name='xxx', show=login_to_email)
#设置主目录id
home_id = "625189562a2c0797f0f54a84aefcd201d8c51287"
'''
#阿里云登陆二维码发送到邮箱
#ali = Aligo(email=('2638710695@qq.com', 'Young'))
#使用refresh_token登陆
Ali_List_Refresh_Token = str(os.environ.get('refresh_token'))
Ali_List_Refresh_Token = "550c0d16466e4a05a76e3c2692a40b12"
ali = Aligo(refresh_token=Ali_List_Refresh_Token)
#使用网页扫码登陆
ali = Aligo(port=9000)
'''

#开启web服务器
app = Flask(__name__)
app.config.update(DEBUG=True)#False

#取字符串中间文本
def get_mid_str(s, start_str, stop_str):
    # 查找左边文本的结束位置
    start_pos = s.find(start_str)
    if start_pos == -1:
        return None
    start_pos += len(start_str)
    # 查找右边文本的起始位置
    stop_pos = s.find(stop_str, start_pos)
    if stop_pos == -1:
        return None
    # 通过切片取出中间的文本
    return s[start_pos:stop_pos]

#判断是否登陆成功
def is_login():
    global ali
    try:
        user = ali.get_user()
        #print(user)
        return True
    except Exception as e:
        return False

@app.route("/")
def index():
    # 访问路径
    try:
        fullPath = urllib.parse.unquote(request.full_path)
        #获取分享类型
        share_type = get_mid_str(fullPath,"/?","/")
        #print("share_type:"+share_type)
        if "file" in share_type:#分享文件
            share_id = fullPath.replace("/?file/",'')
        elif "folder" in share_type:#分享文件夹
            share_id = fullPath.replace("/?folder/",'')
        else:
            share_id = home_id
        #print("share_id:"+share_id)
        return render_template('index.html',fullFileId = share_id,share_type = share_type)
    except Exception as e:
        print(f"\033[1;31m {e}\033[0m")
        if is_login():
            return render_template('index.html',fullFileId = "root",share_type = "folder")
        else:
            return render_template('404.html')

#通过id获取office文档信息
@app.route("/get_office/<path:path>")
def get_office_info(path):#获取文件列表
    global ali
    try:
        if path == "root":
            fileid = home_id
        else:
            fileid = path
        #print("path=",fileid)
        file_list = ali.get_office_info(fileid)
        rsp_json = file_list.json()
        # 遍历文件列表
        list_json = {
            "preview_url":rsp_json['preview_url'],
            "access_token":rsp_json['access_token']
        }
        #print(list_json)
        return jsonify(list_json)
    except Exception as e:
        print(f"\033[1;31m {e}\033[0m")
        if is_login():
            return jsonify(msg="没有文件")
        else:
            return jsonify(msg="系统维护中...")

#通过id获取文件信息
@app.route("/get_file/<path:path>")
def get_file_info(path):#获取文件列表
    global ali
    try:
        if path == "root":
            fileid = home_id
        else:
            fileid = path
        #print("path=",fileid)
        file_list = str(ali.get_file(fileid))
        dowload_url = "https://"+request.host+"/getDownloadUrl/"+fileid
        #print("dowload_url",dowload_url)
        # 遍历文件列表
        list_json = {
            "size":0,
            "url":dowload_url,
            "name":get_mid_str(file_list,"name='","')"),
            "type":get_mid_str(file_list,"type='","', file_id='"),
            "encrypted":False,#文件密码
            "parentFileId":get_mid_str(file_list,"file_id='","', name='")
        }
        #print(list_json)
        return jsonify(list_json)
    except Exception as e:
        print(f"\033[1;31m {e}\033[0m")
        if is_login():
            return jsonify(msg="没有文件")
        else:
            return jsonify(msg="系统维护中...")

#验证密码
@app.route("/get_pass/<path:path>")
def get_pass(path):#获取文件列表
    global ali
    try:
        path = path.split('|')
        fileid = path[0]
        filepass = path[1]
        print(fileid,filepass)
        list_json = []
        file_list = ali.get_file_list(fileid)
        # 遍历文件列表
        for file in file_list:
            file_size = file.size
            if file_size == None:
                file_size = 0
            if ".young" in str(file.name):
                file_pass = get_mid_str(str(file.name),"",".young")
            else:
                list_json.append({
                    "url":file.url,
                    "name":file.name,
                    "type":file.type,
                    "size":file_size,
                    "fileId":file.file_id,
                    "encrypted":False,#文件密码
                    "createdAt":file.created_at,
                    "parentFileId":file.parent_file_id,
                    "fileExtension":file.file_extension
                })
        print(file_pass)
        for item_list in list_json:
            if file_pass == filepass:
                item_list["encrypted"] = False
            else:
                list_json = [{
                    "fileId":None,
                    "type":"file",
                    "name":"password",
                    "parentFileId":None,
                    "createdAt":None,
                    "fileExtension":None,
                    "size":0,
                    "url":None,
                    "encrypted":True
                }]
                break
        return jsonify(list_json)
    except Exception as e:
        print(f"\033[1;31m {e}\033[0m")
        if is_login():
            return jsonify(msg="密码错误")
        else:
            return jsonify(msg="系统维护中...")

#通过id获取文件 OR 文件夹列表
@app.route("/get_list/<path:path>")
def get_file_list(path):#获取文件列表
    global ali
    try:
        if path == "root":
            fileid = home_id
        else:
            fileid = path
        list_json = []
        file_pass = False
        #print("path=",fileid)
        file_list = ali.get_file_list(fileid)
        # 遍历文件列表
        for file in file_list:
            file_size = file.size
            if file_size == None:
                file_size = 0
            if ".young" in str(file.name):
                file_pass = get_mid_str(str(file.name),"",".young")
                print(file_pass)
                list_json = [{
                    "fileId":None,
                    "type":"file",
                    "name":"password",
                    "parentFileId":None,
                    "createdAt":None,
                    "fileExtension":None,
                    "size":0,
                    "url":None,
                    "encrypted":True
                }]
                break
            else:
                list_json.append({
                    "url":file.url,
                    "name":file.name,
                    "type":file.type,
                    "size":file_size,
                    "fileId":file.file_id,
                    "encrypted":False,#文件密码
                    "createdAt":file.created_at,
                    "parentFileId":file.parent_file_id,
                    "fileExtension":file.file_extension
                })
        return jsonify(list_json)
    except Exception as e:
        print(f"\033[1;31m {e}\033[0m")
        if is_login():
            return jsonify(msg="没有文件")
        else:
            return jsonify(msg="系统维护中...")

#通过id下载文件
@app.route("/getDownloadUrl/<id>")
def get_download(id):
    global ali
    try:
        down = ali.get_download_url(id)
        file_download = str(down.url)
        #print(file_download)
        return redirect(file_download, code=302)
    except Exception as e:
        print(f"\033[1;31m {e}\033[0m")
        if is_login():
            return jsonify(msg="没有该文件")
        else:
            return jsonify(msg="系统维护中...")

@app.errorhandler(400)
def error(e):
    return jsonify({'status':0,'msg':str(e)})

@app.errorhandler(500)
def internal_server_error(e):
    return "jsonify({'status':0,'msg':'系统维护中...'})"

if __name__ == '__main__':
    app.config['JSON_AS_ASCII'] = False
    # 启动服务,监听 9000 端口,监听地址为 0.0.0.0
    app.run(port=9000, host='0.0.0.0')

flask静态模版文件下载

点击下载

修改Aligo库的文件

如果不修改的话无法进行pdf、word等文件预览

修改 src/aligo/apis/Audio.py

函数:get_office_info
追加了:获取预览office信息

"""Audio class"""

from aligo.core import *
from aligo.request import *
from aligo.response import *


class Audio(Core):
    """音频相关"""

    def get_audio_play_info(self, file_id: str, drive_id: str = None) -> GetAudioPlayInfoResponse:
        """
        官方:获取音频播放信息
        :param file_id: [str] 音频 file_id
        :param drive_id: Optional[str] 音频文件的 drive_id
        :return: [GetAudioPlayInfoResponse]

        用法示例:
        >>> from aligo import Aligo
        >>> ali = Aligo()
        >>> audio = ali.get_audio_play_info('<file_id>')
        >>> print(audio)
        """
        body = GetAudioPlayInfoRequest(file_id=file_id, drive_id=drive_id)
        return self._core_get_audio_play_info(body)

	#下面这个函数是追加的
    def get_office_info(self, file_id: str, drive_id: str = None) -> GetAudioPlayInfoResponse:
        """
        Young:获取预览office信息
        :param file_id: [str] 文档 file_id
        :param drive_id: Optional[str] 文档文件的 drive_id
        :return: [GetAudioPlayInfoResponse]

        用法示例:
        >>> from aligo import Aligo
        >>> ali = Aligo()
        >>> audio = ali.get_audio_play_info('<file_id>')
        >>> print(audio)
        """
        body = GetAudioPlayInfoRequest(file_id=file_id, drive_id=drive_id)
        return self._core_get_office_info(body)

修改 src/aligo/core/Audio.py

函数:_core_get_office_info
追加了:获取OFFICE信息

"""..."""

from aligo.core import *
from aligo.core.Config import *
from aligo.request import *
from aligo.response import *


class Audio(BaseAligo):
    """..."""

    def _core_get_audio_play_info(self, body: GetAudioPlayInfoRequest) -> GetAudioPlayInfoResponse:
        """获取音频播放信息"""
        response = self._post(V2_DATABOX_GET_AUDIO_PLAY_INFO, body=body)
        return self._result(response, GetAudioPlayInfoResponse)

	#下面这个函数是追加的
    def _core_get_office_info(self, body: GetAudioPlayInfoRequest) -> GetAudioPlayInfoResponse:
        """获取OFFICE信息"""
        response = self._post(V2_DATABOX_GET_OFFICE_PLAY_INFO, body=body)
        print(response.text)
        return self.Y_result(response)

修改 src/aligo/core/Auth.py

26行替换处追加:
paths = [“/tmp”,”.aligo”]
aligo_config_folder = Path.cwd().parent.joinpath(*paths)
因为腾讯云云函数的tmp目录没有写的权限,所以到这修改保存目录为自己的文件系统

#文件内容过长就只写这么点了25行-30行
#aligo_config_folder = Path.home().joinpath('.aligo')
paths = ["/mnt","aliyun"]
aligo_config_folder = Path.cwd().parent.joinpath(*paths)
aligo_config_folder.mkdir(parents=True, exist_ok=True)

修改 src/aligo/core/BaseAligo.py

120行追加:
函数:Y_result
用于出来回复json数据

#在_result函数上面追加
def Y_result(self, response: requests.Response,
                status_code: Union[List, int] = 200) -> Union[Null, DataType]:
        """统一处理响应

        :param response:
        :param status_code:
        :return:
        """
        if isinstance(status_code, int):
            status_code = [status_code]
        if response.status_code in status_code:
            text = response.text
        self._auth.log.warning(f'{response.status_code} {response.text[:200]}')
        return response

修改 src/aligo/core/Config.py

30行处追加:

# 基本信息
......
#获取office文档
V2_DATABOX_GET_OFFICE_PLAY_INFO = '/v2/file/get_office_preview_url'
#  文件操作
......

关于文件系统

云函数创建子网请参考:创建子网
云函数挂载请参考:腾讯云云函数挂载使用cfs
关于更多的文件系统挂载云函数的问题请联系腾讯云在线客服解决,我都是通过腾讯云客服解决的,上面的两个教程也是腾讯云客服发我的。


文章作者: Young
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Young !
评论
  目录