原文:
towardsdatascience.com/mastering-data-streaming-in-python-a88d4b3abf8b
在本文中,我将讨论数据工程师在设计流数据管道时可能遇到的关键挑战。我们将探讨用例场景,提供 Python 代码示例,讨论使用流式框架进行的窗口计算,并分享与这些主题相关的最佳实践。
在许多应用中,访问实时和持续更新的数据至关重要。欺诈检测、客户流失预防和推荐是流式处理的最佳候选者。这些数据管道实时处理来自各种源到多个目标目的地的数据,捕捉事件的发生,并使它们的转换、丰富和分析成为可能。
流数据管道
在我之前的一篇文章中,我描述了最常见的数据管道设计模式和何时使用它们 [1]。
数据管道设计模式
数据管道是一系列数据处理步骤的序列,其中每个阶段的输出成为下一个阶段的输入,创建一个逻辑数据流。
当数据在两点之间被处理时,就存在数据管道,例如从源到目的地。
数据管道的三个关键组件是源、处理步骤和目的地。例如,从外部 API(源)提取的数据可以加载到数据仓库(目的地),这说明了源和目的地不同的常见场景。
在流式处理中,源通常是发布者服务,而目的地是消费者– 例如应用程序或另一个处理数据的端点。这些数据通常通过窗口计算进行转换。一个很好的例子是定义在最后事件之后的非活动期间的会话窗口(例如 Google Analytics 4 等)。
https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/05ebed7b97a127a78e91f09e48399498.png
概念性数据管道设计。图片由作者提供
实际应用示例
考虑以下使用 Python、Kafka 和 Faust 构建的简单流数据处理应用程序示例。高级应用逻辑如下:
- API 服务
app/app_main.py允许将有效的用户参与事件 POST 到 Kafka 的producer主题。这些事件可以来自网站、移动应用程序,或由其他服务(如某种类型的数据发布者)发送。
{"event_type":"page_view","user_id":"e659e3e7-22e1-4a6b","action":"alternative_handset","timestamp":"2024-06-27T15:43:43.315342","metadata":{"session_id":"4b481fd1-9973-4498-89fb","page":"/search","item_id":"05efee91","user_agent":"Opera/8.81.(X11; Linux x86_64; hi-IN) Presto/2.9.181 Version/12.00"}}事件验证可以通过pydantic进行,并接受具有有效类型和操作等的事件 [2]
我们的应用程序不断从consumer主题消费处理过的事件,并将它们发送到WebSocket,以便可视化实时处理。
Python 数据工程师
数据处理服务从
producer主题消费原始事件,然后每 10 秒应用一个窗口计算(滚动表),并将按user分组的聚合结果发送到consumer主题。这可能是一种流式框架,如kafka-streams或faust。这不断处理的数据流可以立即通过 API 服务在
localhost:8000/monitor上可视化。
例如,我们可以每 10 秒处理一次原始用户参与事件,根据每个用户的简单事件计数生成用户排行榜。
https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/8b110767140eead76ebd39d44d7fd75e.png
流式排行榜应用示例。图片由作者提供。
importosimportjsonimportloggingimportasyncioimportuvicornfromcontextlibimportasynccontextmanagerfromfastapiimportFastAPI,WebSocket,HTTPExceptionfromfastapi.staticfilesimportStaticFilesfromaiokafkaimportAIOKafkaConsumer,AIOKafkaProducerfromapp.modelsimportEventfrom.httpimportevents,usersfromdotenvimportload_dotenv load_dotenv()logging.basicConfig(level=logging.INFO)# kafka_brokers = os.getenv("REDPANDA_BROKERS")kafka_brokers=("redpanda-0:9092"ifos.getenv("RUNTIME_ENVIRONMENT")=="DOCKER"else"localhost:19092")consumer_topic=os.getenv("CONSUMER_TOPIC")producer_topic=os.getenv("PRODUCER_TOPIC")error_topic=os.getenv("ERROR_TOPIC")defkafka_json_deserializer(serialized):returnjson.loads(serialized)@asynccontextmanagerasyncdefstartup(app):app.producer=AIOKafkaProducer(bootstrap_servers=[kafka_brokers],)awaitapp.producer.start()app.consumer=AIOKafkaConsumer(consumer_topic,# "agg-events",# group_id="demo-group",# loop=loop,bootstrap_servers=[kafka_brokers],enable_auto_commit=True,auto_commit_interval_ms=1000,# commit every secondauto_offset_reset="earliest",# If committed offset not found, start from beginningvalue_deserializer=kafka_json_deserializer,)awaitapp.consumer.start()yieldapp=FastAPI(lifespan=startup)app.mount("/static",StaticFiles(directory="static"),name="static")app.include_router(events.router)app.include_router(users.router)# WebSocket endpoint@app.websocket("/ws")asyncdefwebsocket_endpoint(websocket:WebSocket):awaitwebsocket.accept()asyncdefsend_message_to_websocket(msg):text=str(msg.value)awaitwebsocket.send_text(text)asyncdefconsume_from_topic(topic,callback):print(f"Consuming from{topic}")asyncformsginapp.consumer:print(f"Received message:{msg.value}")awaitcallback(msg)# Start consumingasyncio.create_task(consume_from_topic(consumer_topic,send_message_to_websocket))# Keep the connection openwhileTrue:awaitasyncio.sleep(3)@app.post("/track")asyncdefsend_event_to_topic(event:Event):try:data=event.model_dump_json()data=data.encode()# # Validate the presence of required fields# We could do something like this but Pydantic will do# everything for us.# if "user_id" not in data or "action" not in data:# raise HTTPException(# status_code=422, detail="Incomplete data provided")user_id=event.user_id# Send filename to Redpandaawaitapp.producer.send(producer_topic,data)# Returning a confirmation messagereturn{"message":"User data submitted successfully!","user_data":{"user_id":user_id}}exceptHTTPExceptionase:# Re-raise HTTPException to return the specified# status code and detailprint(e)raiseeexceptExceptionase:# Handle other unexpected exceptions and return a# 500 Internal Server Errorprint(e)raiseHTTPException(status_code=500,detail=f"An error occurred:{str(e)}")if__name__=="__main__":uvicorn.run("app.app_main:app",host="0.0.0.0",port=8000)# Run:# uvicorn app.app_main:app --reload --host 0.0.0.0 --port 8000# python -m app.app_main api -l info流处理服务的代码可以在下面的部分找到。
Kafka、Kinesis、RabbitMQ 以及其他流处理工具
让我们来看看在过去几年中证明自己最有用的流行数据流平台和框架。
Apache Spark– 一个用于大规模分析和复杂数据转换的分布式数据计算框架。
Apache Kafka– 一个具有分布式消息系统的实时数据管道工具,用于应用程序。它使用发布-订阅模型,其中生产者向主题发送数据,消费者从这些主题中拉取数据。每个主题都被分割成分区,这些分区在不同的服务器上复制,以提高可用性和平衡负载。此外,Kafka 具有内置的容错功能,因此你可以为每个主题设置复制因子和所需的同步副本(ISR)数量。这意味着即使某些服务器宕机,数据仍然可访问。
AWS Kinesis是一个用于分析和应用的实时流平台。我之前在这里写过关于它的内容 [3]。
使用 Redshift Serverless 和 Kinesis 构建流数据管道
Google Cloud Dataflow– Google 的实时事件处理和分析管道流平台。
Apache Flink– 一个专为低延迟数据处理设计的分布式流数据平台。
RabbitMQ是一个开源的消息代理,它促进了应用程序之间的通信。它使用基于高级消息队列协议(AMQP)的排队系统,允许你高效地发送、接收和路由消息。RabbitMQ 有助于解耦应用程序组件,使它们能够独立工作并轻松扩展。
Kafka 是我最喜欢的分布式流平台之一,它允许你发布和订阅数据流,实时处理它们,并可靠地存储它们。它最初是为 Java 构建的,但现在也适用于 Python 开发者(kafka-python)。
我喜欢它的一个特点是内置的窗口方法,这简化了会话计算。
例如,使用faust-streaming框架,这可以轻松实现。考虑以下代码。它展示了我们的应用程序如何为每个用户计算窗口聚合:
importosimportrandomfromdatetimeimportdatetime,timedeltaimportfaust SINK=os.getenv("CONSUMER_TOPIC")TOPIC=os.getenv("PRODUCER_TOPIC")BROKER=("redpanda-0:9092"ifos.getenv("RUNTIME_ENVIRONMENT")=="DOCKER"else"localhost:19092")TABLE="tumbling-events"CLEANUP_INTERVAL=1.0WINDOW=10# 10 seconds windowWINDOW_EXPIRES=10PARTITIONS=1app=faust.App("event-stream",broker=f"kafka://{BROKER}")app.conf.table_cleanup_interval=CLEANUP_INTERVAL source=app.topic(TOPIC,value_type=Event)sink=app.topic(SINK,value_type=UserStats)@app.timer(interval=3.0,on_leader=True)asyncdefgenerate_event_data():events_topic=app.topic(TOPIC,key_type=str,value_type=Event)allowed_events=[e.valueforeinAllowedEvents]allowed_actions=[e.valueforeinAllowedActions]# Create a loop to send data to the Redpanda topic# Send 20 messages every time the timer is triggered (every 5 seconds)foriinrange(20):# Send the data to the Redpanda topicawaitevents_topic.send(key=random.choice(["User1","User2","User3","User4","User5"]),value=Event(event_type=random.choice(["page_view","scroll"]),user_id=random.choice(["User1","User2","User3","User4","User5"]),# noqa: E501action=random.choice(["action_1","action_2"]),timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),),)print("Producer is sleeping for 3 seconds 😴 ")defwindow_processor(key,events):try:timestamp=key[1][0]# key[1] is the tuple (ts, ts + window)print(f'key::{key}')users=[event.user_idforeventinevents]count=len(users)user_counter=Counter(event.user_idforeventinevents)fori,(user,count)inenumerate(user_counter.items()):print(f"{i}.{user}:{count}")aggregated_event=UserStats(timestamp=timestamp,user_id=user,count=count)print(f"Processing window:{len(users)}events, Aggreged results:{aggregated_event}"# noqa: E501)sink.send_soon(value=aggregated_event)exceptExceptionase:print(e)tumbling_table=(app.Table(TABLE,default=list,key_type=str,value_type=Event,partitions=PARTITIONS,on_window_close=window_processor,).tumbling(WINDOW,expires=timedelta(seconds=WINDOW_EXPIRES)).relative_to_field(Event.timestamp))@app.agent(app.topic(TOPIC,key_type=str,value_type=Event))asyncdefcalculate_tumbling_events(events):asyncforeventinevents:value_list=tumbling_table["events"].value()value_list.append(event)tumbling_table["events"]=value_listif__name__=="__main__":app.main()现在在命令行中输入以下内容以启动我们的流模块:
faust-A streamer.event_stream worker-l info因此,我们应该看到事件聚合在实时发生:
https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/8290d2b708ebbd08872a83c59ea85480.png
作者图片
选择合适的 Python 客户端
一些流行的 Kafka Python 客户端包括confluent-kafka、pykafka和kafka-python。
confluent-kafka是高性能librdkafkaC 库的 Python 包装器,提供了完整的 Kafka 功能支持,以及额外的 AVRO 序列化和模式注册表集成。它包括高级和低级 API,以提供更大的灵活性和对 Kafka 应用程序的控制。
大数据文件格式,解释
pykafka和kafka-python客户端是最广泛使用的,它们的设计考虑了性能和简单性。它们支持所有 Kafka 功能,并包括平衡消费者、Zookeeper 集成和分区管理等功能。
每个 Python 库都有其自身的优缺点。
使用简单性对于初学者来说可能很重要。这主要涉及文档、代码可读性、API 设计和错误处理。Pykafka通常被认为是简单的,具有清晰的 API、良好的文档和用户友好的错误消息。相反,kafka-python和confluent-kafka-python则不那么简单,具有更复杂的 API、更不全面的文档和不太清晰的错误消息。
另一方面,confluent-kafka-python和kafka-python提供了最全面的Kafka 功能支持,涵盖了所有 Kafka 功能,并提供了高级和低级 API。相比之下,pykafka的功能支持有限,仅提供高级 API。一般来说,confluent-kafka-python由于其基于优化的librdkafkaC 库而提供最佳性能,而kafka-python和pykafka由于是纯 Python 实现且开销更大,性能较低。
您可以使用 pip 安装kafka-python:pip install kafka-python,我们的应用程序文件将如下所示:
# app.pyfromkafkaimportKafkaProducer,KafkaConsumerimportjson producer=KafkaProducer(bootstrap_servers=['brokerA:9092','brokerB:9092'],key_serializer=lambdak:json.dumps(k).encode(),value_serializer=lambdav:json.dumps(v).encode())# Send data to Kafka topic 'my_topic'future=producer.send('my_topic',key='hello',value='world')try:result=future.get(timeout=10)print(result)exceptExceptionase:print(e)要从主题中消费数据,我们希望使用poll方法。它将返回包含我们的 JSON 记录的主题分区列表:
importjson consumer=KafkaConsumer(bootstrap_servers=['brokerA:9092','brokerB:9092'],group_id='my-group',topics=['my_topic'],key_deserializer=lambdak:json.loads(k.decode()),value_deserializer=lambdav:json.loads(v.decode()),auto_offset_reset='earliest',enable_auto_commit=True,max_poll_records=10)forrecordinconsumer:print(record.key,record.value)Kafka 最佳实践
那么,我们如何提高我们的 Kafka 应用程序的可靠性和性能?
优化 Kafka 应用程序没有通用的解决方案
我们 Kafka 应用程序的性能和可靠性可能受到各种因素的影响,包括以下方面:
配置和生产者与消费者的数量,
消息大小和频率,
网络带宽和延迟,
硬件和软件资源,
可能的故障场景。
通常,以下是一般性的容错和性能设计建议:
使用复制:复制在多个代理之间创建多个数据副本,确保在故障期间的可访问性,并通过负载均衡提高性能。然而,它可能导致磁盘使用量和网络流量增加。根据您的可用性和一致性要求设置复制因子和同步副本(ISR)的数量。
启用批处理:批处理将多个消息组合成一个请求,减少网络开销并提高吞吐量。它还提高了压缩效率。在生产和消费端都启用批处理,调整批大小和 linger 时间以满足您的延迟和吞吐量目标。
明智地分区:每个主题的分区数量会影响可伸缩性、并行性和性能。更多的分区允许更大的生产者和消费者容量,有效地在代理之间分配负载。然而,过多的分区会增加网络流量。因此,根据您预期的吞吐量和延迟选择分区数量。
启用压缩:压缩可以减少消息大小。就是这样简单。它可以减少磁盘空间使用量和数据传输。虽然它会产生一些处理压缩的 CPU 开销,但选择您的压缩算法和级别也被认为是最佳实践。
错误处理
Python Kafka 客户端提供了一系列方法和选项来管理错误和异常,包括:
错误处理策略:策略涉及在错误发生时重试、忽略或快速失败。数据工程师可以确定如何处理不同的错误类型,同时确保可靠性和数据一致性。例如,可以使用
max_in_flight_requests_per_connection、retries和retry_backoff_ms等参数来配置kafka-python生产者。在confluent-kafka-python消费者中,我们可以使用enable.auto.commit、auto.commit.interval.ms和enable.auto.offset.store等参数。异常处理:可以使用
try、except和finally块来处理各种异常类型并执行清理或恢复程序。这是标准的 Python 异常处理。例如,在kafka-python和confluent-kafka-python客户端中有一个KafkaError类。在pykafka客户端中,我们有PyKafkaException类。错误回调:当我们需要记录错误、引发异常或实现恢复操作时,回调函数非常有用。回调函数可以被分配给生产者和消费者的构造函数,在客户端级别发生错误或异常时触发。例如,
error_cb参数在kafka-python和confluent-kafka-python客户端中可用,而on_error参数在pykafka客户端中使用。
例如,最常见的一种错误处理模式是死信队列。来自源主题的事件可以分成两条独立的路径:
应用程序成功处理源主题中的每个事件,并在一切正常的情况下将它们发布到目标主题(一个汇点)。
无法处理的事件(例如,缺少预期格式或缺少所需属性的事件)将被定向到错误主题。
另一个有用的错误处理模式是实现重试队列。例如,如果此时项目的参数不可用,事件可以被路由到重试主题。例如,如果该功能正在由另一个服务处理,并且请求时不可用,这种情况可能会发生。
这种可恢复的状态不应被归类为错误
相反,它应该定期重试,直到满足所需条件。引入重试主题允许立即处理大多数事件,同时将某些事件的处理推迟到满足必要条件。
结论
流式处理技术(如滚动和跳跃窗口、会话等)与 Python 结合是一种多功能的解决方案。像 Kafka 这样的工具帮助数据工程师创建能够支持实时分析、消息传递和事件驱动系统的流式应用程序。例如,Kafka 的关键优势包括高吞吐量和可伸缩性,使其在其用途上极为强大**。** Kafka 通过批量写入和读取,降低磁盘 I/O 和网络开销,并利用数据压缩,以低延迟处理每秒数百万条消息。它可以通过添加代理进行水平扩展,通过增加分区和副本进行垂直扩展。它支持消费者组以有效地分配工作负载。Kafka 通过领导者-跟随者模型确保数据可用性,即使在故障期间,跟随者也会复制数据。如果领导者失败,跟随者将接管,Kafka 的提交日志允许进行持久和可重放的数据存储。
选择正确的 Python 库将是首要任务。如果您最关心的是性能指标,如吞吐量、延迟、CPU 使用率和内存消耗,那么请选择confluent-kafka-python。如果您在寻找简单性,那么pykafka或kafka-python将是更好的解决方案。
适当的错误处理以及以最大化延迟、吞吐量和可用性的方式设计我们的流式应用程序也是需要考虑的另一件事。我希望遵循这个故事中的提示能帮助您正确配置 Kafka 集群并启用最佳设置,以进行复制、分区、批处理和压缩。
数据工程中的流式处理
推荐阅读
[1]towardsdatascience.com/data-pipeline-design-patterns-100afa4b93e3
[2]towardsdatascience.com/python-for-data-engineers-f3d5db59b6dd
[3]towardsdatascience.com/streaming-in-data-engineering-2bb2b9b3b603
[4]towardsdatascience.com/streaming-in-data-engineering-2bb2b9b3b603
[5]medium.com/towards-data-science/big-data-file-formats-explained-275876dc1fc9