Оптимизируйте параллельные вызовы API

У нас есть код Python ниже. У этого есть 2 API

  • а) чтобы получить всех клиентов — get_all_customers ()
  • б) чтобы получить конкретную информацию о клиенте — get_specifc_customer_details()

в настоящее время (б) get_specifc_customer_details выполняется последовательно. Можно ли в любом случае использовать искровой параллелизм, чтобы мы могли вызывать параллельно. что-то вроде ниже или любые другие лучшие подходы к использованию API в Spark.

хранить всех клиентов в фрейме данных df, отдельные строки

df = df.withColumn(cust_status_1, <invoke api pass specific customer >) \
       .withColumn(cust_status_2, <invoke api pass specific customer >)

наконец, в df будет 3 столбца — customer_id,cust_status_1,cust_status_2.

Исходный код:

def get_all_customers():

    url = "https://test.url.com/api/v1/customers?status=Customer"
    headers = {
    'Content-Type': 'application/json; charset=utf-8',
    'Authorization': 'Bearer Token'
    }
    customer_id_list = []
    response = requests.request("GET", url, headers=headers)
    for cust in response.json()["customers"]:
        customer_id_list.append(cust["uid"])

    return customer_id_list

def get_specifc_customer_details(customer_id_list):
    
    customer_status_overall = []
    for customer_id in customer_id_list:
        
        url = f"https://test.url.com/api/v1/customers/{customer_id}"
        headers = {
        'Content-Type': 'application/json; charset=utf-8',
        'Authorization': 'Bearer Token'
        }
        response = requests.request("GET", url, headers=headers)

        cust_status_1 = response.json()["customer"]["status_1"]
        cust_status_2 = response.json()["customer"]["status_2"]
        customer_status_overall.append((customer_id,cust_status_1,cust_status_2))
    
    customer_status_df = spark.createDataFrame(customer_status_overall, schema)
    return customer_status_df
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
0
65
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Я думаю, что это должно быть возможно. Вы даже можете связать вызовы API один за другим.

import json
from pyspark import SparkContext, SQLContext
import requests
from pyspark.sql import Row

sc = SparkContext('local')
sqlContext = SQLContext(sc)


data1 = [

[1],
[2],
[3],
[4],

]
columns1 =["postId"]

df1 = sqlContext.createDataFrame(data=data1, schema=columns1)
print("Given dataframe")
df1.show(n=100, truncate=False)

## Repartition into suitable numbers so that they get processed in proper manner.
## Increase number of parititons if still getting OOM errors.
df1 = df1.repartition(4).cache()

def my_method(input):
    # The API endpoint
    url = "https://jsonplaceholder.typicode.com/posts/"
    # Adding a payload
    payload = {"id": [input], "userId": 1}
    # A get request to the API
    response = requests.get(url, params=payload)
    if response.status_code == 200:
        response_json = response.json()
        return response_json
    else:
        return ""


def mapPartition_inference(partitioned_rows):

    print("Within Map partition")
    result_array_list = []
    print("For loop below")
    for row in partitioned_rows:
        result = my_method(row["postId"])
        print(result)
        result_array_list.append(Row(json.dumps(result))) ## wrap the result in the Row()
    print("result array list")
    print(result_array_list)
    return iter(result_array_list)

result_df = df1.rdd.mapPartitions(mapPartition_inference).toDF(["api_response"])


print("Result DF")
result_df.show(n=30, truncate=False)

Выход :

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|api_response                                                                                                                                                                                                                                                                                |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{"userId": 1, "id": 1, "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit", "body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"}]|
|[{"userId": 1, "id": 3, "title": "ea molestias quasi exercitationem repellat qui ipsa sit aut", "body": "et iusto sed quo iure\nvoluptatem occaecati omnis eligendi aut ad\nvoluptatem doloribus vel accusantium quis pariatur\nmolestiae porro eius odio et labore et velit aut"}]         |
|[{"userId": 1, "id": 2, "title": "qui est esse", "body": "est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla"}]              |
|[{"userId": 1, "id": 4, "title": "eum et est occaecati", "body": "ullam et saepe reiciendis voluptatem adipisci\nsit amet autem assumenda provident rerum culpa\nquis hic commodi nesciunt rem tenetur doloremque ipsam iure\nquis sunt voluptatem rerum illo velit"}]                      |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

