Flet桌面全栈项目实操?

访客 全栈框架 1

本文目录导读:

  1. Flet项目基础架构
  2. 完整实战:企业管理后台
  3. 高级特性实现
  4. 打包发布
  5. 优化建议

我来为你梳理一个Flet桌面全栈项目的实操指南,Flet是一个基于Flutter的Python框架,可以用Python代码构建跨平台(桌面、Web、移动)的UI。

Flet项目基础架构

项目结构示例

my_flet_app/
├── main.py              # 入口文件
├── requirements.txt     # 依赖管理
├── config.py           # 配置文件
├── app/
│   ├── __init__.py
│   ├── views/          # 页面视图
│   │   ├── __init__.py
│   │   ├── login.py
│   │   ├── dashboard.py
│   │   └── settings.py
│   ├── components/     # 可复用组件
│   │   ├── __init__.py
│   │   ├── sidebar.py
│   │   ├── navbar.py
│   │   └── data_table.py
│   ├── models/         # 数据模型
│   │   ├── __init__.py
│   │   └── user.py
│   ├── services/       # 业务逻辑/API服务
│   │   ├── __init__.py
│   │   ├── api_service.py
│   │   ├── auth_service.py
│   │   └── database_service.py
│   └── utils/          # 工具函数
│       ├── __init__.py
│       ├── validators.py
│       └── helpers.py
├── assets/             # 静态资源
│   ├── images/
│   ├── icons/
│   └── fonts/
└── tests/              # 测试文件
    ├── __init__.py
    └── test_main.py

完整实战:企业管理后台

基础入口文件 (main.py)

import flet as ft
from app.views.login import LoginView
from app.views.dashboard import DashboardView
from app.components.sidebar import Sidebar
def main(page: ft.Page):
    # 页面基础配置
    page.title = "企业管理后台"
    page.theme_mode = ft.ThemeMode.LIGHT
    page.window_width = 1200
    page.window_height = 800
    page.window_center()
    # 全局状态管理
    page.session.set("user", None)
    page.session.set("theme", "light")
    # 路由管理
    def route_change(e):
        troute = ft.TemplateRoute(page.route)
        if troute.match("/login"):
            page.views.clear()
            page.views.append(
                ft.View(
                    "/login",
                    [LoginView(page)],
                    padding=0
                )
            )
        elif troute.match("/"):
            page.views.clear()
            page.views.append(
                ft.View(
                    "/",
                    [
                        ft.Row([
                            Sidebar(page),
                            ft.VerticalDivider(width=1),
                            ft.Column([
                                ft.Container(
                                    content=DashboardView(page),
                                    expand=True
                                )
                            ], expand=True)
                        ], expand=True)
                    ],
                    padding=0
                )
            )
        elif troute.match("/settings"):
            page.views.clear()
            page.views.append(
                ft.View(
                    "/settings",
                    [
                        ft.Row([
                            Sidebar(page),
                            ft.VerticalDivider(width=1),
                            ft.Column([
                                ft.Container(
                                    content=SettingsView(page),
                                    expand=True
                                )
                            ], expand=True)
                        ], expand=True)
                    ],
                    padding=0
                )
            )
        page.update()
    # 监听路由变化
    page.on_route_change = route_change
    # 初始路由
    page.go("/login")
ft.app(target=main)

登录页面 (login.py)

