本文目录导读:
我来为你梳理一个Flet桌面全栈项目的实操指南,Flet是一个基于Flutter的Python框架,可以用Python代码构建跨平台(桌面、Web、移动)的UI。
Flet项目基础架构
项目结构示例
my_flet_app/
├── main.py # 入口文件
├── requirements.txt # 依赖管理
├── config.py # 配置文件
├── app/
│ ├── __init__.py
│ ├── views/ # 页面视图
│ │ ├── __init__.py
│ │ ├── login.py
│ │ ├── dashboard.py
│ │ └── settings.py
│ ├── components/ # 可复用组件
│ │ ├── __init__.py
│ │ ├── sidebar.py
│ │ ├── navbar.py
│ │ └── data_table.py
│ ├── models/ # 数据模型
│ │ ├── __init__.py
│ │ └── user.py
│ ├── services/ # 业务逻辑/API服务
│ │ ├── __init__.py
│ │ ├── api_service.py
│ │ ├── auth_service.py
│ │ └── database_service.py
│ └── utils/ # 工具函数
│ ├── __init__.py
│ ├── validators.py
│ └── helpers.py
├── assets/ # 静态资源
│ ├── images/
│ ├── icons/
│ └── fonts/
└── tests/ # 测试文件
├── __init__.py
└── test_main.py
完整实战:企业管理后台
基础入口文件 (main.py)
import flet as ft
from app.views.login import LoginView
from app.views.dashboard import DashboardView
from app.components.sidebar import Sidebar
def main(page: ft.Page):
# 页面基础配置
page.title = "企业管理后台"
page.theme_mode = ft.ThemeMode.LIGHT
page.window_width = 1200
page.window_height = 800
page.window_center()
# 全局状态管理
page.session.set("user", None)
page.session.set("theme", "light")
# 路由管理
def route_change(e):
troute = ft.TemplateRoute(page.route)
if troute.match("/login"):
page.views.clear()
page.views.append(
ft.View(
"/login",
[LoginView(page)],
padding=0
)
)
elif troute.match("/"):
page.views.clear()
page.views.append(
ft.View(
"/",
[
ft.Row([
Sidebar(page),
ft.VerticalDivider(width=1),
ft.Column([
ft.Container(
content=DashboardView(page),
expand=True
)
], expand=True)
], expand=True)
],
padding=0
)
)
elif troute.match("/settings"):
page.views.clear()
page.views.append(
ft.View(
"/settings",
[
ft.Row([
Sidebar(page),
ft.VerticalDivider(width=1),
ft.Column([
ft.Container(
content=SettingsView(page),
expand=True
)
], expand=True)
], expand=True)
],
padding=0
)
)
page.update()
# 监听路由变化
page.on_route_change = route_change
# 初始路由
page.go("/login")
ft.app(target=main)
登录页面 (login.py)
import flet as ft
from app.services.auth_service import AuthService
class LoginView:
def __init__(self, page: ft.Page):
self.page = page
self.auth_service = AuthService()
def build(self):
self.username_field = ft.TextField(
label="用户名",
prefix_icon=ft.icons.PERSON,
width=300,
border_radius=10,
)
self.password_field = ft.TextField(
label="密码",
prefix_icon=ft.icons.LOCK,
password=True,
can_reveal_password=True,
width=300,
border_radius=10,
)
self.error_text = ft.Text(
"",
color=ft.colors.RED,
size=12
)
return ft.Container(
content=ft.Column(
[
ft.Container(height=50),
ft.Icon(
ft.icons.ADMIN_PANEL_SETTINGS,
size=80,
color=ft.colors.BLUE_500
),
ft.Text(
"企业管理后台",
size=28,
weight=ft.FontWeight.BOLD,
),
ft.Container(height=30),
self.username_field,
ft.Container(height=10),
self.password_field,
ft.Container(height=10),
self.error_text,
ft.Container(height=20),
ft.ElevatedButton(
text="登录",
width=300,
height=45,
style=ft.ButtonStyle(
shape=ft.RoundedRectangleBorder(
radius=10
)
),
on_click=self.login_click
),
ft.Container(height=10),
ft.TextButton(
"忘记密码?",
on_click=lambda e: self.page.go("/forgot-password")
)
],
alignment=ft.MainAxisAlignment.CENTER,
horizontal_alignment=ft.CrossAxisAlignment.CENTER
),
padding=40,
alignment=ft.alignment.center,
expand=True
)
def login_click(self, e):
username = self.username_field.value
password = self.password_field.value
# 基本验证
if not username or not password:
self.error_text.value = "请输入用户名和密码"
self.page.update()
return
# 调用认证服务
result = self.auth_service.login(username, password)
if result["success"]:
self.page.session.set("user", result["user"])
self.page.go("/")
else:
self.error_text.value = result["message"]
self.page.update()
侧边栏组件 (sidebar.py)
import flet as ft
class Sidebar:
def __init__(self, page: ft.Page):
self.page = page
def build(self):
user = self.page.session.get("user")
return ft.Container(
content=ft.Column(
[
# 用户信息区域
ft.Container(
content=ft.Column([
ft.CircleAvatar(
foreground_image_url=user.get("avatar", ""),
radius=30,
),
ft.Text(
user.get("name", "用户"),
size=16,
weight=ft.FontWeight.BOLD
),
ft.Text(
user.get("role", "普通用户"),
size=12,
color=ft.colors.GREY_500
),
], alignment=ft.MainAxisAlignment.CENTER,
horizontal_alignment=ft.CrossAxisAlignment.CENTER),
padding=ft.padding.only(top=20, bottom=20),
),
ft.Divider(height=1),
# 菜单项
self.menu_item(
ft.icons.DASHBOARD,
"仪表盘",
"/",
True
),
self.menu_item(
ft.icons.PEOPLE,
"用户管理",
"/users"
),
self.menu_item(
ft.icons.SHOPPING_CART,
"订单管理",
"/orders"
),
self.menu_item(
ft.icons.ANALYTICS,
"数据分析",
"/analytics"
),
self.menu_item(
ft.icons.SETTINGS,
"系统设置",
"/settings"
),
ft.Divider(height=1),
# 退出按钮
ft.Container(
content=ft.TextButton(
"退出登录",
icon=ft.icons.LOGOUT,
on_click=self.logout
),
padding=ft.padding.only(left=10, right=10)
)
],
spacing=0,
scroll=ft.ScrollMode.AUTO
),
width=250,
bgcolor=ft.colors.SURFACE_VARIANT,
padding=ft.padding.only(top=10)
)
def menu_item(self, icon, text, route, active=False):
return ft.Container(
content=ft.Row([
ft.Icon(icon, size=20),
ft.Text(text, size=14)
]),
padding=ft.padding.symmetric(horizontal=20, vertical=12),
bgcolor=ft.colors.PRIMARY_CONTAINER if active else None,
on_click=lambda e: self.navigate(route)
)
def navigate(self, route):
self.page.go(route)
def logout(self, e):
self.page.session.set("user", None)
self.page.go("/login")
数据表格组件 (data_table.py)
import flet as ft
class DataTable:
def __init__(self, page: ft.Page):
self.page = page
self.selected_rows = []
def create_user_table(self, users_data):
return ft.DataTable(
columns=[
ft.DataColumn(
ft.Row([
ft.Checkbox(on_change=self.select_all),
ft.Text("ID")
])
),
ft.DataColumn(ft.Text("用户名")),
ft.DataColumn(ft.Text("邮箱")),
ft.DataColumn(ft.Text("角色")),
ft.DataColumn(ft.Text("状态")),
ft.DataColumn(ft.Text("操作")),
],
rows=[self.create_user_row(user) for user in users_data],
sort_column_index=0,
sort_ascending=True,
show_bottom_border=True,
column_spacing=50,
)
def create_user_row(self, user):
return ft.DataRow(
cells=[
ft.DataCell(ft.Checkbox(value=False, on_change=self.row_selected)),
ft.DataCell(ft.Text(str(user["id"]))),
ft.DataCell(ft.Text(user["username"])),
ft.DataCell(ft.Text(user["email"])),
ft.DataCell(ft.Text(user["role"])),
ft.DataCell(
ft.Container(
content=ft.Text(
"活跃",
color=ft.colors.GREEN
) if user["active"] else ft.Text(
"禁用",
color=ft.colors.RED
)
)
),
ft.DataCell(
ft.Row([
ft.IconButton(
icon=ft.icons.EDIT,
tooltip="编辑",
on_click=lambda e: self.edit_user(user["id"])
),
ft.IconButton(
icon=ft.icons.DELETE,
tooltip="删除",
on_click=lambda e: self.delete_user(user["id"])
)
])
)
]
)
def select_all(self, e):
# 全选逻辑
pass
def row_selected(self, e):
# 行选择逻辑
pass
def edit_user(self, user_id):
# 编辑用户逻辑
pass
def delete_user(self, user_id):
# 删除用户逻辑
pass
API服务 (api_service.py)
import httpx
from typing import Dict, Any
import json
class APIService:
def __init__(self, base_url: str = "http://localhost:8000/api"):
self.base_url = base_url
self.client = httpx.Client(timeout=30.0)
self.token = None
def set_token(self, token: str):
self.token = token
self.client.headers.update({
"Authorization": f"Bearer {token}"
})
async def get(self, endpoint: str, params: Dict = None) -> Dict:
try:
response = await self.client.get(
f"{self.base_url}{endpoint}",
params=params
)
response.raise_for_status()
return {"success": True, "data": response.json()}
except httpx.HTTPStatusError as e:
return {"success": False, "error": str(e)}
except Exception as e:
return {"success": False, "error": f"网络错误: {str(e)}"}
async def post(self, endpoint: str, data: Dict = None) -> Dict:
try:
response = await self.client.post(
f"{self.base_url}{endpoint}",
json=data
)
response.raise_for_status()
return {"success": True, "data": response.json()}
except httpx.HTTPStatusError as e:
return {"success": False, "error": str(e)}
except Exception as e:
return {"success": False, "error": f"网络错误: {str(e)}"}
async def update(self, endpoint: str, data: Dict = None) -> Dict:
try:
response = await self.client.put(
f"{self.base_url}{endpoint}",
json=data
)
response.raise_for_status()
return {"success": True, "data": response.json()}
except httpx.HTTPStatusError as e:
return {"success": False, "error": str(e)}
except Exception as e:
return {"success": False, "error": f"网络错误: {str(e)}"}
async def delete(self, endpoint: str) -> Dict:
try:
response = await self.client.delete(
f"{self.base_url}{endpoint}"
)
response.raise_for_status()
return {"success": True, "data": response.json()}
except httpx.HTTPStatusError as e:
return {"success": False, "error": str(e)}
except Exception as e:
return {"success": False, "error": f"网络错误: {str(e)}"}
数据模型 (user.py)
from dataclasses import dataclass
from typing import Optional
from datetime import datetime
@dataclass
class User:
id: int
username: str
email: str
role: str
active: bool = True
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
avatar: Optional[str] = None
phone: Optional[str] = None
def to_dict(self) -> dict:
return {
"id": self.id,
"username": self.username,
"email": self.email,
"role": self.role,
"active": self.active,
"avatar": self.avatar,
"phone": self.phone
}
@classmethod
def from_dict(cls, data: dict):
return cls(
id=data.get("id"),
username=data.get("username"),
email=data.get("email"),
role=data.get("role"),
active=data.get("active", True),
created_at=data.get("created_at"),
updated_at=data.get("updated_at"),
avatar=data.get("avatar"),
phone=data.get("phone")
)
@dataclass
class LoginResponse:
success: bool
token: Optional[str] = None
user: Optional[User] = None
message: Optional[str] = None
数据库服务 (database_service.py)
import sqlite3
from typing import List, Dict, Any
from contextlib import contextmanager
import os
class DatabaseService:
def __init__(self, db_path: str = "data/app.db"):
self.db_path = db_path
self.ensure_database()
def ensure_database(self):
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
self.create_tables()
@contextmanager
def get_connection(self):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def create_tables(self):
with self.get_connection() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
role TEXT DEFAULT 'user',
active INTEGER DEFAULT 1,
avatar TEXT,
phone TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
total_amount REAL,
status TEXT DEFAULT 'pending',
FOREIGN KEY (user_id) REFERENCES users(id)
)
""")
def query(self, sql: str, params: tuple = ()) -> List[Dict]:
with self.get_connection() as conn:
cursor = conn.execute(sql, params)
return [dict(row) for row in cursor.fetchall()]
def execute(self, sql: str, params: tuple = ()) -> int:
with self.get_connection() as conn:
cursor = conn.execute(sql, params)
return cursor.lastrowid
def insert_user(self, user_data: Dict) -> int:
sql = """
INSERT INTO users (username, email, password_hash, role)
VALUES (?, ?, ?, ?)
"""
return self.execute(sql, (
user_data["username"],
user_data["email"],
user_data["password_hash"],
user_data.get("role", "user")
))
def get_user_by_username(self, username: str) -> Dict:
result = self.query(
"SELECT * FROM users WHERE username = ?",
(username,)
)
return result[0] if result else None
工具函数 (helpers.py)
import re
import hashlib
import json
from typing import Any
def validate_email(email: str) -> bool:
"""验证邮箱格式"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def validate_phone(phone: str) -> bool:
"""验证手机号格式"""
pattern = r'^1[3-9]\d{9}$'
return re.match(pattern, phone) is not None
def hash_password(password: str) -> str:
"""密码哈希"""
return hashlib.sha256(password.encode()).hexdigest()
def format_datetime(dt_str: str, format: str = "%Y-%m-%d %H:%M:%S") -> str:
"""格式化日期时间"""
from datetime import datetime
try:
dt = datetime.fromisoformat(dt_str)
return dt.strftime(format)
except:
return dt_str
def create_notification(page, message: str, type: str = "info"):
"""创建通知"""
colors = {
"info": ft.colors.BLUE,
"success": ft.colors.GREEN,
"warning": ft.colors.ORANGE,
"error": ft.colors.RED
}
snack_bar = ft.SnackBar(
content=ft.Text(message),
bgcolor=colors.get(type, ft.colors.BLUE),
duration=3000
)
page.show_snack_bar(snack_bar)
高级特性实现
主题切换功能
def toggle_theme(page: ft.Page):
current_theme = page.theme_mode
if current_theme == ft.ThemeMode.LIGHT:
page.theme_mode = ft.ThemeMode.DARK
page.session.set("theme", "dark")
else:
page.theme_mode = ft.ThemeMode.LIGHT
page.session.set("theme", "light")
page.update()
文件上传处理
def upload_file(page: ft.Page, file_picker: ft.FilePicker):
def on_file_selected(e: ft.FilePickerResultEvent):
if e.files:
file = e.files[0]
# 处理文件上传逻辑
pass
file_picker.on_result = on_file_selected
page.overlay.append(file_picker)
file_picker.pick_files(allow_multiple=False)
图表集成
import plotly.graph_objects as go
import plotly.io as pio
def create_chart(data: List[Dict]) -> ft.Image:
# 创建Plotly图表
fig = go.Figure(data=[
go.Bar(name='销售额', x=[d['month'] for d in data],
y=[d['sales'] for d in data])
])
# 转换为PNG图片
img_bytes = pio.to_image(fig, format='png', width=600, height=400)
return ft.Image(
src_base64=img_bytes.decode('utf-8'),
width=600,
height=400
)
打包发布
使用PyInstaller打包
# build.py
import subprocess
import sys
def build_exe():
cmd = [
"pyinstaller",
"--onefile", # 单文件模式
"--windowed", # 无控制台窗口
"--name", "MyApp",
"--icon", "assets/icon.ico",
"--add-data", "assets;assets", # 包含资源文件
"--hidden-import", "flet",
"main.py"
]
subprocess.run(cmd)
if __name__ == "__main__":
build_exe()
使用Flet打包
# 命令行打包
# flet pack main.py --name MyApp --icon assets/icon.ico
import flet as ft
def main(page: ft.Page):
pass
if __name__ == "__main__":
ft.app(target=main)
优化建议
性能优化
- 使用
async模式处理I/O操作 - 实现虚拟滚动加载大数据集
- 使用缓存机制减少重复查询
- 合理使用状态管理
安全优化
- 前端验证与后端验证结合
- 使用HTTPS进行数据传输
- 实现JWT或OAuth2认证
- 防止XSS和SQL注入
开发效率
- 使用类型注解提高代码可读性
- 实现自动化测试
- 使用版本控制
- 建立组件库文档
这个实战项目涵盖了Flet桌面应用开发的主要方面,从基础架构到高级特性都有涉及,建议你根据实际需求逐步扩展功能,并持续优化代码质量。
标签: 全栈项目