можно ли сделать это только в фрейме данных вместо rdd?

Matthew 05.04.2024 07:02

Посмотрите UDF pyspark.

user238607 05.04.2024 12:01
Ответ принят как подходящий

Вот полезный пример использования Dataframes и параллельных вызовов API.

import json
import sys

from pyspark.sql import SQLContext
import requests
from pyspark.sql.functions import *
from pyspark.sql.types import *

sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = [[1], [2] ]
columns1 = ["postId"]

df1 = sqlContext.createDataFrame(data=data1, schema=columns1)
print("Given dataframe")
df1.show(n=100, truncate=False)

## Repartition into suitable numbers so that they get processed in proper manner.
## Increase number of parititons if still getting OOM errors.
df1 = df1.repartition(4).cache()


def my_method_for_schema(post_id):
    # The API endpoint
    post_url = "https://jsonplaceholder.typicode.com/posts/"
    # Adding a payload
    payload = {"id": [post_id], "userId": 1}
    # A get request to the API
    post_response = None
    response = requests.get(post_url, params=payload)
    if response.status_code == 200:
        post_response = response.json()[0]
    else:
        return ""

    # The API endpoint
    comments_url = "https://jsonplaceholder.typicode.com/comments"
    # Adding a payload
    comment_ids = [1, 2, 3]
    comments_retrieved = []
    for com_id in comment_ids:
        payload = {"id": com_id, "postId": 1}
        # A get request to the API
        response = requests.get(comments_url, params=payload)
        if response.status_code == 200:
            response_json = response.json()[0]
            comments_retrieved.append(response_json)
        else:
            return ""

    overall_json = {"post_response": post_response, "comments_response": comments_retrieved}
    str_repr = json.dumps(overall_json)

    return str_repr





@udf
def my_method_copy_for_pyspark(post_id):
    # The API endpoint
    post_url = "https://jsonplaceholder.typicode.com/posts/"
    # Adding a payload
    payload = {"id": [post_id], "userId": 1}
    # A get request to the API
    post_response = None
    response = requests.get(post_url, params=payload)
    if response.status_code == 200:
        post_response = response.json()[0]
    else:
        return ""

    # The API endpoint
    comments_url = "https://jsonplaceholder.typicode.com/comments"
    # Adding a payload
    comment_ids = [1, 2, 3]
    comments_retrieved = []
    for com_id in comment_ids:
        payload = {"id": com_id, "postId": 1}
        # A get request to the API
        response = requests.get(comments_url, params=payload)
        if response.status_code == 200:
            response_json = response.json()[0]
            comments_retrieved.append(response_json)
        else:
            return ""

    overall_json = {"post_response": post_response, "comments_response": comments_retrieved}
    str_repr = json.dumps(overall_json)
    return str_repr


## EXPLICIT SCHEMA REPRESENTATION

post_schema = StructType([
    StructField('userId', IntegerType(), True),
    StructField('id', IntegerType(), True),
    StructField('title', StringType(), False),
    StructField('body', StringType(), False)
]
)
comments_schema = StructType([
    StructField('postId', IntegerType(), True),
    StructField('id', IntegerType(), True),
    StructField('name', StringType(), True),
    StructField('email', StringType(), True),
    StructField('body', StringType(), True)]
)

schema_struct = StructType([
    StructField('post_response', post_schema, True),
    StructField('comments_response', ArrayType(comments_schema, True), True)
]
)

# We have copied the same function so that we can call it once
# to retrive a sample response from API call and then infer the json schema required
# for parsing the result into struct.
retrived_value = my_method_for_schema("1")

print("API RESPONSE")
result_df = df1.withColumn("api_response", my_method_copy_for_pyspark(col("postId")))
result_df.cache().show(n=30, truncate=False)


