AutoFaker - A Python library to minimize unit testing ceremony

Around a year ago, I found myself working full time in Python. Coming from a C# background, I really missed the unit testing tools you have in .NET. To be more specific, I missed things like xUnit, AutoFixture, and Fluent Assertions. This immediately got me started working on a project I called AutoFaker

AutoFaker is a Python library designed to minimize the setup/arrange phase of your unit tests by removing the need to manually write code to create anonymous variables as part of a test cases setup/arrange phase.

This library is heavily inspired by AutoFixture and was initially created for simplifying how to write unit tests for ETL (Extract-Transform-Load) code running from a python library on an Apache Spark cluster in Big Data solutions.

When writing unit tests you normally start with creating objects that represent the initial state of the test. This phase is called the arrange or setup phase of the test. In most cases, the system you want to test will force you to specify much more information than you really care about, so you frequently end up creating objects with no influence on the test itself just simply to satisfy the compiler/interpreter

AutoFaker is available from PyPI and should be installed using pip

pip install autofaker

AutoFaker can help by creating such anonymous variables for you. Here’s a simple example:

import unittest
from autofaker import Autodata

class Calculator:
  def add(self, number1: int, number2: int):
    return number1 + number2

class CalculatorTests(unittest.TestCase):
    def test_can_add_two_numbers(self):      
        # arrange
        numbers = Autodata.create_many(int, 2)
        sut = Autodata.create(Calculator)        
        # act
        result = sut.add(numbers[0], numbers[1])        
        # assert
        self.assertEqual(numbers[0] + numbers[1], result)

Since the point of this library is to simplify the arrange step of writing unit tests, we can use the @autodata and @fakedata are available to explicitly state whether to use anonymous variables or fake data and construct our system under test. To use this you can either define the types or the arguments as function arguments to the decorator, or specify argument annotations

import unittest
from autofaker import autodata

class Calculator:
  def add(self, number1: int, number2: int):
    return number1 + number2

class CalculatorTests(unittest.TestCase):
    @autodata(Calculator, int, int)
    def test_can_add_two_numbers_using_test_arguments(self, sut, number1, number2):
        result = sut.add(number1, number2)
        self.assertEqual(number1 + number2, result)

    @autodata()
    def test_can_add_two_numbers_using_annotated_arguments(self, 
                                                           sut: Calculator, 
                                                           number1: int, 
                                                           number2: int):
        result = sut.add(number1, number2)
        self.assertEqual(number1 + number2, result)

There are times when completely anonymous variables don’t make much sense, especially in data centric scenarios. For these use cases this library uses Faker for generating fake data. This option is enabled by setting use_fake_data to True when calling the Autodata.create() function

from dataclasses import dataclass
from autofaker import Autodata

@dataclass
class DataClass:
    id: int
    name: str
    job: str

data = Autodata.create(DataClass, use_fake_data=True)

print(f'id:     {data.id}')
print(f'name:   {data.name}')
print(f'job:    {data.job}\n')

The following code above might output something like:

id:     8952
name:   Justin Wise
job:    Chief Operating Officer

Supported data types

Currently autofaker supports creating anonymous variables for the following data types:

Built-in types:

  • int
  • float
  • str
  • complex
  • range
  • bytes
  • bytearray

Datetime types:

  • datetime
  • date

Classes:

  • Simple classes
  • @dataclass
  • Nested classes (and recursion)
  • Classes containing lists of other types

Dataframes:

  • Pandas dataframe

Example usages

Create anonymous built-in types like int, float, str and datetime types like datetime and date

print(f'anonymous string:    {Autodata.create(str)}')
print(f'anonymous int:       {Autodata.create(int)}')
print(f'anonymous float:     {Autodata.create(float)}')
print(f'anonymous complex:   {Autodata.create(complex)}')
print(f'anonymous range:     {Autodata.create(range)}')
print(f'anonymous bytes:     {Autodata.create(bytes)}')
print(f'anonymous bytearray: {Autodata.create(bytearray)}')
print(f'anonymous datetime:  {Autodata.create(datetime)}')
print(f'anonymous date:      {Autodata.create(datetime.date)}')

The code above might output the following

