# Semi-structured data and Nested Struct handling

## Import Libraries

In [2]:
import json
import boto3
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt

## Import awswrangler for easy access to glue catalog and Athena

In [3]:
#!pip install -i https://pypi.tuna.tsinghua.edu.cn/simple awswrangler
import awswrangler as wr

Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple
Collecting awswrangler
  Downloading https://pypi.tuna.tsinghua.edu.cn/packages/aa/19/859c65e265fc38fa29afbc6f9766a12df1b51d020d68ee3fcc548eae610c/awswrangler-2.1.0-py3-none-any.whl (150 kB)
[K     |████████████████████████████████| 150 kB 49.3 MB/s eta 0:00:01
Collecting redshift-connector~=2.0.0
  Downloading https://pypi.tuna.tsinghua.edu.cn/packages/2f/73/fe9ca0306d0fa5a56b8ae65b2b7cb11849f70e15d0efd9c2222ed01c507f/redshift_connector-2.0.872-py3-none-any.whl (73 kB)
[K     |████████████████████████████████| 73 kB 161 kB/s  eta 0:00:01
[?25hCollecting pandas<1.2.0,>=1.1.0
  Downloading https://pypi.tuna.tsinghua.edu.cn/packages/c3/e2/00cacecafbab071c787019f00ad84ca3185952f6bb9bca9550ed83870d4d/pandas-1.1.5-cp36-cp36m-manylinux1_x86_64.whl (9.5 MB)
[K     |████████████████████████████████| 9.5 MB 56.7 MB/s eta 0:00:01
Collecting pg8000~=1.16.0
  Downloading https://pypi.tuna.tsinghua.edu.cn/packages/47/7b/0770c960a1e540

## Sample: Using code to create the glue catalog and athena query
Here I show the demo how to handle the json array with structure schema
- You can run crawler to create the table
- You can also use the code to create the table
  - You can also create a Parquet Table (Metadata Only) in the AWS Glue Catalog. 
  - Or you can create the external table on Athena

### Create the external table (AWS Glue Catalog table)

In [149]:
s3_bucket = 'ray-glue-streaming'
s3_csv_prefix = 'catalog_test/complextable/' 
s3_json_prefix = 'catalog_test/json/'
s3_parquet_prefix = 'catalog_test/parquet/'

s3_client = boto3.client('s3')

json_file_path = 's3://{}/{}'.format(s3_bucket, s3_json_prefix)
parquet_file_path = 's3://{}/{}'.format(s3_bucket, s3_parquet_prefix)

data = {"A": "foo", "B": "bar", "C":[{"C1":"dummyC1", "C2":10},{"C1":"dummyC11", "C2":11}]}
df = pd.DataFrame(data)
print(df.head())

# Option1: Athena create external table
# reference doc: https://aws.amazon.com/cn/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/
query = r'''CREATE EXTERNAL TABLE IF NOT EXISTS `sampledb`.`json_structure` (
      `A` string, 
      `B` string, 
      `C` struct<c1:string,c2:int>
      )
    ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
    WITH SERDEPROPERTIES ('ignore.malformed.json' = 'true', 'paths'='A,B,C')
    STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION '{}'
    '''
query = query.format(json_file_path)
    
query_exec_id = wr.athena.start_query_execution(sql=query, database="sampledb")
wr.athena.wait_query(query_execution_id=query_exec_id)
res = wr.athena.get_query_execution(query_execution_id=query_exec_id)
print("Athena query {} result: {}".format(query_exec_id, res["Status"]["State"]))

# Upload to S3: Option1: Save to temporay file and Use the S3 client uplaod to S3
df.to_json("json_info.json", orient="records", lines=True)
s3_client.upload_file(Filename='json_info.json', Bucket=s3_bucket, Key=s3_json_prefix+"json_info.json")
# json.load Not well support array structure
# with open('json_info.json') as f:
#   parsed = json.load(f)
# print(parsed)

# Upload to S3: Option2: use the aws-data-wrangler https://aws-data-wrangler.readthedocs.io/en/stable/
wr.s3.to_json(
        df=df,
        path=json_file_path+'wr_s3_json_info.json',
        orient="records", 
        lines=True
    )

#Option2: wr.catalog.create_parquet_table, however, to_parquet right now can not well support structure data
wr.catalog.create_parquet_table(database="sampledb", table="complextable_parquet", path=parquet_file_path, 
                                columns_types={"A": "string", "B": "string", "C":"struct<C1:string,C2:int>"}
                               )
wr.s3.to_parquet(
    df, path=parquet_file_path, dataset=True, database="sampledb", table="complextable_parquet", 
    dtype={"A": "string", "B": "string", "C":"struct<C1:string,C2:int>"}
)

     A    B                             C
0  foo  bar   {'C1': 'dummyC1', 'C2': 10}
1  foo  bar  {'C1': 'dummyC11', 'C2': 11}
Athena query 7a4d9553-e828-449b-b1dd-0b9de02fea93 result: SUCCEEDED
string
string
struct<c1:string,c2:int>
string
int
string
string
struct<c1:string,c2:int>
string
int


{'paths': ['s3://ray-glue-streaming/catalog_test/parquet/1696c6cf44d248099b82679f66300dce.snappy.parquet'],
 'partitions_values': {}}

### Athena Query

In [150]:
query = r'''SELECT * FROM json_structure limit 10
    '''
df = wr.athena.read_sql_query(sql=query, database="sampledb")
print(df.head())
scanned_bytes = df.query_metadata["Statistics"]["DataScannedInBytes"]
print(scanned_bytes/1024)

query = r'''
SELECT A as A,
       C.C1 as C1, 
       C.C2 as C2
FROM json_structure
limit 10
'''
df = wr.athena.read_sql_query(sql=query, database="sampledb")
print(df.head())
# Check how many date athen scanned to get the result
scanned_bytes = df.query_metadata["Statistics"]["DataScannedInBytes"]
print(scanned_bytes/1024)

query = r'''SELECT * FROM complextable_parquet limit 10
    '''
df = wr.athena.read_sql_query(sql=query, database="sampledb")
print(df.head())
# Check how many date athen scanned to get the result
scanned_bytes = df.query_metadata["Statistics"]["DataScannedInBytes"]
print(scanned_bytes/1024)

     a    b                             c
0  foo  bar   {'c1': 'dummyC1', 'c2': 10}
1  foo  bar  {'c1': 'dummyC11', 'c2': 11}
0  foo  bar   {'c1': 'dummyC1', 'c2': 10}
1  foo  bar  {'c1': 'dummyC11', 'c2': 11}
0.19921875
     a        c1  c2
0  foo   dummyC1  10
1  foo  dummyC11  11
0  foo   dummyC1  10
1  foo  dummyC11  11
0.19921875
     a    b                         c
0  foo  bar  {'c1': None, 'c2': None}
1  foo  bar  {'c1': None, 'c2': None}
2  foo  bar  {'c1': None, 'c2': None}
3  foo  bar  {'c1': None, 'c2': None}
4.638671875


## Sample: Handle complex csv files

### Read the csv file from S3
```python
raw_board_csv_df = pd.read_csv(s3.open(file_path,mode='rb'), index_col=None, nrows=1, skipinitialspace=True, delim_whitespace=True)
raw_board_csv_df = wr.s3.read_csv(file_path, index_col=None, nrows=1, skipinitialspace=True, delim_whitespace=True)
```

In [19]:
# Reading remote files
# https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#reading-remote-files
# !pip install -i https://pypi.tuna.tsinghua.edu.cn/simple s3fs

Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m


In [166]:
import boto
import json
import os

"""
    List objects in an S3 bucket.

    :param bucket: Name of the S3 bucket.
    :param prefix: Only fetch objects whose key starts with
        this prefix (optional).
    :param suffix: Only fetch objects whose keys end with
        this suffix (optional).
"""
def get_all_s3_objects(bucket, prefix='', suffix=''):
    """Get a list of all keys in an S3 bucket."""
    paginator = s3_client.get_paginator("list_objects_v2")
    
    kwargs = {'Bucket': bucket}
    if isinstance(prefix, str):
        prefixes = (prefix, )
    else:
        prefixes = prefix
        
    for key_prefix in prefixes:
        kwargs["Prefix"] = key_prefix

        for page in paginator.paginate(**kwargs):
            try:
                contents = page["Contents"]
            except KeyError:
                break

            for obj in contents:
                key = obj["Key"]
                if key.endswith(suffix) and key != prefix:
                    yield obj

def get_all_s3_keys(bucket, prefix="", suffix=""):
    """
    Generate the keys in an S3 bucket.

    :param bucket: Name of the S3 bucket.
    :param prefix: Only fetch keys that start with this prefix (optional).
    :param suffix: Only fetch keys that end with this suffix (optional).
    """
    for obj in get_all_s3_objects(bucket, prefix, suffix):
        yield obj["Key"]

"""
    conver the csv to json as 
    {barcode=xxx, index=xxx, ... job= xxx, component=[{Component ID=aaa, Volume=bbb, Height=ccc ...}, {Component ID=aaa1, Volume=bbb1, Height=ccc1 ...}, ....]}
"""
s3_csv_prefix = 'catalog_test/complextable/'
s3_json_prefix = 'catalog_test/complex_json/'
for s3_key in get_all_s3_keys(s3_bucket, s3_csv_prefix):
    file_path = 's3://{}/{}'.format(s3_bucket, s3_key)
    print(file_path)
    
    # Read the s3 csv file and create the DataFrame for BoardInfo with first 2 lines (L1 as header, L2 as data)
    # Option1：use the native pandas
    #raw_board_csv_df = pd.read_csv(s3.open(file_path,mode='rb'), index_col=None, nrows=1, skipinitialspace=True, delim_whitespace=True)
    # Option2: use the aws-data-wrangler https://aws-data-wrangler.readthedocs.io/en/stable/
    raw_board_csv_df = wr.s3.read_csv(file_path, index_col=None, nrows=1, skipinitialspace=True, delim_whitespace=True, keep_default_na=True, na_filter=True)
    #print(raw_board_csv_df.head())
    
    # Read the s3 csv file and create the DataFrame for ComponetInfo with rest lines (L3 as header, others as data)
    # Option1：use the native pandas
    #raw_component_csv_df = pd.read_csv(s3.open(file_path,mode='rb'), index_col=None, header=2, delimiter=',', skipinitialspace=True)
    # Option2: use the aws-data-wrangler https://aws-data-wrangler.readthedocs.io/en/stable/
    raw_component_csv_df = wr.s3.read_csv(file_path, index_col=None, header=2, delimiter=',', skipinitialspace=True, keep_default_na=True, na_filter=True)
    #print(raw_component_csv_df.head())
    
    
    # Iterate the DataFrame and convert to json
    raw_data_json = {}
    # BoardInfo
    for index, data in raw_board_csv_df.iterrows():
        for column_name in raw_board_csv_df.columns:
            column_name_str = column_name.replace(" ", "")
            column_name_str = column_name_str.replace("(", "_")
            column_name_str = column_name_str.replace(")", "_")
            column_name_str = column_name_str.replace("%", "percentage")
            column_name_str = column_name_str.replace(":", "_")
            raw_data_json[column_name_str] = data[column_name]
    #print('converted_raw_data:', json.dumps(raw_data_json))
    
    # ComponentInfo
    component_json_array = []
    for index, data in raw_component_csv_df.iterrows():
        component_json = {}
        for column_name in raw_component_csv_df.columns:
            column_name_str = column_name.replace(" ", "")
            column_name_str = column_name_str.replace("(", "_")
            column_name_str = column_name_str.replace(")", "_")
            column_name_str = column_name_str.replace("%", "percentage")
            column_name_str = column_name_str.replace(":", "_")
            component_json[column_name_str] = data[column_name]
        component_json_array.append(component_json)
    raw_data_json['aggrate_component_info']=component_json_array
    
    # Convert the dict to DataFrame
    raw_data_json_df = pd.DataFrame.from_dict(raw_data_json)
    print(raw_data_json_df.head())
    print('raw_data_json_df size: ', raw_data_json_df.shape)
    print(raw_data_json_df.at[0, 'aggrate_component_info'])
    
    # upload to S3
    s3_json_key = s3_json_prefix + os.path.basename(s3_key) + '.json'
    upload_file_path = 's3://{}/{}'.format(s3_bucket, s3_json_key)
    print('upload to S3 {}'.format(upload_file_path))
    # json.dump can NOT support flatten array to each line
    #with open("aggrate_component_info.json", "w") as outfile: 
    #    json.dump(raw_data_json, outfile)
    # upload to S3 # Option1: Save to temporay file and uplaod to S3, then Use the S3 client, here orient="records", lines=True to flatten array to each line
    #raw_data_json_df.to_json("aggrate_component_info.json", orient="records", lines=True)
    #s3_client.upload_file(Filename='aggrate_component_info.json', Bucket=s3_bucket, Key=s3_json_key)
    
    # Option2: use the aws-data-wrangler https://aws-data-wrangler.readthedocs.io/en/stable/, here orient="records", lines=True to flatten array to each line
    wr.s3.to_json(
        df=raw_data_json_df,
        path=upload_file_path,
        orient="records", 
        lines=True
    )

s3://ray-glue-streaming/catalog_test/complextable/MN63459620201110165647.csv
    BARCODE  INDEX        DATE    S.TIME    E.TIME  CYCLE                 JOB  \
0  M4634596  24878  11/10/2020  16:56:38  16:56:46      8  A5E41637164-04-TOP   
1  M4634596  24878  11/10/2020  16:56:38  16:56:46      8  A5E41637164-04-TOP   
2  M4634596  24878  11/10/2020  16:56:38  16:56:46      8  A5E41637164-04-TOP   
3  M4634596  24878  11/10/2020  16:56:38  16:56:46      8  A5E41637164-04-TOP   
4  M4634596  24878  11/10/2020  16:56:38  16:56:46      8  A5E41637164-04-TOP   

  RESULT USER   LOTINFO MACHINE  SIDE  \
0   GOOD   SV  KOHYOUNG       T   NaN   
1   GOOD   SV  KOHYOUNG       T   NaN   
2   GOOD   SV  KOHYOUNG       T   NaN   
3   GOOD   SV  KOHYOUNG       T   NaN   
4   GOOD   SV  KOHYOUNG       T   NaN   

                              aggrate_component_info  
0  {'ComponentID': '1:00', 'Volume_percentage_': ...  
1  {'ComponentID': '1:00', 'Volume_percentage_': ...  
2  {'ComponentID': '1:C6

## Create the external table (AWS Glue Catalog table)

- You can run crawler to create the table
- You can also create the external table on Athena

In [167]:
json_file_path = 's3://{}/{}'.format(s3_bucket, s3_json_prefix)

query = r'''
      CREATE EXTERNAL TABLE IF NOT EXISTS `sampledb`.`complex_json_structure` (
      `BARCODE` string, 
      `INDEX` int,
      `DATE` string,
      `S.TIME` string,
      `E.TIME` string,
      `JOB` string,
      `RESULT` string,
      `USER` string,
      `LOTINFO` string,
      `MACHINE` string,
      `SIDE` string,
      `aggrate_component_info` struct<ComponentID:string,Volume_percentage_:float,Height_um_:float,Area_percentage_:float,OffsetX_percentage_:float,OffsetY_percentage_:float,Volume_um3_:int,Area_um2_:int,Result:string,PinNumber:string,PadVerification:string,Shape:string,Library_Name:string,Vol_Min_percentage_:int,Vol_Max_percentage_:int,Height_Low_um_:int,Height_High_um_:int,Area_Min_percentage_:int,Area_Max_percentage_:int,OffsetX_Error_mm_:float,OffsetY_Error_mm_:float,Unnamed_21:string>
      )
    ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
    WITH SERDEPROPERTIES (
      'ignore.malformed.json'='true',
      'paths'='BARCODE,INDEX,DATE,S.TIME,E.TIME,JOB,RESULT,USER,LOTINFO,MACHINE,SIDE,aggrate_component_info'
    )
    STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION '{}'
    '''
query = query.format(json_file_path)
print(query)
    
query_exec_id = wr.athena.start_query_execution(sql=query, database="sampledb")
wr.athena.wait_query(query_execution_id=query_exec_id)
res = wr.athena.get_query_execution(query_execution_id=query_exec_id)
print("Athena query {} result: {}".format(query_exec_id, res["Status"]["State"]))


      CREATE EXTERNAL TABLE IF NOT EXISTS `sampledb`.`complex_json_structure` (
      `BARCODE` string, 
      `INDEX` int,
      `DATE` string,
      `S.TIME` string,
      `E.TIME` string,
      `JOB` string,
      `RESULT` string,
      `USER` string,
      `LOTINFO` string,
      `MACHINE` string,
      `SIDE` string,
      `aggrate_component_info` struct<ComponentID:string,Volume_percentage_:float,Height_um_:float,Area_percentage_:float,OffsetX_percentage_:float,OffsetY_percentage_:float,Volume_um3_:int,Area_um2_:int,Result:string,PinNumber:string,PadVerification:string,Shape:string,Library_Name:string,Vol_Min_percentage_:int,Vol_Max_percentage_:int,Height_Low_um_:int,Height_High_um_:int,Area_Min_percentage_:int,Area_Max_percentage_:int,OffsetX_Error_mm_:float,OffsetY_Error_mm_:float,Unnamed_21:string>
      )
    ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
    WITH SERDEPROPERTIES (
      'ignore.malformed.json'='true',
      'paths'='BARCODE,INDEX,DATE,S.TIME,E.TIME,J

## Athena Query

In [171]:
# Retrieving the data from Amazon Athena
query = r'''SELECT * FROM "sampledb"."complex_json_structure" limit 10;
    '''
df = wr.athena.read_sql_query(sql=query, database="sampledb")
print(df.head())
scanned_bytes = df.query_metadata["Statistics"]["DataScannedInBytes"]
print(scanned_bytes/1024)

query = r'''
SELECT * FROM "sampledb"."complex_json_structure" as t 
where t.barcode='MN634850' and t.index=24857 and t.JOB='A5E41637164-04-TOP' limit 10;
'''
df = wr.athena.read_sql_query(sql=query, database="sampledb")
print(df.head())
# Check how many date athen scanned to get the result
scanned_bytes = df.query_metadata["Statistics"]["DataScannedInBytes"]
print(scanned_bytes/1024)

query = r'''
SELECT * FROM "sampledb"."complex_json_structure" as t 
where t.barcode='MN635582' and t.index=24566 and t.JOB='A5E41637164-04-BOT' and t.aggrate_component_info.ComponentID='1:C82_01' 
limit 10;
'''
df = wr.athena.read_sql_query(sql=query, database="sampledb")
print(df.head())
# Check how many date athen scanned to get the result
scanned_bytes = df.query_metadata["Statistics"]["DataScannedInBytes"]
print(scanned_bytes/1024)


    barcode  index        date    s.time    e.time                 job result  \
0  MN635582  24566  11/10/2020  11:54:09  11:54:15  A5E41637164-04-BOT   FAIL   
1  MN635582  24566  11/10/2020  11:54:09  11:54:15  A5E41637164-04-BOT   FAIL   
2  MN635582  24566  11/10/2020  11:54:09  11:54:15  A5E41637164-04-BOT   FAIL   
3  MN635582  24566  11/10/2020  11:54:09  11:54:15  A5E41637164-04-BOT   FAIL   
4  MN635582  24566  11/10/2020  11:54:09  11:54:15  A5E41637164-04-BOT   FAIL   

  user   lotinfo machine  side  \
0   SV  KOHYOUNG    <NA>  <NA>   
1   SV  KOHYOUNG    <NA>  <NA>   
2   SV  KOHYOUNG    <NA>  <NA>   
3   SV  KOHYOUNG    <NA>  <NA>   
4   SV  KOHYOUNG    <NA>  <NA>   

                              aggrate_component_info  
0  {'componentid': '1:', 'volume_percentage_': 10...  
1  {'componentid': '1:', 'volume_percentage_': 10...  
2  {'componentid': '1:R615_04', 'volume_percentag...  
3  {'componentid': '1:R615_04', 'volume_percentag...  
4  {'componentid': '1:X161_04', '