print("USING EXPLICIT STRUCT SCHEMA")
first_method_df = result_df.withColumn("api_parsed_struct", from_json(col("api_response"), schema_struct)).drop("api_response")
first_method_df.show(n=30, truncate=False)


print("USING SCHEMA FROM ONE TEST API CALL")
second_method_df = result_df.withColumn("api_parsed_struct", from_json(col("api_response"), schema_of_json(retrived_value))).drop("api_response")
second_method_df.show(n=30, truncate=False)

Выход :

Given dataframe
+------+
|postId|
+------+
|1     |
|2     |
+------+

API RESPONSE

|postId|api_response|

|2     |{"post_response": {"userId": 1, "id": 2, "title": "qui est esse", "body": "est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla"}, "comments_response": [{"postId": 1, "id": 1, "name": "id labore ex et quam laborum", "email": "[email protected]", "body": "laudantium enim quasi est quidem magnam voluptate ipsam eos\ntempora quo necessitatibus\ndolor quam autem quasi\nreiciendis et nam sapiente accusantium"}, {"postId": 1, "id": 2, "name": "quo vero reiciendis velit similique earum", "email": "[email protected]", "body": "est natus enim nihil est dolore omnis voluptatem numquam\net omnis occaecati quod ullam at\nvoluptatem error expedita pariatur\nnihil sint nostrum voluptatem reiciendis et"}, {"postId": 1, "id": 3, "name": "odio adipisci rerum aut animi", "email": "[email protected]", "body": "quia molestiae reprehenderit quasi aspernatur\naut expedita occaecati aliquam eveniet laudantium\nomnis quibusdam delectus saepe quia accusamus maiores nam est\ncum et ducimus et vero voluptates excepturi deleniti ratione"}]}              |
|1     |{"post_response": {"userId": 1, "id": 1, "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit", "body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"}, "comments_response": [{"postId": 1, "id": 1, "name": "id labore ex et quam laborum", "email": "[email protected]", "body": "laudantium enim quasi est quidem magnam voluptate ipsam eos\ntempora quo necessitatibus\ndolor quam autem quasi\nreiciendis et nam sapiente accusantium"}, {"postId": 1, "id": 2, "name": "quo vero reiciendis velit similique earum", "email": "[email protected]", "body": "est natus enim nihil est dolore omnis voluptatem numquam\net omnis occaecati quod ullam at\nvoluptatem error expedita pariatur\nnihil sint nostrum voluptatem reiciendis et"}, {"postId": 1, "id": 3, "name": "odio adipisci rerum aut animi", "email": "[email protected]", "body": "quia molestiae reprehenderit quasi aspernatur\naut expedita occaecati aliquam eveniet laudantium\nomnis quibusdam delectus saepe quia accusamus maiores nam est\ncum et ducimus et vero voluptates excepturi deleniti ratione"}]}|


USING EXPLICIT STRUCT SCHEMA

|postId|api_parsed_struct|

|2     |{{1, 2, qui est esse, est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla}, [{1, 1, id labore ex et quam laborum, [email protected], laudantium enim quasi est quidem magnam voluptate ipsam eos\ntempora quo necessitatibus\ndolor quam autem quasi\nreiciendis et nam sapiente accusantium}, {1, 2, quo vero reiciendis velit similique earum, [email protected], est natus enim nihil est dolore omnis voluptatem numquam\net omnis occaecati quod ullam at\nvoluptatem error expedita pariatur\nnihil sint nostrum voluptatem reiciendis et}, {1, 3, odio adipisci rerum aut animi, [email protected], quia molestiae reprehenderit quasi aspernatur\naut expedita occaecati aliquam eveniet laudantium\nomnis quibusdam delectus saepe quia accusamus maiores nam est\ncum et ducimus et vero voluptates excepturi deleniti ratione}]}              |
|1     |{{1, 1, sunt aut facere repellat provident occaecati excepturi optio reprehenderit, quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto}, [{1, 1, id labore ex et quam laborum, [email protected], laudantium enim quasi est quidem magnam voluptate ipsam eos\ntempora quo necessitatibus\ndolor quam autem quasi\nreiciendis et nam sapiente accusantium}, {1, 2, quo vero reiciendis velit similique earum, [email protected], est natus enim nihil est dolore omnis voluptatem numquam\net omnis occaecati quod ullam at\nvoluptatem error expedita pariatur\nnihil sint nostrum voluptatem reiciendis et}, {1, 3, odio adipisci rerum aut animi, [email protected], quia molestiae reprehenderit quasi aspernatur\naut expedita occaecati aliquam eveniet laudantium\nomnis quibusdam delectus saepe quia accusamus maiores nam est\ncum et ducimus et vero voluptates excepturi deleniti ratione}]}|


