基于阿里云函数计算的简单邮件发送服务之数据库访问中间件
基本要求
- 理解掌握基于pymysql使用原生SQL开展数据库表访问的基本操作;
- 理解掌握基于ORM框架SQLAlchemy开展数据库表访问的基本操作。
实验原理
专有网络VPC
专有网络VPC(Virtual Private Cloud)是阿里云提供的一种隔离的、私有的云上网络环境,允许用户在公共云上配置和管理一个逻辑隔离的网络区域。每个VPC都是逻辑上完全隔离的,确保了不同用户或业务间的数据和操作互不影响。若希望实现不同地域下的VPC互联互通,可以参考VPC对等连接(什么是VPC对等连接_专有网络VPC(VPC)-阿里云帮助中心)或云企业网CEN(什么是云企业网_云企业网(CEN)-阿里云帮助中心)。
[!NOTE]
VPC由虚拟路由器、交换机和私网网段三个关键部分构成:
- 虚拟路由器(vRouter)作为VPC的中枢,负责连接VPC内部的交换机,并作为通往外部网络的网关,通过路由表来指导数据包的转发路径。
- 交换机(vSwitch)则是VPC内部的网络基础组件,它们在VPC内划分出不同的子网,使得资源部署更为灵活,并且支持跨可用区部署,增强了应用的高可用性。
- 私网网段定义了VPC内部的IP地址分配范围,遵循预设的CIDR规范,确保地址的唯一性和管理的有效性。
需要注意的是,每一个地域下可以创建若干个VPC,每个VPC下有且仅有一个虚拟路由器,但可以有多个交换机(默认上限为150个)。
[!IMPORTANT]
由于同一VPC内不同交换机之间内网互通,即使云服务配置的交换机不同,但只要在同一VPC,可以通过该交换机访问在其他可用区的VPC内资源,但可能会多1~2ms左右的延时。
安全组
安全组是一种云平台提供的虚拟防火墙,旨在控制ECS(Elastic Compute Service)实例的入站和出站流量。安全组的作用类似于传统网络防火墙,但更灵活且更易于管理。用户可以通过设置不同的安全规则来实现对ECS实例网络访问的精细化控制,从而提升系统的安全性。每个安全组可以包含多个ECS实例,这些实例共享安全组的网络访问规则,确保不同实例间的网络隔离与安全性。
阿里云安全组分为入站规则和出站规则。入站规则用于控制外部对ECS实例的访问,例如限制特定IP地址或端口的访问权限,而出站规则则用来管理ECS实例对外部网络的访问权限。这种设计帮助用户在保持应用可访问性的同时,有效地防止潜在的网络攻击或未经授权的访问。安全组规则支持多种协议类型和端口范围的自定义,用户可以根据实际业务需求灵活配置,实现对流量的精细控制。
安全组的灵活性体现在其可以随时调整和应用到不同的ECS实例上,从而简化了网络管理和运维工作。当用户新增或修改安全组规则时,这些更改会立即生效,无需重新启动ECS实例。这种动态性确保了用户在快速扩展或变更业务需求时,不会影响系统的稳定性和安全性。
安全组配置最佳实践可参考(https://help.aliyun.com/zh/ecs/user-guide/overview-44)。
PyMySQL
[!NOTE]
PyMySQL是一个完全用Python编写的MySQL客户端库,它允许开发者在Python应用程序中轻松地与MySQL数据库进行连接和交互。这个库遵循Python数据库API规范,提供了一个全面的接口,使得用户能够执行各种数据库操作,如建立和终止数据库连接、执行SQL查询、插入和更新数据记录、删除数据条目以及管理事务等。
PyMySQL的一个显著特点是其对预编译SQL语句和参数化查询的支持。这种支持不仅增强了数据库操作的安全性,防止了SQL注入攻击,还提高了操作的效率。通过预编译语句,开发者可以重用SQL模板,同时为不同的查询传递不同的参数,这不仅减少了数据库的解析负担,也使得代码更加简洁和易于维护。此外,参数化查询确保了数据在传递过程中的安全性,防止了潜在的注入风险。
ORM框架及Python SQLAlchemy
对象关系映射(ORM)是一种在数据库和面向对象编程之间建立映射的技术,使开发者能够用面向对象的方法操作数据库。ORM通过将数据库表映射为程序中的类和对象,简化了数据库交互。现代ORM框架支持多种数据库,包括关系型和非关系型数据库。ORM的优势在于提高开发效率、代码可读性和移植性,但可能会影响性能,且在复杂操作时可能仍需要手动编写SQL。
ORM通过抽象适配层、跨数据库兼容的查询API、数据类型映射和统一事务管理实现数据库的无关性。这种无关性提高了应用程序的灵活性和代码的可移植性,使得开发者可以专注于面向对象的结构,而不必关心底层数据库的表结构和SQL方言。选择ORM框架时,应考虑技术栈兼容性、框架易用性、性能需求和事务并发控制。常见的ORM框架包括Python的SQLAlchemy和Django ORM,Java的MyBatis和Hibernate。这些框架提供了丰富的功能,如动态SQL、CRUD操作简化、事务和缓存机制。
- SQLAlchemy Core: 这是SQLAlchemy的基础部分,提供了与数据库进行交互的底层功能。它包括了创建数据库引擎、连接管理、执行原生SQL语句的能力,以及构建和执行动态SQL表达式的语言。
- SQLAlchemy ORM: 这是SQLAlchemy的对象关系映射层,它允许开发者使用面向对象的方法来操作数据库。通过将数据库中的表映射为Python类,行映射为类的实例,ORM层提供了一种更高级和抽象的方式来处理数据库操作。它包括了会话管理、对象关系映射、查询接口以及数据一致性维护等功能。
实验过程
实验环境搭建
构建PolarDB版环境
- 获取免费额度
进入PolarDB MySQL版产品详情页面(https://www.aliyun.com/product/polardb/mysql), 点击免费试用
按钮获取免费额度。根据页面提示配置集群参数,地域必须与FC所在地域保持一致,专有网络和交换机选择默认即可。
- 创建白名单及账号信息
进入PolarDB的控制台,查看当前的集群列表(https://polardb.console.aliyun.com/cn-hangzhou/clusters)。
进入集群详情页面进行数据库集群相关配置。点击 配置与管理 - 集群白名单 按钮进入白名单设置页面,点击 新增IP白名单分组 按钮配置允许所有IP访问(0.0.0.0/0)。
[!NOTE]
该配置仅用于实验,实际生产环境上应配置内网访问,即使需要配置公网访问,也应当配置具体的IP或特定网段。
点击 配置与管理 - 账号管理
按钮进入账号页面,点击 创建账号 按钮创建集群访高权限账号。
[!NOTE]
该配置仅用于实验,实际生产环境上应根据实际情况创建普通用户并配置所需的最小权限集合。
- 获取内网访问方式
点击左侧菜单栏中的 基本信息
按钮,进入集群基本信息页面,复制数据库连接中 集群地址
中的 私网地址
备用。主地址与集群地址比较,参见阿里云 PolarDB 连接地址介绍(https://help.aliyun.com/zh/polardb/polardb-for-mysql/user-guide/connection-address)。
- 查看当前专有网络VPC和交换机VSwitch信息
在基本信息中可以查看当前集群配置的VPC和交换机信息,后续的实验中需要配置VPC的地方将配置成该默认的VPC,实现不同阿里云服务间通信。可以进一步在专有网络控制台(https://vpc.console.aliyun.com/vpc/cn-hangzhou/vpcs)查看当前账号下的VPC实例信息。
构建FC自定义公共层,提供Python sqlalchemy依赖
sqlalchemy自定公共层构建方法,参见《基于阿里云函数计算FC的微服务部署体验》。
[!IMPORTANT]
Q:为什么本实验需要使用 pymysql 和 sqlalchemy 两个框架,只构建 sqlalchemy 的公共层?
A:因为创建 FC Web 函数时选择阿里云官方提供的 Python 3.10 运行时中已经预装了 pymysql,故无需额外构建新的公共层。运行时预装依赖清单(https://help.aliyun.com/zh/functioncompute/runtime-overview-2)。
Q:在构建阿里云函数计算(FC)的公共层时,应当遵循哪些指导原则?
A:阿里云官方并没有明确给出建议,结合公共层的特性可总结为以下几点:
原则一:依赖项应按关联性和使用频率进行组合。
将高关联性和经常一起使用的依赖项放在同一个层中,可以减少层的数量和部署复杂度;对于独立、复用性高的依赖项,应分离为单独的层,以便在不同场景下灵活使用。
原则二:控制层的大小,避免超过阿里云限制。
层的大小应控制在 50 MB 以内。如果依赖项导致层的大小超出限制,应拆分层,将相关性较高的部分保留在同一层,其余部分分离至其他层,以确保部署和更新的效率。
原则三:根据依赖项的重要性和冷启动性能优化层结构。
将关键和高频使用的依赖项整合到一个层中,以减少冷启动时间。其他低频使用或次要依赖项可分散到单独的层中,从而在提升性能的同时保持层的灵活性和独立性。
原则四:为每个层单独管理版本,确保更新的独立性和回滚的灵活性。
每个层应具备独立的版本控制机制,以便在更新单个依赖项时,不影响其他层的稳定性。同时,这样的管理方式便于在问题出现时快速回滚到上一版本,确保服务连续性。
原则五:避免将所有依赖项合并到一个层中,以减少复杂度和提高维护性。
虽然将依赖项集中到一个层中可以减少层的数量,但可能会导致管理复杂性和更新风险增加。应保持层的合理拆分,确保每个层在更新和维护时有明确的目标和边界,便于调试和问题排查。
配置现有FC允许访问VPC
从函数计算 FC 控制台 https://fcnext.console.aliyun.com 进入 fun-alarm-email-send
函数详情页面,进入配置
标签页,点击网络 - 编辑
按钮对网络进行配置。
选择与 PolarDB 一致的专有网络和交换机。
若该VPC网络下没有默认的安全组,则进入云服务器控制台 https://ecs.console.aliyun.com,点击`网络与安全 - 安全组进入配置页面,点击
创建安全组`按钮,选择上述的VPC网络,安全组默认内网互通,故无需额外配置规则。创建完成后再返回FC配置页面选择刚创建的安全组。
创建告警邮件数据库表
使用阿里云提供的云数据库统一控制台 https://dmslab.aliyun.com 对 PolarDB 进行操作。首次进入需要授权创建 DMS 服务关联角色方可使用。
开启自动接入功能,可以自动接入当前阿里云租户下的各种云数据库实例(可选)。
左侧的数据库实例中将展示出当前可登录的实例,点击“请先登录”按钮进行登录。首次登录需要输入数据的账号信息进行登录,后续再点击登录则无需自动再次输入。
点击确认
按钮进行登录后,DMS将默认打开 information_schema
表的 SQLConsole,表示登录成功。右键已登录的实例,在弹出菜单中点击数据库管理
按钮,进入管理页面。
点击创建库
按钮新建 db_message
数据库。字符集选择为 utf8mb4
。点击确认
按钮完成创建。
双击右侧已登录实例的db_message数据库,打开相应的SQL控制台。使用以下SQL创建tbl_config
表结构,点击执行按钮,并选择直接执行
完成创建。
CREATE TABLE `tbl_config` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '配置记录的唯一标识符',
`host` varchar(255) NOT NULL COMMENT '邮件服务器的主机地址',
`port` int(5) NOT NULL COMMENT '邮件服务器的端口号',
`username` varchar(100) NOT NULL COMMENT '登录邮件服务器的用户名,示例: synx@example.com',
`password` varchar(100) NOT NULL COMMENT '登录邮件服务器的授权码',
`sender` varchar(255) NOT NULL COMMENT '邮件发送人的地址,示例: synx@example.com',
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '记录创建时间',
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录最后更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='邮件配置表';
基于pymysql框架重构告警邮件发送功能
项目结构说明
.
├── app.py # Sanic 应用入口
├── custom_json_encoder.py # 自定义JSON编码器,用于处理数据库格式序列化问题
└── email_config_service.py # 邮件配置实现
邮件配置管理实现
本实验提供了一个基于PyMySQL框架的邮件配置CRUD(创建,读取,更新,删除)操作和自定义JSON编码器的示例代码,需要按照上面所示目录结构创建文件并将以下代码拷贝到文件中。
此代码可进一步优化,例如,可以添加按条件查询的功能,或者使用连接池来获取数据库连接,以避免每次操作都创建新的连接,从而提高程序的性能和效率。
# filename: email_config_service.py
import json
from typing import Any, Dict
import pymysql
from custom_json_encoder import DateTimeEncoder
def get_db_connection():
"""获取数据库连接,需要自行替换为前文中的数据库连接信息"""
return pymysql.connect(
host='xxx.aliyuncs.com',
port=3306,
user='synx',
password='******',
db='db_message',
charset='utf8mb4'
)
async def create_config(data) -> Dict[str, Any]:
"""创建配置"""
try:
with get_db_connection() as connection:
with connection.cursor() as cursor:
cursor.execute("""
INSERT INTO tbl_config(host, port, username, password, sender)
VALUES (%s, %s, %s, %s, %s)
""", (data['host'], data['port'], data['username'], data['password'], data['sender']))
connection.commit()
return {"message": "Config created successfully"}
except Exception as e:
connection.rollback()
return {"error": str(e)}
async def read_config(data) -> Dict[str, Any]:
"""
读取配置,此处仅做最简单的查询,实际应用中需要根据业务需求进行查询
"""
try:
with get_db_connection() as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT * FROM tbl_config LIMIT 1")
result = cursor.fetchone()
if result:
config = {
"id": result[0],
"host": result[1],
"port": result[2],
"username": result[3],
"password": result[4],
"sender": result[5],
"create_time": result[6],
"update_time": result[7],
}
return json.loads(json.dumps(config, cls=DateTimeEncoder))
else:
return {"error": "No configuration found"}
except Exception as e:
return {"error": str(e)}
async def update_config(data) -> Dict[str, Any]:
"""修改邮件配置"""
try:
with get_db_connection() as connection:
with connection.cursor() as cursor:
if not data.get("id"):
raise ValueError("id is required")
sql = "UPDATE tbl_config SET "
params = []
for key in ['host', 'port', 'username', 'password', 'sender']:
if data.get(key):
sql += f"{key}=%s,"
params.append(data[key])
sql = sql.rstrip(",") + " WHERE id=%s"
params.append(data["id"])
if cursor.execute(sql, tuple(params)) == 0:
raise ValueError("Config not found")
connection.commit()
return {"message": "Config updated successfully"}
except Exception as e:
connection.rollback()
return {"error": str(e)}
async def delete_config(data) -> Dict[str, Any]:
"""删除邮件配置"""
try:
with get_db_connection() as connection:
with connection.cursor() as cursor:
if not data.get("id"):
raise ValueError("id is required")
if cursor.execute("DELETE FROM tbl_config WHERE id=%s", (data["id"],)) == 0:
raise ValueError("Config not found")
connection.commit()
return {"message": "Config deleted successfully"}
except Exception as e:
connection.rollback()
return {"error": str(e)}
# filename: custom_json_encoder.py
import json
from datetime import datetime
class DateTimeEncoder(json.JSONEncoder):
"""自定义编码器"""
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)
告警邮件发送FC代码优化
修改app.py文件接口定义,通过参数传递action参数决定执行的方法。如action为”send_email”时则执行发送邮件相关逻辑,修改完成后点击代码部署
按钮。
# -*- coding: utf-8 -*-
import json as std_json
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from smtplib import SMTP
from sanic import Sanic, response
from sanic.response import json
from email_config_service import (
create_config,
delete_config,
read_config,
update_config
)
app = Sanic("EmailSender")
async def send_email(data):
# 从数据库中读取邮件配置
email_config = await read_config(None)
# 创建邮件对象
msg = MIMEMultipart()
msg['From'] = email_config["sender"]
msg['To'] = data.get("recipient")
msg['Subject'] = data.get("subject")
# 添加邮件正文
msg.attach(MIMEText(data.get("body"), 'plain'))
# 连接SMTP服务器
server = SMTP(email_config["host"], email_config["port"])
server.starttls() # 启动TLS加密
server.login(email_config["username"], email_config["password"])
# 发送邮件
server.send_message(msg)
# 关闭连接
server.quit()
return {"message": "Email sent successfully"}
@app.route("/invoke", methods=["POST"])
async def send_email_route(request):
action = request.json.get("action")
data = request.json.get("data", {})
actions = {
"create_config": create_config,
"read_config": read_config,
"update_config": update_config,
"delete_config": delete_config,
"send_email": send_email
}
func = actions.get(action)
if func:
return json(await func(data))
else:
return response.json({"error": "Invalid action"}, status=400)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=9000, dev=True)
测试告警邮件发送FC功能
本小节将利用Apifox对fun-alarm-email-send
函数进行测试,测试新增邮件配置及发送告警邮件功能。
- 新增邮件发送配置
传参中action字段应传入create_config
,完成请求参数如下:
{
"action": "create_config",
"data": {
"host": "smtp.qq.com",
"port": 587,
"username": "synx@qq.com",
"password": "xxxxxxxxxxxxxxxx",
"sender": "synx@qq.com"
}
}
登录PolarDB实例,右键db_message
库中tbl_config
表,点击打开表
按钮查看表中数据,可以发现数据已经存储在数据库中。
- 发送告警邮件
传参中action字段应传入seng_email,完成请求参数如下:
{
"action": "send_email",
"data": {
"recipient": "1404332425@qq.com",
"subject": "告警邮件",
"body": "告警邮件正文部分"
}
}
等待片刻,收件邮箱将受到告警邮件。
基于sqlalchemy框架重构邮件配置管理实现
目前已支持使用 pymysql 库直接与 MySQL 数据库进行交互。虽然这种方式能够有效地实现 CRUD 操作,并且对于初学者来说可以很好地理解数据库的基本操作,但直接使用 pymysql
也存在一些问题。比如,当应用程序的复杂度增加时,手动构建 SQL 语句和管理数据库连接将变得困难,并且容易导致代码重复和错误。此外,手动处理 SQL 语句也增加了代码维护的难度,特别是在涉及多个表关联查询或需要进行事务管理时,代码的可读性和扩展性都可能受到影响。
为了解决这些问题,本小节引入了 SQLAlchemy 框架。SQLAlchemy 是一款功能强大的 ORM(对象关系映射)工具,它可以将数据库表与 Python 类关联,从而通过类和对象来管理数据库中的数据,简化了与数据库的交互。
重构邮件配置管理实现
本实验提供了一个基于 SQLAlchemy 框架的邮件配置 CRUD(创建、读取、更新、删除)操作示例代码,需要将 email_config_service.py
文件内容进行替换。
import json
import urllib.parse
from typing import Any, Dict
from sqlalchemy import Column, DateTime, Integer, String, create_engine, func
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from custom_json_encoder import DateTimeEncoder
Base = declarative_base()
class Config(Base):
__tablename__ = 'tbl_config'
id = Column(Integer, primary_key=True, autoincrement=True)
host = Column(String(255))
port = Column(Integer)
username = Column(String(255))
password = Column(String(255))
sender = Column(String(255))
create_time = Column(DateTime, server_default=func.now())
update_time = Column(DateTime, server_default=func.now(), onupdate=func.now())
def get_db_connection():
DB_HOST = 'xxx.aliyuncs.com'
DB_PORT = 3306
DB_USER = 'synx'
DB_PASSWORD = '******'
DB_NAME = 'db_message'
encoded_password = urllib.parse.quote_plus(DB_PASSWORD)
DATABASE_URL = f'mysql+pymysql://{DB_USER}:{encoded_password}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
engine = create_engine(DATABASE_URL, echo=True)
Session = sessionmaker(bind=engine)
return Session()
async def create_config(data) -> Dict[str, Any]:
"""创建配置"""
with get_db_connection() as session:
try:
config = Config(
host=data['host'],
port=data['port'],
username=data['username'],
password=data['password'],
sender=data['sender']
)
session.add(config)
session.commit()
return {"message": "Config created successfully"}
except SQLAlchemyError as e:
session.rollback()
return {"error": str(e)}
async def read_config(data) -> Dict[str, Any]:
"""
读取配置,此处仅做最简单的查询,实际应用中需要根据业务需求进行查询
"""
with get_db_connection() as session:
try:
config = session.query(Config).first()
if config:
config_dict = {
"id": config.id,
"host": config.host,
"port": config.port,
"username": config.username,
"password": config.password,
"sender": config.sender,
"create_time": config.create_time,
"update_time": config.update_time,
}
encoded_result = json.dumps(config_dict, cls=DateTimeEncoder)
return json.loads(encoded_result)
else:
return {"error": "No configuration found"}
except SQLAlchemyError as e:
return {"error": str(e)}
async def update_config(data) -> Dict[str, Any]:
"""
更新配置
"""
with get_db_connection() as session:
try:
config_id = data.get("id", None)
if not config_id:
raise ValueError("id is required")
config = session.query(Config).filter(Config.id == config_id).first()
if not config:
raise ValueError("Config not found")
for key in ['host', 'port', 'username', 'password', 'sender']:
if key in data:
setattr(config, key, data[key])
session.commit()
return {"message": "Config updated successfully"}
except SQLAlchemyError as e:
session.rollback()
return {"error": str(e)}
async def delete_config(data) -> Dict[str, Any]:
"""
删除配置
"""
with get_db_connection() as session:
try:
config_id = data.get("id", None)
if not config_id:
raise ValueError("id is required")
config = session.query(Config).filter(Config.id == config_id).first()
if not config:
raise ValueError("Config not found")
session.delete(config)
session.commit()
return {"message": "Config deleted successfully"}
except SQLAlchemyError as e:
session.rollback()
return {"error": str(e)}
测试告警邮件发送FC功能
本小节将利用 Apifox 对 fun-alarm-email-send
函数进行测试,测试新增邮件配置及发送告警邮件功能。
- 新增邮件发送配置
传参中 action
字段应传入 create_config
,完成请求参数如下:
{
"action": "create_config",
"data": {
"host": "smtp.example.com",
"port": 587,
"username": "synx@example.com",
"password": "******",
"sender": "synx@example.com"
}
}
[!WARNING]
遇到如下图所示问题:
应为没有增加之前设置好的自定义依赖层。如下图所示可以解决:
再次查看数据库表中数据,发现已经存在两条数据。
- 发送告警邮件
传参中 action
字段应传入 send_email
,完成请求参数如下:
{
"action": "send_email",
"data": {
"recipient": "smtp@example.com",
"subject": "告警邮件",
"body": "告警邮件正文部分"
}
}
等待片刻,收件邮件将收到告警邮件。
修改CloudFlow工作流的参数传递
该小节内容参考《基于阿里云函数计算的云工作流CloudFlow设计与体验》对工作流进行修改,主要修改fun-alarm-email-send
的InvokeFunction
参数传递,使其正常运行。