管道 - 过滤器Pipe-and-Filter架构风格一、核心定义与本质管道过滤器是数据流驱动的经典架构模式属于结构化 / 数据流架构最早源于 Unix Shell 设计思想。核心组成两部分过滤器Filter独立、无状态、单一职责的数据处理单元只做一件事接收输入数据流 → 转换 / 加工 / 过滤数据 → 输出数据流不共享全局数据不依赖其他过滤器内部状态可独立复用、替换、并行运行。管道Pipe连接两个过滤器的通道只负责传输数据流不修改数据标准单向传输上游 Filter 输出 → Pipe → 下游 Filter 输入解耦过滤器之间互不感知对方实现只约定数据格式。核心思想数据流经一系列独立处理组件每个组件只完成单一处理组件之间仅通过数据流通信。二、关键特性优点高复用单个过滤器可在多个流程中重复使用如日志过滤、格式转换易修改扩展新增处理逻辑只需插入新过滤器无需改动原有代码可并行执行多过滤器可流水线并发处理提升吞吐量松耦合过滤器完全隔离替换其中一个不影响整条链路可调试管道中间可截流、打印数据流快速定位异常无状态天然容错单个过滤器故障不直接击穿全链路可加熔断。缺点数据格式强约束上下游必须统一数据流格式格式变更改动全链路额外 IO 开销数据反复序列化 / 传输大数据场景性能损耗明显不适合交互场景同步、低延迟人机交互不适合适合批处理、流式处理复杂分支难维护多分支、循环管道会大幅提升架构复杂度。三、管道过滤器常见拓扑结构线性串联最基础Filter1 → Pipe → Filter2 → Pipe → Filter3分叉广播一个过滤器输出通过管道分发给多个下游过滤器并行处理汇聚合并多个过滤器输出管道汇入同一个下游过滤器如多日志合并循环管道处理后数据回流上游再次过滤较少用易死循环四、最经典真实底层示例Unix/Linux Shell 命令原生管道过滤器原理Unix 每个命令就是Filter|符号就是Pipe 每个命令默认标准输入 stdin、标准输出 stdout管道把前一个 stdout 接到后一个 stdin。示例 1日志过滤完整链路# 原始日志文件读取(Filter1) → 筛选错误日志(Filter2) → 提取时间字段(Filter3) → 排序(Filter4) → 输出文件(Filter5) cat app.log | grep ERROR | awk {print $1,$2} | sort error_time.log拆解对应架构Filter1cat— 读取文件输出原始日志流Pipe1|— 传输全部日志文本Filter2grep ERROR— 过滤器只保留包含 ERROR 的行Pipe2|— 传输错误日志行Filter3awk— 过滤器截取每行前两列日期、时间Pipe3|— 传输时间文本流Filter4sort— 过滤器按时间排序Pipe4— 管道写入目标文件特点每个命令独立、单一职责可单独拿出来使用管道只传文本不处理业务随意增删过滤器加head -10只看前 10 条不改动原有逻辑。五、工程级真实业务系统示例后端开发常用示例 1实时日志处理系统ELK 架构 标准管道过滤器整体链路原始日志 → Filebeat → Logstash → Elasticsearch → Kibana 逐个映射Filter1 Filebeat采集服务器日志文件、切割日志流Pipe1网络 TCP 通道传输日志原始文本Filter2 Logstash多过滤器串联grok 过滤器解析非结构化日志为 JSON 结构化数据date 过滤器统一时间字段格式drop 过滤器丢弃无用调试日志mutate 过滤器新增业务标签字段Pipe2http 管道传输结构化 JSONFilter3 Elasticsearch存储、索引、分词过滤数据Pipe3查询数据流Filter4 Kibana聚合、图表过滤、可视化输出特点任意一层过滤器可替换Filebeat 替换为 Flink 采集Logstash 替换为 Flink Transform完全符合管道过滤器设计。示例 2大数据实时计算 Flink/Spark Streaming流式计算是管道过滤器工业级落地 数据流链路消息队列 (Kafka) → 数据源 Filter → 清洗 Filter → 转换 Filter → 聚合 Filter → 入库 Filter每个算子 (map/filter/flatMap) 独立过滤器上下游算子之间的数据通道 管道可随意插入新算子脱敏、风控过滤不修改原有计算逻辑。业务场景电商用户行为流处理Filter1读取 Kafka 用户点击日志Filter2清洗过滤非法埋点、空数据Filter3脱敏手机号、身份证加密处理Filter4转换拆分行为字段、关联商品基础数据Filter5聚合统计每小时商品点击量Filter6输出写入 MySQL 报表库六、日志处理Demopackage test; import java.util.ArrayList; import java.util.List; import java.util.Random; // 统一数据流载体管道传输的数据 class DataRecord { private String content; public DataRecord(String content) { this.content content; } public String getContent() { return content; } } // 过滤器顶层接口 所有处理单元实现该接口 interface Filter { ListDataRecord process(ListDataRecord input); } // 管道工具类串联所有过滤器 class PipeLine { // 链式执行所有过滤器数据在管道中流转 public static ListDataRecord run(ListDataRecord sourceData, Filter... filters) { ListDataRecord data sourceData; for (Filter filter : filters) { // 管道传输数据到下一个过滤器 data filter.process(data); } return data; } } // 自定义实体用户行为 class UserAction { private String userId; private String action; private Long productId; public UserAction(String userId, String action, Long productId) { this.userId userId; this.action action; this.productId productId; } Override public String toString() { return UserAction{ userId userId \ , action action \ , productId productId }; } } // 各个独立过滤器对应Flink算子 // 过滤器1模拟数据源批量生成原始日志 class SourceFilter implements Filter { Override public ListDataRecord process(ListDataRecord input) { ListDataRecord result new ArrayList(); Random random new Random(); String[] actions {click, buy, collect, invalid}; // 模拟生成10条埋点日志 for (int i 0; i 10; i) { String uid u random.nextInt(9999); String act actions[random.nextInt(actions.length)]; long pid random.nextInt(1000); String log uid , act , pid; result.add(new DataRecord(log)); } return result; } } // 过滤器2清洗过滤丢弃非法脏数据 class CleanFilter implements Filter { Override public ListDataRecord process(ListDataRecord input) { ListDataRecord result new ArrayList(); for (DataRecord record : input) { String line record.getContent(); String[] arr line.split(,); // 过滤格式错误、非法行为 if (arr.length ! 3) { continue; } String action arr[1]; if (click.equals(action) || buy.equals(action) || collect.equals(action)) { result.add(record); } } return result; } } // 过滤器3字符串日志转结构化对象模拟map转换算子 class ConvertFilter implements Filter { // 转换后存储对象单独输出 public ListUserAction convert(ListDataRecord input) { ListUserAction list new ArrayList(); for (DataRecord record : input) { String[] arr record.getContent().split(,); UserAction action new UserAction(arr[0], arr[1], Long.valueOf(arr[2])); list.add(action); } return list; } Override public ListDataRecord process(ListDataRecord input) { return input; } } // 过滤器4用户ID脱敏处理 class MaskFilter { public ListUserAction mask(ListUserAction input) { ListUserAction res new ArrayList(); for (UserAction ua : input) { String rawId ua.toString().split(userId)[1].split()[0]; String maskId rawId.substring(0, 2) ****; String action ua.toString().split(action)[1].split()[0]; String pid ua.toString().split(productId)[1].split(})[0]; res.add(new UserAction(maskId, action, Long.valueOf(pid))); } return res; } } // 主程序组装整条管道过滤器链路 public class SimplePipeFilterDemo { public static void main(String[] args) { // 1. 组装过滤器链路 Filter source new SourceFilter(); Filter clean new CleanFilter(); ConvertFilter convert new ConvertFilter(); MaskFilter mask new MaskFilter(); // 2. 管道流转数据生成原始数据 - 清洗 ListDataRecord rawData source.process(new ArrayList()); ListDataRecord cleanData PipeLine.run(rawData, clean); // 3. 管道下游转换实体 脱敏 ListUserAction entityList convert.convert(cleanData); ListUserAction finalResult mask.mask(entityList); // 4. 输出过滤器打印最终结果 System.out.println( 管道过滤器最终输出结果 ); for (UserAction item : finalResult) { System.out.println(item); } } }