import flet as ft
from app.services.auth_service import AuthService
class LoginView:
    def __init__(self, page: ft.Page):
        self.page = page
        self.auth_service = AuthService()
    def build(self):
        self.username_field = ft.TextField(
            label="用户名",
            prefix_icon=ft.icons.PERSON,
            width=300,
            border_radius=10,
        )
        self.password_field = ft.TextField(
            label="密码",
            prefix_icon=ft.icons.LOCK,
            password=True,
            can_reveal_password=True,
            width=300,
            border_radius=10,
        )
        self.error_text = ft.Text(
            "",
            color=ft.colors.RED,
            size=12
        )
        return ft.Container(
            content=ft.Column(
                [
                    ft.Container(height=50),
                    ft.Icon(
                        ft.icons.ADMIN_PANEL_SETTINGS,
                        size=80,
                        color=ft.colors.BLUE_500
                    ),
                    ft.Text(
                        "企业管理后台",
                        size=28,
                        weight=ft.FontWeight.BOLD,
                    ),
                    ft.Container(height=30),
                    self.username_field,
                    ft.Container(height=10),
                    self.password_field,
                    ft.Container(height=10),
                    self.error_text,
                    ft.Container(height=20),
                    ft.ElevatedButton(
                        text="登录",
                        width=300,
                        height=45,
                        style=ft.ButtonStyle(
                            shape=ft.RoundedRectangleBorder(
                                radius=10
                            )
                        ),
                        on_click=self.login_click
                    ),
                    ft.Container(height=10),
                    ft.TextButton(
                        "忘记密码?",
                        on_click=lambda e: self.page.go("/forgot-password")
                    )
                ],
                alignment=ft.MainAxisAlignment.CENTER,
                horizontal_alignment=ft.CrossAxisAlignment.CENTER
            ),
            padding=40,
            alignment=ft.alignment.center,
            expand=True
        )
    def login_click(self, e):
        username = self.username_field.value
        password = self.password_field.value
        # 基本验证
        if not username or not password:
            self.error_text.value = "请输入用户名和密码"
            self.page.update()
            return
        # 调用认证服务
        result = self.auth_service.login(username, password)
        if result["success"]:
            self.page.session.set("user", result["user"])
            self.page.go("/")
        else:
            self.error_text.value = result["message"]
            self.page.update()

侧边栏组件 (sidebar.py)

import flet as ft
class Sidebar:
    def __init__(self, page: ft.Page):
        self.page = page
    def build(self):
        user = self.page.session.get("user")
        return ft.Container(
            content=ft.Column(
                [
                    # 用户信息区域
                    ft.Container(
                        content=ft.Column([
                            ft.CircleAvatar(
                                foreground_image_url=user.get("avatar", ""),
                                radius=30,
                            ),
                            ft.Text(
                                user.get("name", "用户"),
                                size=16,
                                weight=ft.FontWeight.BOLD
                            ),
                            ft.Text(
                                user.get("role", "普通用户"),
                                size=12,
                                color=ft.colors.GREY_500
                            ),
                        ], alignment=ft.MainAxisAlignment.CENTER,
                           horizontal_alignment=ft.CrossAxisAlignment.CENTER),
                        padding=ft.padding.only(top=20, bottom=20),
                    ),
                    ft.Divider(height=1),
                    # 菜单项
                    self.menu_item(
                        ft.icons.DASHBOARD, 
                        "仪表盘",
                        "/",
                        True
                    ),
                    self.menu_item(
                        ft.icons.PEOPLE, 
                        "用户管理",
                        "/users"
                    ),
                    self.menu_item(
                        ft.icons.SHOPPING_CART, 
                        "订单管理",
                        "/orders"
                    ),
                    self.menu_item(
                        ft.icons.ANALYTICS, 
                        "数据分析",
                        "/analytics"
                    ),
                    self.menu_item(
                        ft.icons.SETTINGS, 
                        "系统设置",
                        "/settings"
                    ),
                    ft.Divider(height=1),
                    # 退出按钮
                    ft.Container(
                        content=ft.TextButton(
                            "退出登录",
                            icon=ft.icons.LOGOUT,
                            on_click=self.logout
                        ),
                        padding=ft.padding.only(left=10, right=10)
                    )
                ],
                spacing=0,
                scroll=ft.ScrollMode.AUTO
            ),
            width=250,
            bgcolor=ft.colors.SURFACE_VARIANT,
            padding=ft.padding.only(top=10)
        )
    def menu_item(self, icon, text, route, active=False):
        return ft.Container(
            content=ft.Row([
                ft.Icon(icon, size=20),
                ft.Text(text, size=14)
            ]),
            padding=ft.padding.symmetric(horizontal=20, vertical=12),
            bgcolor=ft.colors.PRIMARY_CONTAINER if active else None,
            on_click=lambda e: self.navigate(route)
        )
    def navigate(self, route):
        self.page.go(route)
    def logout(self, e):
        self.page.session.set("user", None)
        self.page.go("/login")

