#!/usr/bin/env python # -*- coding: UTF-8 -*- """ @Project :redbook @File :scraper.py @IDE :PyCharm @Author :rengengchen @Time :2024/4/17 11:53 """ import datetime import random import time from loguru import logger import requests from config import HEAD, INFO_COLUMNS, RESULT_COLUMNS, RESULT_PATH, ENCODING, KEYWORDS url = 'https://www.kuaishou.com/graphql' def request_comment(photo_id, pcursor='', comment_id=None, s1=1, s2=3): is_child = bool(comment_id) if is_child: params = ( r'{"query":"mutation visionSubCommentList($photoId: String, $rootCommentId: String, $pcursor: String) {\n' r' visionSubCommentList(photoId: $photoId, rootCommentId: $rootCommentId, pcursor: $pcursor) {\n' r' pcursor\n' r' subComments {\n' r' commentId\n' r' authorId\n' r' authorName\n' r' content\n' r' headurl\n' r' timestamp\n' r' likedCount\n' r' realLikedCount\n' r' liked\n' r' status\n' r' authorLiked\n' r' replyToUserName\n' r' replyTo\n' r' __typename\n' r' }\n' r' __typename\n' r' }\n' r'}\n",' '"variables":{' f'"photoId":"{photo_id}","rootCommentId":"{comment_id}","pcursor":"{pcursor}"' '}}') comment_key = 'visionSubCommentList' else: params = ( r'{"query":"query commentListQuery($photoId: String, $pcursor: String) {\n' r' visionCommentList(photoId: $photoId, pcursor: $pcursor) {\n' r' commentCount\n' r' pcursor\n' r' rootComments {\n' r' commentId\n' r' authorId\n' r' authorName\n' r' content\n' r' headurl\n' r' timestamp\n' r' likedCount\n' r' realLikedCount\n' r' liked\n' r' status\n' r' authorLiked\n' r' subCommentCount\n' r' subCommentsPcursor\n' r' subComments {\n' r' commentId\n' r' authorId\n' r' authorName\n' r' content\n' r' headurl\n' r' timestamp\n' r' likedCount\n' r' realLikedCount\n' r' liked\n' r' status\n' r' authorLiked\n' r' replyToUserName\n' r' replyTo\n' r' __typename\n' r' }\n' r' __typename\n' r' }\n' r' __typename\n' r' }\n' r'}\n",' '"variables":{' f'"photoId":"{photo_id}","pcursor":"{pcursor}"' '}}') comment_key = 'visionCommentList' r = requests.post(url, headers=HEAD, data=params) respond = r.json() try: respond = respond['data'][comment_key] except Exception: logger.error(respond) raise if s2 > s1: gap = random.uniform(s1, s2) elif s1 == s2: gap = s1 else: gap = 0 time.sleep(gap) return respond def transform_comment(comment): dt_object = datetime.datetime.fromtimestamp(int(comment['timestamp']) // 1000) comment['create_time'] = dt_object.strftime('%Y-%m-%d %H:%M:%S') comment['authorName'] = comment['authorName'] comment['authorId'] = f'https://www.kuaishou.com/profile/{comment["authorId"]}' comment['replyTo'] = f'https://www.kuaishou.com/profile/{comment["replyTo"]}' if comment.get('replyTo') else '' comment1 = {k: comment[k] for k in INFO_COLUMNS} comment1['ip_label'] = comment.get('ip_label', '') return comment1 def parse_comment(photo_id, comments, level): comments1 = [] sub_comment_has_more = False subs = [] for comment in comments: logger.debug(f'parse comment:\n{comment}') # 子评论没有这些属性 sub_cursor = comment.get('subCommentsPcursor', 'no_more') sub_comments = comment.get('subComments', []) if sub_cursor is not None and sub_cursor != 'no_more': sub_comment_has_more = True subs.append((comment['commentId'], sub_cursor)) # 是否包含指定关键词 have_word = not bool(len(KEYWORDS)) for keyword in KEYWORDS: have_word = keyword in comment['content'] if have_word: break if not have_word: continue comment = transform_comment(comment) comment['photo_id'] = f'https://www.kuaishou.com/short-video/{photo_id}' comment['level'] = level comments1.append(comment) for sub_comment in sub_comments: have_word = not bool(len(KEYWORDS)) for filter_word in KEYWORDS: have_word = filter_word in sub_comment['content'] if have_word: break if not have_word: continue sub_comment1 = transform_comment(sub_comment) sub_comment1['photo_id'] = f'https://www.kuaishou.com/short-video/{photo_id}' sub_comment1['level'] = level + 1 logger.debug(sub_comment1) comments1.append(sub_comment1) with open(f'{RESULT_PATH}/comments.csv', mode='a', encoding=ENCODING, errors='ignore') as f: for comment in comments1: f.write(','.join([str(comment[k]) for k in RESULT_COLUMNS])) f.write('\n') if sub_comment_has_more: logger.debug('load sub comment') for comment_id, subCommentsPcursor in subs: read_comment(photo_id, comment_id=comment_id, pcursor=subCommentsPcursor, level=level + 1) return comments1 def read_comment(photo_id, comment_id=None, pcursor='', level=1, s1=1, s2=3): if comment_id: logger.debug(f'load sub comment from {comment_id}') comment_col = 'subComments' else: comment_col = 'rootComments' comment_list = request_comment(photo_id, pcursor=pcursor, comment_id=comment_id, s1=s1, s2=s2) pcursor = comment_list['pcursor'] logger.debug(comment_list) parse_comment(photo_id, comment_list[comment_col], level=level) while pcursor != 'no_more': logger.debug('load next page') comment_list = request_comment(photo_id, pcursor=pcursor, comment_id=comment_id, s1=s1, s2=s2) logger.debug(comment_list) try: pcursor = comment_list['pcursor'] parse_comment(photo_id, comment_list[comment_col], level=level) except Exception: logger.error(comment_col) logger.error(comment_list) raise logger.debug('done')