手动跑SEO脚本太痛苦了。我用Apache Airflow搭了一套自动化数据管道,每天自动采集、分析、报告。这篇文章分享Airflow DAG设计和代码。
一、为什么用Airflow
Airflow的优势:
- 可视化:DAG图直观展示依赖关系
- 调度:cron表达式,精确控制执行时间
- 重试:失败自动重试
- 监控:Web UI查看任务状态
- 扩展:轻松添加新任务
二、核心DAG设计
2.1 每日SEO管道
# dags/daily_seo_pipeline.pyfromairflowimportDAGfromairflow.operators.pythonimportPythonOperatorfromairflow.operators.emailimportEmailOperatorfromdatetimeimportdatetime,timedelta default_args={'owner':'seo-team','depends_on_past':False,'email':['seo@company.com'],'email_on_failure':True,'email_on_retry':False,'retries':2,'retry_delay':timedelta(minutes=5)}withDAG('daily_seo_pipeline',default_args=default_args,description='Daily SEO data collection and reporting',schedule_interval='0 6 * * *',# 每天6点start_date=datetime(2026,1,1),catchup=False,tags=['seo','daily'])asdag:# 任务1: 采集SERP数据collect_serp=PythonOperator(task_id='collect_serp_data',python_callable=collect_serp_task,op_kwargs={'keywords':'{{ var.value.seo_keywords }}','api_key':'{{ conn.serpbase.password }}'})# 任务2: 采集竞品数据collect_competitors=PythonOperator(task_id='collect_competitor_data',python_callable=collect_competitor_task)# 任务3: 分析数据analyze=PythonOperator(task_id='analyze_data',python_callable=analyze_task)# 任务4: 生成报告generate_report=PythonOperator(task_id='generate_report',python_callable=generate_report_task)# 任务5: 发送邮件send_email=EmailOperator(task_id='send_email',to=['team@company.com'],subject='Daily SEO Report - {{ ds }}',html_content=""" <h3>Daily SEO Report - {{ ds }}</h3> <p>Report generated. Please check the dashboard.</p> """)# 依赖关系[collect_serp,collect_competitors]>>analyze>>generate_report>>send_email2.2 任务函数
defcollect_serp_task(keywords:str,api_key:str):"""采集SERP数据任务"""keyword_list=keywords.split(',')forkeywordinkeyword_list:headers={"X-API-Key":api_key,"Content-Type":"application/json"}body={"q":keyword.strip(),"hl":"en","gl":"us","page":1}r=requests.post("https://api.serpbase.dev/google/search",headers=headers,json=body,timeout=30)# 存储到数据库store_serp_data(keyword,r.json())returnf"Collected{len(keyword_list)}keywords"defcollect_competitor_task():"""采集竞品数据任务"""competitors=Variable.get("seo_competitors",default_var="").split(',')forcompetitorincompetitors:ifcompetitor.strip():track_competitor(competitor.strip())returnf"Tracked{len(competitors)}competitors"defanalyze_task():"""分析数据任务"""# 计算排名变化ranking_changes=calculate_ranking_changes()# 检测异常anomalies=detect_anomalies()# 生成洞察insights=generate_insights()# 存储分析结果store_analysis(ranking_changes,anomalies,insights)return"Analysis complete"defgenerate_report_task():"""生成报告任务"""# 生成HTML报告report_html=generate_html_report()# 保存到文件withopen(f"/reports/seo_report_{datetime.now().strftime('%Y%m%d')}.html",'w')asf:f.write(report_html)return"Report generated"三、高级特性
3.1 动态任务生成
fromairflow.operators.pythonimportPythonOperatordefcreate_dynamic_tasks(**context):"""动态生成任务"""keywords=Variable.get("seo_keywords",default_var="").split(',')forkeywordinkeywords:ifkeyword.strip():task=PythonOperator(task_id=f'collect_{keyword.strip().replace(" ","_")}',python_callable=collect_single_keyword,op_kwargs={'keyword':keyword.strip()})# 添加到DAGcontext['dag'].add_task(task)# 使用BranchPythonOperator做条件分支defbranch_on_anomaly(**context):"""根据是否有异常决定分支"""has_anomaly=check_if_anomaly_exists()ifhas_anomaly:return'send_alert_task'else:return'skip_alert_task'branch_task=BranchPythonOperator(task_id='branch_on_anomaly',python_callable=branch_on_anomaly)3.2 监控和告警
defcheck_task_health(**context):"""检查任务健康状态"""ti=context['ti']# 获取上游任务状态upstream_tasks=ti.get_dagrun().get_task_instances()failed_tasks=[tfortinupstream_tasksift.state=='failed']iffailed_tasks:send_alert(f"Tasks failed:{[t.task_idfortinfailed_tasks]}")return"Health check complete"四、部署
# docker-compose.ymlversion:'3.8'services:airflow-webserver:image:apache/airflow:2.8.0command:webserverports:-"8080:8080"volumes:-./dags:/opt/airflow/dags-./logs:/opt/airflow/logsenvironment:-AIRFLOW__CORE__EXECUTOR=LocalExecutor-AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflowairflow-scheduler:image:apache/airflow:2.8.0command:schedulervolumes:-./dags:/opt/airflow/dags-./logs:/opt/airflow/logspostgres:image:postgres:15environment:POSTGRES_USER:airflowPOSTGRES_PASSWORD:airflowPOSTGRES_DB:airflowAirflow让SEO自动化从"脚本集合"变成了"工程系统"。可视化DAG图让团队能理解整个流程,失败自动重试减少人工干预,监控告警确保问题及时发现。部署成本:一个2核4G的服务器就能跑起来。