У нас есть код Python ниже. У этого есть 2 API
в настоящее время (б) get_specifc_customer_details выполняется последовательно. Можно ли в любом случае использовать искровой параллелизм, чтобы мы могли вызывать параллельно. что-то вроде ниже или любые другие лучшие подходы к использованию API в Spark.
хранить всех клиентов в фрейме данных df, отдельные строки
df = df.withColumn(cust_status_1, <invoke api pass specific customer >) \
.withColumn(cust_status_2, <invoke api pass specific customer >)
наконец, в df будет 3 столбца — customer_id,cust_status_1,cust_status_2.
Исходный код:
def get_all_customers():
url = "https://test.url.com/api/v1/customers?status=Customer"
headers = {
'Content-Type': 'application/json; charset=utf-8',
'Authorization': 'Bearer Token'
}
customer_id_list = []
response = requests.request("GET", url, headers=headers)
for cust in response.json()["customers"]:
customer_id_list.append(cust["uid"])
return customer_id_list
def get_specifc_customer_details(customer_id_list):
customer_status_overall = []
for customer_id in customer_id_list:
url = f"https://test.url.com/api/v1/customers/{customer_id}"
headers = {
'Content-Type': 'application/json; charset=utf-8',
'Authorization': 'Bearer Token'
}
response = requests.request("GET", url, headers=headers)
cust_status_1 = response.json()["customer"]["status_1"]
cust_status_2 = response.json()["customer"]["status_2"]
customer_status_overall.append((customer_id,cust_status_1,cust_status_2))
customer_status_df = spark.createDataFrame(customer_status_overall, schema)
return customer_status_df
Я думаю, что это должно быть возможно. Вы даже можете связать вызовы API один за другим.
import json
from pyspark import SparkContext, SQLContext
import requests
from pyspark.sql import Row
sc = SparkContext('local')
sqlContext = SQLContext(sc)
data1 = [
[1],
[2],
[3],
[4],
]
columns1 =["postId"]
df1 = sqlContext.createDataFrame(data=data1, schema=columns1)
print("Given dataframe")
df1.show(n=100, truncate=False)
## Repartition into suitable numbers so that they get processed in proper manner.
## Increase number of parititons if still getting OOM errors.
df1 = df1.repartition(4).cache()
def my_method(input):
# The API endpoint
url = "https://jsonplaceholder.typicode.com/posts/"
# Adding a payload
payload = {"id": [input], "userId": 1}
# A get request to the API
response = requests.get(url, params=payload)
if response.status_code == 200:
response_json = response.json()
return response_json
else:
return ""
def mapPartition_inference(partitioned_rows):
print("Within Map partition")
result_array_list = []
print("For loop below")
for row in partitioned_rows:
result = my_method(row["postId"])
print(result)
result_array_list.append(Row(json.dumps(result))) ## wrap the result in the Row()
print("result array list")
print(result_array_list)
return iter(result_array_list)
result_df = df1.rdd.mapPartitions(mapPartition_inference).toDF(["api_response"])
print("Result DF")
result_df.show(n=30, truncate=False)
Выход :
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|api_response |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{"userId": 1, "id": 1, "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit", "body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"}]|
|[{"userId": 1, "id": 3, "title": "ea molestias quasi exercitationem repellat qui ipsa sit aut", "body": "et iusto sed quo iure\nvoluptatem occaecati omnis eligendi aut ad\nvoluptatem doloribus vel accusantium quis pariatur\nmolestiae porro eius odio et labore et velit aut"}] |
|[{"userId": 1, "id": 2, "title": "qui est esse", "body": "est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla"}] |
|[{"userId": 1, "id": 4, "title": "eum et est occaecati", "body": "ullam et saepe reiciendis voluptatem adipisci\nsit amet autem assumenda provident rerum culpa\nquis hic commodi nesciunt rem tenetur doloremque ipsam iure\nquis sunt voluptatem rerum illo velit"}] |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Посмотрите UDF pyspark.
Вот полезный пример использования Dataframes и параллельных вызовов API.
import json
import sys
from pyspark.sql import SQLContext
import requests
from pyspark.sql.functions import *
from pyspark.sql.types import *
sc = SparkContext('local')
sqlContext = SQLContext(sc)
data1 = [[1], [2] ]
columns1 = ["postId"]
df1 = sqlContext.createDataFrame(data=data1, schema=columns1)
print("Given dataframe")
df1.show(n=100, truncate=False)
## Repartition into suitable numbers so that they get processed in proper manner.
## Increase number of parititons if still getting OOM errors.
df1 = df1.repartition(4).cache()
def my_method_for_schema(post_id):
# The API endpoint
post_url = "https://jsonplaceholder.typicode.com/posts/"
# Adding a payload
payload = {"id": [post_id], "userId": 1}
# A get request to the API
post_response = None
response = requests.get(post_url, params=payload)
if response.status_code == 200:
post_response = response.json()[0]
else:
return ""
# The API endpoint
comments_url = "https://jsonplaceholder.typicode.com/comments"
# Adding a payload
comment_ids = [1, 2, 3]
comments_retrieved = []
for com_id in comment_ids:
payload = {"id": com_id, "postId": 1}
# A get request to the API
response = requests.get(comments_url, params=payload)
if response.status_code == 200:
response_json = response.json()[0]
comments_retrieved.append(response_json)
else:
return ""
overall_json = {"post_response": post_response, "comments_response": comments_retrieved}
str_repr = json.dumps(overall_json)
return str_repr
@udf
def my_method_copy_for_pyspark(post_id):
# The API endpoint
post_url = "https://jsonplaceholder.typicode.com/posts/"
# Adding a payload
payload = {"id": [post_id], "userId": 1}
# A get request to the API
post_response = None
response = requests.get(post_url, params=payload)
if response.status_code == 200:
post_response = response.json()[0]
else:
return ""
# The API endpoint
comments_url = "https://jsonplaceholder.typicode.com/comments"
# Adding a payload
comment_ids = [1, 2, 3]
comments_retrieved = []
for com_id in comment_ids:
payload = {"id": com_id, "postId": 1}
# A get request to the API
response = requests.get(comments_url, params=payload)
if response.status_code == 200:
response_json = response.json()[0]
comments_retrieved.append(response_json)
else:
return ""
overall_json = {"post_response": post_response, "comments_response": comments_retrieved}
str_repr = json.dumps(overall_json)
return str_repr
## EXPLICIT SCHEMA REPRESENTATION
post_schema = StructType([
StructField('userId', IntegerType(), True),
StructField('id', IntegerType(), True),
StructField('title', StringType(), False),
StructField('body', StringType(), False)
]
)
comments_schema = StructType([
StructField('postId', IntegerType(), True),
StructField('id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('email', StringType(), True),
StructField('body', StringType(), True)]
)
schema_struct = StructType([
StructField('post_response', post_schema, True),
StructField('comments_response', ArrayType(comments_schema, True), True)
]
)
# We have copied the same function so that we can call it once
# to retrive a sample response from API call and then infer the json schema required
# for parsing the result into struct.
retrived_value = my_method_for_schema("1")
print("API RESPONSE")
result_df = df1.withColumn("api_response", my_method_copy_for_pyspark(col("postId")))
result_df.cache().show(n=30, truncate=False)
print("USING EXPLICIT STRUCT SCHEMA")
first_method_df = result_df.withColumn("api_parsed_struct", from_json(col("api_response"), schema_struct)).drop("api_response")
first_method_df.show(n=30, truncate=False)
print("USING SCHEMA FROM ONE TEST API CALL")
second_method_df = result_df.withColumn("api_parsed_struct", from_json(col("api_response"), schema_of_json(retrived_value))).drop("api_response")
second_method_df.show(n=30, truncate=False)
Выход :
Given dataframe
+------+
|postId|
+------+
|1 |
|2 |
+------+
API RESPONSE

|postId|api_response |

|2 |{"post_response": {"userId": 1, "id": 2, "title": "qui est esse", "body": "est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla"}, "comments_response": [{"postId": 1, "id": 1, "name": "id labore ex et quam laborum", "email": "[email protected]", "body": "laudantium enim quasi est quidem magnam voluptate ipsam eos\ntempora quo necessitatibus\ndolor quam autem quasi\nreiciendis et nam sapiente accusantium"}, {"postId": 1, "id": 2, "name": "quo vero reiciendis velit similique earum", "email": "[email protected]", "body": "est natus enim nihil est dolore omnis voluptatem numquam\net omnis occaecati quod ullam at\nvoluptatem error expedita pariatur\nnihil sint nostrum voluptatem reiciendis et"}, {"postId": 1, "id": 3, "name": "odio adipisci rerum aut animi", "email": "[email protected]", "body": "quia molestiae reprehenderit quasi aspernatur\naut expedita occaecati aliquam eveniet laudantium\nomnis quibusdam delectus saepe quia accusamus maiores nam est\ncum et ducimus et vero voluptates excepturi deleniti ratione"}]} |
|1 |{"post_response": {"userId": 1, "id": 1, "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit", "body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"}, "comments_response": [{"postId": 1, "id": 1, "name": "id labore ex et quam laborum", "email": "[email protected]", "body": "laudantium enim quasi est quidem magnam voluptate ipsam eos\ntempora quo necessitatibus\ndolor quam autem quasi\nreiciendis et nam sapiente accusantium"}, {"postId": 1, "id": 2, "name": "quo vero reiciendis velit similique earum", "email": "[email protected]", "body": "est natus enim nihil est dolore omnis voluptatem numquam\net omnis occaecati quod ullam at\nvoluptatem error expedita pariatur\nnihil sint nostrum voluptatem reiciendis et"}, {"postId": 1, "id": 3, "name": "odio adipisci rerum aut animi", "email": "[email protected]", "body": "quia molestiae reprehenderit quasi aspernatur\naut expedita occaecati aliquam eveniet laudantium\nomnis quibusdam delectus saepe quia accusamus maiores nam est\ncum et ducimus et vero voluptates excepturi deleniti ratione"}]}|

USING EXPLICIT STRUCT SCHEMA

|postId|api_parsed_struct |

|2 |{{1, 2, qui est esse, est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla}, [{1, 1, id labore ex et quam laborum, [email protected], laudantium enim quasi est quidem magnam voluptate ipsam eos\ntempora quo necessitatibus\ndolor quam autem quasi\nreiciendis et nam sapiente accusantium}, {1, 2, quo vero reiciendis velit similique earum, [email protected], est natus enim nihil est dolore omnis voluptatem numquam\net omnis occaecati quod ullam at\nvoluptatem error expedita pariatur\nnihil sint nostrum voluptatem reiciendis et}, {1, 3, odio adipisci rerum aut animi, [email protected], quia molestiae reprehenderit quasi aspernatur\naut expedita occaecati aliquam eveniet laudantium\nomnis quibusdam delectus saepe quia accusamus maiores nam est\ncum et ducimus et vero voluptates excepturi deleniti ratione}]} |
|1 |{{1, 1, sunt aut facere repellat provident occaecati excepturi optio reprehenderit, quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto}, [{1, 1, id labore ex et quam laborum, [email protected], laudantium enim quasi est quidem magnam voluptate ipsam eos\ntempora quo necessitatibus\ndolor quam autem quasi\nreiciendis et nam sapiente accusantium}, {1, 2, quo vero reiciendis velit similique earum, [email protected], est natus enim nihil est dolore omnis voluptatem numquam\net omnis occaecati quod ullam at\nvoluptatem error expedita pariatur\nnihil sint nostrum voluptatem reiciendis et}, {1, 3, odio adipisci rerum aut animi, [email protected], quia molestiae reprehenderit quasi aspernatur\naut expedita occaecati aliquam eveniet laudantium\nomnis quibusdam delectus saepe quia accusamus maiores nam est\ncum et ducimus et vero voluptates excepturi deleniti ratione}]}|

USING SCHEMA FROM ONE TEST API CALL

|postId|api_parsed_struct |

|2 |{[{laudantium enim quasi est quidem magnam voluptate ipsam eos\ntempora quo necessitatibus\ndolor quam autem quasi\nreiciendis et nam sapiente accusantium, [email protected], 1, id labore ex et quam laborum, 1}, {est natus enim nihil est dolore omnis voluptatem numquam\net omnis occaecati quod ullam at\nvoluptatem error expedita pariatur\nnihil sint nostrum voluptatem reiciendis et, [email protected], 2, quo vero reiciendis velit similique earum, 1}, {quia molestiae reprehenderit quasi aspernatur\naut expedita occaecati aliquam eveniet laudantium\nomnis quibusdam delectus saepe quia accusamus maiores nam est\ncum et ducimus et vero voluptates excepturi deleniti ratione, [email protected], 3, odio adipisci rerum aut animi, 1}], {est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla, 2, qui est esse, 1}} |
|1 |{[{laudantium enim quasi est quidem magnam voluptate ipsam eos\ntempora quo necessitatibus\ndolor quam autem quasi\nreiciendis et nam sapiente accusantium, [email protected], 1, id labore ex et quam laborum, 1}, {est natus enim nihil est dolore omnis voluptatem numquam\net omnis occaecati quod ullam at\nvoluptatem error expedita pariatur\nnihil sint nostrum voluptatem reiciendis et, [email protected], 2, quo vero reiciendis velit similique earum, 1}, {quia molestiae reprehenderit quasi aspernatur\naut expedita occaecati aliquam eveniet laudantium\nomnis quibusdam delectus saepe quia accusamus maiores nam est\ncum et ducimus et vero voluptates excepturi deleniti ratione, [email protected], 3, odio adipisci rerum aut animi, 1}], {quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto, 1, sunt aut facere repellat provident occaecati excepturi optio reprehenderit, 1}}|

Проверьте код ниже
import pyspark
from pyspark.sql.functions import col, from_json, current_timestamp, udf
from pyspark.sql import Row
from pyspark.sql.types import StringType
import requests
# creating session object globally so that it can be re used inside udf function for multiple rows across partitions.
session = requests.Session()
customers_schema = 'customers ARRAY<STRUCT <amagi_id STRING>>'
status_schema = 'customer STRUCT <status_1 STRING, status_2 STRING>'
def get_customers():
url = "https://test.url.com/api/v1/customers?status=Customer"
headers = {
'Content-Type': 'application/json; charset=utf-8',
'Authorization': 'Bearer Token'
}
return session.get(url, headers).text
def get_customer_details(customer_id):
url = f"https://test.url.com/api/v1/customers/{customer_id}"
headers = {
'Content-Type': 'application/json; charset=utf-8',
'Authorization': 'Bearer Token'
}
return session.get(url, headers).text
df.withColumn('customers', from_json(get_customers(), lit(customers_schema)))\
.cache()
.selectExp("inline(customers)")
.withColumn("status", from_json(get_customer_details(col("customer_id")), lit(status_schema)) \
.selectExpr("customer_id", "status.*")
.show(10, False)
можно ли сделать это только в фрейме данных вместо rdd?