数据表格组件 (data_table.py)

import flet as ft
class DataTable:
    def __init__(self, page: ft.Page):
        self.page = page
        self.selected_rows = []
    def create_user_table(self, users_data):
        return ft.DataTable(
            columns=[
                ft.DataColumn(
                    ft.Row([
                        ft.Checkbox(on_change=self.select_all),
                        ft.Text("ID")
                    ])
                ),
                ft.DataColumn(ft.Text("用户名")),
                ft.DataColumn(ft.Text("邮箱")),
                ft.DataColumn(ft.Text("角色")),
                ft.DataColumn(ft.Text("状态")),
                ft.DataColumn(ft.Text("操作")),
            ],
            rows=[self.create_user_row(user) for user in users_data],
            sort_column_index=0,
            sort_ascending=True,
            show_bottom_border=True,
            column_spacing=50,
        )
    def create_user_row(self, user):
        return ft.DataRow(
            cells=[
                ft.DataCell(ft.Checkbox(value=False, on_change=self.row_selected)),
                ft.DataCell(ft.Text(str(user["id"]))),
                ft.DataCell(ft.Text(user["username"])),
                ft.DataCell(ft.Text(user["email"])),
                ft.DataCell(ft.Text(user["role"])),
                ft.DataCell(
                    ft.Container(
                        content=ft.Text(
                            "活跃",
                            color=ft.colors.GREEN
                        ) if user["active"] else ft.Text(
                            "禁用",
                            color=ft.colors.RED
                        )
                    )
                ),
                ft.DataCell(
                    ft.Row([
                        ft.IconButton(
                            icon=ft.icons.EDIT,
                            tooltip="编辑",
                            on_click=lambda e: self.edit_user(user["id"])
                        ),
                        ft.IconButton(
                            icon=ft.icons.DELETE,
                            tooltip="删除",
                            on_click=lambda e: self.delete_user(user["id"])
                        )
                    ])
                )
            ]
        )
    def select_all(self, e):
        # 全选逻辑
        pass
    def row_selected(self, e):
        # 行选择逻辑
        pass
    def edit_user(self, user_id):
        # 编辑用户逻辑
        pass
    def delete_user(self, user_id):
        # 删除用户逻辑
        pass

API服务 (api_service.py)

import httpx
from typing import Dict, Any
import json
class APIService:
    def __init__(self, base_url: str = "http://localhost:8000/api"):
        self.base_url = base_url
        self.client = httpx.Client(timeout=30.0)
        self.token = None
    def set_token(self, token: str):
        self.token = token
        self.client.headers.update({
            "Authorization": f"Bearer {token}"
        })
    async def get(self, endpoint: str, params: Dict = None) -> Dict:
        try:
            response = await self.client.get(
                f"{self.base_url}{endpoint}",
                params=params
            )
            response.raise_for_status()
            return {"success": True, "data": response.json()}
        except httpx.HTTPStatusError as e:
            return {"success": False, "error": str(e)}
        except Exception as e:
            return {"success": False, "error": f"网络错误: {str(e)}"}
    async def post(self, endpoint: str, data: Dict = None) -> Dict:
        try:
            response = await self.client.post(
                f"{self.base_url}{endpoint}",
                json=data
            )
            response.raise_for_status()
            return {"success": True, "data": response.json()}
        except httpx.HTTPStatusError as e:
            return {"success": False, "error": str(e)}
        except Exception as e:
            return {"success": False, "error": f"网络错误: {str(e)}"}
    async def update(self, endpoint: str, data: Dict = None) -> Dict:
        try:
            response = await self.client.put(
                f"{self.base_url}{endpoint}",
                json=data
            )
            response.raise_for_status()
            return {"success": True, "data": response.json()}
        except httpx.HTTPStatusError as e:
            return {"success": False, "error": str(e)}
        except Exception as e:
            return {"success": False, "error": f"网络错误: {str(e)}"}
    async def delete(self, endpoint: str) -> Dict:
        try:
            response = await self.client.delete(
                f"{self.base_url}{endpoint}"
            )
            response.raise_for_status()
            return {"success": True, "data": response.json()}
        except httpx.HTTPStatusError as e:
            return {"success": False, "error": str(e)}
        except Exception as e:
            return {"success": False, "error": f"网络错误: {str(e)}"}