USING SCHEMA FROM ONE TEST API CALL

|postId|api_parsed_struct|

|2     |{[{laudantium enim quasi est quidem magnam voluptate ipsam eos\ntempora quo necessitatibus\ndolor quam autem quasi\nreiciendis et nam sapiente accusantium, [email protected], 1, id labore ex et quam laborum, 1}, {est natus enim nihil est dolore omnis voluptatem numquam\net omnis occaecati quod ullam at\nvoluptatem error expedita pariatur\nnihil sint nostrum voluptatem reiciendis et, [email protected], 2, quo vero reiciendis velit similique earum, 1}, {quia molestiae reprehenderit quasi aspernatur\naut expedita occaecati aliquam eveniet laudantium\nomnis quibusdam delectus saepe quia accusamus maiores nam est\ncum et ducimus et vero voluptates excepturi deleniti ratione, [email protected], 3, odio adipisci rerum aut animi, 1}], {est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla, 2, qui est esse, 1}}              |
|1     |{[{laudantium enim quasi est quidem magnam voluptate ipsam eos\ntempora quo necessitatibus\ndolor quam autem quasi\nreiciendis et nam sapiente accusantium, [email protected], 1, id labore ex et quam laborum, 1}, {est natus enim nihil est dolore omnis voluptatem numquam\net omnis occaecati quod ullam at\nvoluptatem error expedita pariatur\nnihil sint nostrum voluptatem reiciendis et, [email protected], 2, quo vero reiciendis velit similique earum, 1}, {quia molestiae reprehenderit quasi aspernatur\naut expedita occaecati aliquam eveniet laudantium\nomnis quibusdam delectus saepe quia accusamus maiores nam est\ncum et ducimus et vero voluptates excepturi deleniti ratione, [email protected], 3, odio adipisci rerum aut animi, 1}], {quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto, 1, sunt aut facere repellat provident occaecati excepturi optio reprehenderit, 1}}|


Проверьте код ниже

import pyspark
from pyspark.sql.functions import col, from_json, current_timestamp, udf
from pyspark.sql import Row
from pyspark.sql.types import StringType
import requests

# creating session object globally so that it can be re used inside udf function for multiple rows across partitions.

session = requests.Session() 
customers_schema = 'customers ARRAY<STRUCT <amagi_id STRING>>'
status_schema = 'customer STRUCT <status_1 STRING, status_2 STRING>'
def get_customers():
    url = "https://test.url.com/api/v1/customers?status=Customer"
    headers = {
        'Content-Type': 'application/json; charset=utf-8',
        'Authorization': 'Bearer Token'
    }
    return session.get(url, headers).text
def get_customer_details(customer_id):
    url = f"https://test.url.com/api/v1/customers/{customer_id}"
    headers = {
        'Content-Type': 'application/json; charset=utf-8',
        'Authorization': 'Bearer Token'
    }

    return session.get(url, headers).text
df.withColumn('customers', from_json(get_customers(), lit(customers_schema)))\ 
.cache()
.selectExp("inline(customers)")
.withColumn("status", from_json(get_customer_details(col("customer_id")), lit(status_schema)) \
.selectExpr("customer_id", "status.*")
.show(10, False)

Другие вопросы по теме