Я пытаюсь использовать утилиту mocks3 Boto3 moto для проверки моего кода, который подключается к s3, функция в основном перечисляет все папки с датой раздела и возвращает последнюю. Я не вижу исключений в насмешках над ведрами s3, однако тестовый код, похоже, не находит этот ведро
Моя тестовая спецификация
import os
import unittest
from botocore.client import ClientError
from moto import mock_s3
from src.utils.aws_utils import *
import logging
log = logging.getLogger("my-logger")
MY_BUCKET = "mock_s3_bucket"
MY_PREFIX = "mock_folder"
@mock_s3
class TestPysparkUtils(unittest.TestCase):
def setUp(self):
s3 = boto3.resource(
"s3",
region_name = "us-east-1",
aws_access_key_id = "fake_access_key",
aws_secret_access_key = "fake_secret_key",
)
s3.create_bucket(Bucket = "{}".format(MY_BUCKET))
s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201223/file_20201223.txt'
.format(MY_BUCKET, MY_PREFIX), Body='def')
s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201222/file_20201222.txt'
.format(MY_BUCKET, MY_PREFIX), Body='abc')
def tearDown(self):
s3 = boto3.resource(
"s3",
region_name = "us-east-1",
aws_access_key_id = "fake_access_key",
aws_secret_access_key = "fake_secret_key",
)
bucket = s3.Bucket(MY_BUCKET)
for key in bucket.objects.all():
key.delete()
bucket.delete()
def test_get_latest_file_path_inter(self):
print('{}/{}'.format(MY_BUCKET, MY_PREFIX))
s3 = boto3.resource(
"s3",
region_name = "us-east-1",
aws_access_key_id = "fake_access_key",
aws_secret_access_key = "fake_secret_key",
)
try:
s3.meta.client.head_bucket(Bucket=MY_BUCKET)
print("Bucket Exists!")
except ClientError:
log.info('The bucket does not exist or you have no access.')
result = get_latest_file_path_inter(log, s3, 's3://{}/{}/'.format(MY_BUCKET, MY_PREFIX), 'partition_date')
print('------------------------')
print(result)
desired_result = ["foo.json", "bar.json"]
self.assertCountEqual(result, desired_result)
if __name__ == "__main__":
unittest.main()
Тестовая функция
def get_latest_file_path_inter(logger, s3_client, base_path, partition):
"""
Returns full s3 path of latest partition assuming partition date is of format yyyyMMdd
:type (object, str, str) -> (str)
:parameter
:param logger Logger object
:param s3_client boto3 s3 client object
:param base_path Base s3 path
:param partition column name
"""
print("Inside get_latest_file_path_inter() : Given: {} {}".format(base_path, partition))
start = base_path.find("//") + 2
end = base_path.find("/", start)
bucket_in = base_path[start:end]
prefix_in = base_path[base_path.find(bucket_in) + len(bucket_in) + 1:]
print(
"bucket: {} | prefix: {} | partition: {} | path: s3://{}/{}".format(bucket_in, prefix_in, partition,
bucket_in, prefix_in))
objects = list(s3_client.Bucket(bucket_in).objects.filter(Prefix=prefix_in))
print("total objects found: {}".format(len(objects)))
dict_out = {}
if len(objects) == 0:
logger.info("Error. no files found")
return
for i in range(0, len(objects)):
file_str = objects[i].key
start = file_str.find(partition) + len(partition)
end = file_str.find("/", start)
part_found = file_str[start:end]
partial_path = file_str[:file_str.find(partition) + len(partition) + 8]
dict_out[part_found] = partial_path
dict_sort = collections.OrderedDict(sorted(dict_out.items()))
last = list(dict_sort.keys())[len(dict_sort) - 1]
path_final = "s3://{}/{}/".format(bucket_in, dict_sort.get(last))
print("path_final: {} for base_path: {} and partition: {} and last: {} and dict_sort: {}".format(
path_final, base_path, partition, last, dict_sort))
return path_final
Выход
mock_s3_bucket/mock_folder
Inside get_latest_file_path_inter() : Given: s3://mock_s3_bucket/mock_folder/ partition_date
bucket: mock_s3_bucket | prefix: mock_folder/ | partition: partition_date | path: s3://mock_s3_bucket/mock_folder/
s3.Bucket(name='mock_s3_bucket')
total objects found: 0
------------------------
None
Все заработало, я смешивал клиент boto3 и api ресурса boto3 в тестовой спецификации и соответствующих функциях. Выяснив разницу между ними, я изменил все на boto3 client api и заработал. Ниже представлена модифицированная функция и соответствующая ей спецификация.
ssl._create_default_https_context = ssl._create_unverified_context
MY_BUCKET = "mock_s3_bucket"
MY_PREFIX = "mock_folder/mock_sub_folder"
MY_ANOTHER_PREFIX = "mock_folder/mock_another_sub_folder"
class TestPysparkUtils(unittest.TestCase):
mock_s3 = mock_s3()
LOGGER = logging.getLogger("my-logger")
def setUp(self):
self.mock_s3.start()
s3 = boto3.resource(
"s3",
region_name = "us-east-1",
aws_access_key_id = "fake_access_key",
aws_secret_access_key = "fake_secret_key",
)
s3.create_bucket(Bucket = "{}".format(MY_BUCKET))
s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201223/file_20201223.txt'
.format(MY_BUCKET, MY_PREFIX), Body='def')
s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201222/file_20201222.txt'
.format(MY_BUCKET, MY_PREFIX), Body='abc')
s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201222/file1_20201222.txt'
.format(MY_BUCKET, MY_PREFIX), Body='xyz')
s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201225/file_20201225.txt'
.format(MY_BUCKET, MY_ANOTHER_PREFIX), Body='mno')
s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201225/_SUCCESS'
.format(MY_BUCKET, MY_ANOTHER_PREFIX), Body='pqr')
def tearDown(self):
self.mock_s3.stop()
def test_get_latest_file_path_inter(self):
boto3_s3_client = boto3.client("s3")
result = get_latest_file_path_from_s3(self.LOGGER, boto3_s3_client, 's3://{}/{}/'.format(MY_BUCKET, MY_PREFIX),
'partition_date')
desired_result = 's3://mock_s3_bucket/mock_folder/mock_sub_folder/partition_date=20201223/'
self.assertEqual(result, desired_result)
with pytest.raises(KeyError):
get_latest_file_path_from_s3(self.LOGGER, boto3_s3_client, 's3://{}/{}/'.format(MY_BUCKET, 'unavailable_prefix'),
'partition_date')
def get_latest_file_path_from_s3(logger, boto_s3_client, base_path, partition):
"""
Returns full s3 path of latest partition assuming partition date is of format yyyyMMdd
:type (object, str, str) -> (str)
:parameter
:param logger Logger object
:param boto_s3_client boto3 s3 client object
:param base_path Base s3 path i.e. path till partition column name
:param partition final partition column name
"""
logger.info("Inside get_latest_file_path_inter() : Given: {} {}".format(base_path, partition))
start = base_path.find("//") + 2
end = base_path.find("/", start)
bucket_in = base_path[start:end]
prefix_in = base_path[base_path.find(bucket_in) + len(bucket_in) + 1:]
logger.info("bucket: {} | prefix: {} | partition: {} | path: s3://{}/{}".format(bucket_in, prefix_in, partition,
bucket_in, prefix_in))
try:
s3_files = boto_s3_client.list_objects_v2(Bucket=bucket_in, Prefix='{}/{}'.format(bucket_in, prefix_in))['Contents']
except KeyError:
logger.error("Exception while listing objects from path : {}/{}".format(bucket_in, prefix_in))
raise
if len(s3_files) == 0:
raise FileNotFoundError("Error. no files found at provided path, path: s3://{}/{} "
"and partition: {}".format(bucket_in, prefix_in, partition))
latest_partition_date = 0
for obj in s3_files:
file_str = obj['Key']
folder_path = file_str.rsplit('/', 1)[0]
partition_date = int(folder_path.rpartition('=')[2])
if partition_date > latest_partition_date:
latest_partition_date = partition_date
path_final = "s3://{}/{}{} = {}/".format(bucket_in, prefix_in, partition, latest_partition_date)
logger.info("path_final: {} for base_path: {} and partition: {}".format(
path_final, base_path, partition))
return path_final
@chandan добавил дополнительные сведения и код для справки. проголосуйте за это, если это полезно :)
не могли бы вы добавить объяснение, в чем была проблема, из-за которой она не работала, и что вы изменили, чтобы решить ее, заранее спасибо.