anonymous string:    f91954f1-96df-463f-a427-665c99213395
anonymous int:       2066712686
anonymous float:     725758222.8712853
anonymous datetime:  2017-06-19 02:40:41.000084
anonymous date:      2019-11-10 00:00:00

Creates an anonymous class


class SimpleClass:
    id = -1
    text = 'test'

cls = Autodata.create(SimpleClass)
print(f'id = {cls.id}')
print(f'text = {cls.text}')

The code above might output the following

id = 2020177162
text = ac54a65d-b4a3-4eda-a840-eb948ad10d5f

Create a collection of an anonymous class

class SimpleClass:
    id = -1
    text = 'test'

classes = Autodata.create_many(SimpleClass)
for cls in classes:
  print(f'id = {cls.id}')
  print(f'text = {cls.text}')
  print()

The code above might output the following

id = 242996515
text = 5bb60504-ccca-4104-9b7f-b978e52a6518

id = 836984239
text = 079df61e-a87e-4f26-8196-3f44157aabd6

id = 570703150
text = a3b86f08-c73a-4730-bde7-4bdff5360ef4

Creates an anonymous dataclass

from dataclasses import dataclass

@dataclass
class DataClass:
    id: int
    text: str

cls = Autodata.create(DataClass)
print(f'id = {cls.id}')
print(f'text = {cls.text}')

The code above might output the following

id = 314075507
text = 4a3b3cae-f4cf-4502-a7f3-61115a1e0d2a

Creates an anonymous dataclass using fake data

@dataclass
class DataClass:
    id: int

    name: str
    address: str
    job: str

    country: str
    currency_name: str
    currency_code: str

    email: str
    safe_email: str
    company_email: str

    hostname: str
    ipv4: str
    ipv6: str

    text: str


data = Autodata.create(DataClass, use_fake_data=True)

print(f'id:               {data.id}')
print(f'name:             {data.name}')
print(f'job:              {data.job}\n')
print(f'address:\n{data.address}\n')

print(f'country:          {data.country}')
print(f'currency name:    {data.currency_name}')
print(f'currency code:    {data.currency_code}\n')

print(f'email:            {data.email}')
print(f'safe email:       {data.safe_email}')
print(f'work email:       {data.company_email}\n')

print(f'hostname:         {data.hostname}')
print(f'IPv4:             {data.ipv4}')
print(f'IPv6:             {data.ipv6}\n')

print(f'text:\n{data.text}')

The code above might output the following

id:               8952
name:             Justin Wise
job:              Chief Operating Officer

address:
65939 Hernandez Parks
Rochaport, NC 41760

country:          Equatorial Guinea
currency name:    Burmese kyat
currency code:    ERN

email:            smithjohn@example.com
safe email:       kent11@example.com
work email:       marissagreen@brown-cole.com

hostname:         db-90.hendricks-west.org
IPv4:             66.139.143.242
IPv6:             895d:82f7:7c13:e7cb:f35d:c93:aeb2:8eeb

text:
Movie author culture represent. Enjoy myself over physical green lead but home.
Share wind factor far minute produce significant. Sense might fact leader.

Create an anonymous class with nested types


class NestedClass:
    id = -1
    text = 'test'
    inner = SimpleClass()

cls = Autodata.create(NestedClass)
print(f'id = {cls.id}')
print(f'text = {cls.text}')
print(f'inner.id = {cls.inner.id}')
print(f'inner.text = {cls.inner.text}')

The code above might output the following

id = 1565737216
text = e66ecd5c-c17a-4426-b755-36dfd2082672
inner.id = 390282329
inner.text = eef94b5c-aa95-427a-a9e6-d99e2cc1ffb2

Create a collection of an anonymous class with nested types

class NestedClass:
    id = -1
    text = 'test'
    inner = SimpleClass()

classes = Autodata.create_many(NestedClass)
for cls in classes:
  print(f'id = {cls.id}')
  print(f'text = {cls.text}')
  print(f'inner.id = {cls.inner.id}')
  print(f'inner.text = {cls.inner.text}')
  print()

The code above might output the following

id = 1116454042
text = ceeecf0c-7375-4f3a-8d4b-6d7a4f2b20fd
inner.id = 1067027444
inner.text = 079573ce-1ef4-408d-8984-1dbc7b0d0b80

