Supabase云同步架构:Flutter应用的数据同步策略
本文基于BeeCount(蜜蜂记账)项目的实际开发经验,深入探讨如何使用Supabase构建安全、高效的云端数据同步系统。
项目背景
BeeCount(蜜蜂记账)是一款开源、简洁、无广告的个人记账应用。所有财务数据完全由用户掌控,支持本地存储和可选的云端同步,确保数据绝对安全。
引言
在现代移动应用开发中,多设备数据同步已成为用户的基本需求。用户希望在手机、平板、不同设备间无缝切换,同时保持数据的一致性和安全性。BeeCount选择Supabase作为云端后台服务,不仅因为其开源特性和强大功能,更重要的是它提供了完整的数据安全保障。
Supabase架构优势
开源与自主可控
- 开源透明:完全开源的后台即服务(BaaS)解决方案
- 数据主权:支持自建部署,数据完全可控
- 标准技术:基于PostgreSQL,无厂商锁定风险
功能完整性
- 实时数据库:基于PostgreSQL的实时数据同步
- 用户认证:完整的身份验证和授权系统
- 文件存储:对象存储服务,支持大文件上传
- 边缘函数:服务端逻辑处理能力
同步架构设计
整体架构图
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Flutter App │ │ Supabase │ │ Other Device │
│ (Local SQLite) │◄──►│ (PostgreSQL) │◄──►│ (Local SQLite) │
│ │ │ (Auth + Storage) │ │ │
└─────────────────┘ └──────────────────┘ └─────────────────┘│ │ │└───── 加密备份文件 ─────┴───── 加密备份文件 ─────┘
核心设计原则
- 本地优先:所有操作优先在本地完成,确保响应速度
- 增量同步:只同步变更数据,降低网络开销
- 端到端加密:敏感数据在客户端加密后上传
- 冲突处理:合理的冲突解决策略
- 离线可用:网络异常时应用仍可正常使用
认证系统集成
Supabase认证配置
class SupabaseAuthService implements AuthService {final s.SupabaseClient client;SupabaseAuthService({required this.client});@overrideFuture<AuthResult> signInWithEmail({required String email,required String password,}) async {try {final response = await client.auth.signInWithPassword(email: email,password: password,);if (response.user != null) {return AuthResult.success(user: AppUser.fromSupabase(response.user!));} else {return AuthResult.failure(error: 'Login failed');}} catch (e) {return AuthResult.failure(error: e.toString());}}@overrideFuture<AuthResult> signUpWithEmail({required String email,required String password,}) async {try {final response = await client.auth.signUp(email: email,password: password,);return AuthResult.success(user: AppUser.fromSupabase(response.user!));} catch (e) {return AuthResult.failure(error: e.toString());}}@overrideStream<AuthState> get authStateChanges {return client.auth.onAuthStateChange.map((data) {if (data.session?.user != null) {return AuthState.authenticated(user: AppUser.fromSupabase(data.session!.user));}return AuthState.unauthenticated();});}
}
用户模型设计
class AppUser {final String id;final String email;final DateTime? lastSignInAt;const AppUser({required this.id,required this.email,this.lastSignInAt,});factory AppUser.fromSupabase(s.User user) {return AppUser(id: user.id,email: user.email ?? '',lastSignInAt: user.lastSignInAt,);}bool get isAnonymous => email.isEmpty;
}
数据同步策略
备份文件格式
BeeCount采用加密备份文件的方式进行数据同步:
class BackupData {final String version;final DateTime createdAt;final String deviceId;final Map<String, dynamic> ledgers;final Map<String, dynamic> accounts;final Map<String, dynamic> categories;final Map<String, dynamic> transactions;BackupData({required this.version,required this.createdAt,required this.deviceId,required this.ledgers,required this.accounts,required this.categories,required this.transactions,});Map<String, dynamic> toJson() => {'version': version,'createdAt': createdAt.toIso8601String(),'deviceId': deviceId,'ledgers': ledgers,'accounts': accounts,'categories': categories,'transactions': transactions,};factory BackupData.fromJson(Map<String, dynamic> json) {return BackupData(version: json['version'],createdAt: DateTime.parse(json['createdAt']),deviceId: json['deviceId'],ledgers: json['ledgers'],accounts: json['accounts'],categories: json['categories'],transactions: json['transactions'],);}
}
同步服务实现
class SupabaseSyncService implements SyncService {final s.SupabaseClient client;final BeeDatabase db;final BeeRepository repo;final AuthService auth;final String bucket;// 状态缓存和上传窗口管理final Map<int, SyncStatus> _statusCache = {};final Map<int, _RecentUpload> _recentUpload = {};final Map<int, DateTime> _recentLocalChangeAt = {};SupabaseSyncService({required this.client,required this.db,required this.repo,required this.auth,this.bucket = 'beecount-backups',});@overrideFuture<SyncStatus> getSyncStatus(int ledgerId) async {// 检查缓存if (_statusCache.containsKey(ledgerId)) {final cached = _statusCache[ledgerId]!;if (DateTime.now().difference(cached.lastCheck).inMinutes < 5) {return cached;}}try {final user = await auth.getCurrentUser();if (user == null) {return SyncStatus.notLoggedIn();}// 获取云端文件信息final fileName = 'ledger_${ledgerId}_backup.json';final cloudFile = await _getCloudFileInfo(fileName);// 计算本地数据指纹final localFingerprint = await _calculateLocalFingerprint(ledgerId);if (cloudFile == null) {final status = SyncStatus.localOnly(localFingerprint: localFingerprint,hasLocalChanges: true,);_statusCache[ledgerId] = status;return status;}// 比较指纹判断同步状态final isUpToDate = localFingerprint == cloudFile.fingerprint;final status = SyncStatus.synced(localFingerprint: localFingerprint,cloudFingerprint: cloudFile.fingerprint,isUpToDate: isUpToDate,lastSyncAt: cloudFile.lastModified,);_statusCache[ledgerId] = status;return status;} catch (e) {logger.error('Failed to get sync status', e);return SyncStatus.error(error: e.toString());}}@overrideFuture<SyncResult> uploadBackup(int ledgerId) async {try {final user = await auth.getCurrentUser();if (user == null) {return SyncResult.failure(error: 'Not logged in');}// 生成备份数据final backupData = await _generateBackup(ledgerId);final jsonString = json.encode(backupData.toJson());// 加密备份数据final encryptedData = await _encryptBackupData(jsonString);// 上传到Supabase Storagefinal fileName = 'ledger_${ledgerId}_backup.json';final uploadResult = await client.storage.from(bucket).uploadBinary(fileName, encryptedData);if (uploadResult.isNotEmpty) {// 记录上传成功final fingerprint = await _calculateLocalFingerprint(ledgerId);_recentUpload[ledgerId] = _RecentUpload(fingerprint: fingerprint,uploadedAt: DateTime.now(),);// 更新缓存_statusCache[ledgerId] = SyncStatus.synced(localFingerprint: fingerprint,cloudFingerprint: fingerprint,isUpToDate: true,lastSyncAt: DateTime.now(),);return SyncResult.success(syncedAt: DateTime.now(),message: 'Backup uploaded successfully',);}return SyncResult.failure(error: 'Upload failed');} catch (e) {logger.error('Failed to upload backup', e);return SyncResult.failure(error: e.toString());}}@overrideFuture<SyncResult> downloadRestore(int ledgerId) async {try {final user = await auth.getCurrentUser();if (user == null) {return SyncResult.failure(error: 'Not logged in');}// 下载备份文件final fileName = 'ledger_${ledgerId}_backup.json';final downloadData = await client.storage.from(bucket).download(fileName);if (downloadData.isEmpty) {return SyncResult.failure(error: 'No backup found');}// 解密备份数据final decryptedData = await _decryptBackupData(downloadData);final backupData = BackupData.fromJson(json.decode(decryptedData));// 执行数据恢复await _restoreFromBackup(backupData, ledgerId);// 更新状态final fingerprint = await _calculateLocalFingerprint(ledgerId);_statusCache[ledgerId] = SyncStatus.synced(localFingerprint: fingerprint,cloudFingerprint: fingerprint,isUpToDate: true,lastSyncAt: DateTime.now(),);return SyncResult.success(syncedAt: DateTime.now(),message: 'Data restored successfully',);} catch (e) {logger.error('Failed to download restore', e);return SyncResult.failure(error: e.toString());}}// 数据加密/解密Future<Uint8List> _encryptBackupData(String jsonData) async {final key = await _getDerivedKey();final cipher = AESCipher(key);return cipher.encrypt(utf8.encode(jsonData));}Future<String> _decryptBackupData(Uint8List encryptedData) async {final key = await _getDerivedKey();final cipher = AESCipher(key);final decrypted = cipher.decrypt(encryptedData);return utf8.decode(decrypted);}
}
数据安全保障
端到端加密
class AESCipher {final Uint8List key;AESCipher(this.key);Uint8List encrypt(List<int> plaintext) {final cipher = AESEngine()..init(true, KeyParameter(key));// 生成随机IVfinal iv = _generateRandomIV();final cbcCipher = CBCBlockCipher(cipher)..init(true, ParametersWithIV(KeyParameter(key), iv));// PKCS7填充final paddedPlaintext = _padPKCS7(Uint8List.fromList(plaintext));final ciphertext = Uint8List(paddedPlaintext.length);for (int i = 0; i < paddedPlaintext.length; i += 16) {cbcCipher.processBlock(paddedPlaintext, i, ciphertext, i);}// IV + 密文return Uint8List.fromList([...iv, ...ciphertext]);}Uint8List decrypt(Uint8List encrypted) {// 分离IV和密文final iv = encrypted.sublist(0, 16);final ciphertext = encrypted.sublist(16);final cipher = AESEngine()..init(false, KeyParameter(key));final cbcCipher = CBCBlockCipher(cipher)..init(false, ParametersWithIV(KeyParameter(key), iv));final decrypted = Uint8List(ciphertext.length);for (int i = 0; i < ciphertext.length; i += 16) {cbcCipher.processBlock(ciphertext, i, decrypted, i);}// 移除PKCS7填充return _removePKCS7Padding(decrypted);}Uint8List _generateRandomIV() {final random = Random.secure();return Uint8List.fromList(List.generate(16, (_) => random.nextInt(256)));}
}
密钥派生
Future<Uint8List> _getDerivedKey() async {final user = await auth.getCurrentUser();if (user == null) throw Exception('User not authenticated');// 使用用户ID和设备特征生成密钥final salt = utf8.encode('${user.id}_${await _getDeviceId()}');final password = utf8.encode(user.id);// PBKDF2密钥派生final pbkdf2 = PBKDF2KeyDerivator(HMac(SHA256Digest(), 64))..init(Pbkdf2Parameters(salt, 10000, 32));return pbkdf2.process(password);
}Future<String> _getDeviceId() async {final prefs = await SharedPreferences.getInstance();String? deviceId = prefs.getString('device_id');if (deviceId == null) {deviceId = const Uuid().v4();await prefs.setString('device_id', deviceId);}return deviceId;
}
冲突处理策略
冲突检测
class ConflictDetector {static ConflictResolution detectConflict({required BackupData localData,required BackupData cloudData,required DateTime lastSyncAt,}) {final localChanges = <String, dynamic>{};final cloudChanges = <String, dynamic>{};// 检测交易记录冲突_detectTransactionConflicts(localData.transactions,cloudData.transactions,lastSyncAt,localChanges,cloudChanges,);if (localChanges.isEmpty && cloudChanges.isEmpty) {return ConflictResolution.noConflict();}if (localChanges.isNotEmpty && cloudChanges.isEmpty) {return ConflictResolution.localWins(changes: localChanges);}if (localChanges.isEmpty && cloudChanges.isNotEmpty) {return ConflictResolution.cloudWins(changes: cloudChanges);}// 存在双向冲突,需要用户选择return ConflictResolution.needsResolution(localChanges: localChanges,cloudChanges: cloudChanges,);}static void _detectTransactionConflicts(Map<String, dynamic> localTxs,Map<String, dynamic> cloudTxs,DateTime lastSyncAt,Map<String, dynamic> localChanges,Map<String, dynamic> cloudChanges,) {// 检测本地新增/修改的交易localTxs.forEach((id, localTx) {final txUpdatedAt = DateTime.parse(localTx['updatedAt'] ?? localTx['createdAt']);if (txUpdatedAt.isAfter(lastSyncAt)) {localChanges[id] = localTx;}});// 检测云端新增/修改的交易cloudTxs.forEach((id, cloudTx) {final txUpdatedAt = DateTime.parse(cloudTx['updatedAt'] ?? cloudTx['createdAt']);if (txUpdatedAt.isAfter(lastSyncAt)) {cloudChanges[id] = cloudTx;}});}
}
冲突解决
class ConflictResolver {static Future<BackupData> resolveConflict({required BackupData localData,required BackupData cloudData,required ConflictResolution resolution,}) async {switch (resolution.type) {case ConflictType.noConflict:return localData;case ConflictType.localWins:return localData;case ConflictType.cloudWins:return cloudData;case ConflictType.needsResolution:return await _mergeData(localData, cloudData, resolution);}}static Future<BackupData> _mergeData(BackupData localData,BackupData cloudData,ConflictResolution resolution,) async {// 实现智能合并策略final mergedTransactions = <String, dynamic>{};// 优先保留较新的数据mergedTransactions.addAll(cloudData.transactions);resolution.localChanges.forEach((id, localTx) {final localUpdatedAt = DateTime.parse(localTx['updatedAt'] ?? localTx['createdAt']);if (resolution.cloudChanges.containsKey(id)) {final cloudTx = resolution.cloudChanges[id];final cloudUpdatedAt = DateTime.parse(cloudTx['updatedAt'] ?? cloudTx['createdAt']);// 保留时间戳较新的版本if (localUpdatedAt.isAfter(cloudUpdatedAt)) {mergedTransactions[id] = localTx;} else {mergedTransactions[id] = cloudTx;}} else {mergedTransactions[id] = localTx;}});return BackupData(version: localData.version,createdAt: DateTime.now(),deviceId: localData.deviceId,ledgers: localData.ledgers,accounts: localData.accounts,categories: localData.categories,transactions: mergedTransactions,);}
}
性能优化
增量同步优化
class IncrementalSync {static Future<BackupData> generateIncrementalBackup({required int ledgerId,required DateTime lastSyncAt,required BeeRepository repo,}) async {// 只获取自上次同步后的变更数据final changedTransactions = await repo.getTransactionsSince(ledgerId: ledgerId,since: lastSyncAt,);final changedAccounts = await repo.getAccountsSince(ledgerId: ledgerId,since: lastSyncAt,);// 构建增量备份数据return BackupData(version: '1.0',createdAt: DateTime.now(),deviceId: await _getDeviceId(),ledgers: {}, // 账本信息变化较少,可按需包含accounts: _mapAccountsToJson(changedAccounts),categories: {}, // 分类变化较少,可按需包含transactions: _mapTransactionsToJson(changedTransactions),);}static Map<String, dynamic> _mapTransactionsToJson(List<Transaction> transactions) {return Map.fromEntries(transactions.map((tx) => MapEntry(tx.id.toString(),{'id': tx.id,'ledgerId': tx.ledgerId,'type': tx.type,'amount': tx.amount,'categoryId': tx.categoryId,'accountId': tx.accountId,'toAccountId': tx.toAccountId,'happenedAt': tx.happenedAt.toIso8601String(),'note': tx.note,'updatedAt': DateTime.now().toIso8601String(),})));}
}
网络优化
class NetworkOptimizer {static const int maxRetries = 3;static const Duration retryDelay = Duration(seconds: 2);static Future<T> withRetry<T>(Future<T> Function() operation) async {int attempts = 0;while (attempts < maxRetries) {try {return await operation();} catch (e) {attempts++;if (attempts >= maxRetries) {rethrow;}// 指数退避await Future.delayed(retryDelay * (1 << attempts));}}throw Exception('Max retries exceeded');}static Future<bool> isNetworkAvailable() async {try {final result = await InternetAddress.lookup('supabase.co');return result.isNotEmpty && result[0].rawAddress.isNotEmpty;} catch (_) {return false;}}
}
用户体验设计
同步状态展示
class SyncStatusWidget extends ConsumerWidget {final int ledgerId;const SyncStatusWidget({Key? key, required this.ledgerId}) : super(key: key);@overrideWidget build(BuildContext context, WidgetRef ref) {final syncStatus = ref.watch(syncStatusProvider(ledgerId));return syncStatus.when(data: (status) => _buildStatusIndicator(status),loading: () => const SyncLoadingIndicator(),error: (error, _) => SyncErrorIndicator(error: error.toString()),);}Widget _buildStatusIndicator(SyncStatus status) {switch (status.type) {case SyncStatusType.synced:return Row(mainAxisSize: MainAxisSize.min,children: [Icon(Icons.cloud_done, color: Colors.green, size: 16),SizedBox(width: 4),Text('已同步', style: TextStyle(fontSize: 12, color: Colors.green)),],);case SyncStatusType.localOnly:return Row(mainAxisSize: MainAxisSize.min,children: [Icon(Icons.cloud_off, color: Colors.orange, size: 16),SizedBox(width: 4),Text('仅本地', style: TextStyle(fontSize: 12, color: Colors.orange)),],);case SyncStatusType.needsUpload:return Row(mainAxisSize: MainAxisSize.min,children: [Icon(Icons.cloud_upload, color: Colors.blue, size: 16),SizedBox(width: 4),Text('待上传', style: TextStyle(fontSize: 12, color: Colors.blue)),],);case SyncStatusType.needsDownload:return Row(mainAxisSize: MainAxisSize.min,children: [Icon(Icons.cloud_download, color: Colors.purple, size: 16),SizedBox(width: 4),Text('有更新', style: TextStyle(fontSize: 12, color: Colors.purple)),],);default:return SizedBox.shrink();}}
}
同步操作界面
class SyncActionsSheet extends ConsumerWidget {final int ledgerId;const SyncActionsSheet({Key? key, required this.ledgerId}) : super(key: key);@overrideWidget build(BuildContext context, WidgetRef ref) {final syncStatus = ref.watch(syncStatusProvider(ledgerId));return DraggableScrollableSheet(initialChildSize: 0.4,minChildSize: 0.2,maxChildSize: 0.8,builder: (context, scrollController) {return Container(decoration: BoxDecoration(color: Theme.of(context).scaffoldBackgroundColor,borderRadius: BorderRadius.vertical(top: Radius.circular(20)),),child: Column(children: [// 拖拽指示器Container(width: 40,height: 4,margin: EdgeInsets.symmetric(vertical: 12),decoration: BoxDecoration(color: Colors.grey[300],borderRadius: BorderRadius.circular(2),),),// 标题Text('云端同步',style: Theme.of(context).textTheme.headlineSmall,),Expanded(child: ListView(controller: scrollController,padding: EdgeInsets.all(16),children: [_buildSyncActions(context, ref, syncStatus),],),),],),);},);}Widget _buildSyncActions(BuildContext context, WidgetRef ref, AsyncValue<SyncStatus> syncStatus) {return syncStatus.when(data: (status) {switch (status.type) {case SyncStatusType.localOnly:return Column(children: [_buildActionCard(title: '上传备份',subtitle: '将本地数据上传到云端',icon: Icons.cloud_upload,color: Colors.blue,onTap: () => _uploadBackup(ref),),],);case SyncStatusType.needsDownload:return Column(children: [_buildActionCard(title: '下载恢复',subtitle: '从云端恢复数据(会覆盖本地数据)',icon: Icons.cloud_download,color: Colors.purple,onTap: () => _downloadRestore(context, ref),),SizedBox(height: 16),_buildActionCard(title: '强制上传',subtitle: '用本地数据覆盖云端备份',icon: Icons.upload,color: Colors.orange,onTap: () => _forceUpload(context, ref),),],);default:return _buildSyncInfo(status);}},loading: () => Center(child: CircularProgressIndicator()),error: (error, _) => Text('错误: $error'),);}Widget _buildActionCard({required String title,required String subtitle,required IconData icon,required Color color,required VoidCallback onTap,}) {return Card(child: ListTile(leading: CircleAvatar(backgroundColor: color.withOpacity(0.1),child: Icon(icon, color: color),),title: Text(title),subtitle: Text(subtitle),trailing: Icon(Icons.arrow_forward_ios, size: 16),onTap: onTap,),);}
}
最佳实践总结
1. 架构设计原则
- 本地优先:确保应用离线可用
- 渐进式同步:支持部分同步,不影响核心功能
- 状态透明:让用户清楚了解同步状态
2. 安全性考虑
- 端到端加密:敏感数据客户端加密
- 密钥管理:使用安全的密钥派生和存储
- 权限控制:确保用户只能访问自己的数据
3. 性能优化
- 增量同步:只传输变更数据
- 压缩上传:减少网络传输量
- 后台同步:不影响用户操作
4. 用户体验
- 状态可见:清晰的同步状态指示
- 操作简单:一键同步,自动处理
- 错误友好:明确的错误提示和恢复建议
实际应用效果
在BeeCount项目中,Supabase云同步系统带来了显著的价值:
- 用户满意度提升:多设备无缝切换,用户数据永不丢失
- 技术债务减少:基于成熟的BaaS服务,减少自建后台成本
- 安全性保障:端到端加密确保财务数据安全
- 开发效率:快速集成,专注业务逻辑开发
结语
Supabase作为开源的BaaS解决方案,为Flutter应用提供了完整的后端服务能力。通过合理的架构设计、安全的加密策略和良好的用户体验设计,我们可以构建出既安全又好用的云同步功能。
BeeCount的实践证明,选择合适的技术栈和设计模式,能够在保证数据安全的前提下,为用户提供便捷的多设备同步体验。这对于任何需要数据同步的应用都具有重要的参考价值。
关于BeeCount项目
项目特色
- 🎯 现代架构: 基于Riverpod + Drift + Supabase的现代技术栈
- 📱 跨平台支持: iOS、Android双平台原生体验
- 🔄 云端同步: 支持多设备数据实时同步
- 🎨 个性化定制: Material Design 3主题系统
- 📊 数据分析: 完整的财务数据可视化
- 🌍 国际化: 多语言本地化支持
技术栈一览
- 框架: Flutter 3.6.1+ / Dart 3.6.1+
- 状态管理: Flutter Riverpod 2.5.1
- 数据库: Drift (SQLite) 2.20.2
- 云服务: Supabase 2.5.6
- 图表: FL Chart 0.68.0
- CI/CD: GitHub Actions
开源信息
BeeCount是一个完全开源的项目,欢迎开发者参与贡献:
- 项目主页: https://github.com/TNT-Likely/BeeCount
- 开发者主页: https://github.com/TNT-Likely
- 发布下载: GitHub Releases
参考资源
官方文档
- Supabase官方文档 - Supabase完整使用指南
- Flutter加密指南 - Flutter安全开发实践
学习资源
- Supabase Flutter教程 - 官方Flutter集成教程
- 数据同步最佳实践 - 实时数据同步案例
本文是BeeCount技术文章系列的第3篇,后续将深入探讨主题系统、数据可视化等话题。如果你觉得这篇文章有帮助,欢迎关注项目并给个Star!