7.3.4 从Yahoo Finance抓取和解析金融数据
文件yahoo_finance_utils.py定义了一个名为yahoo_finance_utils.py的模块,它包含了一系列用于从Yahoo Finance网站抓取和解析金融数据的函数。这些函数可以异步地获取收益日历数据、市场动态(如涨幅最大、跌幅最大和交易量最大的股票)等信息。在代码中使用了httpx库来发送HTTP请求,并利用pandas和BeautifulSoup库来解析和处理数据。此外,代码中还包含了一个装饰器函数retry_on_rate_limit,用于在遇到速率限制错误时重试函数调用。
(1)下面代码的功能是定义一组HTTP请求头YAHOO_HEADERS,这些请求头模拟了一个常见的浏览器环境,用于在向Yahoo Finance网站发起请求时,伪装成浏览器访问,以避免被网站识别为爬虫而阻止访问。
YAHOO_HEADERS = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', }(2)下面代码的功能是创建一个异步HTTP客户端create_cached_async_client,该客户端通过hishel库启用了缓存功能,可以提高重复请求相同资源时的效率。同时,该函数允许传入自定义的请求头。
def create_cached_async_client(headers: dict | None = None) -> httpx.AsyncClient: """创建一个启用了 hishel 缓存的 httpx.AsyncClient。""" hishel.install_cache() return httpx.AsyncClient( timeout=30.0, follow_redirects=True, headers=headers, )(3)下面代码的功能是定义一个convert_to_numeric函数,用于将包含单位后缀(如K、M、B、T)的字符串转换为数值类型,以便进行后续的数值计算和分析。
def convert_to_numeric(value_str): """将类似 '1.2M'、'3.45B'、'123.4K' 这样的字符串值转换为数值。""" if pd.isna(value_str) or value_str in ('', '-'): return None value_str = str(value_str).strip().replace(',', '') try: return float(value_str) except ValueError: pass # 处理带后缀的值(K、M、B、T) multipliers = {'K': 1_000, 'M': 1_000_000, 'B': 1_000_000_000, 'T': 1_000_000_000_000} for suffix, multiplier in multipliers.items(): if value_str.upper().endswith(suffix): try: return float(value_str[:-1]) * multiplier except ValueError: pass return value_str(4)下面代码的功能是定义一个异步函数_parse_earnings_json,用于从Yahoo Finance的特定URL解析收益相关的JSON数据。该函数尝试使用预定义的正则表达式模式从网页内容中提取JSON数据,并将其转换为Python字典。
async def _parse_earnings_json(url: str) -> dict: """使用现有的异步基础架构从雅虎财经 URL 解析收益 JSON。""" async with create_cached_async_client(headers=YAHOO_HEADERS) as client: response = await client.get(url) response.raise_for_status() content = response.text # 尝试过去有效的模式 patterns = [ r'root\.App\.main\s*=\s*({.*?});', r'window\.App\.main\s*=\s*({.*?});' ] for pattern_name, pattern in zip(['root.App.main', 'window.App.main'], patterns): match = re.search(pattern, content, re.DOTALL) if match: try: data = json.loads(match.group(1)) logger.info(f"使用 {pattern_name} 模式成功解析收益数据") return data except json.JSONDecodeError as e: logger.warning(f"解析 {pattern_name} JSON 失败:{e}") raise ValueError("无法使用原始模式找到收益数据")(5)下面代码的功能是定义一个异步函数get_earnings_for_date,用于获取特定日期的收益数据,支持分页功能。该函数通过递归调用自身,逐步获取所有分页的收益数据,并将它们合并为一个列表。
async def get_earnings_for_date(date, offset=0, count=1): """获取特定日期的收益(带分页,是原有有效代码的异步版本)。""" base_earnings_url = 'https://finance.yahoo.com/calendar/earnings' if offset >= count: return [] temp = pd.Timestamp(date) date = temp.strftime("%Y-%m-%d") dated_url = '{0}?day={1}&offset={2}&size={3}'.format( base_earnings_url, date, offset, 100) result = await _parse_earnings_json(dated_url) stores = result['context']['dispatcher']['stores'] earnings_count = stores['ScreenerCriteriaStore']['meta']['total'] new_offset = offset + 100 more_earnings = await get_earnings_for_date(date, new_offset, earnings_count) current_earnings = stores['ScreenerResultsStore']['results']['rows'] total_earnings = current_earnings + more_earnings return total_earnings(6)下面代码的功能是定义一个异步函数get_earnings_in_date_range,用于获取指定日期范围内的收益数据。该函数通过计算起始日期和结束日期之间的天数,逐天调用get_earnings_for_date函数来获取每一天的收益数据,并将它们合并。
async def get_earnings_in_date_range(start_date, end_date): """获取日期范围内的收益(是原有有效代码的异步版本)。""" import datetime earnings_data = [] days_diff = pd.Timestamp(end_date) - pd.Timestamp(start_date) days_diff = days_diff.days current_date = pd.Timestamp(start_date) dates = [current_date + datetime.timedelta(diff) for diff in range(days_diff + 1)] dates = [d.strftime("%Y-%m-%d") for d in dates] i = 0 while i < len(dates): try: earnings_data += await get_earnings_for_date(dates[i]) except Exception: pass i += 1 return earnings_data(7)下面代码的功能是定义一个异步函数get_earnings_calendar_data,使用Playwright浏览器自动化工具获取收益日历数据,并支持分页。该函数通过模拟浏览器访问Yahoo Finance的收益日历页面,提取页面中的表格数据,并将其转换为结构化的收益数据。
async def get_earnings_calendar_data( start_date: str = None, end_date: str = None, limit: int = 100 ) -> dict: """使用 Playwright 并支持分页来获取收益日历数据。""" try: from playwright.async_api import async_playwright except ImportError: raise ImportError("Playwright 不可用。请使用:uvx investor-agent[playwright] 安装") # 设置默认日期 start_date = start_date or datetime.date.today().strftime('%Y-%m-%d') end_date = end_date or (pd.Timestamp(start_date) + pd.DateOffset(days=7)).strftime('%Y-%m-%d') all_earnings = [] offset = 0 async with async_playwright() as p: async with await p.chromium.launch(headless=True) as browser: context = await browser.new_context( user_agent='Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' ) page = await context.new_page() # 持续获取页面直到获取足够数据或没有更多页面 while len(all_earnings) < limit: url = f"https://finance.yahoo.com/calendar/earnings?from={start_date}&to={end_date}&offset={offset}&size=100" logger.info(f"使用 Playwright 加载收益页面 {offset//100 + 1}") await page.goto(url, wait_until='domcontentloaded', timeout=60000) await page.wait_for_timeout(3000) # 从当前页面提取收益数据 earnings_data = await page.eval_on_selector_all('table tr', ''' (rows) => rows.slice(1).map(row => { const cells = Array.from(row.querySelectorAll('td, th')).map(c => c.textContent.trim()); // 只需要有效的代码——让其他字段自然设为空 const symbol = cells[0] || ''; if (!symbol || symbol === 'Symbol' || cells.length === 0) return null; return { symbol, company: cells[1] || '', event_name: cells[2] || '', time: cells[3] || '', eps_estimate: cells[4] || '', eps_actual: cells[5] || '', surprise_percent: cells[6] || '', market_cap: cells[7] || '' }; }).filter(Boolean) ''') if not earnings_data: logger.info("未找到更多收益数据——已到结果末尾") break all_earnings.extend(earnings_data) logger.info(f"从第 {offset//100 + 1} 页获取了 {len(earnings_data)} 条收益数据") # 如果获取的数据少于页面大小,说明已到最后一页 if len(earnings_data) < 100: logger.info(f"获取了 {len(earnings_data)} 条 < 100 条收益数据——已到最后一页") break offset += 100 await page.wait_for_timeout(1000) # 转换为结构化收益数据(限制为请求的数量) earnings_list = [ { 'symbol': str(item.get('symbol', '')), 'company': str(item.get('company', '')), 'event_name': str(item.get('event_name', '')), 'time': str(item.get('time', '')), 'eps_estimate': convert_to_numeric(item.get('eps_estimate', '')), 'eps_actual': convert_to_numeric(item.get('eps_actual', '')), 'surprise_percent': convert_to_numeric(item.get('surprise_percent', '')), 'market_cap': str(item.get('market_cap', '')) } for item in all_earnings[:limit] if item.get('symbol') ] return { 'metadata': { 'start_date': start_date, 'end_date': end_date, 'count': len(earnings_list), 'total_found': len(all_earnings), 'pages_fetched': (offset // 100) + 1, 'timestamp': pd.Timestamp.now().isoformat(), 'source': 'Yahoo Finance Playwright Scraping (Paginated)' }, 'earnings': earnings_list }(8)下面代码的功能是定义一个异步函数get_market_movers_data,用于从Yahoo Finance获取市场动态数据,如涨幅最大、跌幅最大和交易量最大的股票。该函数根据传入的类别和市场时段构建相应的URL,然后使用httpx库异步获取网页内容,并通过pandas库解析HTML表格数据,最后将解析后的数据转换为字典格式返回。
async def get_market_movers_data( category: Literal["gainers", "losers", "most-active"] = "most-active", count: int = 25, market_session: Literal["regular", "pre-market", "after-hours"] = "regular" ) -> dict: """从雅虎财经获取并解析市场动态数据。""" from bs4 import BeautifulSoup count = min(max(count, 1), 100) # 根据类别和市场时段构建 URL if category == "most-active": if market_session == "regular": url = f"https://finance.yahoo.com/most-active?count={count}&offset=0" elif market_session == "pre-market": url = f"https://finance.yahoo.com/markets/stocks/pre-market?count={count}&offset=0" elif market_session == "after-hours": url = f"https://finance.yahoo.com/markets/stocks/after-hours?count={count}&offset=0" else: raise ValueError(f"无效的市场时段:{market_session}") else: # 涨幅榜和跌幅榜仅适用于常规时段 url_map = { "gainers": f"https://finance.yahoo.com/gainers?count={count}&offset=0", "losers": f"https://finance.yahoo.com/losers?count={count}&offset=0" } url = url_map.get(category) if not url: raise ValueError(f"无效的类别:{category}") async with create_cached_async_client(headers=YAHOO_HEADERS) as client: logger.info(f"从以下地址获取 {category}({market_session} 时段)数据:{url}") response = await client.get(url) response.raise_for_status() # 用 pandas 解析 tables = pd.read_html(response.content) if not tables: raise ValueError(f"未找到 {category} 的数据") df = tables[0].copy() # 清理数据 df = df.drop('52 Week Range', axis=1, errors='ignore') # 清理涨跌幅列 if '% Change' in df.columns: df['% Change'] = df['% Change'].astype(str).str.replace('[%+,]', '', regex=True) df['% Change'] = pd.to_numeric(df['% Change'], errors='coerce') # 清理数值列 numeric_cols = [col for col in df.columns if any(x in col for x in ['Vol', 'Volume', 'Market Cap', 'Market'])] for col in numeric_cols: df[col] = df[col].astype(str).apply(convert_to_numeric) return { 'metadata': { 'category': category, 'market_session': market_session if category == "most-active" else "regular", 'count': len(df), 'timestamp': pd.Timestamp.now().isoformat(), 'source': 'Yahoo Finance' }, 'stocks': df.head(count).to_dict('records') }