本文中使用python多进程进行接口的并发请求

python多进程实践

  • UserAgent:生成随机UserAgent以模拟真实请求
  • peewee:轻量级ORM模块
  • multiprocess:最简单基础的多进程实现
  • logging:调试以及记录程序中出现的问题
  • traceback:提取、格式化和打印程序的stack traces信息
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/local/bin/python3
import requests
from fake_useragent import UserAgent
import json
from peewee import *
import datetime
import time
from multiprocessing import Process, Pool
import os
import logging
import traceback

logging.basicConfig(level=logging.ERROR,
                    filename='collect-multiprocess.log',
                    filemode='a',
                    format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'
                    )

id_all = {"动作": 1, "角色扮演": 5, "横版过关": 41, "冒险": 4, "射击": 48, "第一人称射击": 32,
          "策略": 2, "益智": 18, "模拟": 7, "体育": 3, "竞速": 6, "格斗": 9, "乱斗/清版": 37, "即时战略": 12, "音乐/旋律": 19}

comment_api = 'https://www.douban.com/j/ilmen/game/search?genres={}&platforms=&q=&sort=rating&more={}'

db = MySQLDatabase('thinkphp', user='root', charset='utf8mb4')


class DouBanGame(Model):
    title = CharField()
    cover = CharField()
    star = CharField()
    type = CharField()
    rating = CharField()
    platforms = CharField()
    n_ratings = CharField()
    genres = CharField()
    content = CharField()
    create_at = DateTimeField()

    class Meta:
        database = db
        table_name = 'douban_games'


def get_data(genres):
    logging.info(genres)
    # print('Run task as %s (%s)...' % (genres, os.getpid()))

    id_all_reverse = dict([val, key] for key, val in id_all.items())

    link = comment_api.format(genres, 1)

    headers = {"User-Agent": UserAgent(verify_ssl=False).random}

    page_data = requests.get(link, headers=headers,
                             # proxies={'https': 'http://127.0.0.1:8888'},
                             # verify=False
                             )

    init_data = json.loads(page_data.text)

    logging.info(init_data)

    col = ['name', 'star', 'rating', 'platforms', 'n_ratings', 'genres', 'content']

    total = init_data['total']
    print('{}类别共{}个游戏,开始爬取!'.format(id_all_reverse[genres], total))

    i = 0
    while i < total:
        data = []
        game_type = id_all_reverse[genres]

        if i == 0:
            n = 1
        else:
            n = init_data['more']

        init_data = json.loads(
            requests.get(comment_api.format(genres, n), headers=headers,
                         # proxies={'https': 'http://127.0.0.1:8888'},
                         # verify=False
                         ).text)

        current_games = init_data['games']

        length = len(init_data['games'])

        try:

            for j in range(length - 1):
                data.append({
                    'title': current_games[j]['title'],
                    'cover': current_games[j]['cover'],
                    'type': game_type,
                    'star': current_games[j]['star'],
                    'rating': current_games[j]['rating'],
                    'platforms': current_games[j]['platforms'],
                    'n_ratings': current_games[j]['n_ratings'],
                    'genres': current_games[j]['genres'],
                    'content': (
                        current_games[j]['review']['content'] if isinstance(current_games[j]['review']['content'],
                                                                            str) else ''),
                    'create_at': datetime.datetime.now()
                })
                i += 1
            # time.sleep(0.8)

            if data:
                last_id = DouBanGame.insert_many(data).execute()
                print(last_id)
            else:
                print('empty data!')
                print('NO%s' % i)
                break
        except Exception as e:
            logging.info(traceback.format_exc())


if __name__ == '__main__':
    comment_api = 'https://www.douban.com/j/ilmen/game/search?genres={}&platforms=&q=&sort=rating&more={}'

    print('Parent process %s.' % os.getpid())

    try:

        p = Pool(4)

        for genres in list(id_all.values()):
            p.apply_async(get_data, args=(genres,))

        print("waiting for all subProcesses done...")
        p.close()
        p.join()
        print('All subProcesses done.')
    except Exception as e:
        logging.info(traceback.print_exc())

github