id = 730390288
text = ff3ca474-a69d-4ff6-95b4-fbdb1bea7cdb
inner.id = 1632771208
inner.text = 9423e824-dc8f-4145-ba47-7301351a91f8

id = 187364960
text = b31ca191-5031-43a2-870a-7bc7c99e4110
inner.id = 1705149100
inner.text = e703a117-ba4f-4201-a31b-10ab8e54a673

Create a Pandas DataFrame using anonymous data generated from a specified type

class DataClass:
    id = -1
    type = '' 
    value = 0

pdf = Autodata.create_pandas_dataframe(DataClass)
print(pdf)

The code above might output the following

          id                                  type       value
0  778090854  13537c5a-62e7-488b-836e-a4b17f2f3ae9  1049015695
1  602015506  c043ca8d-e280-466a-8bba-ec1e0539fe28  1016359353
2  387753717  986b3b1c-abf4-4bc1-95cf-0e979390e4f3   766159839

Create a Pandas DataFrame using fake data generated from a specified type

class DataClass:
    id = -1
    type = '' 
    value = 0

pdf = Autodata.create_pandas_dataframe(DataClass, use_fake_data=True)
print(pdf)

The code above might output the following

  first_name    id last_name          phone_number
0   Lawrence  7670   Jimenez  001-172-307-0561x471
1      Bryan  9084    Walker         (697)893-6767
2       Paul  9824    Thomas    960.555.3577x65487



Orchestrated ETL Design Pattern for Apache Spark and Databricks

This article describes ideas based on existing software design patterns that can be applied to designing data processing libraries to be used by Databricks

As a data platform grows bigger it becomes harder and more complex to maintain a large set of Notebooks. Although, reusing code between Notebooks is possible using the %run command, it is neither elegant nor efficient.

The goal in mind when writing this is to move as much code regarding ETL operations as possible, if not all, away from Notebooks and into a Python library. This library should also contain reusable components for ingesting data in various formats from various data sources into a variety of data stores

Databricks Notebooks should be used for only 3 things:

  • Data exploration
  • Executing library code
  • Interacting with the dbutils API (i.e. reading secrets or setting up widgets for parameterized jobs)

All components must be designed with unit and integration testing in mind, and tests must execute in the CI/CD pipeline

In the past, some of us have used and implemented variations on the Model-View-Whatever (MVC, MVP, MVVM, etc) design pattern. Such patterns solve the problems regarding separating concerns between the following:

  • Data layer
  • Business logic
  • Application logic
  • User interface

A similar pattern can be derived for separating the following concerns:

  • Extraction
  • Transformation
  • Loading

The OETL Design Pattern

Short for Orchestrated Extract-Transform-Load is a pattern that takes the ideas behind variations of the Model-View-Whatever design pattern

Orchestrated ETL

The Orchestrator is responsible for conducting the interactions between the Extractor -> Transformer -> Loader.

The Ochestrator reads data from the Extractor then uses the result as a parameter to calling the Transformer and saves the transformed result into the Loader. The Transformer can be optional as there are scenarios where data transformation is not needed, i.e. raw data ingestion to the landing zone (bronze)

Each layer may have a single or multiple implementations, and this is handled automatically in the Orchestrator

In Python, an example of an Orchestrator with single implementations of the Extractor, Transformer, and Loader would look something like this:

class Orchestrator:
    def __init__(self,
                 extractor: Extractor,
                 transformer: Transformer,
                 loader: Loader):
        self.loader = loader
        self.transformer = transformer
        self.extractor = extractor

    def execute(self):
        df = self.extractor.read()
        df = self.transformer.process(df)
        self.loader.save(df)

ATC-DataPlatform

A framework for this design pattern is implemented in a Python Library called atc-dataplatform available from PyPi

pip install atc-dataplatform

Orchestration Fluent Interface

atc-dataplatform provides common simple implementations and base classes for implementing the OETL design pattern. To simplify object construction, this library provides the Orchestrator fluent interface from atc.etl

from atc.etl import Extractor, Transformer, Loader, Orchestrator

(Orchestrator()
    .extract_from(Extractor())
    .transform_with(Transformer())
    .load_into(Loader())
    .execute())

