xhs/scraper.py

145 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@Project redbook
@File scraper.py
@IDE PyCharm
@Author rengengchen
@Time 2024/4/17 11:53
"""
import datetime
import random
import re
import time
from urllib.parse import urlparse
import execjs
from loguru import logger
import requests
from config import HEAD, INFO_COLUMNS, RESULT_COLUMNS, RESULT_PATH, ENCODING, KEYWORDS, resource_path
def getXs(api, cookie):
file_path = resource_path("xhs_get_sub_comment.js")
with open(file_path, 'r', encoding='utf-8') as f:
jstext = f.read()
ctx = execjs.compile(jstext)
match = re.search(r'(a1=[^;]+)', cookie)
if match:
a1 = match.group(1)
else:
raise Exception("关键参数a1获取失败请检查cookie")
result = ctx.call("sign", api, '', a1)
return result['X-s']
def request_comment(note_id, cursor=None, root_id=None, s1=1, s2=3):
if root_id is not None:
url = f'https://edith.xiaohongshu.com/api/sns/web/v2/comment/sub/page?note_id={note_id}&root_comment_id={root_id}&num=10&cursor={cursor}&image_formats=jpg,webp,avif'
parsed_url = urlparse(url)
# 组合路径和查询字符串
path_and_query = parsed_url.path
if parsed_url.query:
path_and_query += '?' + parsed_url.query
HEAD['X-s'] = getXs(path_and_query, HEAD['Cookie'])
else:
if cursor:
url = f'https://edith.xiaohongshu.com/api/sns/web/v2/comment/page?note_id={note_id}&cursor={cursor}&image_formats=jpg,webp,avif'
else:
url = f'https://edith.xiaohongshu.com/api/sns/web/v2/comment/page?note_id={note_id}&image_formats=jpg,webp,avif'
if 'X-s' in HEAD:
del HEAD['X-s']
r = requests.get(url, headers=HEAD)
respond = r.json()
if respond['code'] == 0:
respond = respond['data']
time.sleep(random.randint(s1, s2))
if s2 > s1:
gap = random.uniform(s1, s2)
elif s1 == s2:
gap = s1
else:
gap = 0
time.sleep(gap)
return respond
logger.error(f'fail to request {url}')
logger.error(respond)
raise ConnectionError(f'fail to request {url}, respond: {respond}')
def transform_comment(comment):
dt_object = datetime.datetime.fromtimestamp(int(comment['create_time']) // 1000)
comment['create_time'] = dt_object.strftime('%Y-%m-%d %H:%M:%S')
comment['user_id'] = comment['user_info']['user_id']
comment['nickname'] = comment['user_info']['nickname']
comment1 = {k: comment[k] for k in INFO_COLUMNS}
comment1['ip_location'] = comment.get('ip_location', '')
return comment1
def parse_comment(level, comments):
comments1 = []
sub_comment_has_more = False
subs = []
for comment in comments:
logger.debug(f'parse comment:\n{comment}')
# 子评论没有这些属性
sub_comments = comment.get('sub_comments', [])
if comment.get('sub_comment_has_more', False):
sub_comment_has_more = True
subs.append((comment['note_id'], comment['id'], comment['sub_comment_cursor']))
flag = not bool(len(KEYWORDS))
for filter_word in KEYWORDS:
flag = filter_word in comment['content']
if flag:
break
if not flag:
continue
comment = transform_comment(comment)
comment['target'] = comment.get('target_comment', {}).get('id', '')
comment['level'] = level
logger.debug(comment)
comments1.append(comment)
for sub_comment in sub_comments:
flag = False
for filter_word in KEYWORDS:
flag = filter_word in sub_comment['content']
if flag:
logger.debug(f'comment: {sub_comment["content"]} be filtered cause of {filter_word}')
break
if flag:
continue
sub_comment1 = transform_comment(sub_comment)
sub_comment1['target'] = sub_comment['target_comment']['id']
sub_comment1['level'] = level + 1
logger.debug(sub_comment1)
comments1.append(sub_comment1)
with open(f'{RESULT_PATH}/comments.csv', mode='a', encoding=ENCODING, errors='ignore') as f:
for comment in comments1:
f.write(','.join([str(comment[k]) for k in RESULT_COLUMNS]))
f.write('\n')
if sub_comment_has_more:
logger.debug('load sub comment')
for note_id, root_id, cursor in subs:
read_comment(note_id,
level=level + 1,
root_id=root_id,
cursor=cursor)
return comments1
def read_comment(note_id, level=1, root_id=None, cursor=None, s1=1, s2=3):
data = request_comment(note_id, cursor=cursor, root_id=root_id, s1=s1, s2=s2)
parse_comment(level, data['comments'])
while data['has_more']:
logger.debug('load next page')
data = request_comment(note_id, cursor=data['cursor'], root_id=root_id, s1=s1, s2=s2)
parse_comment(level, data['comments'])