首页
关于
Search
1
微服务
34 阅读
2
同步本地Markdown至Typecho站点
28 阅读
3
JavaWeb——后端
18 阅读
4
苍穹外卖
14 阅读
5
智能协同云图库
13 阅读
后端学习
项目
杂项
科研
论文
默认分类
登录
找到
6
篇与
项目
相关的结果
2025-07-30
草稿
核心比较如下: 核心点 Apache HttpClient Retrofit 编程模型 细粒度调用,手动构造 HttpGet/HttpPost 注解驱动接口方法,声明式调用 请求定义 手动拼接 URL、参数 用 @GET/@POST、@Path、@Query、@Body 注解 序列化/反序列化 手动调用 ObjectMapper/Gson 自动通过 ConverterFactory(Jackson/Gson 等) 同步/异步 以同步为主,异步需自行管理线程和回调 同一个 Call<T> 即可 execute()(同步)或 enqueue()(异步) 扩展性与拦截器 可配置拦截器,但需手动集成 底层基于 OkHttp,天然支持拦截器、连接池、缓存、重试和取消
项目
zy123
3天前
0
0
0
2025-06-20
拼团交易系统
拼团交易系统 系统备忘录 本系统涉及微信和支付宝的回调。 1.微信扫码登录,*https://mp.weixin.qq.com/debug/cgi-bin/sandboxinfo?action=showinfo&t=sandbox/index*平台上配置了扫描通知地址,如果是本地测试,需要打开frp内网穿透,然后填的地址是frp建立通道的服务器端的ip:端口 2.支付宝,用户付款成功回调,也是同理,本地测试就要开frp。注意frp中的通道,默认是本地端口=远程端口,但是如果在服务器上部署了一套,那么远程的端口就会与frp的端口冲突!!!导致本地测试的时候失效。 流程: 用户锁单-》支付宝付款-》成功后return_url设置了用户支付完毕后跳转回哪个地址是给前端用户看的; alipay_notify_url设置了支付成功后alipay调用你的后端哪个接口。 这里有小商城和拼团系统,notify_url指拼团系统中拼团达到指定人数后,通知小商城的地址,这里用rabbitmq。然后小商场将订单中相应拼团的status都设置为deal_done。然后小商场内部也再发一个'支付成功'消息,主要用于通知这些拼团对应的订单进入下一环节:发货(感觉'支付成功'取名不够直观)。 系统设计 功能流程 库表设计 首先,站在运营的角度,要为这次拼团配置对应的拼团活动。那么就会涉及到;给哪个渠道的什么商品ID配置拼团,这样用户在进入商品页就可以看到带有拼团商品的信息了。之后要考虑,这个拼团的商品所提供的规则信息,包括:折扣、起止时间、人数等。还要拿到折扣的一个试算金额。这个试算出来的金额,就是告诉用户,通过拼团可以拿到的最低价格。 之后,站在用户的角度,是参与拼团。首次发起一个拼团或者参与已存在的拼团进行数据的记录,达成拼团约定拼团人数后,开始进行通知。这个通知的设计站在平台角度可以提供回调,那么任何的系统也就都可以接入了。 另外,为了支持拼团库表,需要先根据业务规则把符合条件的用户 ID 写入 Redis,并为这批用户打上可配置的人群标签。创建拼团活动时,只需关联对应标签,即可让活动自动面向这部分用户生效,实现精准运营和差异化折扣。 那么,拼团活动表,为什么会把折扣拆分出来呢。因为这里的折扣可能有多种迭代到一个拼团上。比如,给一个商品添加了直减10元的优惠,又对符合的人群id的用户,额外打9折,这样就有了2个折扣迭代。所以拆分出来会更好维护。这是对常变的元素和稳定的元素进行设计的思考。 (一)拼团配置表 group_buy_activity 拼团活动 字段名 说明 id 自增ID activity_id 活动ID source 来源 channel 渠道 goods_id 商品ID discount_id 折扣ID group_type 成团方式【0自动成团(到时间后自动成团)、1达成目标成团】 take_limit_count 拼团次数限制 target 达成目标(3人单、5人单) valid_time 拼单时长(20分钟),未完成拼团则=》自动成功or失败 status 活动状态 (活动是否有效,运营可临时设置为失效) start_time 活动开始时间 end_time 活动结束时间 tag_id 人群标签规则标识 tag_scope 人群标签规则范围【多选;可见、参与】 create_time 创建时间 update_time 更新时间 group_buy_discount 折扣配置 字段名 说明 id 自增ID discount_id 折扣ID discount_name 折扣标题 discount_desc 折扣描述 discount_type 类型【base、tag】 market_plan 营销优惠计划【直减、满减、N元购】 market_expr 营销优惠表达式 tag_id 人群标签,特定优惠限定 create_time 创建时间 update_time 更新时间 crowd_tags 人群标签 字段名 说明 id 自增ID tag_id 标签ID tag_name 标签名称 tag_desc 标签描述 statistics 人群标签统计量 200\10万\100万 create_time 创建时间 update_time 更新时间 crowd_tags_detail 人群标签明细(写入缓存) 字段名 说明 id 自增ID tag_id 标签ID user_id 用户ID create_time 创建时间 update_time 更新时间 crowd_tags_job 人群标签任务 字段名 说明 id 自增ID tag_id 标签ID batch_id 批次ID tag_type 标签类型【参与量、消费金额】 tag_rule 标签规则【限定参与N次】 stat_start_time 统计开始时间 stat_end_time 统计结束时间 status 计划、重置、完成 create_time 创建时间 update_time 更新时间 拼团活动表:设定了拼团的成团规则,人群标签的使用可以限定哪些人可见,哪些人可参与。 折扣配置表:拆分出拼团优惠到一个新的表进行多条配置。如果折扣还有更多的复杂规则,则可以配置新的折扣规则表进行处理。 人群标签表:专门来做人群设计记录的,这3张表就是为了把符合规则的人群ID,也就是用户ID,全部跑任务到一个记录下进行使用。比如黑玫瑰人群、高净值人群、拼团履约率90%以上的人群等。 (二)参与拼团表 group_buy_account 拼团账户 字段名 说明 id 自增ID user_id 用户ID activity_id 活动ID take_limit_count 拼团次数限制 take_limit_count_used 拼团次数消耗 create_time 创建时间 update_time 更新时间 group_buy_order 用户拼单 一条记录 = 一个拼团团队(team_id 唯一) 字段名 说明 id 自增ID team_id 拼单组队ID activity_id 活动ID source 渠道 channel 来源 original_price 原始价格 deduction_price 折扣金额 pay_price 支付价格 target_count 目标数量 complete_count 完成数量 status 状态(0-拼单中、1-完成、2-失败) create_time 创建时间 update_time 更新时间 group_buy_order_list 用户拼单明细 一条记录 = 某用户在该团队里锁的一笔单 字段名 说明 id 自增ID user_id 用户ID team_id 拼单组队ID order_id 订单ID activity_id 活动ID start_time 活动开始时间 end_time 活动结束时间 goods_id 商品ID source 渠道 channel 来源 original_price 原始价格 deduction_price 折扣金额 status 状态;0 初始锁定、1 消费完成 out_trade_no 外部交易单号(确保外部调用唯一幂等) create_time 创建时间 update_time 更新时间 notify_task 回调任务 字段名 说明 id 自增ID activity_id 活动ID order_id 拼单ID notify_url 回调接口 notify_count 回调次数(3-5次) notify_status 回调状态【初始、完成、重试、失败】 parameter_json 参数对象 create_time 创建时间 update_time 更新时间 拼团账户表:记录用户的拼团参与数据,一个是为了限制用户的参与拼团次数,另外是为了人群标签任务统计数据。 用户拼单表:当有用户发起首次拼单的时候,产生拼单id,并记录所需成团的拼单记录,另外是写上拼团的状态、唯一索引、回调接口等。这样拼团完成就可以回调对接的平台,通知完成了。【微信支付也是这样的设计,回调支付结果,这样的设计可以方便平台化对接】当再有用户参与后,则写入用户拼单明细表。直至达成拼团。 回调任务表:当拼团完成后,要做回调处理。但可能会有失败,所以加入任务的方式进行补偿。如果仍然失败,则需要对接的平台,自己查询拼团结果。 架构设计 MVC架构: DDD架构: 价格试算 @Service @RequiredArgsConstructor public class IndexGroupBuyMarketServiceImpl implements IIndexGroupBuyMarketService { private final DefaultActivityStrategyFactory defaultActivityStrategyFactory; @Override public TrialBalanceEntity indexMarketTrial(MarketProductEntity marketProductEntity) throws Exception { StrategyHandler<MarketProductEntity, DefaultActivityStrategyFactory.DynamicContext, TrialBalanceEntity> strategyHandler = defaultActivityStrategyFactory.strategyHandler(); TrialBalanceEntity trialBalanceEntity = strategyHandler.apply(marketProductEntity, new DefaultActivityStrategyFactory.DynamicContext()); return trialBalanceEntity; } } IndexGroupBuyMarketService │ │ indexMarketTrial() ▼ DefaultActivityStrategyFactory │ (return rootNode) ▼ RootNode.apply() │ doApply() (执行) │ router() (路由到下一node) ▼ SwitchNode.apply() │ ... ▼ ... (可能还有其他节点) ▼ EndNode.apply() → 组装结果并返回 TrialBalanceEntity ▲ └────────── 最终一路向上 return IndexGroupBuyMarketService 是领域服务,整个价格试算的入口 DefaultActivityStrategyFactory 帮你拿到 根节点,真正的“工厂”工作(多线程预处理、分支路由)都在各 Node 里完成。 DynamicContext 是一次性创建的共享上下文:谁需要谁就往里放 人群标签数据采集 步骤 目的 说明 1. 记录日志 标明本次批次任务的开始 方便后续排查、链路追踪 2. 读取批次配置 拿到该批次统计范围、规则、时间窗等 若返回 null 通常代表批次号错误或已被清理 3. 采集候选用户 从业务数仓/模型结果里拉取符合条件的用户 ID 列表 真实场景中会:• 调 REST / RPC 拿画像• 或扫离线结果表• 或读 Kafka 流 4. 双写标签明细 将每个用户与标签的关系永久化 & 提供实时校验能力 方法内部两件事:• 插入 crowd_tags_detail 表• 在 Redis BitMap 中把该用户对应位设为 1(幂等处理冲突) 5. 更新统计量 维护标签当前命中人数,用于运营看板 这里简单按“新增条数”累加,也可改为重新 count(*) 全量回填 6. 结束 方法返回 void 如果过程抛异常,调度系统可重试/报警 一句话总结 这是一个被定时器或消息触发的离线批量打标签任务: 拉取任务规则 → (离线)筛出符合条件的用户 → 写库 + 写 Redis 位图 → 更新命中人数。 之后业务系统就能用位图做到毫秒级 isUserInTag(userId, tagId) 判断,实现精准运营投放。 Bitmap(位图) 概念 Bitmap 又称 Bitset,是一种用位(bit)来表示状态的数据结构。 它把一个大的“布尔数组”压缩到最小空间:每个元素只占 1 位,要么 0(False)、要么 1(True)。 为什么用 Bitmap? 超高空间效率:1000 万个用户,只需要约 10 MB(1000 万 / 8)。 超快操作:检查某个索引位是否为 1、计数所有“1”的个数(BITCOUNT)、找出第一个“1”的位置(BITPOS)等,都是 O(1) 或者极快的位运算。 典型场景 用户标签 / 权限判断:把符合某个条件的用户的索引位置设置为 1,以后实时判断“用户 X 是否在标签 A 中?”就只需读一个 bit。 海量去重 / 布隆过滤器:在超大流量场景下判断“URL 是否已访问过”、“手机号是否已注册”等。 统计分析:快速统计某个条件下有多少个用户/对象符合(BITCOUNT)。 拼团交易锁单 下单到支付中间有一个流程,即锁单,比如淘宝京东中,在这个环节(限定时间内)选择使用优惠券、京豆等,可以得到优惠价,再进行支付;拼团场景同理,先加入拼团,进行锁单,然后优惠试算,最后才付款。 拼团结算 对接商城和拼团系统 上面左侧,小型支付商城,用户下单过程。增加一个营销锁单,从营销锁单中获取拼团营销拿到的优惠价格。之后小型商城继续往下走,创建支付订单。右侧,拼团交易平台,提供营销锁单流程,锁单目标、优惠试算,规则过滤,最终落库和返回结果(订单ID、原始价格、折扣金额、支付金额、订单状态)。 下面小型支付商城在用户完成支付后,调用拼团组队结算,更新当前xx拼团完成人数,当拼团完成后接收回调消息执行后续交易结算发货(暂时模拟的)。 注意两个回调函数不要搞混:1:alipay_notify_url,用户支付成功后支付宝的回调,为后端服务设定的回调地址,支付宝告诉pay-mall当前用户支付完毕,可以调用拼团组队结算。return_rul,用户付款后自动跳转到的地址,即跳转回首页,和前端跳转有关。gateway-url,支付宝提供的本商家的用户付款地址。 2:group-buy-market_notify-url,由pay-mall商城设置的回调,某teamId拼团达到目标人数时,拼团成功会触发该回调,告诉pay-mall可以进行下一步动作,比如给该teamId下的所有人发货。 本地对接 在 group-buying-sys 项目中,对 group-buying-api 模块执行 mvn clean install(或在 IDE 中运行 install)。这会将该模块的 jar 安装到本地 Maven 仓库(~/.m2/repository)。然后在 pay-mall 项目的 pom.xml 中添加依赖,使用相同的 groupId、artifactId 和 version 即可引用该模块,如下所示: <dependency> <groupId>edu.whut</groupId> <artifactId>group-buying-api</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency> 发包 仅适用于本地,共用一个本地Maven仓库,一旦换台电脑或者在云服务器上部署,无法就这样引入,因此可以进行发包。这里使用阿里云效发包https://packages.aliyun.com/ 1)点击制品仓库->生产库 2)下载settings-aliyun.xml文件并保存至本地的Maven的conf文件夹中。 3) 配置项目的Maven仓库为阿里云提供的这个,而不是自己的本地仓库。 4)发包,打开Idea中的Maven,双击deploy 5)验证 6)使用 将公共镜像仓库的settings文件和阿里云效的settings文件合并,可以同时拉取公有依赖和私有包。 逆向工程:退单 逆向的流程,要分析用户是在哪个流程节点下进行中断行为。包括3个场景; 已锁单、未支付 用户行为:完成锁单后未发起支付。 结果:订单超时自动关单。 补偿 若用户在临界时刻支付,则需执行“逆向退款”流程——退还支付金额并告知“优惠已过期,请重新参与”。 否则该订单自动失效,释放拼团名额给后续用户。 已锁单、已支付,但拼团未成团 用户行为:完成支付,组团人数不足暂未成团。 补偿策略 (可配置优先级): 先退拼团,再退款, 先退款,再退拼团 具体执行哪种方式,可由拼团活动策略决定——“优先保障个人”或“优先保障成团”。 已锁单、已支付,且拼团已成团 用户行为:支付成功,且组团人数已凑齐。 补偿流程 先退还用户支付金额; 再撤销对应的拼团完成量。 注意:已成团订单视为“已完成含退单”,仍然成团、不再开放新用户参与,确保团队成团状态一致。 策略模板应用 根据订单状态和拼团状态动态选择退单策略。 收获 实体对象 实体是指具有唯一标识的业务对象。 在 DDD 分层里,Domain Entity ≠ 数据库 PO。 在 edu.whut.domain.*.model.entity 包下放的是纯粹的业务对象,它们只表达业务语义(团队 ID、活动时间、优惠金额……),对「数据持久化细节」保持无感知。因此它们看起来“字段不全”是正常的: 它们不会带 @TableName / @TableId 等 MyBatis-Plus 注解; 也不会出现数据库的技术字段(id、create_time、update_time、status 等); 只保留聚合根真正需要的业务属性与行为。 @Data @Builder @AllArgsConstructor @NoArgsConstructor public class PayActivityEntity { /** 拼单组队ID */ private String teamId; /** 活动ID */ private Long activityId; /** 活动名称 */ private String activityName; /** 拼团开始时间 */ private Date startTime; /** 拼团结束时间 */ private Date endTime; /** 目标数量 */ private Integer targetCount; } 这个也是实体对象,因为多个字段的组合:teamId和activityId能唯一标识这个实体。 模板方法 核心思想: 在抽象父类中定义算法骨架(固定执行顺序),把某些可变步骤留给子类重写;调用方只用模板方法,保证流程一致。 Client ───▶ AbstractClass ├─ templateMethod() ←—— 固定流程 │ step1() │ step2() ←—— 抽象,可变 │ step3() └─ hookMethod() ←—— 可选覆盖 ▲ │ extends ┌──────────┴──────────┐ │ ConcreteClassA/B… │ 示例: // 1. 抽象模板 public abstract class AbstractDialog { // 模板方法:固定调用顺序,设为 final 防止子类改流程 public final void show() { initLayout(); bindEvent(); beforeDisplay(); // 钩子,可选 display(); afterDisplay(); // 钩子,可选 } // 具体公共步骤 private void initLayout() { System.out.println("加载通用布局文件"); } // 需要子类实现的抽象步骤 protected abstract void bindEvent(); // 钩子方法,默认空实现 protected void beforeDisplay() {} protected void afterDisplay() {} private void display() { System.out.println("弹出对话框"); } } // 2. 子类:登录对话框 public class LoginDialog extends AbstractDialog { @Override protected void bindEvent() { System.out.println("绑定登录按钮事件"); } @Override protected void afterDisplay() { System.out.println("focus 到用户名输入框"); } } // 3. 调用 public class Demo { public static void main(String[] args) { AbstractDialog dialog = new LoginDialog(); dialog.show(); /* 输出: 加载通用布局文件 绑定登录按钮事件 弹出对话框 focus 到用户名输入框 */ } } 要点 复用公共流程:initLayout()、display() 写一次即可。 限制流程顺序:show() 定为 final,防止子类乱改步骤。 钩子方法:子类可选择性覆盖(如 beforeDisplay)。 责任链 应用场景:日志系统、审批流程、权限校验——任何需要将请求按阶段传递、并由某一环节决定是否继续或终止处理的地方,都非常适合职责链模式。 单例链 典型的责任链模式要点: 解耦请求发送者和处理者:调用者只持有链头,不关心中间环节。 动态组装:通过 appendNext 可以灵活地增加、删除或重排链上的节点。 可扩展:新增处理逻辑只需继承 AbstractLogicLink 并实现 apply,不用改动已有代码。 接口定义:ILogicChainArmory<T, D, R> 提供添加节点方法和获取节点 //定义了责任链的组装接口: public interface ILogicChainArmory<T, D, R> { ILogicLink<T, D, R> next(); //在当前节点中获取下一个节点 ILogicLink<T, D, R> appendNext(ILogicLink<T, D, R> next); //把下一个处理节点挂接上来 } ILogicLink<T, D, R> 继承自 ILogicChainArmory<T, D, R>,并额外声明了核心方法 apply public interface ILogicLink<T, D, R> extends ILogicChainArmory<T, D, R> { R apply(T requestParameter, D dynamicContext) throws Exception; //处理请求 } 抽象基类:AbstractLogicLink public abstract class AbstractLogicLink<T, D, R> implements ILogicLink<T, D, R> { private ILogicLink<T, D, R> next; @Override public ILogicLink<T, D, R> next() { return next; } @Override public ILogicLink<T, D, R> appendNext(ILogicLink<T, D, R> next) { this.next = next; return next; } protected R next(T requestParameter, D dynamicContext) throws Exception { return next.apply(requestParameter, dynamicContext); //交给下一节点处理 } } 子类只需继承它,重写 apply(...),在合适的条件下要么直接处理并返回,要么调用 next(requestParameter, dynamicContext) 继续传递。 使用示例: public class AuthLink extends AbstractLogicLink<Request, Context, Response> { @Override public Response apply(Request req, Context ctx) throws Exception { if (!ctx.isAuthenticated()) { throw new UnauthorizedException(); } // 认证通过,继续下一个环节 return next(req, ctx); } } public class LoggingLink extends AbstractLogicLink<Request, Context, Response> { @Override public Response apply(Request req, Context ctx) throws Exception { System.out.println("Request received: " + req); Response resp = next(req, ctx); System.out.println("Response sent: " + resp); return resp; } } // 组装责任链 放工厂类factory中实现 ILogicLink<Request, Context, Response> chain = new AuthLink() .appendNext(new LoggingLink()) .appendNext(new BusinessLogicLink()); //客户端使用 Request req = new Request(...); Context ctx = new Context(...); Response resp = chain.apply(req, ctx); 示例图: AuthLink.apply └─▶ LoggingLink.apply └─▶ BusinessLogicLink.apply └─▶ 返回 Response 这种模式链上的每个节点都手动 next()到下一节点。 多例链 /** * 通用逻辑处理器接口 —— 责任链中的「节点」要实现的核心契约。 */ public interface ILogicHandler<T, D, R> { /** * 默认的 next占位实现,方便节点若不需要向后传递时直接返回 null。 */ default R next(T requestParameter, D dynamicContext) { return null; } /** * 节点的核心处理方法。 */ R apply(T requestParameter, D dynamicContext) throws Exception; } /** * 业务链路容器 —— 双向链表实现,同时实现 ILogicHandler,从而可以被当作单个节点使用。 */ public class BusinessLinkedList<T, D, R> extends LinkedList<ILogicHandler<T, D, R>> implements ILogicHandler<T, D, R>{ public BusinessLinkedList(String name) { super(name); } /** * BusinessLinkedList是头节点,它的apply方法就是循环调用后面的节点,直至返回。 * 遍历并执行链路。 */ @Override public R apply(T requestParameter, D dynamicContext) throws Exception { Node<ILogicHandler<T, D, R>> current = this.first; // 顺序执行,直到链尾或返回结果 while (current != null) { ILogicHandler<T, D, R> handler = current.item; R result = handler.apply(requestParameter, dynamicContext); if (result != null) { // 节点命中,立即返回 return result; } //result==null,则交给那一节点继续处理 current = current.next; } // 全链未命中 return null; } } /** * 链路装配工厂 —— 负责把一组 ILogicHandler 顺序注册到 BusinessLinkedList 中。 */ public class LinkArmory<T, D, R> { private final BusinessLinkedList<T, D, R> logicLink; /** * @param linkName 链路名称,便于日志排查 * @param logicHandlers 节点列表,按传入顺序链接 */ @SafeVarargs public LinkArmory(String linkName, ILogicHandler<T, D, R>... logicHandlers) { logicLink = new BusinessLinkedList<>(linkName); for (ILogicHandler<T, D, R> logicHandler: logicHandlers){ logicLink.add(logicHandler); } } /** 返回组装完成的链路 */ public BusinessLinkedList<T, D, R> getLogicLink() { return logicLink; } } //工厂类,可以定义多条责任链,每条有自己的Bean名称区分。 @Bean("tradeRuleFilter") public BusinessLinkedList<TradeRuleCommandEntity, DynamicContext, TradeRuleFilterBackEntity> tradeRuleFilter(ActivityUsabilityRuleFilter activityUsabilityRuleFilter, UserTakeLimitRuleFilter userTakeLimitRuleFilter) { // 1. 组装链 LinkArmory<TradeRuleCommandEntity, DynamicContext, TradeRuleFilterBackEntity> linkArmory = new LinkArmory<>("交易规则过滤链", activityUsabilityRuleFilter, userTakeLimitRuleFilter); // 2. 返回链容器(即可作为责任链使用) return linkArmory.getLogicLink(); } 示例图: BusinessLinkedList.apply ←─ 只有这一层在栈里 while 循环: ├─▶ 调用 ActivityUsability.apply → 返回 null → 继续 ├─▶ 调用 UserTakeLimit.apply → 返回 null → 继续 └─▶ 调用 ... → 返回 Result → break 链头拿着“游标”一个个跑,节点只告诉“命中 / 未命中”。 规则树流程 ! 整体分层思路 分层 作用 关键对象 通用模板层 抽象出与具体业务无关的「规则树」骨架,解决 如何找到并执行策略 的共性问题 StrategyMapper、StrategyHandler、AbstractStrategyRouter<T,D,R> 业务装配层 基于模板,自由拼装出 一棵 贴合业务流程的策略树 RootNode / SwitchRoot / MarketNode / EndNode … 对外暴露层 通过 工厂 + 服务支持类 将整棵树封装成一个可直接调用的 StrategyHandler,并交给 Spring 整体托管 DefaultActivityStrategyFactory、AbstractGroupBuyMarketSupport 通用模板层:规则树的“骨架” 角色 职责 关系 StrategyMapper 映射器:依据 requestParameter + dynamicContext 选出 下一个 策略节点 被 AbstractStrategyRouter 调用 StrategyHandler 处理器:真正执行业务逻辑;apply 结束后可返回结果或继续路由 节点本身 / 路由器本身都是它的实现 AbstractStrategyRouter<T,D,R> 路由模板:① 调用 get(...) 找到合适的 StrategyHandler;② 调用该 handler 的 apply(...);③ 若未命中则走 defaultStrategyHandler 同时实现 StrategyMapper 与 StrategyHandler,但自身保持 抽象,把细节延迟到子类 业务装配层:一棵可编排的策略树 RootNode -> SwitchRoot -> MarketNode -> EndNode ↘︎ OtherNode ... 每个节点 继承 AbstractStrategyRouter 实现 get(...):决定当前节点的下一跳是哪一个节点 实现 apply(...):实现节点自身应做的业务动作(或继续下钻) 组合方式 比责任链更灵活: 一个节点既可以“继续路由”也可以“自己处理完直接返回” 可以随时插拔 / 替换子节点,形成多分支、循环、早停等复杂流转 对外暴露层:工厂 + 服务支持类 组件 主要职责 DefaultActivityStrategyFactory (@Service) 工厂:1. 在 Spring 启动时注入根节点 RootNode;2. 暴露统一入口 strategyHandler() → 返回整个策略树顶点(一个 StrategyHandler 实例) AbstractGroupBuyMarketSupport 业务服务基类:封装拼团场景下共用的查询、工具方法;供每个节点继承使用 这样,调用方只需 TrialBalanceEntity result = factory.strategyHandler().apply(product, new DynamicContext(vo1, vo2)); 就能驱动整棵策略树,而完全不用关心节点搭建、依赖注入等细节。 策略模式 核心思想: 把可互换的算法/行为抽成独立策略类,运行时由“上下文”对象选择合适的策略;对调用方来说,只关心统一接口,而非具体实现。 ┌───────────────┐ │ Client │ └─────▲─────────┘ │ has-a ┌─────┴─────────┐ implements │ Context │────────────┐ ┌──────────────┐ │ (使用者) │ strategy └─▶│ Strategy A │ └───────────────┘ ├──────────────┤ │ Strategy B │ └──────────────┘ 集合自动注入 常见于策略/工厂/插件场景。 @Autowired private Map<String, IDiscountCalculateService> discountCalculateServiceMap; 字段类型:Map<String, IDiscountCalculateService> key—— Bean 的名字 默认是类名首字母小写 (mjCalculateService) 或者你在实现类上显式写的 @Service("MJ") value —— 那个实现类对应的实例 Spring 机制: 启动时扫描所有实现 IDiscountCalculateService 的 Bean。 把它们按 “BeanName → Bean 实例” 的映射注入到这张 Map 里。 你一次性就拿到了“策略字典”。 示例: @Service("MJ") // ★ 关键:Bean 名即策略键 public class MJCalculateService extends IDiscountCalculateService { @Override protected BigDecimal Calculate(String userId, BigDecimal originalPrice, GroupBuyActivityDiscountVO.GroupBuyDiscount groupBuyDiscount) { //忽略实现细节 } @Component @RequiredArgsConstructor // 构造器注入更推荐 public class DiscountContext { private final Map<String, IDiscountCalculateService> discountServiceMap; public BigDecimal calc(String strategyKey, String userId, BigDecimal originalPrice, GroupBuyActivityDiscountVO.GroupBuyDiscount plan) { //strategyKey可以是"MJ" .. IDiscountCalculateService strategy = discountServiceMap.get(strategyKey); if (strategy == null) { throw new IllegalArgumentException("无匹配折扣类型: " + strategyKey); } return strategy.calculate(userId, originalPrice, plan); } } 多线程异步调用 如果某任务比较耗时(如加载大量数据),可以考虑开多线程异步调用。 // Runnable ➞ 只能 run(),没有返回值 public interface Runnable { void run(); } // Callable<V> ➞ call() 能返回 V,也能抛检查型异常 public interface Callable<V> { V call() throws Exception; } public class MyTask implements Callable<String> { private final String name; public MyTask(String name) { this.name = name; } @Override public String call() throws Exception { // 模拟耗时操作 TimeUnit.MILLISECONDS.sleep(300); return "任务[" + name + "]的执行结果"; } } public class SimpleAsyncDemo { public static void main(String[] args) { // 创建大小为 2 的线程池 ExecutorService pool = Executors.newFixedThreadPool(2); try { // 构造两个任务 MyTask task1 = new MyTask("A"); MyTask task2 = new MyTask("B"); // 用 FutureTask 包装 Callable FutureTask<String> future1 = new FutureTask<>(task1); FutureTask<String> future2 = new FutureTask<>(task2); // 提交给线程池异步执行 pool.execute(future1); pool.execute(future2); // 主线程可以先做别的事… System.out.println("主线程正在做其他事情…"); // 在需要的时候再获取结果(可加超时) String result1 = future1.get(1, TimeUnit.SECONDS); //设置超时时间1秒 String result2 = future2.get(); //无超时时间 System.out.println("拿到结果1 → " + result1); System.out.println("拿到结果2 → " + result2); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { System.err.println("任务执行中出错: " + e.getCause()); } catch (TimeoutException e) { System.err.println("等待结果超时"); } finally { pool.shutdown(); } } } 动态配置(热更新) 原理:借助 Redis 的发布/订阅(Pub/Sub)能力,在程序跑起来以后,动态地往某个频道推送一条消息,然后所有订阅了该频道的 Bean 都会收到通知,进而反射更新它们身上的对应字段。 启动时 ────────────────────────────────────▶ BeanPostProcessor │ 扫描 @DCCValue 写入默认 / 读取 Redis │ 注入字段值 缓存 key→Bean ───────────────────────────────────────────────────────────────── 运行时 管理后台调用 ───▶ publish("myKey,newVal") ───▶ Redis Pub/Sub │ │ │ ▼ │ RTopic listener 收到消息 │ └─ ▸ 写回 Redis │ └─ ▸ 从 Map 找到 Bean │ └─ ▸ 反射注入新值到字段 ▼ Bean 字段热更新完成 实现步骤 注解标记 用 @DCCValue("key:default") 标注需要动态注入的字段,指定对应的 Redis Key(带前缀)及默认值。 // 标记要动态注入的字段 @Retention(RUNTIME) @Target(FIELD) public @interface DCCValue { String value(); // "key:default" } // 业务使用示例 @Service public class MyFeature { @DCCValue("myFlag:0") private String myFlag; public boolean enabled() { return "1".equals(myFlag); } } 启动时注入 实现一个 BeanPostProcessor,在每个 Spring Bean 初始化后: 扫描带 @DCCValue 的字段; 拼出完整 Redis Key(如 dcc_prefix_key),若不存在则写入默认值,否则读最新值; 反射把值注入到该 Bean 的私有字段; 将 (redisKey → Bean 实例) 记录到内存映射,用于后续热更新。 @Override public Object postProcessAfterInitialization(Object bean, String name) { // 确定真实的目标类:处理代理 Bean 或普通 Bean Class<?> cls = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass(); // 遍历所有字段,寻找标注了 @DCCValue 的配置字段 for (Field f : cls.getDeclaredFields()) { DCCValue dv = f.getAnnotation(DCCValue.class); if (dv == null) { continue; // 如果该字段未被 @DCCValue 注解标注,则跳过 } // 注解值格式为 "key:default",拆分获取配置项的 key 和默认值 String[] parts = dv.value().split(":"); String key = PREFIX + parts[0]; // Redis 中存储该配置的完整 Key String defaultValue = parts[1]; // 默认值 // 从 Redis 获取配置,如果不存在则使用默认值,并同步写入 Redis RBucket<String> bucket = redis.getBucket(key); String val = bucket.isExists() ? bucket.get() : defaultValue; bucket.trySet(defaultValue); // 如果 Redis 中没有该 Key,则写入默认值 // 反射方式将值注入到 Bean 的字段上(即动态替换该字段的值) injectField(bean, f, val); // 将该 Bean 注册到映射表,以便后续热更新时找到实例并更新字段 beans.put(key, bean); } return bean; // 返回处理后的 Bean } 运行时热更新 在同一个组件里,订阅一个 Redis Topic(频道),比如 "dcc_update"; 外部调用发布接口 PUBLISH dcc_update "key,newValue"; //更新配置 @GetMapping("/dcc/update") public void update(@RequestParam String key, @RequestParam String value) { dccTopic().publish(key + "," + value); } 订阅者收到后: 同步把新值写回 Redis; 从映射里取出对应 Bean,反射更新它的字段。 // 发布/订阅频道,用于接收 DCC 配置的热更新消息 @Bean public RTopic dccTopic() { // 1. 从 RedissonClient 中获取名为 "dcc_update" 的主题(Topic),后续会订阅这个频道 RTopic t = redis.getTopic("dcc_update"); // 2. 为该主题添加监听器,消息格式为 String t.addListener(String.class, (channel, msg) -> { // 3. msg 约定格式:"configKey,newValue",先按逗号分割出 key 和 value String[] a = msg.split(","); String key = PREFIX + a[0]; // 拼出完整的 Redis Key String val = a[1]; // 新的配置值 // 4. 检查 Redis 中是否已存在该 Key(只对已注册的配置生效) RBucket<String> bucket = redis.getBucket(key); if (!bucket.isExists()) { return; // 如果不是我们关心的配置,跳过 } // 5. 把新值同步写回 Redis,保证持久化 bucket.set(val); // 6. 从内存缓存中取出当初注入该 key 的 Bean 实例 Object bean = beans.get(key); if (bean != null) { // 7. 通过反射把新的配置值重新注入到 Bean 的字段上,完成热更新 injectField(bean, a[0], val); } }); // 8. 返回这个 RTopic Bean,让 Spring 容器管理 return t; } OkHttpClient 引入依赖 <dependency> <groupId>com.squareup.okhttp3</groupId> <artifactId>okhttp-sse</artifactId> </dependency> 让Spring 管理 Http客户端 写配置类 @Configuration public class OKHttpClientConfig { @Bean public OkHttpClient httpClient() { return new OkHttpClient(); } } 在需要使用的地方注入 @Slf4j @Service @RequiredArgsConstructor public class HttpService { private final OkHttpClient okHttpClient; /** * 发送 JSON POST 请求并返回响应内容 * * @param apiUrl 接口地址 * @param jsonPayload 请求体 JSON 字符串 */ public String postJson(String apiUrl, String jsonPayload) throws IOException { //1.构建参数 MediaType mediaType = MediaType.get("application/json; charset=utf-8"); RequestBody body = RequestBody.create(jsonPayload, mediaType); Request request = new Request.Builder() .url(apiUrl) .post(body) .addHeader("Content-Type", "application/json") .build(); //2.调用接口 try (Response response = okHttpClient.newCall(request).execute()) { if (!response.isSuccessful()) { log.error("HTTP 请求失败,URL:{},状态码:{}", apiUrl, response.code()); throw new IOException("Unexpected HTTP code " + response.code()); } ResponseBody responseBody = response.body(); return responseBody != null ? responseBody.string() : ""; } catch (IOException e) { log.error("调用 HTTP 接口异常:{}", apiUrl, e); throw e; } } } 优点: 单例复用,性能更优 Spring 默认将 Bean 作为单例管理,整个应用只创建一次 OkHttpClient。 内部的连接池、线程池、缓存等资源可以被复用,避免频繁创建、销毁带来的开销。 统一配置,易于维护 超时、拦截器、连接池、SSL、日志等配置集中在一个地方,改动一次全局生效。 避免在代码各处手动 new OkHttpClient()、重复配置。 Retrofit 微信登录时,需要调用微信提供的接口做验证。 快速入门 // 1. 定义 DTO public class User { private String id; private String name; // … 省略 getters/setters … } // 2. 定义 Retrofit 接口 public interface ApiService { @GET("users/{id}") Call<User> getUser(@Path("id") String id); } // 3. 配置 Retrofit 并注册为 Spring Bean @Configuration public class RetrofitConfig { private static final String BASE_URL = "https://api.example.com/"; @Bean public Retrofit retrofit() { return new Retrofit.Builder() .baseUrl(BASE_URL) // 公共前缀 .addConverterFactory(JacksonConverterFactory.create()) // 自动 JSON ↔ DTO .build(); } @Bean public ApiService apiService(Retrofit retrofit) { // 动态生成 ApiService 实现 return retrofit.create(ApiService.class); } } // 4. 在业务层注入并调用 @Service public class UserService { private final ApiService apiService; public UserService(ApiService apiService) { this.apiService = apiService; } /** * 同步方式获取用户信息 */ public User getUserById(String userId) { try { Call<User> call = apiService.getUser(userId); Response<User> resp = call.execute(); if (resp.isSuccessful()) { return resp.body(); } else { // 根据业务需要抛出异常或返回 null throw new RuntimeException("请求失败,HTTP " + resp.code()); } } catch (Exception e) { throw new RuntimeException("调用用户服务出错", e); } } /** * 异步方式获取用户信息 */ public void getUserAsync(String userId) { apiService.getUser(userId).enqueue(new retrofit2.Callback<User>() { @Override public void onResponse(Call<User> call, Response<User> response) { if (response.isSuccessful()) { User user = response.body(); // TODO: 处理 user } } @Override public void onFailure(Call<User> call, Throwable t) { // TODO: 处理异常 } }); } } Retrofit 在运行时会生成这个接口的实现类,帮你完成: 拼 URL(把 {id} 换成具体值) 发起 GET 请求 拿到响应的 JSON 并自动反序列化成 User 对象 核心点 Apache HttpClient Retrofit 编程模型 细粒度调用,手动构造 HttpGet/HttpPost 注解驱动接口方法,声明式调用 请求定义 手动拼接 URL、参数 用 @GET/@POST、@Path、@Query、@Body 注解 序列化/反序列化 手动调用 ObjectMapper/Gson 自动通过 ConverterFactory(Jackson/Gson 等) 同步/异步 以同步为主,异步需自行管理线程和回调 同一个 Call<T> 即可 execute()(同步)或 enqueue()(异步) 扩展性与拦截器 可配置拦截器,但需手动集成 底层基于 OkHttp,天然支持拦截器、连接池、缓存、重试和取消 公众号扫码登录流程 场景:用微信的能力来替你的网站做“扫码登录”或“社交登录”,代替自己写一整套帐号/密码体系。后台只需要基于 openid 做一次性关联(比如把某个微信号和你系统的用户记录挂钩),后续再次扫码就当作同一用户; 1.前端请求二维码凭证 用户点击“扫码登录”,前端向后端发 GET /api/v1/login/weixin_qrcode_ticket。 后端获取 access_token 1.先尝试从本地缓存(如 Guava Cache)读取 access_token; 2.若无或已过期,则请求微信接口: GET https://api.weixin.qq.com/cgi-bin/token ?grant_type=client_credential &appid={你的 AppID} &secret={你的 AppSecret} 微信返回 { "access_token":"ACCESS_TOKEN_VALUE", "expires_in":7200 },后端缓存这个值(有效期约 2 小时)。 后端利用 access_token 创建二维码 ticket,返给前端。(每次调用微信会返回不同的ticket) 2.前端展示二维码 前端根据 ticket 生成二维码链接:https://mp.weixin.qq.com/cgi-bin/showqrcode?ticket={ticket} 3.微信回调后端 用户确认扫描后,微信服务器向你预先配置的回调 URL(如 POST /api/v1/weixin/portal/receive)推送包含 ticket 和 openid 的消息。 后端:将 ticket → openid 存入缓存(openidToken.put(ticket, openid));调用 sendLoginTemplate(openid) 给用户推送“登录成功”模板消息(手机公众号上推送,非网页) 4.前端获知登录结果 轮询方式:生成二维码后,前端每隔几秒向后端 check_login 接口发送 ticket来验证登录状态,后端查缓存来判断 ticket 对应用户是否成功登录。 推送方式:前端通过 WebSocket/SSE 建立长连接,后端回调处理完成后直接往该连接推送登录成功及 JWT。 浏览器指纹获取登录ticket 在扫码登录流程的基础上改进!!! 目的:把「这张二维码/ticket」严格绑在发起请求的那台浏览器上,防止别的设备或会话拿到同一个 ticket 就能登录。 1.生成指纹 前端在用户打开「扫码登录页」时,先用 JS/浏览器 API(比如 User-Agent、屏幕分辨率、插件列表、Canvas 指纹等)算出一个唯一的浏览器指纹 fp。 2.获取 ticket 时携带指纹 前端发起请求: GET /api/v1/login/weixin_qrcode_ticket_scene?sceneStr=<fp> 后端执行: String ticket = loginPort.createQrCodeTicket(sceneStr); sceneTicketCache.put(sceneStr, ticket); // 把 fp→ticket 映射进缓存 3.扫码后轮询校验 前端轮询:传入 ticket 和 sceneStr 指纹 GET /api/v1/login/check_login_scene?ticket=<ticket>&sceneStr=<fp> 后端逻辑(简化): // 1) 验证拿到的 sceneStr(fp) 对应的 ticket 是否一致 String cachedTicket = sceneTicketCache.getIfPresent(sceneStr); if (!ticket.equals(cachedTicket)) { // fp 不匹配,拒绝 return NO_LOGIN; } // 2) 再看 ticket→openid 有没有被写入(扫码并回调后,saveLoginState 会写入) String openid = ticketOpenidCache.getIfPresent(ticket); if (openid != null) { // 同一浏览器,且已扫码确认,返回 openid(或 JWT) return SUCCESS(openid); } return NO_LOGIN; 4.回调时保存登录状态 当用户扫描二维码,微信会回调你预定的接口地址,拿到 ticket、openid 后,调用: ticketOpenidCache.put(ticket, openid); // 保存 ticket→openid 注意 ticketOpenidCache 和 sceneTicketCache 一般是一个Cache Bean,这里只是为了更清晰。 安全性提升 防止“票据劫持”:别人就算截获了这个 ticket,想拿去自己那台机器上轮询也不行,因为指纹对不上。 防止多人共用:多个人在不同设备上同时扫同一个码,只有最先发起获取 ticket 的那台浏览器能完成登录。 无痕登录 “无痕登录”(又称“免扫码登录”或“静默登录”)的核心思想,是在用户首次通过二维码/授权完成登录后,给这台设备发放一份长期信任凭证,以后再访问就能悄无声息地登录,不再需要人为地再扫码或输入密码。 典型流程 1.初次登录(扫码授权) 即前面**"浏览器指纹获取登录ticket"**的流程 2.后续“无痕”自动登录 1)前端再次打开页面,重新生成指纹 2)前端调用“免扫码”接口,仅传递指纹 3)后端校验 fingerprint → openid String openid = sceneLoginCache.getIfPresent(sceneStr); if (openid != null) { // 直接返回登录态(Session / JWT) return SUCCESS(openid); } else { // 指纹过期或未绑定,返回未登录,前端再走扫码流程 return NO_LOGIN; } 4)成功后,前端拿到 openid/JWT,直接进入应用,无需用户任何操作。 独占锁和无锁化场景(防超卖) 独占锁 适用场景 定时任务互备 多机部署时,确保每天只有一台机器在某个时间点执行同一份任务(如数据清理、报表生成、邮件推送等)。 @Scheduled(cron = "0 0 0 * * ?") public void exec() { // 获取锁句柄,并未真正获取锁 RLock lock = redissonClient.getLock("group_buy_market_notify_job_exec"); try { //尝试获取锁 waitTime = 3:如果当前锁已经被别人持有,调用线程最多等待 3 秒去重试获取;leaseTime = 0:不设过期时间,看门狗机制 boolean isLocked = lock.tryLock(3, 0, TimeUnit.SECONDS); if (!isLocked) return; Map<String, Integer> result = tradeSettlementOrderService.execSettlementNotifyJob(); log.info("定时任务,回调通知拼团完结任务 result:{}", JSON.toJSONString(result)); } catch (Exception e) { log.error("定时任务,回调通知拼团完结任务失败", e); } finally { if (lock.isLocked() && lock.isHeldByCurrentThread()) { lock.unlock(); } } } 无锁化场景 “无锁化”设计 的核心思路是不在整个逻辑上加一把全局互斥锁,而是用 Redis 原子操作 + 后置校验/补偿 来完成并发控制。 原子计数(Atomic Counter) 用 Redis 的 INCR(或 Redisson 的 RAtomicLong.incrementAndGet())来保证并发环境下每次调用都能拿到一个唯一、自增的数字。这个数字可以看作“第 N 个占位请求”。 边界校验+补偿回滚(Validation & Compensation) 拿到新数字后,马上与允许的最大值(target + 已回滚补偿数)做比较: 如果在范围内,视为占位成功; 如果超出范围,则把 Redis 里的计数器重置回 target(即“丢弃”这次多余的自增),并返回失败。 极端兜底锁(Fallback Lock) 虽然 INCR 本身已经原子,但在极端运维或网络抖动下仍有极小几率两次自增同时返回相同值。 因此,针对每个“序号”再做一次最轻量的 SETNX(key:occupySeq): 成功 SETNX → 序号唯一,真正拿到名额; 失败 SETNX → 重复抢号,拒绝这次占位。 典型适用场景 电商秒杀 & 拼团抢购 万级甚至十万级并发下不适合所有请求都排队,必须让绝大多数请求用原子计数并行处理。 抢票系统 票务分配、座位预占,都讲究“先到先得”+“补偿回退”,不能用一把大锁。 @Override public boolean tryOccupy(String counterKey, String recoveryKey, int target, int ttlMinutes) { // 1) 读取“补偿”次数(退款/回滚补偿) Long recovery = redisService.getAtomicLong(recoveryKey); int recovered = (recovery == null ? 0 : recovery.intValue()); // 2) 原子自增,拿到当前序号 long seq = redisService.incr(counterKey); long occupySeq = seq; // 3) 超出“目标 + 补偿池” → 回滚主计数器,失败 if (occupySeq > target + recovered) { redisService.setAtomicLong(counterKey, target); return false; } // 4) 如果用到了补偿名额(序号已经 > target),就从补偿池里减掉一个 //if (occupySeq > target) { // redisService.decr(recoveryKey); //} // 5) 兜底锁:针对每个序号做一次 SETNX,防止极端重复 String lockKey = counterKey + ":lock:" + occupySeq; boolean locked = redisService.setNx(lockKey, ttlMinutes, TimeUnit.MINUTES); if (!locked) { return false; } // 6) 成功占位 return true; } Supplier<T> Supplier<T> 是 Java 8 提供的一个函数式接口 @FunctionalInterface public interface Supplier<T> { /** * 返回一个 T 类型的结果,参数为空 */ T get(); } 任何“无参返回一个 T 类型对象”的代码片段(方法引用或 lambda)都可以当成 Supplier<T> 来用。 作用 1.延迟执行 把“取数据库数据”这类开销大的操作,包装成 Supplier<T> 传进去;只有真正需要时(缓存未命中),才调用 supplier.get() 去跑查询。 2.解耦逻辑 缓存逻辑和查询逻辑分离,缓存组件不用知道“怎么查库”,只负责“啥时候要查”,调用方通过 Supplier 把查库方法交给它。 3.重用性高 同一个缓存-回源模板方法可以服务于任何返回 T 的场景,既可以查 User,也可以查 Order、List<Product>…… // 服务方法:它只关心“缓存优先,否则回源” // dbFallback 是一段延迟执行的查库代码 protected <T> T getFromCacheOrDb(String cacheKey, Supplier<T> dbFallback) { // 1) 先从缓存拿 T v = cache.get(cacheKey); if (v != null) return v; // 2) 缓存没命中,调用 dbFallback.get() 去“回源”拿数据 T fromDb = dbFallback.get(); if (fromDb != null) { cache.put(cacheKey, fromDb); } return fromDb; } // 调用时这么写: User user = getFromCacheOrDb( "user:42", () -> userRepository.findById(42) // 这里的 () -> ... 就是一个 Supplier<User> ); List<Product> list = getFromCacheOrDb( "hot:products", () -> productService.queryHotProducts() // Supplier<List<Product>> ); 动态限流+黑名单 令牌桶算法(Token Bucket) 按固定速率往桶里放“令牌”(tokens),比如每秒放 N 个; 每次请求来临时“取一个令牌”才能通过,取不到就拒绝或降级; 可以做到“流量平滑释放”、“突发流量吸纳”(桶里最多能积攒 M 个令牌)。 核心限流思路 注解驱动拦截:对标记了 @RateLimiterAccessInterceptor 的方法统一进行限流。 分布式限流:基于 Redisson 的 RRateLimiter,可在多实例环境下共享令牌桶。 黑名单机制:对超限用户计数,达到阈值后加入黑名单(24 h 后自动解禁)。 动态开关:通过 DCC 配置中心开关(rateLimiterSwitch)可随时启用或关闭限流。 降级回调:限流或黑名单命中时,通过注解指定的方法反射调用,返回自定义响应。 请求到达 ↓ 检查限流开关(DCC) ↓ 解析限流维度(key,如 userId) ↓ 黑名单校验(RAtomicLong 计数,24h 过期) ↓ 分布式令牌桶限流(RRateLimiter.tryAcquire) ↓ ├─ 通过 → 执行目标方法 └─ 拒绝 → 调用 fallback 方法,记录黑名单次数 对比维度 本地限流 分布式限流 实现复杂度 低:直接用 Guava RateLimiter,几行代码即可接入 中高:依赖 Redis/Redisson,需要注入客户端并管理限流器 性能开销 极低:全程内存操作,纳秒级延迟 中等:每次获取令牌需网络往返,存在 RTT 延迟 限流范围 单实例:仅对当前 JVM 有效,多实例互不影响 全局:多实例共享同一套令牌桶,合计速率可控 状态持久化 & 容错 无:服务重启后状态丢失;实例宕机只影响自身 有:Redis 存储限流器与黑名单,可持久化;需保证 Redis 可用性 监控 & 可观测 弱:需额外上报或埋点才能集中监控 强:可直接查看 Redis Key、TTL、计数等,易做报警与可视化 运维依赖 无:不依赖外部组件 有:需维护高可用的 Redis 集群,增加运维成本 目前本项目使用的是分布式限流,用Redisson 日志系统 输出流向一览 输出到3个地方:控制台、本地文件、ELK日志(服务器上内存不足无法部署!) 日志级别 控制台 本地文件(异步) Logstash (TCP) TRACE/DEBUG — — — INFO ✔ log_info.log ✔ WARN ✔ log_info.log``log_error.log ✔ ERROR/FATAL ✔ log_info.log``log_error.log ✔ 注意:实际写文件时,都是通过 ASYNC_FILE_INFO/ERROR 两个异步 Appender 执行,以免日志写盘阻塞业务线程。 ELK日志系统 本地文件每台机器都会在自己 /data/log/... 目录下滚动输出自己的日志,互相之间不会合并。 如果你希望跨多台服务器统一管理,就需要把日志推到中央端——ELK日志系统 ELK=Elasticsearch(存储&检索)+ Logstash(采集&处理)+ Kibana(可视化) docker-compose.yml: version: '3' services: elasticsearch: image: elasticsearch:7.17.28 ports: ['9201:9200','9300:9300'] environment: - discovery.type=single-node - ES_JAVA_OPTS=-Xms512m -Xmx512m volumes: - ./data:/usr/share/elasticsearch/data logstash: image: logstash:7.17.28 ports: ['4560:4560','9600:9600'] volumes: - ./logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf environment: - LS_JAVA_OPTS=-Xms1g -Xmx1g kibana: image: kibana:7.17.28 ports: ['5601:5601'] environment: - elasticsearch.hosts=http://elasticsearch:9200 networks: default: driver: bridge kibana配置: # # ** THIS IS AN AUTO-GENERATED FILE ** # # Default Kibana configuration for docker target server.host: "0" server.shutdownTimeout: "5s" elasticsearch.hosts: [ "http://elasticsearch:9200" ] # 记得修改ip monitoring.ui.container.elasticsearch.enabled: true i18n.locale: "zh-CN" logstash配置: input { tcp { mode => "server" host => "0.0.0.0" port => 4560 codec => json_lines type => "info" } } filter {} output { elasticsearch { action => "index" hosts => "es:9200" index => "group-buy-market-log-%{+YYYY.MM.dd}" } } 自己的项目: <!-- 上报日志;ELK --> <springProperty name="LOG_STASH_HOST" scope="context" source="logstash.host" defaultValue="127.0.0.1"/> <!--输出到logstash的appender--> <appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender"> <!--可以访问的logstash日志收集端口--> <destination>${LOG_STASH_HOST}:4560</destination> <encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder"/> </appender> <dependency> <groupId>net.logstash.logback</groupId> <artifactId>logstash-logback-encoder</artifactId> <version>7.3</version> </dependency> 使用 检查索引:curl http://localhost:9201/_cat/indices?v3 打开 Kibana:浏览器访问 http://localhost:5601,新建 索引模式(如 app-log-*),即可在 Discover/Visualize 中查看与分析日志。 防止重复下单 外部交易单号设计 统一跟踪:对接小商城时,将外部交易单号(out_trade_no)与小商城下单时生成的 order_id 保持一致,方便全链路追踪。 内部独立:拼团系统内部仍保留自己的 order_id,互不冲突。 在高并发支付场景中,确保同一用户对同一商品/活动只生成一条待支付订单,常用以下两种思路: 业务维度复合唯一索引 + 冲突捕获重试 查询未支付订单 在创建订单时,先根据业务维度(如 userId + goodId + activityId)查询“已下单但未支付”的订单; 若存在,则直接返回该订单,避免二次创建。 复合唯一索引约束 在订单表中对业务维度字段(userId、goodId、activityId 等)添加复合唯一索引; 高并发下若出现并行插入,后续请求因违反唯一约束抛出异常; 捕获异常后,再次查询并返回已创建的订单,实现幂等。 分布式锁保障(可选) 针对同一用户加分布式锁(例如 lock:userId:{userId}),确保只有首个请求能获取锁并创建订单; 后续请求等待锁释放或直接返回“订单处理中”,随后再次查询订单状态。 幂等 Key 模式 生成幂等 Key 前端进入支付流程时调用接口(GET /api/idempotency-key),后端生成全局唯一 ID(UUID 或雪花 ID)返回给前端; 或者外部系统(如小商城)传来唯一的外部交易单号(out_trade_no),天生作为幂等Key。 前端将该 Key 存入内存、LocalStorage 或隐藏表单字段,直至支付完成或过期。 请求携带幂等 Key 用户点击“下单”时,调用 /create_pay_order 接口,需在请求体中附带 idempotencyKey; 服务端根据该 Key 判断:若数据库中已有相同 idempotency_key,直接返回该订单,否则创建新订单。 数据库持久化 & 唯一约束 在订单表中新增 idempotency_key 列,并对其增加唯一索引; 双重保障:前端重复发送同一 Key,也仅能插入一条记录,彻底避免重复下单。
项目
zy123
6月20日
0
4
0
2025-06-07
智能协同云图库
智能协同云图库 待完善功能: 用户模块扩展功能: 2.JWT校验,可能要同时改前端,把userId保存到ThreadLocal中 3.目前这些标签写死了,可以用redis、数据库进行动态设置。(根据点击次数) @GetMapping("/tag_category") public BaseResponse<PictureTagCategory> listPictureTagCategory() { PictureTagCategory pictureTagCategory = new PictureTagCategory(); List<String> tagList = Arrays.asList("热门", "搞笑", "生活", "高清", "艺术", "校园", "背景", "简历", "创意"); List<String> categoryList = Arrays.asList("模板", "电商", "表情包", "素材", "海报"); pictureTagCategory.setTagList(tagList); pictureTagCategory.setCategoryList(categoryList); return ResultUtils.success(pictureTagCategory); } 4.图片审核扩展 5.爬图扩展 2)记录从哪里爬的 4)bing直接搜可能也是缩略图,可能模拟手点一次图片,再爬会清晰一点 6.缓存扩展 图片压缩 文件秒传,md5校验,如果已有,直接返回url,不用重新上传(图片场景不必使用) 分片上传和断点续传:对象存储 上传对象_腾讯云 CDN内容分发,后期项目上线之后搞一下。 浏览器缓存 是服务器(或 CDN/静态文件服务器)在返回资源时下发给浏览器的。 用户空间扩展: 图片编辑 AI扩图 创建图片的业务流程 创建图片主要是包括两个过程:第一个过程是上传图片文件本身,第二个过程是将图片信息上传到数据库。 有两种常见的处理方式: 1.先上传再提交数据(大多数的处理方式):用户直接上传图片,系统自动生成图片的url存储地址;然后在用户填写其它相关信息并提交后才将图片记录保存到数据库中。 2.上传图片时直接记录图片信息:云图库平台中图片作为核心资源,只要用户将图片上传成功就应该把这个图片上传到数据库中(即用户上传图片后系统应该立即生成图片的完整数据记录和其它元信息,这里元信息指的是图片的一些基础信息,这些信息应该是在图片上传成功后就能够解析出来),无需等待用户上传提交图片信息就会立即存入数据库中,这样会使整个交互过程更加轻量。这样的话用户只需要再上传图片的其它信息即可,这样就相当于用户对已有的图片信息进行编辑。 当然我们也可以对用户进行一些限制,比如说当用户上传过多的图片资源时就禁止该用户继续上传图片资源。 优化 协同编辑: 扩展 1、为防止消息丢失,可以使用 Redis 等高性能存储保存执行的操作记录。 目前如果图片已经被编辑了,新用户加入编辑时没办法查看到已编辑的状态,这一点也可以利用 Redis 保存操作记录来解决,新用户加入编辑时读取 Redis 的操作记录即可。 2、每种类型的消息处理可以封装为独立的 Handler 处理器类,也就是采用策略模式。 3、支持分布式 WebSocket。实现思路很简单,只需要保证要编辑同一图片的用户连接的是相同的服务器即可,和游戏分服务器大区、聊天室分房间是类似的原理。 4、一些小问题的优化:比如 WebSocket 连接建立之后,如果用户退出了登录,这时 WebSocket 的连接是没有断开的。不过影响并不大,可以思考下怎么处理。 收获 MybatisX插件简化开发 下载MybatisX插件,可以从数据表直接生成Bean、Mapper、Service,选项设置如下: 注意,勾选Actual Column生成的Bean和表中字段一模一样,取消勾选会进行驼峰转换,即user_name->userName 下载GenerateSerailVersionUID插件,可以右键->generate->生成序列ID: private static final long serialVersionUID = -1321880859645675653L; 胡图工具类hutool 引入依赖 <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.26</version> </dependency> ObjUtil.isNotNull(Object obj),仅判断对象是否 不为 null,不关心对象内容是否为空,比如空字符串 ""、空集合 []、数字 0 等都算是“非 null”。 ObjUtil.isNotEmpty(Object obj) 判断对象是否 不为 null 且非“空” 对不同类型的对象判断逻辑不同: CharSequence(String):长度大于 0 Collection:size > 0 Map:非空 Array:长度 > 0 其它对象:只判断是否为 null(默认不认为“空”) StrUtil.isNotEmpty(String str) 只要不是 null 且长度大于 0 就算“非空”。 StrUtil.isNotBlank(String str) 不仅要非 null,还要不能只包含空格、换行、Tab 等空白字符 StrUtil.hasBlank(CharSequence... strs)只要 **至少一个字符串是 blank(空或纯空格)**就返回 true,底层其实就是对每个参数调用 StrUtil.isBlank(...) CollUtil.isNotEmpty(Collection<?> coll)用于判断 集合(Collection)是否非空,功能类似于 ObjUtil.isNotEmpty(...) BeanUtil.toBean :用来把一个 Map、JSONObject 或者另一个对象快速转换成你的目标 JavaBean public class BeanUtilExample { public static class User { private String name; private Integer age; // 省略 getter/setter } public static void main(String[] args) { // 1. 从 Map 转 Bean Map<String, Object> data = new HashMap<>(); data.put("name", "Alice"); data.put("age", 30); User user1 = BeanUtil.toBean(data, User.class); System.out.println(user1.getName()); // Alice // 2. 从另一个对象转 Bean class Temp { public String name = "Bob"; public int age = 25; } Temp temp = new Temp(); User user2 = BeanUtil.toBean(temp, User.class); System.out.println(user2.getAge()); // 25 } } 多级缓存 Redis+Session 之前我们每次重启服务器都要重新登陆,既然已经整合了 Redis,不妨使用 Redis 管理 Session,更好地维护登录态。 1)先在 Maven 中引入 spring-session-data-redis 库: <!-- Spring Session + Redis --> <dependency> <groupId>org.springframework.session</groupId> <artifactId>spring-session-data-redis</artifactId> </dependency> 2)修改 application.yml 配置文件,更改Session的存储方式和过期时间: 既要设置redis能存30天,发给前端的cookie也要30天有效期。 spring: # session 配置 session: store-type: redis # session 30 天过期 timeout: 2592000 server: port: 8123 servlet: context-path: /api # cookie 30 天过期 session: cookie: max-age: 2592000 为什么用 ConcurrentHashMap<Long,Object> 管理锁更优? 避免污染常量池 String.intern() 会把每一个不同的 userId 字符串都放到 JVM 的字符串常量池里,随着用户量增长,常量池里的内容会越来越多,可能导致元空间(MetaSpace)/永久代(PermGen)压力过大。 显式可控的锁生命周期 用 ConcurrentHashMap 明确地管理——「只要 map 里有这个 key,就有对应的锁对象;不需要时可以删掉。」 相比之下,intern() 后的字符串对象由 JVM 常量池管理,代码里很难清理,存在内存泄漏风险。 高并发性能更好 ConcurrentHashMap 内部采用分段锁或 Node 锁定(取决于 JDK 版本),即便高并发下往 map 里 computeIfAbsent 也能保持较高吞吐。 synchronized (lock) 本身只锁定单个用户对应的那把锁,不影响其他用户;结合 ConcurrentHashMap 的高并发特性,整体性能比直接在一个全局 HashMap + synchronized 好得多。 锁+事务可能出现的问题 @Transactional(声明式) 事务在方法入口打开,很可能在拿锁前就占用连接/数据库资源,导致“空跑事务”+“资源耗尽”。 依赖代理,存在自调用失效的坑。 transactionTemplate.execute()(编程式) 锁先行→事务后发,确保高并发下只有一个连接/事务进数据库,极大降低资源竞争。 全程显式,放到哪儿就是哪儿,杜绝自调用/代理链带来的隐患。 锁+事务@Transactional一起可能出现问题: 线程 A 进入方法,Spring AOP 拦截,立即开启事务 走到 synchronized(lock),拿到锁 在锁里执行 exists → save(但真正的 “提交” 要等到方法返回后才做) 退出 synchronized 块,方法继续执行(其实已经没别的逻辑了) 方法返回,事务拦截器这时才 提交 线程 B(并发进来) 等待 AOP 代理,进入同一个方法,也会马上开启自己的事务 在入口就拿到一个新的连接/事务上下文 然后遇到 synchronized(lock),在这里阻塞 等 A 释放锁 A 一旦走出 synchronized,B 立刻拿到锁——但此时 A 还没真正提交(提交在方法尾被拦截器做) B 在锁里执行 exists:因为 A 的改动还在 A 的未提交事务里,默认隔离级别(READ_COMMITTED)下看不到,所以 exists 会返回 false B 就继续 save,结果就可能插入重复记录,或者引发唯一索引冲突 团队空间 空间和用户是多对多的关系,还要同时记录用户在某空间的角色,所以需要新建关联表 -- 空间成员表 create table if not exists space_user ( id bigint auto_increment comment 'id' primary key, spaceId bigint not null comment '空间 id', userId bigint not null comment '用户 id', spaceRole varchar(128) default 'viewer' null comment '空间角色:viewer/editor/admin', createTime datetime default CURRENT_TIMESTAMP not null comment '创建时间', updateTime datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间', -- 索引设计 UNIQUE KEY uk_spaceId_userId (spaceId, userId), -- 唯一索引,用户在一个空间中只能有一个角色 INDEX idx_spaceId (spaceId), -- 提升按空间查询的性能 INDEX idx_userId (userId) -- 提升按用户查询的性能 ) comment '空间用户关联' collate = utf8mb4_unicode_ci; RBAC模型 团队空间: 一般来说,标准的 RBAC 实现需要 5 张表:用户表、角色表、权限表、用户角色关联表、角色权限关联表,还是有一定开发成本的。由于我们的项目中,团队空间不需要那么多角色,可以简化RBAC 的实现方式,比如将角色和权限直接定义到配置文件中。 本项目角色: 角色 描述 浏览者 仅可查看空间中的图片内容 编辑者 可查看、上传和编辑图片内容 管理员 拥有管理空间和成员的所有权限 本项目权限: 权限键 功能名称 描述 spaceUsername 成员管理 管理空间成员,添加或移除成员 picture:view 查看图片 查看空间中的图片内容 picture:upload 上传图片 上传图片到空间中 picture:edit 修改图片 编辑已上传的图片信息 picture:delete 删除图片 删除空间中的图片 角色权限映射: 角色 对应权限键 可执行功能 浏览者 picture:view 查看图片 编辑者 picture:view, picture:upload, picture:edit, picture:delete 查看图片、上传图片、修改图片、删除图片 管理员 spaceUsername, picture:view, picture:upload, picture:edit, picture:delete 成员管理、查看图片、上传图片、修改图片、删除图片 RBAC 只是一种权限设计模型,我们在 Java 代码中如何实现权限校验呢? 1)最直接的方案是像之前校验私有空间权限一样,封装个团队空间的权限校验方法;或者类似用户权限校验一样,写个注解 + AOP 切面。 2)对于复杂的角色和权限管理,可以选用现成的第三方权限校验框架来实现,编写一套权限校验规则代码后,就能整体管理系统的权限校验逻辑了。( Sa-Token) Sa-Token 快速入门 1)引入: <!-- Sa-Token 权限认证 --> <dependency> <groupId>cn.dev33</groupId> <artifactId>sa-token-spring-boot-starter</artifactId> <version>1.39.0</version> </dependency> 2)让 Sa-Token 整合 Redis,将用户的登录态等内容保存在 Redis 中。 <!-- Sa-Token 整合 Redis (使用 jackson 序列化方式) --> <dependency> <groupId>cn.dev33</groupId> <artifactId>sa-token-redis-jackson</artifactId> <version>1.39.0</version> </dependency> <!-- 提供Redis连接池 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> 3)基本用法 StpUtil 是 Sa-Token 提供的全局静态工具。 用户登录时调用 login 方法,产生一个新的会话: StpUtil.login(10001); 还可以给会话保存一些信息,比如登录用户的信息: StpUtil.getSession().set("user", user) 接下来就可以判断用户是否登录、获取用户信息了,可以通过代码进行判断: // 检验当前会话是否已经登录, 如果未登录,则抛出异常:`NotLoginException` StpUtil.checkLogin(); // 获取用户信息 StpUtil.getSession().get("user"); 也可以参考 官方文档,使用注解进行鉴权: // 登录校验:只有登录之后才能进入该方法 @SaCheckLogin @RequestMapping("info") public String info() { return "查询用户信息"; } 多账号体系 若项目中存在两套权限校验体系。一套是 user 表的,分为普通用户和管理员;另一套是对团队空间的权限进行校验。 为了更轻松地扩展项目,减少对原有代码的改动,我们原有的 user 表权限校验依然使用自定义注解 + AOP 的方式实现。而团队空间权限校验,采用 Sa-Token 来管理。 这种同一项目有多账号体系的情况下,不建议使用 Sa-Token 默认的账号体系,而是使用 Sa-Token 提供的多账号认证特性,可以将多套账号的授权给区分开,让它们互不干扰。 使用 Kit 模式 实现多账号认证 /** * StpLogic 门面类,管理项目中所有的 StpLogic 账号体系 * 添加 @Component 注解的目的是确保静态属性 DEFAULT 和 SPACE 被初始化 */ @Component public class StpKit { public static final String SPACE_TYPE = "space"; /** * 默认原生会话对象,项目中目前没使用到 */ public static final StpLogic DEFAULT = StpUtil.stpLogic; /** * Space 会话对象,管理 Space 表所有账号的登录、权限认证 */ public static final StpLogic SPACE = new StpLogic(SPACE_TYPE); } 修改用户服务的 userLogin 方法,用户登录成功后,保存登录态到 Sa-Token 的空间账号体系中: //记录用户的登录态 request.getSession().setAttribute(USER_LOGIN_STATE, user); //记录用户登录态到 Sa-token,便于空间鉴权时使用,注意保证该用户信息与 SpringSession 中的信息过期时间一致 StpKit.SPACE.login(user.getId()); StpKit.SPACE.getSession().set(USER_LOGIN_STATE, user); return this.getLoginUserVO(user); 之后就可以在代码中使用账号体系 // 检测当前会话是否以 Space 账号登录,并具有 picture:edit 权限 StpKit.SPACE.checkPermission("picture:edit"); // 获取当前 Space 会话的 Session 对象,并进行写值操作 StpKit.SPACE.getSession().set("user", "zy123"); 权限认证逻辑 Sa-Token 开发的核心是编写权限认证类,我们需要在该类中实现 “如何根据登录用户 id 获取到用户已有的角色和权限列表” 方法。当要判断某用户是否有某个角色或权限时,Sa-Token 会先执行我们编写的方法,得到该用户的角色或权限列表,然后跟需要的角色权限进行比对。 参考 官方文档,示例权限认证类如下: /** * 自定义权限加载接口实现类 */ @Component // 保证此类被 SpringBoot 扫描,完成 Sa-Token 的自定义权限验证扩展 public class StpInterfaceImpl implements StpInterface { /** * 返回一个账号所拥有的权限码集合 */ @Override public List<String> getPermissionList(Object loginId, String loginType) { // 本 list 仅做模拟,实际项目中要根据具体业务逻辑来查询权限 List<String> list = new ArrayList<String>(); list.add("user.add"); list.add("user.update"); list.add("user.get"); list.add("art.*"); return list; } /** * 返回一个账号所拥有的角色标识集合 (权限与角色可分开校验) */ @Override public List<String> getRoleList(Object loginId, String loginType) { // 本 list 仅做模拟,实际项目中要根据具体业务逻辑来查询权限 List<String> list = new ArrayList<String>(); list.add("admin"); list.add("super-admin"); return list; } } Sa-Token 支持按照角色和权限校验,对于权限不多的项目,基于角色校验即可;对于权限较多的项目,建议根据权限校验。二选一即可,最好不要混用! 关键问题:如何在 Sa-Token 中获取当前请求操作的参数? 使用 Sa-Token 有 2 种方式 —— 注解式和编程式 ,但都要实现上面的StpInterface接口。 如果使用注解式,那么在接口被调用时就会立刻触发 Sa-Token 的权限校验,此时参数只能通过 Servlet 的请求对象传递,必须具有指定权限才能进入该方法! 使用 注解合并 简化代码。 @SaSpaceCheckPermission(value = SpaceUserPermissionConstant.PICTURE_UPLOAD) public BaseResponse<PictureVO> uploadPicture() { } 如果使用编程式,可以在函数内的任意位置执行权限校验,只要在执行前将参数放到当前线程的上下文 ThreadLocal 对象中,就能在鉴权时获取到了。 **注意,只要加上了 Sa-Token 注解,框架就会强制要求用户登录,未登录会抛出异常。**所以针对未登录也可以调用的接口,需要改为编程式权限校验 @GetMapping("/get/vo") public BaseResponse<PictureVO> getPictureVOById(long id, HttpServletRequest request) { ThrowUtils.throwIf(id <= 0, ErrorCode.PARAMS_ERROR); // 查询数据库 Picture picture = pictureService.getById(id); ThrowUtils.throwIf(picture == null, ErrorCode.NOT_FOUND_ERROR); // 空间的图片,需要校验权限 Space space = null; Long spaceId = picture.getSpaceId(); if (spaceId != null) { boolean hasPermission = StpKit.SPACE.hasPermission(SpaceUserPermissionConstant.PICTURE_VIEW); ThrowUtils.throwIf(!hasPermission, ErrorCode.NO_AUTH_ERROR); } PictureVO pictureVO = pictureService.getPictureVO(picture, request); // 获取封装类 return ResultUtils.success(pictureVO); } 循环依赖问题 PictureController ↓ 注入 PictureServiceImpl PictureServiceImpl ↓ 注入 SpaceServiceImpl SpaceServiceImpl ↓ 注入 SpaceUserServiceImpl SpaceUserServiceImpl ↓ 注入 SpaceServiceImpl ←—— 又回到 SpaceServiceImpl 解决办法:将一方改成 setter 注入并加上 @Lazy注解 如在SpaceUserServiceImpl中 import org.springframework.context.annotation.Lazy; @Resource @Lazy private SpaceService spaceService; @Lazy为懒加载,直到真正第一次使用它时才去创建或注入。且这里不能用构造器注入的方式!!! 这里有个坑: import groovy.lang.Lazy; 导入这个包的@lazy注解就无效! 分库分表 如果某团队空间的图片数量比较多,可以对其数据进行单独的管理。 1、图片信息数据 可以给每个团队空间单独创建一张图片表 picture_{spaceId},也就是分库分表中的分表,而不是和公共图库、私有空间的图片混在一起。这样不仅查询空间内的图片效率更高,还便于整体管理和清理空间。但是要注意,仅对旗舰版空间生效,否则分表的数量会特别多,反而可能影响性能。 要实现的是会随着新增空间不断增加分表数量的动态分表,会使用分库分表框架 Apache ShardingSphere 带大家实现。 2、图片文件数据 已经实现隔离,存到COS上的不同桶内。 思路主要是基于业务需求设计数据分片规则,将数据按一定策略(如取模、哈希、范围或时间)分散存储到多个库或表中,同时开发路由逻辑来决定查询或写入操作的目标库表。 ShardingSphere 分库分表 <!-- 分库分表 --> <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId> <version>5.2.0</version> </dependency> 分库分表的策略总体分为 2 类:静态分表和动态分表 分库分表策略 - 静态分表 静态分表:在设计阶段,分表的数量和规则就是固定的,不会根据业务增长动态调整,比如 picture_0、picture_1。 分片规则通常基于某一字段(如图片 id)通过简单规则(如取模、范围)来决定数据存储在哪个表或库中。 这种方式的优点是简单、好理解;缺点是不利于扩展,随着数据量增长,可能需要手动调整分表数量并迁移数据。 举个例子,图片表按图片 id 对 3 取模拆分: String tableName = "picture_" + (picture_id % 3) // picture_0 ~ picture_2 静态分表的实现很简单,直接在 application.yml 中编写 ShardingSphere 的配置就能完成分库分表,比如: rules: sharding: tables: picture: actualDataNodes: ds0.picture_${0..2} # 3张分表:picture_0, picture_1, picture_2 tableStrategy: standard: shardingColumn: picture_id # 按 pictureId 分片 shardingAlgorithmName: pictureIdMod shardingAlgorithms: pictureIdMod: type: INLINE #内置实现,直接在配置类中写规则,即下面的algorithm-expression props: algorithm-expression: picture_${pictureId % 3} # 分片表达式 甚至不需要修改任何业务代码,在查询picture表(一般叫逻辑表)时,框架会自动帮你修改 SQL,根据 pictureId 将查询请求路由到不同的表中。 分库分表策略 - 动态分表 动态分表是指分表的数量可以根据业务需求或数据量动态增加,表的结构和规则是运行时动态生成的。举个例子,根据时间动态创建 picture_2025_03、picture_2025_04。 String tableName = "picture_" + LocalDate.now().format( DateTimeFormatter.ofPattern("yyyy_MM") ); spring: shardingsphere: datasource: names: smile-picture smile-picture: type: com.zaxxer.hikari.HikariDataSource driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/smile-picture username: root password: 123456 rules: sharding: tables: picture: #逻辑表名(业务层永远只写 picture) actual-data-nodes: smile-picture.picture # 逻辑表对应的真实节点 table-strategy: standard: sharding-column: space_id #分片列(字段) sharding-algorithm-name: picture_sharding_algorithm # 使用自定义分片算法 sharding-algorithms: picture_sharding_algorithm: type: CLASS_BASED props: strategy: standard algorithmClassName: edu.whut.smilepicturebackend.manager.sharding.PictureShardingAlgorithm props: sql-show: true 需要实现自定义算法类: public class PictureShardingAlgorithm implements StandardShardingAlgorithm<Long> { @Override public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> preciseShardingValue) { // 编写分表逻辑,返回实际要查询的表名 // picture_0 物理表,picture 逻辑表 } @Override public Collection<String> doSharding(Collection<String> collection, RangeShardingValue<Long> rangeShardingValue) { return new ArrayList<>(); } @Override public Properties getProps() { return null; } @Override public void init(Properties properties) { } } 本项目分表总体思路: 对 picture 进行分表 一张 逻辑表 picture 业务代码永远只写 picture,不用关心落到哪张真实表。 两类真实表 类型 存谁的数据 例子 公共表 普通 / 进阶 / 专业版空间 picture 分片表 旗舰版 空间(每个空间一张) picture_<spaceId>,如 picture_30001 自定义分片算法: 传入 space_id 时 如果是旗舰,会自动路由到 picture_<spaceId>;否则回落到公共表 picture。 没有 space_id 时 (例如后台批量报表): 广播到 所有 picture_<spaceId> + picture 并做汇聚。 操作 必须带分片键? 若缺少分片键会发生什么 INSERT 是 - 中间件不知道该落到哪张实际表- 直接抛异常:Could not determine actual data nodes / Table xxx route result is empty UPDATE 强烈建议 - ShardingSphere 会把 SQL 广播到所有分表 ,再分别执行- 表越多、数据越大,锁持有时间越长,性能急剧下降- 若所有表都无匹配行,会返回 0,但成本已付出 DELETE 同上 同 UPDATE,且更危险:一次误写可能删光全部分表的数据 SELECT 同上 没分片键就会全表扫描后聚合,数据量大时查询极慢、内存占用高 因此,项目中的业务代码中,对Picture表进行增删查改时,必须确保space_id非空。 协同编辑 相比于生产者直接调用消费者,事件驱动模型的主要优点在于解耦和异步性。在事件驱动模型中,生产者和消费者不需要直接依赖于彼此的实现,生产者只需触发事件并将其发送到事件分发器,消费者则根据事件类型处理逻辑。此外,事件驱动还可以提升系统的 并发性 和 实时性,可以理解为多引入了一个中介来帮忙,通过异步消息传递,减少了阻塞和等待,能够更高效地处理多个并发任务。 如何解决协同冲突? 法一:约定 同一时刻只允许一位用户进入编辑图片的状态,此时其他用户只能实时浏览到修改效果,但不能参与编辑;进入编辑状态的用户可以退出编辑,其他用户才可以进入编辑状态。 事件触发者(用户 A 的动作) 事件类型(发送消息) 事件消费者(其他用户的处理) 用户 A 建立连接,加入编辑 INFO 显示"用户 A 加入编辑"的通知 用户 A 进入编辑状态 ENTER_EDIT 其他用户界面显示"用户 A 开始编辑图片",锁定编辑状态 用户 A 执行编辑操作 EDIT_ACTION 放大/缩小/左旋/右旋当前图片 用户 A 退出编辑状态 EXIT_EDIT 解锁编辑状态,提示其他用户可以进入编辑状态 用户 A 断开连接,离开编辑 INFO 显示"用户 A 离开编辑"的通知,并释放编辑状态 用户 A 发送了错误的消息 ERROR 显示错误消息的通知 法二:实时协同 OT 算法(Operational Transformation),广泛应用于在线文档协作等场景。 操作 (Operation):表示用户对协作内容的修改,比如插入字符、删除字符等。 转化 (Transformation):当多个用户同时编辑内容时,OT 会根据操作的上下文将它们转化,使得这些操作可以按照不同的顺序应用而结果保持一致。 因果一致性:OT 算法确保操作按照用户看到的顺序被正确执行,即每个用户的操作基于最新的内容状态。 举一个简单的例子,假设初始内容是 "abc",用户 A 和 B 同时进行编辑: 用户 A 在位置 1 插入 "x" 用户 B 在位置 2 删除 "b" 如果不使用 OT 算法,结果是: 用户 A 操作后,内容变为 "axbc" 用户 B 操作后,内容变为 "ac" 如果直接应用 B 的操作到 A 的结果,得到的是 "ac",对于 A 来说,相当于删除了 "b",A 会感到一脸懵逼。 如果使用 OT 算法,结果是: 用户 A 的操作,应用后内容为 "axbc" 用户 B 的操作经过 OT 转化为删除 "b" 在 "axbc" 中的新位置 最终用户 A 和 B 的内容都一致为 "axc",符合预期。OT 算法确保无论用户编辑的顺序如何,最终内容是一致的。 OT 算法的难点在于设计如何转化各个用户的操作。 业务流程图 // key: pictureId,value: 这张图下所有活跃的 Session(即各个用户的连接) Map<Long, Set<WebSocketSession>> pictureSessions; 当用户 A 在浏览器里打开了 pictureId=123 的编辑页面,就产生了一个 Session; 如果同一个浏览器又开了一个标签页编辑同一张图,或者不同的浏览器/设备打开,同样又会分别产生新的 Session。 假设有两张图,ID 是 100 和 200: pictureId pictureSessions.get(pictureId) 100 { sessionA, sessionB } (用户 A、B 的连接) 200 { sessionC } (只有用户 C 的连接) Disruptor 优化 调用 Spring MVC 的某个接口时,如果该接口内部的耗时较长,请求线程就会一直阻塞,最终导致 Tomcat 请求连接数耗尽(默认值 200)。 大多数请求是快请求,毫秒级别,直接在请求线程里完成;若有个慢请求,执行一次需要几秒,那么必须将它放入异步线程中执行。 Disruptor 是一种高性能的并发框架,它是一种 无锁的环形队列 数据结构,用于解决高吞吐量和低延迟场景中的并发问题。 Disruptor 的工作流程: 1)环形队列初始化:创建一个固定大小为 8 的 RingBuffer(索引范围 0-7),每个格子存储一个可复用的事件对象,序号初始为 0。 2)生产者写入数据:生产者申请索引 0(序号 0),将数据 "A" 写入事件对象,提交后序号递增为 1,下一个写入索引变为 1。 3)消费者读取数据:消费者检查索引 0(序号 0),读取数据 "A",处理后提交,序号递增为 1,下一个读取索引变为 1。 4)环形队列循环使用:当生产者写入到索引 7(序号 7)后,索引回到 0(序号 8),形成循环存储,但序号会持续自增以区分数据的先后顺序。 5)防止数据覆盖:如果生产者追上消费者,消费者尚未处理完数据,生产者会等待,确保数据不被覆盖。 基于 Disruptor 的异步消息处理机制,可以将原有的同步消息分发逻辑改造为高效解耦的异步处理模型。因为websockt接收到请求,直接往队列里面提交任务,Disruptor的消费者来负责按顺序进行处理。
项目
zy123
6月7日
0
13
0
2025-03-22
同步本地Markdown至Typecho站点
同步本地Markdown至Typecho站点 本项目基于https://github.com/Sundae97/typecho-markdown-file-publisher 实现效果: 将markdown发布到typecho 发布前将markdown中的公式块和代码块进行格式化,确保能适配md解析器。 发布前将markdown的图片资源上传到自己搭建的图床Easyimage中(自行替换成阿里云OSS等), 并替换markdown中的图片链接。 将md所在的文件夹名称作为post的category(mysql发布可以插入category, xmlrpc接口暂时不支持category操作)。 以title和category作为文章的唯一标识,如果数据库中已有该数据,将会更新现有文章,否则新增文章。 环境:Typecho1.2.1 php8.1.0 项目目录 typecho_markdown_upload/main.py是上传md文件到站点的核心脚本 transfer_md/transfer.py是对md文件进行预处理的脚本。 核心思路 一、预先准备 图床服务器 本人自己在服务器上搭建了私人图床——easyimage,该服务能够实现图片上传并返回公网 URL,这对于在博客中正常显示 Markdown 文件中的图片至关重要。 当然,也可以选择使用公共图床服务,如阿里云 OSS,但这里不做详细介绍。 需手动修改transfer_md/upload_img.py,配置url、token等信息。 参考博客:【好玩儿的Docker项目】10分钟搭建一个简单图床——Easyimage-我不是咕咕鸽 github地址:icret/EasyImages2.0: 简单图床 - 一款功能强大无数据库的图床 2.0版 picgo安装: 使用 Typora + PicGo + Easyimage 组合,可以实现将本地图片直接粘贴到 Markdown 文件中,并自动上传至图床。 下载地址:Releases · Molunerfinn/PicGo 操作步骤如下: 打开 PicGo,点击右下角的小窗口。 进入插件设置,搜索并安装 web-uploader 1.1.1 插件(注意:旧版本可能无法搜索到,建议直接安装最新版本)。 配置插件:在设置中填写 API 地址,该地址可在 Easyimage 的“设置-API设置”中获取。 配置完成后,即可实现图片自动上传,提升 Markdown 编辑体验。 Typora 设置 为了确保在博客中图片能正常显示,编辑 Markdown 文档时必须将图片上传至图床,而不是保存在本地。请按以下步骤进行配置: 在 Typora 中,打开 文件 → 偏好设置 → 图像 选项。 在 “插入图片时” 选项中,选择 上传图片。 在 “上传服务设定” 中选择 PicGo,并指定 PicGo 的安装路径。 文件结构统一: md_files ├── category1 │ ├── file1.md │ └── file2.md ├── category2 │ ├── file3.md │ └── file4.md └── output ├── assets_type ├── pics └── updated_files 注意:category对应上传到typecho中的文章所属的分类。 如果你现有的图片分散在系统中,可以使用 transfer_md/transfer.py 脚本来统一处理。该脚本需要传入三个参数: input_path: 指定包含 Markdown 文件的根目录(例如上例中的 md_files)。 output_path: 输出文件夹(例如上例中的 output)。 type_value: 1:扫描 input_path 下所有 Markdown 文件,将其中引用的本地图片复制到 output_path 中,同时更新 Markdown 文件中的图片 URL 为 output_path 内的路径; 2:为每个 Markdown 文件建立单独的文件夹(以文件名命名),图片存放在文件夹下的 assets 子目录中,整体存入assets_type文件夹中,; 3:扫描 Markdown 文件中的本地图片,将其上传到图床(获取公网 URL),并将 Markdown 文件中对应的图片 URL 替换为公网地址。 4:预处理Markdown 文件,将公式块和代码块格式化,以便于Markdown解析器解析(本地typora编辑器对于md格式比较宽容,但博客中使用的md解析器插件不一定能正确渲染!) 二、使用Git进行版本控制 假设你在服务器上已经搭建了 Gitea (Github、Gitee都行)并创建了一个名为 md_files 的仓库,那么你可以在 md_files 文件夹下通过 Git Bash 执行以下步骤将本地文件提交到远程仓库: 初始化本地仓库: git init 添加远程仓库: 将远程仓库地址添加为 origin(请将 http://xxx 替换为你的实际仓库地址): git remote add origin http://xxx 添加文件并提交: 注意,写一个.gitignore文件将output排除版本控制 git add . git commit -m "Initial commit" 推送到远程仓库: git push -u origin master 后续更新(可写个.bat批量执行): git add . git commit -m "更新了xxx内容" git push 三、在服务器上部署该脚本 1. 确保脚本能够连接到 Typecho 使用的数据库 本博客使用 docker-compose 部署 Typecho(参考:【好玩儿的Docker项目】10分钟搭建一个Typecho博客|太破口!念念不忘,必有回响!-我不是咕咕鸽)。为了让脚本能访问 Typecho 的数据库,我将 Python 应用pyapp也通过 docker-compose 部署,这样所有服务均在同一网络中,互相之间可以直接通信。 参考docker-compose.yml如下: services: nginx: image: nginx ports: - "4000:80" # 左边可以改成任意没使用的端口 restart: always environment: - TZ=Asia/Shanghai volumes: - ./typecho:/var/www/html - ./nginx:/etc/nginx/conf.d - ./logs:/var/log/nginx depends_on: - php networks: - web php: build: php restart: always expose: - "9000" # 不暴露公网,故没有写9000:9000 volumes: - ./typecho:/var/www/html environment: - TZ=Asia/Shanghai depends_on: - mysql networks: - web pyapp: build: ./markdown_operation # Dockerfile所在的目录 restart: "no" volumes: - /home/zy123/md_files:/markdown_operation/md_files networks: - web env_file: - .env depends_on: - mysql mysql: image: mysql:5.7 restart: always environment: - TZ=Asia/Shanghai expose: - "3306" # 不暴露公网,故没有写3306:3306 volumes: - ./mysql/data:/var/lib/mysql - ./mysql/logs:/var/log/mysql - ./mysql/conf:/etc/mysql/conf.d env_file: - mysql.env networks: - web networks: web: 注意:如果你不是用docker部署的typecho,只要保证脚本能连上typecho所使用的数据库并操纵里面的表就行! 2. 将 md_files 挂载到容器中,保持最新内容同步 这样有几个优势: 不需要每次构建镜像或进入容器手动拉取; 本地更新 md_files 后,容器内自动同步,无需额外操作; 保持了宿主机上的 Git 版本控制和容器内的数据一致性。 3.仅针对 pyapp 服务进行重构和启动,不影响其他服务的运行: pyapp是本Python应用在容器内的名称。 1.构建镜像: docker-compose build pyapp 2.启动容器并进入 Bash: docker-compose run --rm -it pyapp /bin/bash 3.在容器内运行脚本: python typecho_markdown_upload/main.py 2、3两步可合并为: docker-compose run --rm pyapp python typecho_markdown_upload/main.py 此时可以打开博客验证一下是否成功发布文章了! 如果失败,可以验证mysql数据库: 1️⃣ 进入 MySQL 容器: docker-compose exec mysql mysql -uroot -p # 输入你的 root 密码 2️⃣ 切换到 Typecho 数据库并列出表: USE typecho; SHOW TABLES; 3️⃣ 查看 typecho_contents 表结构(文章表): DESCRIBE typecho_contents; mysql> DESCRIBE typecho_contents; +--------------+------------------+------+-----+---------+----------------+ | Field | Type | Null | Key | Default | Extra | +--------------+------------------+------+-----+---------+----------------+ | cid | int(10) unsigned | NO | PRI | NULL | auto_increment | | title | varchar(150) | YES | | NULL | | | slug | varchar(150) | YES | UNI | NULL | | | created | int(10) unsigned | YES | MUL | 0 | | | modified | int(10) unsigned | YES | | 0 | | | text | longtext | YES | | NULL | | | order | int(10) unsigned | YES | | 0 | | | authorId | int(10) unsigned | YES | | 0 | | | template | varchar(32) | YES | | NULL | | | type | varchar(16) | YES | | post | | | status | varchar(16) | YES | | publish | | | password | varchar(32) | YES | | NULL | | | commentsNum | int(10) unsigned | YES | | 0 | | | allowComment | char(1) | YES | | 0 | | | allowPing | char(1) | YES | | 0 | | | allowFeed | char(1) | YES | | 0 | | | parent | int(10) unsigned | YES | | 0 | | | views | int(11) | YES | | 0 | | | agree | int(11) | YES | | 0 | | +--------------+------------------+------+-----+---------+----------------+ 4️⃣ 查询当前文章数量(确认执行前后有无变化): SELECT COUNT(*) AS cnt FROM typecho_contents; 自动化 1.windows下写脚本自动/手动提交每日更新 2.在 Linux 服务器上配置一个定时任务,定时执行 git pull 命令和启动脚本更新博客的命令。 创建脚本/home/zy123/typecho/deploy.sh #!/bin/bash cd /home/zy123/md_files || exit git pull cd /home/zy123/typecho || exit docker compose run --rm pyapp python typecho_markdown_upload/main.py 赋予可执行权限chmod +x /home/zy123/deploy.sh 编辑 Crontab 安排任务(每天0点10分执行) 打开 crontab 编辑器:$crontab -e$ 10 0 * * * /home/zy123/typecho/deploy.sh >> /home/zy123/typecho/deploy.log 2>&1 3.注意md_files文件夹所属者和deploy.sh的所属者需要保持一致。否则会报: fatal: detected dubious ownership in repository at '/home/zy123/md_files' To add an exception for this directory, call: git config --global --add safe.directory /home/zy123/md_files TODO typecho_contents表中的slug字段代表链接中的日志缩略名,如wordpress风格 /archives/{slug}.html,目前是默认int自增,有需要的话可以在插入文章时手动设置该字段。
项目
zy123
3月22日
0
28
0
2025-03-21
招标文件解析
产品官网:智标领航 - 招投标AI解决方案 产品后台:https://intellibid.cn:9091/login?redirect=%2Findex 项目地址:zy123/zbparse - zbparse - 智标领航代码仓库 git clone地址:http://47.98.59.178:3000/zy123/zbparse.git 选择develop分支,develop-xx 后面的xx越近越新。 正式环境:121.41.119.164:5000 测试环境:47.98.58.178:5000 大解析:指从招标文件解析入口进去,upload.py 小解析:从投标文件生成入口进去,little_zbparse 和get_deviation,两个接口后端一起调 项目启动与维护: .env存放一些密钥(大模型、textin等),它是gitignore忽略了,因此在服务器上git pull项目的时候,这个文件不会更新(因为密钥比较重要),需要手动维护服务器相应位置的.env。 如何更新服务器上的版本: 步骤 进入项目文件夹 **注意:**需要确认.env是否存在在服务器,默认是隐藏的 输入cat .env 如果不存在,在项目文件夹下sudo vim .env 将密钥粘贴进去!!! git pull sudo docker-compose up --build -d 更新并重启 或者 sudo docker-compose build 先构建镜像 sudo docker-compose up -d 等空间时再重启 sudo docker-compose logs flask_app --since 1h 查看最近1h的日志(如果重启后报错也能查看,推荐重启后都运行一下这个) requirements.txt一般无需变动,除非代码中使用了新的库,也要手动在该文件中添加包名及对应的版本 docker基础知识 docker-compose: 本项目为单服务项目,只有flask_app(服务名) build context(context: .): 这是在构建镜像时提供给 Docker 的文件集,指明哪些文件可以被 Dockerfile 中的 COPY 或 ADD 指令使用。它是构建过程中的“资源包”。 对于多服务,build下就要针对不同的服务,指定所需的“资源包”和对应的Dockerfile dockerfile: COPY . .(在 Dockerfile 中): 这条指令会将构建上下文中的所有内容复制到镜像中的当前工作目录(这里是 /flask_project)。 docker exec -it zbparse-flask_app-1 sh 这个命令会直接进入到flask_project目录内部ls之后可以看到: Dockerfile README.md docker-compose.yml flask_app md_files requirements.txt 如果这个基础上再cd /会切换到这个容器的根目录,可以看到flask_project文件夹以及其他基础系统环境。如: bin boot dev etc flask_project home lib lib64 media mnt opt proc root run sbin srv sys tmp usr var 数据卷挂载: volumes: -/home/Z/zbparse_output_dev:/flask_project/flask_app/static/output # 额外的数据卷挂载 本地路径:容器内路径 都从根目录找起。 完整的容器名 <项目名>-<服务名>-<序号> 项目名:默认是当前目录的名称(这里是 zbparse),或者你在启动 Docker Compose 时通过 -p 参数指定的项目名称。 服务名:在 docker-compose.yml 文件中定义的服务名称(这里是 flask_app)。 序号:如果同一个服务启动了多个容器,会有数字序号来区分(这里是 1)。 docker-compose exec flask_app sh docker exec -it zbparse-flask_app-1 sh 这两个是等价的,因为docker-compose 会自动找到对应的完整容器名并执行命令。 删除所有悬空镜像(无容器引用的 <none> 镜像) docker image prune 如何本地启动本项目: Pycharm启动 requirements.txt里的环境要配好 conda create -n zbparse python=3.8 conda activate zbparse pip install -r requirements.txt .env环境配好 (一般不需要在电脑环境变量中额外配置了,但是要在Pycharm中安装插件,使得项目在启动时能将env中的环境变量自动配置到系统环境变量中!!!) 点击下拉框,Edit configurations 设置run_serve.py为启动脚本 注意这里的working directory要设置到最外层文件夹,而不是flask_app!!! 命令行启动 1.编写ps1脚本 # 切换到指定目录 cd D:\PycharmProjects\zbparse # 激活 Conda 环境 conda activate zbparse # 检查是否存在 .env 文件 if (Test-Path .env) { # 读取 .env 文件并设置环境变量 Get-Content .env | ForEach-Object { if ($_ -match '^\s*([^=]+)=(.*)') { $name = $matches[1].Trim() $value = $matches[2].Trim() [System.Environment]::SetEnvironmentVariable($name, $value) } } } else { Write-Host ".env not find" } # 设置 PYTHONPATH 环境变量 $env:PYTHONPATH = "D:\flask_project" # 运行 Python 脚本 python flask_app\run_serve.py $env:PYTHONPATH = "D:\flask_project",告诉 Python 去 D:\flask_project 查找模块,这样就能让 Python 找到你的 flask_app 包。 2.确保conda已添加到系统环境变量 打开 Anaconda Prompt,然后输入 where conda 来查看 conda 的路径。 打开系统环境变量Path,添加一条:C:\ProgramData\anaconda3\condabin 或者 CMD 中 set PATH=%PATH%;新添加的路径 重启终端可以刷新环境变量 3.如果你尚未在 PowerShell 中初始化 conda,可以在 Anaconda Prompt 中运行: conda init powershell 4.进入到存放run.ps1文件的目录,在搜索栏中输入powershell 5.默认情况下,PowerShell 可能会阻止运行脚本。你可以调整执行策略: Set-ExecutionPolicy RemoteSigned -Scope CurrentUser 6.运行脚本 .\run.ps1 注意!!! Windows 控制台存在QuickEdit 模式,在 QuickEdit 模式下,当你在终端窗口中点击(尤其是拖动或选中内容)时,控制台会进入文本选择状态,从而暂停正在运行的程序!! 禁用 QuickEdit 模式 在 PowerShell 窗口标题栏上点击右键,选择“属性”。 在“选项”选项卡中,取消勾选“快速编辑模式”。 点击“确定”,重启 PowerShell 窗口后再试。 模拟用户请求 postman打post请求测试: http://127.0.0.1:5000/upload body: { "file_url":"xxxx", "zb_type":2 } file_url如何获取:OSS管理控制台 bid-assistance/test 里面找个文件的url,推荐'094定稿-湖北工业大学xxx' 注意这里的url地址有时效性,要经常重新获取新的url 清理服务器上的文件夹 1.编写shell文件,sudo vim clean_dir.sh 清理/home/Z/zbparse_output_dev下的output1这些二级目录下的c8d2140d-9e9a-4a49-9a30-b53ba565db56这种uuid的三级目录(只保留最近7天)。 #!/bin/bash # 需要清理的 output 目录路径 ROOT_DIR="/home/Z/zbparse_output_dev" # 检查目标目录是否存在 if [ ! -d "$ROOT_DIR" ]; then echo "目录 $ROOT_DIR 不存在!" exit 1 fi echo "开始清理 $ROOT_DIR 下超过 7 天的目录..." echo "以下目录将被删除:" # -mindepth 2 表示从第二层目录开始查找,防止删除 output 下的直接子目录(如 output1、output2) # -depth 采用深度优先遍历,确保先处理子目录再处理父目录 find "$ROOT_DIR" -mindepth 2 -depth -type d -mtime +7 -print -exec rm -rf {} \; echo "清理完成。" 2.添加权限。 sudo chmod +x ./clean_dir.sh 3.执行 sudo ./clean_dir.sh 以 root 用户的身份编辑 crontab 文件,从而设置或修改系统定时任务(cron jobs)。每天零点10分清理 sudo crontab -e 在里面添加: 10 0 * * * /home/Z/clean_dir.sh 目前测试服务器和正式服务器都写上了!无需变动 内存泄漏问题 问题定位 查看容器运行时占用的文件FD套接字FD等(排查内存泄漏,长期运行这三个值不会很大) [Z@iZbp13rxxvm0y7yz7l02hbZ zbparse]$ docker exec -it zbparse-flask_app-1 sh ls -l /proc/1/fd | awk ' BEGIN { file=0; socket=0; pipe=0; other=0 } { if(/socket:/) socket++ else if(/pipe:/) pipe++ else if(/\/|tmp/) file++ # 识别文件路径特征 else other++ } END { print "文件FD:", file print "套接字FD:", socket print "管道FD:", pipe print "其他FD:", other }' 可以发现文件FD很大,基本上发送一个请求文件FD就加一,且不会衰减: 经排查,@validate_and_setup_logger注解会为每次请求都创建一个logger,需要在@app.teardown_request中获取与本次请求有关的logger并释放。 def create_logger(app, subfolder): """ 创建一个唯一的 logger 和对应的输出文件夹。 参数: subfolder (str): 子文件夹名称,如 'output1', 'output2', 'output3' """ unique_id = str(uuid.uuid4()) g.unique_id = unique_id output_folder = os.path.join("flask_app", "static", "output", subfolder, unique_id) os.makedirs(output_folder, exist_ok=True) log_filename = "log.txt" log_path = os.path.join(output_folder, log_filename) logger = logging.getLogger(unique_id) if not logger.handlers: file_handler = logging.FileHandler(log_path) file_formatter = CSTFormatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(file_formatter) logger.addHandler(file_handler) stream_handler = logging.StreamHandler() stream_handler.setFormatter(logging.Formatter('%(message)s')) logger.addHandler(stream_handler) logger.setLevel(logging.INFO) logger.propagate = False g.logger = logger g.output_folder = output_folder #输出文件夹路径 handler:每当 logger 生成一条日志信息时,这条信息会被传递给所有关联的 handler,由 handler 决定如何输出这条日志。例如,FileHandler 会把日志写入文件,而 StreamHandler 会将日志输出到控制台。 logger.setLevel(logging.INFO) :它设置了 logger 的日志级别阈值。Logger 只会处理大于或等于 INFO 级别的日志消息(例如 INFO、WARNING、ERROR、CRITICAL),而 DEBUG 级别的消息会被忽略。 解决这个文件句柄问题后内存泄漏仍未解决,考虑分模块排查。 本项目结构大致是**1.**预处理(文件读取切分) **2.**并发调用5个函数分别调用大模型获取结果。 因此排查思路: 先将预处理模块单独拎出来作为接口,上传文件测试。 文件一般几MB,首先会读到内存,再处理,必然会占用很多内存,且它是调用每个接口都会经历的环节(little_zbparse/upload等) 内存泄漏排查工具 pip install memory_profiler from memory_profiler import memory_usage import time @profile def my_function(): a = [i for i in range(100000)] time.sleep(1) # 模拟耗时操作 b = {i: i*i for i in range(100000)} time.sleep(1) return a, b # 监控函数“运行前”和“运行后”的内存快照 mem_before = memory_usage()[0] result=my_function() mem_after = memory_usage()[0] print(f"Memory before: {mem_before} MiB, Memory after: {mem_after} MiB") @profile注解加在函数上,可以逐行分析内存增减情况。 memory_usage()[0] 可以获取当前程序所占内存的快照 产生的数据都存到result变量-》内存中,这是正常的,因此my_function没有内存泄漏问题。 但是 @profile def extract_text_by_page(file_path): result = "" with open(file_path, 'rb') as file: reader =PdfReader(file) num_pages = len(reader.pages) # print(f"Total pages: {num_pages}") for page_num in range(num_pages): page = reader.pages[page_num] text = page.extract_text() return "" 可以发现尽管我返回"",内存仍然没有释放!因为就是读取pdf这块发生了内存泄漏! tracemalloc def extract_text_by_page(file_path): result = "" with open(file_path, 'rb') as file: reader =PdfReader(file) num_pages = len(reader.pages) # print(f"Total pages: {num_pages}") for page_num in range(num_pages): page = reader.pages[page_num] text = page.extract_text() return result # 开始跟踪内存分配 tracemalloc.start() # 捕捉函数调用前的内存快照 snapshot_before = tracemalloc.take_snapshot() # 调用函数 file_path=r'C:\Users\Administrator\Desktop\fsdownload\00550cfc-fd33-469e-8272-9215291b175c\ztbfile.pdf' result = extract_text_by_page(file_path) # 捕捉函数调用后的内存快照 snapshot_after = tracemalloc.take_snapshot() # 比较两个快照,获取内存分配差异信息 stats = snapshot_after.compare_to(snapshot_before, 'lineno') print("[ Top 10 内存变化 ]") for stat in stats[:10]: print(stat) # 停止内存分配跟踪 tracemalloc.stop() tracemalloc能更深入的分析,不仅是自己写的代码,调用的库函数产生的内存也能分析出来。在这个例子中就是PyPDF2中的各个函数占用了大部分内存。 综上,定位到问题,就是读取PDF,使用PyPDF2库的地方 如何解决: 首先尝试用with open打开文件,代替直接使用 reader =PdfReader(file_path) 能够确保文件正常关闭。但是没有效果。 考虑为每次请求开子进程处理,有效隔离内存泄漏导致的资源占用,这样子进程运行结束后会释放资源。 但是解析流程是流式/分段返回的,因此还需处理: _child_target 是一个“桥梁”: 它在子进程内调用 goods_bid_main(...) (你的生成器) 并把每一次 yield 得到的数据放进队列。 结束时放一个 None 表示没有更多数据。 run_in_subprocess 是主进程使用的接口,开启子进程: 它启动子进程并实时 get() 队列数据,然后 yield 给外界调用者。 当队列里读到 None,说明子进程运行完毕,就 break 循环并 p.join()。 main_func是真正执行的函数!!! def _child_target(main_func, queue, output_folder, file_path, file_type, unique_id): """ 子进程中调用 `main_func`(它是一个生成器函数), 将其 yield 出的数据逐条放进队列,最后放一个 None 表示结束。 """ try: for data in main_func(output_folder, file_path, file_type, unique_id): queue.put(data) except Exception as e: # 如果要把异常也传给父进程,以便父进程可感知 queue.put(json.dumps({'error': str(e)}, ensure_ascii=False)) finally: queue.put(None) def run_in_subprocess(main_func, output_folder, file_path, file_type, unique_id): """ 启动子进程调用 `main_func(...)`,并在父进程流式获取其输出(通过 Queue)。 子进程结束时,操作系统回收其内存;父进程则保持实时输出。 """ queue = multiprocessing.Queue() p = multiprocessing.Process( target=_child_target, args=(main_func, queue, output_folder, file_path, file_type, unique_id) ) p.start() while True: item = queue.get() # 阻塞等待子进程产出的数据 if item is None: break yield item p.join() 如果开子线程,线程共享同一进程的内存空间,所以如果发生内存泄漏,泄漏的内存会累积在整个进程中,影响所有线程。 开子进程的缺点:多进程通常消耗的系统资源(如内存、启动开销)比多线程要大,因为每个进程都需要独立的资源和上下文切换开销。 进程池 在判断上传的文件是否为招标文件时,需要快速准确地响应。因此既保证内存不泄漏,又保证速度的方案就是在项目启动时创建进程池。(因为创建进程需要耗时2到3秒!) 如果是Waitress服务器启动,这里的进程池是全局共享的;但如果Gunicorn启动,每个请求分配一个worker进程,进程池是在worker里面共享的!!! #创建app,启动时 def create_app(): # 创建全局日志记录器 app = Flask(__name__) app.process_pool = Pool(processes=10, maxtasksperchild=3) app.global_logger = create_logger_main('model_log') # 全局日志记录器 #调用时 pool = current_app.process_pool # 使用全局的进程池 def judge_zbfile_exec_sub(file_path): result = pool.apply( judge_zbfile_exec, # 你的实际执行函数 args=(file_path,) ) return result 但是存在一个问题:第一次发送请求执行时间较慢! 可以发现实际执行只需7.7s,但是接口实际耗时10.23秒,主要是因懒加载或按需初始化:有些模块或资源在子进程启动时并不会马上加载,而是在子进程首次真正执行任务时才进行初始化。 解决思路:提前热身(warm up)进程池 在应用启动后、还没正式接受请求之前,可以提交一个简单的“空任务”或非常小的任务给进程池,让子进程先完成相关的初始化。这种“预热”方式能在正式请求到来之前就完成大部分初始化,减少首次请求的延迟。 还可以快速验证服务是否正常启动 def warmup_request(): # 等待服务器完全启动,例如等待 1-2 秒 time.sleep(5) try: url = "http://127.0.0.1:5000/judge_zbfile" #url必须为永久地址,完成热启动,创建进程池 payload = {"file_url": "xxx"} # 根据实际情况设置 file_url headers = {"Content-Type": "application/json"} response = requests.post(url, json=payload, headers=headers) print(f"Warm-up 请求发送成功,状态码:{response.status_code}") except Exception as e: print(f"Warm-up 请求出错:{e}") threading.Thread(target=warmup_request, daemon=True).start() flask_app结构介绍 项目中做限制的地方 账号、服务器分流 服务器分流:目前linux服务器和windows服务器主要是硬件上的分流(文件切分需要消耗CPU资源),大模型基底还是调用阿里,共用的tpm qpm。 账号分流:qianwen_plus下的 api_keys = cycle([ os.getenv("DASHSCOPE_API_KEY"), # os.getenv("DASHSCOPE_API_KEY_BACKUP1"), # os.getenv("DASHSCOPE_API_KEY_BACKUP2") ]) api_keys_lock = threading.Lock() def get_next_api_key(): with api_keys_lock: return next(api_keys) api_key = get_next_api_key() 只需轮流使用不同的api_key即可。目前没有启用。 大模型的限制 general/llm下的doubao.py 和通义千问long_plus.py 目前是linux和windows各部署一套,因此项目中的qps是对半的,即calls=? 这是qianwen-long的限制(针对阿里qpm为1200,每秒就是20,又linux和windows服务器对半,就是10;TPM无上限) @sleep_and_retry @limits(calls=10, period=1) # 每秒最多调用10次 def rate_limiter(): pass # 这个函数本身不执行任何操作,只用于限流 这是qianwen-plus的限制(针对tpm为1000万,每个请求2万tokens,那么linux和windows总的qps为8时,8x60x2=960<1000。单个为4) 经过2.11号测试,calls=4时最高TPM为800,因此把目前稳定版把calls设为5 2.12,用turbo作为超限后的承载,目前把calls设为7 @sleep_and_retry @limits(calls=7, period=1) # 每秒最多调用7次 def qianwen_plus(user_query, need_extra=False): logger = logging.getLogger('model_log') # 通过日志名字获取记录器 qianwen_turbo的限制(TPM为500万,由于它是plus后的手段,稳妥一点,qps设为6,两个服务器分流即calls=3) @sleep_and_retry @limits(calls=3, period=1) # 500万tpm,每秒最多调用6次,两个服务器分流就是3次 (plus超限后的保底手段,稳妥一点) 重点!!后续阿里扩容之后成倍修改这块calls=? 如果不用linux和windows负载均衡,这里的calls也要乘2!! 接口的限制 start_up.py的def create_app()函数,限制了对每个接口同时100次请求。这里事实上不再限制了(因为100已经足够大了),默认限制做到大模型限制这块。 app.connection_limiters['upload'] = ConnectionLimiter(max_connections=100) app.connection_limiters['get_deviation'] = ConnectionLimiter(max_connections=100) app.connection_limiters['default'] = ConnectionLimiter(max_connections=100) app.connection_limiters['judge_zbfile'] = ConnectionLimiter(max_connections=100) ConnectionLimiter.py以及每个接口上的装饰器,如 @require_connection_limit(timeout=1800) def zbparse(): 这里限制了每个接口内部执行的时间,暂时设置到了30分钟!(不包括排队时间)超时就是解析失败 后端的限制: 目前后端发起招标请求,如果发送超过100(max_connections=100)个请求,我这边会排队后面的请求,这时后端的计时器会将这些请求也视作正在解析中,事实上它们还在排队等待中,这样会导致在极端情况下,新进的解析文件速度大于解析的速度,排队越来越长,后面的文件会因为等待时间过长而直接失败,而不是'解析失败'。 general 是公共函数存放的文件夹,llm下是各类大模型,读取文件下是docx pdf文件的读取以及文档清理clean_pdf,去页眉页脚页码 general下的llm下的清除file_id.py 需要每周运行至少一次,防止file_id数量超出(我这边对每次请求结束都有file_id记录并清理,向应该还没加) llm下的model_continue_query是'模型继续回答'脚本,应对超长文本模型一次无法输出完的情况,继续提问,拼接成完整的内容。 general下的file2markdown是textin 文件--》markdown general下的format_change是pdf-》docx 或doc/docx->pdf general下的merge_pdfs.py是拼接文件的:1.拼接招标公告+投标人须知 2.拼接评标细则章节+资格审查章节 general中比较重要的!!! 后处理: general下的post_processing,解析后的后处理部分,包括extract_info、 资格审查、技术偏离 商务偏离 所需提交的证明材料,都在这块生成。 post_processing中的inner_post_processing专门提取extracted_info post_processing中的process_functions_in_parallel提取 资格审查、技术偏离、 商务偏离、 所需提交的证明材料 大解析upload用了post_processing完整版, little_zbparse.py、小解析main.py用了inner_post_processing get_deviation.py、偏离表数据解析main.py用了process_functions_in_parallel 截取pdf: 截取pdf_main.py是顶级函数, 二级是截取pdf货物标版.py和截取pdf工程标版.py (非general下) 三级是截取pdf通用函数.py 如何判断截取位置是否正确?根据output文件夹中的切分情况(打开各个文件查看是否切分准确,目前的逻辑主要是按大章切分,即'招标公告'章节) 如果切分不准确,如何定位正则表达式? 首先判断当前是工程标解析还是货物标解析,即zb_type=1还是2 如果是2,那么是货物标解析,那么就是截取pdf_main.py调用截取pdf货物标版.py,如下图,selection=1代表截取'招标公告',那么如果招标公告没有切准,就在这块修改。这里可以发现get_notice是通用函数,即截取pdf通用函数.py中的get_notice函数,那么继续往内部跳转。 若开头没截准,就改begin_pattern,末尾没截准,就改end_pattern 另外:在截取pdf货物标版.py中,还有extract_pages_twice函数,即第一次没有切分到之后,会运行该函数,这边又有一套begin_pattern和end_pattern,即二次提取 如何测试? 输入pdf_path,和你要切分的序号,selection=1代表切公告,依次类推,可以看切出来的效果如何。 无效标和废标公共代码 获取无效标与废标项的主要执行代码。对docx文件进行预处理=》正则=》temp.txt=》大模型筛选 如果提的不全,可能是正则没涵盖到位,也可能是大模型提示词漏选了。 这里:如果段落中既被正则匹配,又被follow_up_keywords中的任意一个匹配,那么不会添加到temp中(即不会被大模型筛选),它会直接添加到最后的返回中! 投标人须知正文条款提取成json文件 将截取到的ztbfile_tobidders_notice_part2.pdf ,即须知正文,转为clause1.json 文件,便于后续提取开评定标流程、投标文件要求、重新招标、不再招标和终止招标 这块的主要逻辑就是匹配形如'一、总则'这样的大章节 然后匹配形如'1.1' '1.1.1'这样的序号,由于是按行读取pdf,一个序号后面的内容可能有好几行,因此遇到下一个序号(如'2.1')开头,之前的内容都视为上一个序号的。 old_version 都是废弃文件代码,未在正式、测试环境中使用的,不用管 routes 是接口以及主要实现部分,一一对应 get_deviation对应偏离表数据解析main,获得偏离表数据 judge_zbfile对应判断是否是招标文件 little_zbparse对应小解析main,负责解析extract_info test_zbparse是测试接口,无对应 upload对应工程标解析和货物标解析,即大解析 混淆澄清:小解析可以指代一个过程,即从'投标文件生成'这个入口进去的解析,后端会同时调用little_zbparse和get_deviation。这个过程称为'小解析'。 但是little_zbparse也叫小解析,命名如此因为最初只需返回这些数据(extract_info),后续才陆续返回商务、技术偏离... utils是接口这块的公共功能函数。其中validate_and_setup_logger函数对不同的接口请求对应到不同的output文件夹,如upload->output1。后续增加接口也可直接在这里写映射关系。 重点关注大解析:upload.py和货物标解析main.py static 存放解析的输出和提示词 其中output用gitignore了,git push不会推送这块内容。 各个文件夹(output1 output2..)对应不同的接口请求 test_case&testdir test_case是测试用例,是对一些函数的测试。好久没更新了 testdir是平时写代码的测试的地方 它们都不影响正式和测试环境的解析 工程标&货物标 是两个解析流程中不一样的地方(一样的都写在general中了) 主要是货物标额外解析了采购要求(提取采购需求main+技术参数要求提取+商务服务其他要求提取) 最后: ConnectionLimiter.py定义了接口超时时间->超时后断开与后端的连接 logger_setup.py 为每个请求创建单独的log,每个log对应一个log.txt start_up.py是启动脚本,run_serve也是启动脚本,是对start_up.py的简单封装,目前dockerfile定义的直接使用run_serve启动 持续关注 yield sse_format(tech_deviation_response) yield sse_format(tech_deviation_star_response) yield sse_format(zigefuhe_deviation_response) yield sse_format(shangwu_deviation_response) yield sse_format(shangwu_star_deviation_response) yield sse_format(proof_materials_response) 工程标解析目前仍没有解析采购要求这一块,因此后处理返回的只有'资格审查'和''证明材料"和"extracted_info",没有''商务偏离''及'商务带星偏离',也没有'技术偏离'和'技术带星偏离',而货物标解析是完全版。 其中''证明材料"和"extracted_info"是直接返给后端保存的 大解析中返回了技术评分,后端接收后不仅显示给前端,还会返给向,用于生成技术偏离表 小解析时,get_deviation.py其实也可以返回技术评分,但是没有返回,因为没人和我对接,暂时注释了。 4.商务评议和技术评议偏离表,即评分细则的偏离表,暂时没做,但是商务评分、技术评分无论大解析还是小解析都解析了,稍微对该数据处理一下返回给后端就行。 这个是解析得来的结果,适合给前端展示,但是要生成商务技术评议偏离表的话,需要再调一次大模型,对该数据进行重新归纳,以字符串列表为佳。再传给后端。(未做) 如何定位问题 查看static下的output文件夹 (upload大解析对应output1) docker-compose文件中规定了数据卷挂载的路径:- /home/Z/zbparse_output_dev:/flask_project/flask_app/static/output 也就是说static/output映射到了服务器的Z/zbparse_output_dev文件夹 根据时间查找哪个子文件夹(uuid作为子文件名) 查看是否有final_result.json文件,如果有,说明解析流程正常结束了,问题可能出在后端(a.后端接口请求超限30分钟 b.后处理存在解析数据的时候出错) 也可能出现在自身解析,可以查看子文件内的log.txt,查看日志。 若解析正常(有final_result)但解析不准,可以根据以下定位: a.查看子文件夹下的文件切分是否准确,例如:如果评标办法不准确,那么查看ztbfile_evaluation_methon,是否正确切到了评分细则。如果切到了,那就改general/商务技术评分提取里的提示词;否则修改截取pdf那块关于'评标办法'的正则表达式。 b.总之是先看切的准不准,再看提示词能否优化,都要定位到对应的代码中! 学习总结 Flask + Waitress : Flask 和 Waitress 是两个不同层级的工具,在 Python Web 开发中扮演互补角色。它们的协作关系可以概括为:Flask 负责构建 Web 应用逻辑,而 Waitress 作为生产级服务器承载 Flask 应用。 # Flask 开发服务器(仅用于开发) if __name__ == '__main__': app.run(host='0.0.0.0', port=5000) # 使用 Waitress 启动(生产环境) from waitress import serve serve(app, host='0.0.0.0', port=8080) Waitress 的工作方式 作为 WSGI 服务器:Waitress 作为一个 WSGI 服务器,负责监听指定端口上的网络请求,并将请求传递给 WSGI 应用(如 Flask 应用)。 多线程处理:默认情况下,waitress 在单个进程内启用线程池。当请求到达时,waitress 会从线程池中分配一个线程来处理这个请求。由于 GIL 限制,同一时间只有一个线程在执行 Python 代码(只能使用一个核心,CPU利用率只能到100%)。 Flask 与 waitress 的协同工作 WSGI 接口:Flask 应用实现了 WSGI 接口。waitress 接收到请求后,会调用 Flask 应用对应的视图函数来处理请求,生成响应。 请求处理流程 请求进入 waitress waitress 分配一个线程并调用 Flask 应用 Flask 根据路由匹配并执行对应的处理函数 处理函数返回响应,waitress 将响应发送给客户端 Waitress 的典型使用场景 跨平台部署:尤其适合 Windows 环境(Gunicorn 等服务器不支持)。 简单配置:无需复杂设置即可获得比开发服务器(Flask自带)更强的性能。 中小型应用:对并发要求不极高的场景,Waitress 的轻量级特性优势明显。 Waitress的不足与处理 由于 waitress 是在单进程下工作,所有线程共享进程内存,如果业务逻辑简单且无复杂资源共享问题,这种方式是足够的。 引入子进程:如果需要每个请求实现内存隔离或者绕过 GIL 来利用多核 CPU,有时会在 Flask 视图函数内部启动子进程来处理实际任务。 直接采用多进程部署方案:使用 Gunicorn 的多 worker 模式 Gunicorn Gunicorn 的工作方式 预启动 Worker 进程。Gunicorn 启动时,会按照配置数量(例如 4 个 worker)创建多个 worker 进程。这些 worker 进程会一直运行,并监听同一个端口上的请求。不会针对每个请求单独创建新进程。 共享 socket:所有 worker 进程共享同一个监听 socket,当有请求到来时,操作系统会将请求分发给某个空闲的 worker。 推荐worker 数量 = (2 * CPU 核心数) + 1 如何启动: 要使用异步 worker,你需要: pip install gevent 启动 Gunicorn 时指定 worker 类型和数量,例如: gunicorn -k gevent -w 4 --max-requests 100 flask_app.start_up:create_app --bind 0.0.0.0:5000 使用 -k gevent(或者 -k eventlet)就可以使用异步 worker,单个 worker 能够处理多个 I/O 密集型请求。 使用--max-requests 100 。每个 worker 在处理完 100 个请求后会自动重启,从而释放可能累积的内存。 本项目的执行流程: 调用CPU进行PDF文件的读取与切分,CPU密集型,耗时半分钟 针对切分之后的不同部分,分别调用大模型,得到回答,IO密集型,耗时2分钟。 解决方案: 1.使用flask+waitress,waitress会为每个用户请求开新的线程处理,然后我的代码逻辑会在这个线程内开子进程来执行具体的代码,以绕过GIL限制,且正确释放内存资源。 **后续可以开一个共享的进程池代替为每个请求开子进程。以避免高并发下竞争多核导致的频繁CPU切换问题。 2.使用Gunicorn的异步worker,gunicorn为固定创建worker(进程),处理用户请求,一个异步 worker 可以同时处理多个用户请求,因为当一个请求在等待外部响应(例如调用大模型接口)时,worker 可以切换去处理其他请求。 全局解释器锁(GIL): Python(特别是 CPython 实现)中有一个叫做全局解释器锁(Global Interpreter Lock,简称 GIL)的机制,这个锁确保在任何时刻只有一个线程在执行 Python 字节码。 这意味着,即使你启动了多个线程,它们在执行 Python 代码时实际上是串行执行的,而不是并行利用多核 CPU。 在 Java 中,多线程通常能充分利用多核,因为 Java 的线程是真正的系统级线程,不存在类似 CPython 中的 GIL 限制。 影响: CPU密集型任务:由于 GIL 的存在,在 CPU 密集型任务中,多线程往往不能提高性能,因为同时只有一个线程在执行 Python 代码。 I/O密集型任务:如果任务主要等待 I/O(例如网络、磁盘读写),线程在等待时会释放 GIL,此时多线程可以提高程序的响应性和吞吐量。 NumPy能够在一定程度上绕过 Python 的 GIL 限制。许多 NumPy 的数值计算操作(如矩阵乘法、向量化运算等)是由高度优化的 C 或 Fortran 库(如 BLAS、LAPACK)实现的。这些库通常在执行计算密集型任务时会释放 GIL。C 扩展模块的方式将 C 代码嵌入到 Python 中,从而利用底层 C 库的高性能优势 进程与线程 1、进程是操作系统分配任务的基本单位,进程是python中正在运行的程序;当我们打开了1个浏览器时就是开始了一个浏览器进程; 线程是进程中执行任务的基本单元(执行指令集),一个进程中至少有一个线程、当只有一个线程时,称为主线程 2、线程的创建和销毁耗费资源少,进程的创建和销毁耗费资源多;线程很容易创建,进程不容易创建 3、线程的切换速度快,进程慢 4、一个进程中有多个线程时:线程之间可以进行通信;一个进程中有多个子进程时,进程与进程之间不可以相互通信,如果需要通信时,就必须通过一个中间代理实现,Queue、Pipe。 5、多进程可以利用多核cpu,多线程不可以利用多核cpu 6、一个新的线程很容易被创建,一个新的进程创建需要对父进程进行一次克隆 7、多进程的主要目的是充分使用CPU的多核机制,多线程的主要目的是充分利用某一个单核 ——————————————— 每个进程有自己的独立 GIL 多线程适用于 I/O 密集型任务 多进程适用于CPU密集型任务 因此,多进程用于充分利用多核,进程内开多线程以充分利用单核。 进程池 multiprocessing.Pool库:,通过 maxtasksperchild 指定每个子进程在退出前最多执行的任务数,这有助于防止某些任务中可能存在的内存泄漏问题 pool =Pool(processes=10, maxtasksperchild=3) concurrent.futures.ProcessPoolExecutor更高级、更统一,没有类似 maxtasksperchild 的参数,意味着进程在整个执行期内会一直存活,适合任务本身比较稳定的场景。 pool =ProcessPoolExecutor(max_workers=10) 最好创建的进程数等同于CPU核心数,如果大于,且每个进程都是CPU密集型(高负债一直用到CPU),那么进程之间会竞争CPU,导致上下文切换增加,反而会降低性质。 设置的工作进程数接近 CPU 核心数,以便每个进程能独占一个核运行。 进程、线程间通信 线程间通信: 线程之间可以直接共享全局变量、对象或数据结构,不需要额外的序列化过程,但这也带来了同步的复杂性(如竞态条件)。 import threading num=0 def work(): global num for i in range(1000000): num+=1 print('work',num) def work1(): global num for i in range(1000000): num+=1 print('work1',num) if __name__ == '__main__': t1=threading.Thread(target=work) t2=threading.Thread(target=work1) t1.start() t2.start() t1.join() t2.join() print('主线程执行结果',num) 运行结果: work 1551626 work1 1615783 主线程执行结果 1615783 这些数值都小于预期的 2000000,因为: 即使存在 GIL,num += 1 这样的操作实际上并不是原子的。GIL 确保同一时刻只有一个线程执行 Python 字节码,但在执行 num += 1 时,实际上会发生下面几步操作: 从内存中读取 num 的当前值 对读取到的值进行加 1 操作 将新的值写回到内存 由多个字节码组成!!! 因此会导致: 线程 A 读取到 num 的值 切换到线程 B,线程 B 也读取同样的 num 值并进行加 1,然后写回 当线程 A 恢复时,它依然基于之前读取的旧值进行加 1,最后写回,从而覆盖了线程 B 的更新 解决: from threading import Lock import threading num=0 def work(): global num for i in range(1000000): with lock: num+=1 print('work',num) def work1(): global num for i in range(1000000): with lock: num+=1 print('work1',num) if __name__ == '__main__': lock=Lock() t1=threading.Thread(target=work) t2=threading.Thread(target=work1) t1.start() t2.start() t1.join() t2.join() print('主线程执行结果',num) 进程间通信(IPC): 进程之间默认不共享内存,因此如果需要传递数据,就必须使用专门的通信机制。 在 Python 中,可以使用 multiprocessing.Queue、multiprocessing.Pipe、共享内存(如 multiprocessing.Value 和 multiprocessing.Array)等方式实现进程间通信。 from multiprocessing import Process, Queue def worker(process_id, q): # 每个进程将数据放入队列 q.put(f"data_from_process_{process_id}") print(f"Process {process_id} finished.") if __name__ == '__main__': q = Queue() processes = [] for i in range(5): p = Process(target=worker, args=(i, q)) processes.append(p) p.start() for p in processes: p.join() # 从队列中收集数据 results = [] while not q.empty(): results.append(q.get()) print("Collected data:", results) 当你在主进程中创建了一个 Queue 对象,然后将它作为参数传递给子进程时,子进程会获得一个能够与主进程通信的“句柄”。 子进程中的 q.put(...) 操作会将数据通过这个管道传送到主进程,而主进程可以通过 q.get() 来获取这些数据。 这种机制虽然看起来像是“共享”,但实际上是通过 IPC(进程间通信)实现的,而不是直接共享内存中的变量。 项目贡献 效果图
项目
zy123
3月21日
0
4
0
1
2
下一页