数据模型 (user.py)

from dataclasses import dataclass
from typing import Optional
from datetime import datetime
@dataclass
class User:
    id: int
    username: str
    email: str
    role: str
    active: bool = True
    created_at: Optional[datetime] = None
    updated_at: Optional[datetime] = None
    avatar: Optional[str] = None
    phone: Optional[str] = None
    def to_dict(self) -> dict:
        return {
            "id": self.id,
            "username": self.username,
            "email": self.email,
            "role": self.role,
            "active": self.active,
            "avatar": self.avatar,
            "phone": self.phone
        }
    @classmethod
    def from_dict(cls, data: dict):
        return cls(
            id=data.get("id"),
            username=data.get("username"),
            email=data.get("email"),
            role=data.get("role"),
            active=data.get("active", True),
            created_at=data.get("created_at"),
            updated_at=data.get("updated_at"),
            avatar=data.get("avatar"),
            phone=data.get("phone")
        )
@dataclass
class LoginResponse:
    success: bool
    token: Optional[str] = None
    user: Optional[User] = None
    message: Optional[str] = None

数据库服务 (database_service.py)

import sqlite3
from typing import List, Dict, Any
from contextlib import contextmanager
import os
class DatabaseService:
    def __init__(self, db_path: str = "data/app.db"):
        self.db_path = db_path
        self.ensure_database()
    def ensure_database(self):
        os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
        self.create_tables()
    @contextmanager
    def get_connection(self):
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        try:
            yield conn
            conn.commit()
        except Exception:
            conn.rollback()
            raise
        finally:
            conn.close()
    def create_tables(self):
        with self.get_connection() as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS users (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    username TEXT UNIQUE NOT NULL,
                    email TEXT UNIQUE NOT NULL,
                    password_hash TEXT NOT NULL,
                    role TEXT DEFAULT 'user',
                    active INTEGER DEFAULT 1,
                    avatar TEXT,
                    phone TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """)
            conn.execute("""
                CREATE TABLE IF NOT EXISTS orders (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    user_id INTEGER,
                    order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    total_amount REAL,
                    status TEXT DEFAULT 'pending',
                    FOREIGN KEY (user_id) REFERENCES users(id)
                )
            """)
    def query(self, sql: str, params: tuple = ()) -> List[Dict]:
        with self.get_connection() as conn:
            cursor = conn.execute(sql, params)
            return [dict(row) for row in cursor.fetchall()]
    def execute(self, sql: str, params: tuple = ()) -> int:
        with self.get_connection() as conn:
            cursor = conn.execute(sql, params)
            return cursor.lastrowid
    def insert_user(self, user_data: Dict) -> int:
        sql = """
            INSERT INTO users (username, email, password_hash, role)
            VALUES (?, ?, ?, ?)
        """
        return self.execute(sql, (
            user_data["username"],
            user_data["email"],
            user_data["password_hash"],
            user_data.get("role", "user")
        ))
    def get_user_by_username(self, username: str) -> Dict:
        result = self.query(
            "SELECT * FROM users WHERE username = ?",
            (username,)
        )
        return result[0] if result else None

工具函数 (helpers.py)

import re
import hashlib
import json
from typing import Any
def validate_email(email: str) -> bool:
    """验证邮箱格式"""
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    return re.match(pattern, email) is not None
def validate_phone(phone: str) -> bool:
    """验证手机号格式"""
    pattern = r'^1[3-9]\d{9}$'
    return re.match(pattern, phone) is not None
def hash_password(password: str) -> str:
    """密码哈希"""
    return hashlib.sha256(password.encode()).hexdigest()
def format_datetime(dt_str: str, format: str = "%Y-%m-%d %H:%M:%S") -> str:
    """格式化日期时间"""
    from datetime import datetime
    try:
        dt = datetime.fromisoformat(dt_str)
        return dt.strftime(format)
    except:
        return dt_str
def create_notification(page, message: str, type: str = "info"):
    """创建通知"""
    colors = {
        "info": ft.colors.BLUE,
        "success": ft.colors.GREEN,
        "warning": ft.colors.ORANGE,
        "error": ft.colors.RED
    }
    snack_bar = ft.SnackBar(
        content=ft.Text(message),
        bgcolor=colors.get(type, ft.colors.BLUE),
        duration=3000
    )
    page.show_snack_bar(snack_bar)

高级特性实现

主题切换功能

def toggle_theme(page: ft.Page):
    current_theme = page.theme_mode
    if current_theme == ft.ThemeMode.LIGHT:
        page.theme_mode = ft.ThemeMode.DARK
        page.session.set("theme", "dark")
    else:
        page.theme_mode = ft.ThemeMode.LIGHT
        page.session.set("theme", "light")
    page.update()

文件上传处理

def upload_file(page: ft.Page, file_picker: ft.FilePicker):
    def on_file_selected(e: ft.FilePickerResultEvent):
        if e.files:
            file = e.files[0]
            # 处理文件上传逻辑
            pass
    file_picker.on_result = on_file_selected
    page.overlay.append(file_picker)
    file_picker.pick_files(allow_multiple=False)

图表集成

import plotly.graph_objects as go
import plotly.io as pio
def create_chart(data: List[Dict]) -> ft.Image:
    # 创建Plotly图表
    fig = go.Figure(data=[
        go.Bar(name='销售额', x=[d['month'] for d in data], 
               y=[d['sales'] for d in data])
    ])
    # 转换为PNG图片
    img_bytes = pio.to_image(fig, format='png', width=600, height=400)
    return ft.Image(
        src_base64=img_bytes.decode('utf-8'),
        width=600,
        height=400
    )

打包发布

使用PyInstaller打包

# build.py
import subprocess
import sys
def build_exe():
    cmd = [
        "pyinstaller",
        "--onefile",  # 单文件模式
        "--windowed", # 无控制台窗口
        "--name", "MyApp",
        "--icon", "assets/icon.ico",
        "--add-data", "assets;assets",  # 包含资源文件
        "--hidden-import", "flet",
        "main.py"
    ]
    subprocess.run(cmd)
if __name__ == "__main__":
    build_exe()

使用Flet打包

# 命令行打包
# flet pack main.py --name MyApp --icon assets/icon.ico
import flet as ft
def main(page: ft.Page):
    pass
if __name__ == "__main__":
    ft.app(target=main)

优化建议

性能优化

  • 使用async模式处理I/O操作
  • 实现虚拟滚动加载大数据集
  • 使用缓存机制减少重复查询
  • 合理使用状态管理

安全优化

  • 前端验证与后端验证结合
  • 使用HTTPS进行数据传输
  • 实现JWT或OAuth2认证
  • 防止XSS和SQL注入

开发效率

  • 使用类型注解提高代码可读性
  • 实现自动化测试
  • 使用版本控制
  • 建立组件库文档

这个实战项目涵盖了Flet桌面应用开发的主要方面,从基础架构到高级特性都有涉及,建议你根据实际需求逐步扩展功能,并持续优化代码质量。

标签: 全栈项目

抱歉,评论功能暂时关闭!