Usage examples:

Here are some example usages and implementations of the ETL class provided

Example-1

Here’s an example of reading data from a single location, transforming it once and saving to a single destination. This is the most simple elt case, and will be used as base for the below more complex examples.

import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import IntegerType

from atc.etl import Extractor, Transformer, Loader, Orchestrator
from atc.spark import Spark


class GuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                    ("3", "Ibanez", "RG", "1987"),
                ]
            ),
            """
            id STRING,
            brand STRING,
            model STRING,
            year STRING
            """,
        )


class BasicTransformer(Transformer):
    def process(self, df: DataFrame) -> DataFrame:
        print("Current DataFrame schema")
        df.printSchema()

        df = df.withColumn("id", f.col("id").cast(IntegerType()))
        df = df.withColumn("year", f.col("year").cast(IntegerType()))

        print("New DataFrame schema")
        df.printSchema()
        return df


class NoopLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator using a single simple transformer")
etl = (
    Orchestrator()
    .extract_from(GuitarExtractor())
    .transform_with(BasicTransformer())
    .load_into(NoopLoader())
)
etl.execute()

The code above produces the following output:

Original DataFrame schema
root
 |-- id: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- model: string (nullable = true)
 |-- year: string (nullable = true)

New DataFrame schema
root
 |-- id: integer (nullable = true)
 |-- brand: string (nullable = true)
 |-- model: string (nullable = true)
 |-- year: integer (nullable = true)

+---+------+----------+----+
| id| brand|     model|year|
+---+------+----------+----+
|  1|Fender|Telecaster|1950|
|  2|Gibson|  Les Paul|1959|
|  3|Ibanez|        RG|1987|
+---+------+----------+----+

Example-2

Here’s an example of having multiple Transformer implementations that is reused to change the data type of a given column, where the column name is parameterized.

import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from atc.etl import Extractor, Transformer, Loader, Orchestrator
from atc.spark import Spark


class GuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                    ("3", "Ibanez", "RG", "1987"),
                ]
            ),
            StructType(
                [
                    StructField("id", StringType()),
                    StructField("brand", StringType()),
                    StructField("model", StringType()),
                    StructField("year", StringType()),
                ]
            ),
        )


class IntegerColumnTransformer(Transformer):
    def __init__(self, col_name: str):
        super().__init__()
        self.col_name = col_name

    def process(self, df: DataFrame) -> DataFrame:
        df = df.withColumn(self.col_name, f.col(self.col_name).cast(IntegerType()))
        return df


class NoopLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator using multiple transformers")
etl = (
    Orchestrator()
    .extract_from(GuitarExtractor())
    .transform_with(IntegerColumnTransformer("id"))
    .transform_with(IntegerColumnTransformer("year"))
    .load_into(NoopLoader())
)
etl.execute()

Example-3

Here’s an example of having multiple Extractor implementations and applying transformations using the process_many method.

The read() function in Extractor will return a dictionary that uses the type name of the Extractor as the key, and a DataFrame as its value, the used kan can be overridden in the constructor.

Transformer provides the function process_many(dataset: {}) and returns a single DataFrame.

import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, StringType

from atc.etl import Extractor, Loader, Orchestrator, Transformer
from atc.spark import Spark


class AmericanGuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                ]
            ),
            StructType(
                [
                    StructField("id", StringType()),
                    StructField("brand", StringType()),
                    StructField("model", StringType()),
                    StructField("year", StringType()),
                ]
            ),
        )


class JapaneseGuitarExtractor(Extractor):
    def __init__(self):
        super().__init__(dataset_key="japanese")

    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [("3", "Ibanez", "RG", "1987"), ("4", "Takamine", "Pro Series", "1959")]
            ),
            StructType(
                [
                    StructField("id", StringType()),
                    StructField("brand", StringType()),
                    StructField("model", StringType()),
                    StructField("year", StringType()),
                ]
            ),
        )


class CountryOfOriginTransformer(Transformer):
    def process_many(self, dataset: {}) -> DataFrame:
        usa_df = dataset["AmericanGuitarExtractor"].withColumn("country", f.lit("USA"))
        jap_df = dataset["japanese"].withColumn("country", f.lit("Japan"))
        return usa_df.union(jap_df)


class NoopLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator using multiple extractors")
etl = (
    Orchestrator()
    .extract_from(AmericanGuitarExtractor())
    .extract_from(JapaneseGuitarExtractor())
    .transform_with(CountryOfOriginTransformer())
    .load_into(NoopLoader())
)
etl.execute()

The code above produces the following output:

root
 |-- id: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- model: string (nullable = true)
 |-- year: string (nullable = true)
 |-- country: string (nullable = false)

+---+--------+----------+----+-------+
| id|   brand|     model|year|country|
+---+--------+----------+----+-------+
|  1|  Fender|Telecaster|1950|    USA|
|  2|  Gibson|  Les Paul|1959|    USA|
|  3|  Ibanez|        RG|1987|  Japan|
|  4|Takamine|Pro Series|1959|  Japan|
+---+--------+----------+----+-------+

Example-4

Here’s an example of data raw ingestion without applying any transformations.

from pyspark.sql import DataFrame

from atc.etl import Extractor, Loader, Orchestrator
from atc.spark import Spark


class GuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                    ("3", "Ibanez", "RG", "1987"),
                ]
            ),
            """id STRING, brand STRING, model STRING, year STRING""",
        )


class NoopLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator with no transformations")
etl = Orchestrator().extract_from(GuitarExtractor()).load_into(NoopLoader())
etl.execute()

Example-5

Here’s an example of having multiple Loader implementations that is writing the transformed data into multiple destinations.

import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from atc.etl import Extractor, Transformer, Loader, Orchestrator
from atc.spark import Spark


class GuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                    ("3", "Ibanez", "RG", "1987"),
                ]
            ),
            StructType(
                [
                    StructField("id", StringType()),
                    StructField("brand", StringType()),
                    StructField("model", StringType()),
                    StructField("year", StringType()),
                ]
            ),
        )


class BasicTransformer(Transformer):
    def process(self, df: DataFrame) -> DataFrame:
        print("Current DataFrame schema")
        df.printSchema()

        df = df.withColumn("id", f.col("id").cast(IntegerType()))
        df = df.withColumn("year", f.col("year").cast(IntegerType()))

        print("New DataFrame schema")
        df.printSchema()
        return df


class NoopSilverLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()


class NoopGoldLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator using multiple loaders")
etl = (
    Orchestrator()
    .extract_from(GuitarExtractor())
    .transform_with(BasicTransformer())
    .load_into(NoopSilverLoader())
    .load_into(NoopGoldLoader())
)
etl.execute()

Example-6

Using Example-2, Example-3 and Example-5 as reference, any combinations for single/multiple implementations of Extractor, Transformer or Loader can be created.

Here’s an example of having both multiple Extractor, Transformer and Loader implementations.

It is important that the first transformer is a MultiInputTransformer when having multiple extractors.

import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from atc.etl import Extractor, Transformer, Loader, Orchestrator
from atc.spark import Spark


class AmericanGuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                ]
            ),
            StructType(
                [
                    StructField("id", StringType()),
                    StructField("brand", StringType()),
                    StructField("model", StringType()),
                    StructField("year", StringType()),
                ]
            ),
        )


class JapaneseGuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [("3", "Ibanez", "RG", "1987"), ("4", "Takamine", "Pro Series", "1959")]
            ),
            StructType(
                [
                    StructField("id", StringType()),
                    StructField("brand", StringType()),
                    StructField("model", StringType()),
                    StructField("year", StringType()),
                ]
            ),
        )


class CountryOfOriginTransformer(Transformer):
    def process_many(self, dataset: {}) -> DataFrame:
        usa_df = dataset["AmericanGuitarExtractor"].withColumn("country", f.lit("USA"))
        jap_df = dataset["JapaneseGuitarExtractor"].withColumn(
            "country", f.lit("Japan")
        )
        return usa_df.union(jap_df)


class BasicTransformer(Transformer):
    def process(self, df: DataFrame) -> DataFrame:
        print("Current DataFrame schema")
        df.printSchema()

        df = df.withColumn("id", f.col("id").cast(IntegerType()))
        df = df.withColumn("year", f.col("year").cast(IntegerType()))

        print("New DataFrame schema")
        df.printSchema()
        return df


class NoopSilverLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()


class NoopGoldLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator using multiple loaders")
etl = (
    Orchestrator()
    .extract_from(AmericanGuitarExtractor())
    .extract_from(JapaneseGuitarExtractor())
    .transform_with(CountryOfOriginTransformer())
    .transform_with(BasicTransformer())
    .load_into(NoopSilverLoader())
    .load_into(NoopGoldLoader())
)
etl.execute()

Example-7

This example illustrates the use of an orchestrator as just another ETL step. The principle is called composit orchestration:

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import DataFrame
from pyspark.sql.types import IntegerType

from atc.etl import Extractor, Transformer, Loader, Orchestrator, dataset_group
from atc.spark import Spark


class AmericanGuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                ]
            ),
            T.StructType(
                [
                    T.StructField("id", T.StringType()),
                    T.StructField("brand", T.StringType()),
                    T.StructField("model", T.StringType()),
                    T.StructField("year", T.StringType()),
                ]
            ),
        )


class JapaneseGuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("3", "Ibanez", "RG", "1987"),
                    ("4", "Takamine", "Pro Series", "1959"),
                ]
            ),
            T.StructType(
                [
                    T.StructField("id", T.StringType()),
                    T.StructField("brand", T.StringType()),
                    T.StructField("model", T.StringType()),
                    T.StructField("year", T.StringType()),
                ]
            ),
        )


class CountryOfOriginTransformer(Transformer):
    def process_many(self, dataset: dataset_group) -> DataFrame:
        usa_df = dataset["AmericanGuitarExtractor"].withColumn("country", F.lit("USA"))
        jap_df = dataset["JapaneseGuitarExtractor"].withColumn("country", F.lit("Japan"))
        return usa_df.union(jap_df)


class OrchestratorLoader(Loader):
    def __init__(self, orchestrator: Orchestrator):
        super().__init__()
        self.orchestrator = orchestrator

    def save_many(self, datasets: dataset_group) -> None:
        self.orchestrator.execute(datasets)


class NoopLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator using composit innter orchestrator")
etl_inner = (
    Orchestrator()
    .transform_with(CountryOfOriginTransformer())
    .load_into(NoopLoader())
)

etl_outer = (
    Orchestrator()
    .extract_from(AmericanGuitarExtractor())
    .extract_from(JapaneseGuitarExtractor())
    .load_into(OrchestratorLoader(etl_inner))
)

etl_outer.execute()

If you find this interesting then you should definitely check out atc-dataplatform on Github



AppCenter Extensions for ASP.NET Core and Application Insights

In my previous post, I wrote about an open source project called AppCenterExtensions available at Github and nuget.org. I recently updated this project and added a few components for ASP.NET Core that enables including AppCenter diagnostic information in Application Insights.

The NuGet package is called AppCenterExtensions.AppInsights and contains extension methods and ITelemetryInitializer implementations to be used in a ASP.NET Core web app for including AppCenter diagnostic information when logging to Application Insights

Enabling this is easy. Assuming that the project is already configured to use Application Insights, just add the AppCenterExtensions.AppInsights NuGet package mentioned above to your ASP.NET Core and call services.AddAppCenterTelemetry() in the ConfigureServices method of the Startup class

Here’s an example:

public class Startup
{
    public Startup(IConfiguration configuration)
    {
        Configuration = configuration;
    }

    public IConfiguration Configuration { get; }

    public void ConfigureServices(IServiceCollection services)
    {
        // Configure and register services to the IoC

        services.AddApplicationInsightsTelemetry();
        services.AddAppCenterTelemetry();
    }

    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    {
        // Configure app
    }
}

Once this is setup, AppCenter diagnostic information should now be searchable and visible in Application Insights.

Here’s a screenshot of search results for the x-supportkey header

and here’s a screenshot of the details of a single request containing AppCenter diagnostic information logged in Application Insights

With this flow you can now correlate Crash Reports and Analytics data from AppCenter with the HTTP requests for your backend systems in Application Insights. In the systems that I have been involved with building we include the AppCenter diagnostic information from our API Gateway to all calls to our internal Microservices