#!/usr/bin/env python # -*- coding: UTF-8 -*- """ @Project :redbook @File :scraper.py @IDE :PyCharm @Author :rengengchen @Time :2024/4/17 11:53 """ import datetime import random import re import time from urllib.parse import urlparse import execjs from loguru import logger import requests from config import HEAD, INFO_COLUMNS, RESULT_COLUMNS, RESULT_PATH, ENCODING, KEYWORDS, resource_path def getXs(api, cookie): file_path = resource_path("xhs_get_sub_comment.js") with open(file_path, 'r', encoding='utf-8') as f: jstext = f.read() ctx = execjs.compile(jstext) match = re.search(r'(a1=[^;]+)', cookie) if match: a1 = match.group(1) else: raise Exception("关键参数a1获取失败,请检查cookie") result = ctx.call("sign", api, '', a1) return result['X-s'] def request_comment(note_id, cursor=None, root_id=None, s1=1, s2=3): if root_id is not None: url = f'https://edith.xiaohongshu.com/api/sns/web/v2/comment/sub/page?note_id={note_id}&root_comment_id={root_id}&num=10&cursor={cursor}&image_formats=jpg,webp,avif' parsed_url = urlparse(url) # 组合路径和查询字符串 path_and_query = parsed_url.path if parsed_url.query: path_and_query += '?' + parsed_url.query HEAD['X-s'] = getXs(path_and_query, HEAD['Cookie']) else: if cursor: url = f'https://edith.xiaohongshu.com/api/sns/web/v2/comment/page?note_id={note_id}&cursor={cursor}&image_formats=jpg,webp,avif' else: url = f'https://edith.xiaohongshu.com/api/sns/web/v2/comment/page?note_id={note_id}&image_formats=jpg,webp,avif' if 'X-s' in HEAD: del HEAD['X-s'] r = requests.get(url, headers=HEAD) respond = r.json() if respond['code'] == 0: respond = respond['data'] time.sleep(random.randint(s1, s2)) if s2 > s1: gap = random.uniform(s1, s2) elif s1 == s2: gap = s1 else: gap = 0 time.sleep(gap) return respond logger.error(f'fail to request {url}') logger.error(respond) raise ConnectionError(f'fail to request {url}, respond: {respond}') def transform_comment(comment): dt_object = datetime.datetime.fromtimestamp(int(comment['create_time']) // 1000) comment['create_time'] = dt_object.strftime('%Y-%m-%d %H:%M:%S') comment['user_id'] = comment['user_info']['user_id'] comment['nickname'] = comment['user_info']['nickname'] comment1 = {k: comment[k] for k in INFO_COLUMNS} comment1['ip_location'] = comment.get('ip_location', '') return comment1 def parse_comment(level, comments): comments1 = [] sub_comment_has_more = False subs = [] for comment in comments: logger.debug(f'parse comment:\n{comment}') # 子评论没有这些属性 sub_comments = comment.get('sub_comments', []) if comment.get('sub_comment_has_more', False): sub_comment_has_more = True subs.append((comment['note_id'], comment['id'], comment['sub_comment_cursor'])) flag = not bool(len(KEYWORDS)) for filter_word in KEYWORDS: flag = filter_word in comment['content'] if flag: break if not flag: continue comment = transform_comment(comment) comment['target'] = comment.get('target_comment', {}).get('id', '') comment['level'] = level logger.debug(comment) comments1.append(comment) for sub_comment in sub_comments: flag = False for filter_word in KEYWORDS: flag = filter_word in sub_comment['content'] if flag: logger.debug(f'comment: {sub_comment["content"]} be filtered cause of {filter_word}') break if flag: continue sub_comment1 = transform_comment(sub_comment) sub_comment1['target'] = sub_comment['target_comment']['id'] sub_comment1['level'] = level + 1 logger.debug(sub_comment1) comments1.append(sub_comment1) with open(f'{RESULT_PATH}/comments.csv', mode='a', encoding=ENCODING, errors='ignore') as f: for comment in comments1: f.write(','.join([str(comment[k]) for k in RESULT_COLUMNS])) f.write('\n') if sub_comment_has_more: logger.debug('load sub comment') for note_id, root_id, cursor in subs: read_comment(note_id, level=level + 1, root_id=root_id, cursor=cursor) return comments1 def read_comment(note_id, level=1, root_id=None, cursor=None, s1=1, s2=3): data = request_comment(note_id, cursor=cursor, root_id=root_id, s1=s1, s2=s2) parse_comment(level, data['comments']) while data['has_more']: logger.debug('load next page') data = request_comment(note_id, cursor=data['cursor'], root_id=root_id, s1=s1, s2=s2) parse_comment(level, data['comments'])