AutoFaker - A Python library to minimize unit testing ceremony

Around a year ago, I found myself working full time in Python. Coming from a C# background, I really missed the unit testing tools you have in .NET. To be more specific, I missed things like xUnit, AutoFixture, and Fluent Assertions. This immediately got me started working on a project I called AutoFaker

AutoFaker is a Python library designed to minimize the setup/arrange phase of your unit tests by removing the need to manually write code to create anonymous variables as part of a test cases setup/arrange phase.

This library is heavily inspired by AutoFixture and was initially created for simplifying how to write unit tests for ETL (Extract-Transform-Load) code running from a python library on an Apache Spark cluster in Big Data solutions.

When writing unit tests you normally start with creating objects that represent the initial state of the test. This phase is called the arrange or setup phase of the test. In most cases, the system you want to test will force you to specify much more information than you really care about, so you frequently end up creating objects with no influence on the test itself just simply to satisfy the compiler/interpreter

AutoFaker is available from PyPI and should be installed using pip

pip install autofaker

AutoFaker can help by creating such anonymous variables for you. Here’s a simple example:

import unittest
from autofaker import Autodata

class Calculator:
  def add(self, number1: int, number2: int):
    return number1 + number2

class CalculatorTests(unittest.TestCase):
    def test_can_add_two_numbers(self):      
        # arrange
        numbers = Autodata.create_many(int, 2)
        sut = Autodata.create(Calculator)        
        # act
        result = sut.add(numbers[0], numbers[1])        
        # assert
        self.assertEqual(numbers[0] + numbers[1], result)

Since the point of this library is to simplify the arrange step of writing unit tests, we can use the @autodata and @fakedata are available to explicitly state whether to use anonymous variables or fake data and construct our system under test. To use this you can either define the types or the arguments as function arguments to the decorator, or specify argument annotations

import unittest
from autofaker import autodata

class Calculator:
  def add(self, number1: int, number2: int):
    return number1 + number2

class CalculatorTests(unittest.TestCase):
    @autodata(Calculator, int, int)
    def test_can_add_two_numbers_using_test_arguments(self, sut, number1, number2):
        result = sut.add(number1, number2)
        self.assertEqual(number1 + number2, result)

    @autodata()
    def test_can_add_two_numbers_using_annotated_arguments(self, 
                                                           sut: Calculator, 
                                                           number1: int, 
                                                           number2: int):
        result = sut.add(number1, number2)
        self.assertEqual(number1 + number2, result)

There are times when completely anonymous variables don’t make much sense, especially in data centric scenarios. For these use cases this library uses Faker for generating fake data. This option is enabled by setting use_fake_data to True when calling the Autodata.create() function

from dataclasses import dataclass
from autofaker import Autodata

@dataclass
class DataClass:
    id: int
    first_name: str
    last_name: str
    address: str
    job: str

data = Autodata.create(DataClass, use_fake_data=True)

print(f'id:     {data.id}')
print(f'name:   {data.name}')
print(f'job:    {data.job}\n')

The following code above might output something like:

id:     8952
name:   Justin Wise
job:    Chief Operating Officer

Supported data types

Currently autofaker supports creating anonymous variables for the following data types:

Built-in types:

  • int
  • float
  • str
  • complex
  • range
  • bytes
  • bytearray

Datetime types:

  • datetime
  • date

Classes:

  • Simple classes
  • @dataclass
  • Nested classes (and recursion)
  • Classes containing lists of other types

Dataframes:

  • Pandas dataframe

Example usages

Create anonymous built-in types like int, float, str and datetime types like datetime and date

print(f'anonymous string:    {Autodata.create(str)}')
print(f'anonymous int:       {Autodata.create(int)}')
print(f'anonymous float:     {Autodata.create(float)}')
print(f'anonymous complex:   {Autodata.create(complex)}')
print(f'anonymous range:     {Autodata.create(range)}')
print(f'anonymous bytes:     {Autodata.create(bytes)}')
print(f'anonymous bytearray: {Autodata.create(bytearray)}')
print(f'anonymous datetime:  {Autodata.create(datetime)}')
print(f'anonymous date:      {Autodata.create(datetime.date)}')

The code above might output the following

anonymous string:    f91954f1-96df-463f-a427-665c99213395
anonymous int:       2066712686
anonymous float:     725758222.8712853
anonymous datetime:  2017-06-19 02:40:41.000084
anonymous date:      2019-11-10 00:00:00

Creates an anonymous class


class SimpleClass:
    id = -1
    text = 'test'

cls = Autodata.create(SimpleClass)
print(f'id = {cls.id}')
print(f'text = {cls.text}')

The code above might output the following

id = 2020177162
text = ac54a65d-b4a3-4eda-a840-eb948ad10d5f

Create a collection of an anonymous class

class SimpleClass:
    id = -1
    text = 'test'

classes = Autodata.create_many(SimpleClass)
for cls in classes:
  print(f'id = {cls.id}')
  print(f'text = {cls.text}')
  print()

The code above might output the following

id = 242996515
text = 5bb60504-ccca-4104-9b7f-b978e52a6518

id = 836984239
text = 079df61e-a87e-4f26-8196-3f44157aabd6

id = 570703150
text = a3b86f08-c73a-4730-bde7-4bdff5360ef4

Creates an anonymous dataclass

from dataclasses import dataclass

@dataclass
class DataClass:
    id: int
    text: str

cls = Autodata.create(DataClass)
print(f'id = {cls.id}')
print(f'text = {cls.text}')

The code above might output the following

id = 314075507
text = 4a3b3cae-f4cf-4502-a7f3-61115a1e0d2a

Creates an anonymous dataclass using fake data

@dataclass
class DataClass:
    id: int

    name: str
    address: str
    job: str

    country: str
    currency_name: str
    currency_code: str

    email: str
    safe_email: str
    company_email: str

    hostname: str
    ipv4: str
    ipv6: str

    text: str


data = Autodata.create(DataClass, use_fake_data=True)

print(f'id:               {data.id}')
print(f'name:             {data.name}')
print(f'job:              {data.job}\n')
print(f'address:\n{data.address}\n')

print(f'country:          {data.country}')
print(f'currency name:    {data.currency_name}')
print(f'currency code:    {data.currency_code}\n')

print(f'email:            {data.email}')
print(f'safe email:       {data.safe_email}')
print(f'work email:       {data.company_email}\n')

print(f'hostname:         {data.hostname}')
print(f'IPv4:             {data.ipv4}')
print(f'IPv6:             {data.ipv6}\n')

print(f'text:\n{data.text}')

The code above might output the following

id:               8952
name:             Justin Wise
job:              Chief Operating Officer

address:
65939 Hernandez Parks
Rochaport, NC 41760

country:          Equatorial Guinea
currency name:    Burmese kyat
currency code:    ERN

email:            smithjohn@example.com
safe email:       kent11@example.com
work email:       marissagreen@brown-cole.com

hostname:         db-90.hendricks-west.org
IPv4:             66.139.143.242
IPv6:             895d:82f7:7c13:e7cb:f35d:c93:aeb2:8eeb

text:
Movie author culture represent. Enjoy myself over physical green lead but home.
Share wind factor far minute produce significant. Sense might fact leader.

Create an anonymous class with nested types


class NestedClass:
    id = -1
    text = 'test'
    inner = SimpleClass()

cls = Autodata.create(NestedClass)
print(f'id = {cls.id}')
print(f'text = {cls.text}')
print(f'inner.id = {cls.inner.id}')
print(f'inner.text = {cls.inner.text}')

The code above might output the following

id = 1565737216
text = e66ecd5c-c17a-4426-b755-36dfd2082672
inner.id = 390282329
inner.text = eef94b5c-aa95-427a-a9e6-d99e2cc1ffb2

Create a collection of an anonymous class with nested types

class NestedClass:
    id = -1
    text = 'test'
    inner = SimpleClass()

classes = Autodata.create_many(NestedClass)
for cls in classes:
  print(f'id = {cls.id}')
  print(f'text = {cls.text}')
  print(f'inner.id = {cls.inner.id}')
  print(f'inner.text = {cls.inner.text}')
  print()

The code above might output the following

id = 1116454042
text = ceeecf0c-7375-4f3a-8d4b-6d7a4f2b20fd
inner.id = 1067027444
inner.text = 079573ce-1ef4-408d-8984-1dbc7b0d0b80

id = 730390288
text = ff3ca474-a69d-4ff6-95b4-fbdb1bea7cdb
inner.id = 1632771208
inner.text = 9423e824-dc8f-4145-ba47-7301351a91f8

id = 187364960
text = b31ca191-5031-43a2-870a-7bc7c99e4110
inner.id = 1705149100
inner.text = e703a117-ba4f-4201-a31b-10ab8e54a673

Create a Pandas DataFrame using anonymous data generated from a specified type

class DataClass:
    id = -1
    type = '' 
    value = 0

pdf = Autodata.create_pandas_dataframe(DataClass)
print(pdf)

The code above might output the following

          id                                  type       value
0  778090854  13537c5a-62e7-488b-836e-a4b17f2f3ae9  1049015695
1  602015506  c043ca8d-e280-466a-8bba-ec1e0539fe28  1016359353
2  387753717  986b3b1c-abf4-4bc1-95cf-0e979390e4f3   766159839

Create a Pandas DataFrame using fake data generated from a specified type

class DataClass:
    id = -1
    type = '' 
    value = 0

pdf = Autodata.create_pandas_dataframe(DataClass, use_fake_data=True)
print(pdf)

The code above might output the following

  first_name    id last_name          phone_number
0   Lawrence  7670   Jimenez  001-172-307-0561x471
1      Bryan  9084    Walker         (697)893-6767
2       Paul  9824    Thomas    960.555.3577x65487



Orchestrated ETL Design Pattern for Apache Spark and Databricks

This article describes ideas based on existing software design patterns that can be applied to designing data processing libraries to be used by Databricks

As a data platform grows bigger it becomes harder and more complex to maintain a large set of Notebooks. Although, reusing code between Notebooks is possible using the %run command, it is neither elegant nor efficient.

The goal in mind when writing this is to move as much code regarding ETL operations as possible, if not all, away from Notebooks and into a Python library. This library should also contain reusable components for ingesting data in various formats from various data sources into a variety of data stores

Databricks Notebooks should be used for only 3 things:

  • Data exploration
  • Executing library code
  • Interacting with the dbutils API (i.e. reading secrets or setting up widgets for parameterized jobs)

All components must be designed with unit and integration testing in mind, and tests must execute in the CI/CD pipeline

In the past, some of us have used and implemented variations on the Model-View-Whatever (MVC, MVP, MVVM, etc) design pattern. Such patterns solve the problems regarding separating concerns between the following:

  • Data layer
  • Business logic
  • Application logic
  • User interface

A similar pattern can be derived for separating the following concerns:

  • Extraction
  • Transformation
  • Loading

The OETL Design Pattern

Short for Orchestrated Extract-Transform-Load is a pattern that takes the ideas behind variations of the Model-View-Whatever design pattern

Orchestrated ETL

The Orchestrator is responsible for conducting the interactions between the Extractor -> Transformer -> Loader.

The Ochestrator reads data from the Extractor then uses the result as a parameter to calling the Transformer and saves the transformed result into the Loader. The Transformer can be optional as there are scenarios where data transformation is not needed, i.e. raw data ingestion to the landing zone (bronze)

Each layer may have a single or multiple implementations, and this is handled automatically in the Orchestrator

In Python, an example of an Orchestrator with single implementations of the Extractor, Transformer, and Loader would look something like this:

class Orchestrator:
    def __init__(self,
                 extractor: Extractor,
                 transformer: Transformer,
                 loader: Loader):
        self.loader = loader
        self.transformer = transformer
        self.extractor = extractor

    def execute(self):
        df = self.extractor.read()
        df = self.transformer.process(df)
        self.loader.save(df)

ATC-DataPlatform

A framework for this design pattern is implemented in a Python Library called atc-dataplatform available from PyPi

pip install atc-dataplatform

Orchestration Fluent Interface

atc-dataplatform provides common simple implementations and base classes for implementing the OETL design pattern. To simplify object construction, this library provides the Orchestrator fluent interface from atc.etl

from atc.etl import Extractor, Transformer, Loader, Orchestrator

(Orchestrator()
    .extract_from(Extractor())
    .transform_with(Transformer())
    .load_into(Loader())
    .execute())

Usage examples:

Here are some example usages and implementations of the ETL class provided

Example-1

Here’s an example of reading data from a single location, transforming it once and saving to a single destination. This is the most simple elt case, and will be used as base for the below more complex examples.

import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import IntegerType

from atc.etl import Extractor, Transformer, Loader, Orchestrator
from atc.spark import Spark


class GuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                    ("3", "Ibanez", "RG", "1987"),
                ]
            ),
            """
            id STRING,
            brand STRING,
            model STRING,
            year STRING
            """,
        )


class BasicTransformer(Transformer):
    def process(self, df: DataFrame) -> DataFrame:
        print("Current DataFrame schema")
        df.printSchema()

        df = df.withColumn("id", f.col("id").cast(IntegerType()))
        df = df.withColumn("year", f.col("year").cast(IntegerType()))

        print("New DataFrame schema")
        df.printSchema()
        return df


class NoopLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator using a single simple transformer")
etl = (
    Orchestrator()
    .extract_from(GuitarExtractor())
    .transform_with(BasicTransformer())
    .load_into(NoopLoader())
)
etl.execute()

The code above produces the following output:

Original DataFrame schema
root
 |-- id: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- model: string (nullable = true)
 |-- year: string (nullable = true)

New DataFrame schema
root
 |-- id: integer (nullable = true)
 |-- brand: string (nullable = true)
 |-- model: string (nullable = true)
 |-- year: integer (nullable = true)

+---+------+----------+----+
| id| brand|     model|year|
+---+------+----------+----+
|  1|Fender|Telecaster|1950|
|  2|Gibson|  Les Paul|1959|
|  3|Ibanez|        RG|1987|
+---+------+----------+----+

Example-2

Here’s an example of having multiple Transformer implementations that is reused to change the data type of a given column, where the column name is parameterized.

import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from atc.etl import Extractor, Transformer, Loader, Orchestrator
from atc.spark import Spark


class GuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                    ("3", "Ibanez", "RG", "1987"),
                ]
            ),
            StructType(
                [
                    StructField("id", StringType()),
                    StructField("brand", StringType()),
                    StructField("model", StringType()),
                    StructField("year", StringType()),
                ]
            ),
        )


class IntegerColumnTransformer(Transformer):
    def __init__(self, col_name: str):
        super().__init__()
        self.col_name = col_name

    def process(self, df: DataFrame) -> DataFrame:
        df = df.withColumn(self.col_name, f.col(self.col_name).cast(IntegerType()))
        return df


class NoopLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator using multiple transformers")
etl = (
    Orchestrator()
    .extract_from(GuitarExtractor())
    .transform_with(IntegerColumnTransformer("id"))
    .transform_with(IntegerColumnTransformer("year"))
    .load_into(NoopLoader())
)
etl.execute()

Example-3

Here’s an example of having multiple Extractor implementations and applying transformations using the process_many method.

The read() function in Extractor will return a dictionary that uses the type name of the Extractor as the key, and a DataFrame as its value, the used kan can be overridden in the constructor.

Transformer provides the function process_many(dataset: {}) and returns a single DataFrame.

import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, StringType

from atc.etl import Extractor, Loader, Orchestrator, Transformer
from atc.spark import Spark


class AmericanGuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                ]
            ),
            StructType(
                [
                    StructField("id", StringType()),
                    StructField("brand", StringType()),
                    StructField("model", StringType()),
                    StructField("year", StringType()),
                ]
            ),
        )


class JapaneseGuitarExtractor(Extractor):
    def __init__(self):
        super().__init__(dataset_key="japanese")

    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [("3", "Ibanez", "RG", "1987"), ("4", "Takamine", "Pro Series", "1959")]
            ),
            StructType(
                [
                    StructField("id", StringType()),
                    StructField("brand", StringType()),
                    StructField("model", StringType()),
                    StructField("year", StringType()),
                ]
            ),
        )


class CountryOfOriginTransformer(Transformer):
    def process_many(self, dataset: {}) -> DataFrame:
        usa_df = dataset["AmericanGuitarExtractor"].withColumn("country", f.lit("USA"))
        jap_df = dataset["japanese"].withColumn("country", f.lit("Japan"))
        return usa_df.union(jap_df)


class NoopLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator using multiple extractors")
etl = (
    Orchestrator()
    .extract_from(AmericanGuitarExtractor())
    .extract_from(JapaneseGuitarExtractor())
    .transform_with(CountryOfOriginTransformer())
    .load_into(NoopLoader())
)
etl.execute()

The code above produces the following output:

root
 |-- id: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- model: string (nullable = true)
 |-- year: string (nullable = true)
 |-- country: string (nullable = false)

+---+--------+----------+----+-------+
| id|   brand|     model|year|country|
+---+--------+----------+----+-------+
|  1|  Fender|Telecaster|1950|    USA|
|  2|  Gibson|  Les Paul|1959|    USA|
|  3|  Ibanez|        RG|1987|  Japan|
|  4|Takamine|Pro Series|1959|  Japan|
+---+--------+----------+----+-------+

Example-4

Here’s an example of data raw ingestion without applying any transformations.

from pyspark.sql import DataFrame

from atc.etl import Extractor, Loader, Orchestrator
from atc.spark import Spark


class GuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                    ("3", "Ibanez", "RG", "1987"),
                ]
            ),
            """id STRING, brand STRING, model STRING, year STRING""",
        )


class NoopLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator with no transformations")
etl = Orchestrator().extract_from(GuitarExtractor()).load_into(NoopLoader())
etl.execute()

Example-5

Here’s an example of having multiple Loader implementations that is writing the transformed data into multiple destinations.

import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from atc.etl import Extractor, Transformer, Loader, Orchestrator
from atc.spark import Spark


class GuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                    ("3", "Ibanez", "RG", "1987"),
                ]
            ),
            StructType(
                [
                    StructField("id", StringType()),
                    StructField("brand", StringType()),
                    StructField("model", StringType()),
                    StructField("year", StringType()),
                ]
            ),
        )


class BasicTransformer(Transformer):
    def process(self, df: DataFrame) -> DataFrame:
        print("Current DataFrame schema")
        df.printSchema()

        df = df.withColumn("id", f.col("id").cast(IntegerType()))
        df = df.withColumn("year", f.col("year").cast(IntegerType()))

        print("New DataFrame schema")
        df.printSchema()
        return df


class NoopSilverLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()


class NoopGoldLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator using multiple loaders")
etl = (
    Orchestrator()
    .extract_from(GuitarExtractor())
    .transform_with(BasicTransformer())
    .load_into(NoopSilverLoader())
    .load_into(NoopGoldLoader())
)
etl.execute()

Example-6

Using Example-2, Example-3 and Example-5 as reference, any combinations for single/multiple implementations of Extractor, Transformer or Loader can be created.

Here’s an example of having both multiple Extractor, Transformer and Loader implementations.

It is important that the first transformer is a MultiInputTransformer when having multiple extractors.

import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from atc.etl import Extractor, Transformer, Loader, Orchestrator
from atc.spark import Spark


class AmericanGuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                ]
            ),
            StructType(
                [
                    StructField("id", StringType()),
                    StructField("brand", StringType()),
                    StructField("model", StringType()),
                    StructField("year", StringType()),
                ]
            ),
        )


class JapaneseGuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [("3", "Ibanez", "RG", "1987"), ("4", "Takamine", "Pro Series", "1959")]
            ),
            StructType(
                [
                    StructField("id", StringType()),
                    StructField("brand", StringType()),
                    StructField("model", StringType()),
                    StructField("year", StringType()),
                ]
            ),
        )


class CountryOfOriginTransformer(Transformer):
    def process_many(self, dataset: {}) -> DataFrame:
        usa_df = dataset["AmericanGuitarExtractor"].withColumn("country", f.lit("USA"))
        jap_df = dataset["JapaneseGuitarExtractor"].withColumn(
            "country", f.lit("Japan")
        )
        return usa_df.union(jap_df)


class BasicTransformer(Transformer):
    def process(self, df: DataFrame) -> DataFrame:
        print("Current DataFrame schema")
        df.printSchema()

        df = df.withColumn("id", f.col("id").cast(IntegerType()))
        df = df.withColumn("year", f.col("year").cast(IntegerType()))

        print("New DataFrame schema")
        df.printSchema()
        return df


class NoopSilverLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()


class NoopGoldLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator using multiple loaders")
etl = (
    Orchestrator()
    .extract_from(AmericanGuitarExtractor())
    .extract_from(JapaneseGuitarExtractor())
    .transform_with(CountryOfOriginTransformer())
    .transform_with(BasicTransformer())
    .load_into(NoopSilverLoader())
    .load_into(NoopGoldLoader())
)
etl.execute()

Example-7

This example illustrates the use of an orchestrator as just another ETL step. The principle is called composit orchestration:

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import DataFrame
from pyspark.sql.types import IntegerType

from atc.etl import Extractor, Transformer, Loader, Orchestrator, dataset_group
from atc.spark import Spark


class AmericanGuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("1", "Fender", "Telecaster", "1950"),
                    ("2", "Gibson", "Les Paul", "1959"),
                ]
            ),
            T.StructType(
                [
                    T.StructField("id", T.StringType()),
                    T.StructField("brand", T.StringType()),
                    T.StructField("model", T.StringType()),
                    T.StructField("year", T.StringType()),
                ]
            ),
        )


class JapaneseGuitarExtractor(Extractor):
    def read(self) -> DataFrame:
        return Spark.get().createDataFrame(
            Spark.get().sparkContext.parallelize(
                [
                    ("3", "Ibanez", "RG", "1987"),
                    ("4", "Takamine", "Pro Series", "1959"),
                ]
            ),
            T.StructType(
                [
                    T.StructField("id", T.StringType()),
                    T.StructField("brand", T.StringType()),
                    T.StructField("model", T.StringType()),
                    T.StructField("year", T.StringType()),
                ]
            ),
        )


class CountryOfOriginTransformer(Transformer):
    def process_many(self, dataset: dataset_group) -> DataFrame:
        usa_df = dataset["AmericanGuitarExtractor"].withColumn("country", F.lit("USA"))
        jap_df = dataset["JapaneseGuitarExtractor"].withColumn("country", F.lit("Japan"))
        return usa_df.union(jap_df)


class OrchestratorLoader(Loader):
    def __init__(self, orchestrator: Orchestrator):
        super().__init__()
        self.orchestrator = orchestrator

    def save_many(self, datasets: dataset_group) -> None:
        self.orchestrator.execute(datasets)


class NoopLoader(Loader):
    def save(self, df: DataFrame) -> None:
        df.write.format("noop").mode("overwrite").save()
        df.printSchema()
        df.show()


print("ETL Orchestrator using composit innter orchestrator")
etl_inner = (
    Orchestrator()
    .transform_with(CountryOfOriginTransformer())
    .load_into(NoopLoader())
)

etl_outer = (
    Orchestrator()
    .extract_from(AmericanGuitarExtractor())
    .extract_from(JapaneseGuitarExtractor())
    .load_into(OrchestratorLoader(etl_inner))
)

etl_outer.execute()

If you find this interesting then you should definitely check out atc-dataplatform on Github



AppCenter Extensions for ASP.NET Core and Application Insights

In my previous post, I wrote about an open source project called AppCenterExtensions available at Github and nuget.org. I recently updated this project and added a few components for ASP.NET Core that enables including AppCenter diagnostic information in Application Insights.

The NuGet package is called AppCenterExtensions.AppInsights and contains extension methods and ITelemetryInitializer implementations to be used in a ASP.NET Core web app for including AppCenter diagnostic information when logging to Application Insights

Enabling this is easy. Assuming that the project is already configured to use Application Insights, just add the AppCenterExtensions.AppInsights NuGet package mentioned above to your ASP.NET Core and call services.AddAppCenterTelemetry() in the ConfigureServices method of the Startup class

Here’s an example:

public class Startup
{
    public Startup(IConfiguration configuration)
    {
        Configuration = configuration;
    }

    public IConfiguration Configuration { get; }

    public void ConfigureServices(IServiceCollection services)
    {
        // Configure and register services to the IoC

        services.AddApplicationInsightsTelemetry();
        services.AddAppCenterTelemetry();
    }

    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    {
        // Configure app
    }
}

Once this is setup, AppCenter diagnostic information should now be searchable and visible in Application Insights.

Here’s a screenshot of search results for the x-supportkey header

and here’s a screenshot of the details of a single request containing AppCenter diagnostic information logged in Application Insights

With this flow you can now correlate Crash Reports and Analytics data from AppCenter with the HTTP requests for your backend systems in Application Insights. In the systems that I have been involved with building we include the AppCenter diagnostic information from our API Gateway to all calls to our internal Microservices



AppCenter Extensions for Xamarin.Forms

For the past 3 years or so I have been AppCenter for Crash Reporting and Analytics in Xamarin based apps. During this time, I have mostly built enterprise focused apps using Xamarin.Forms and as a developer I always think about code reuse which usually comes in the form of a library. Early this year, I decided to create and open source a set of convenience classes and extension methods to simplify Crash Reporting and Analytics using AppCenter and called it AppCenterExtensions.

The core features of the project are the following:

  • Simplified user interaction reporting using ICommand implementations
  • Automatic page tracking in Xamarin.Forms including time spent on screen
  • Extension methods for crash reporting
  • Anonymous user information configuration

This library is distributed as 2 NuGet packages

Getting Started

This library is configured almost the same way as the AppCenter SDK. You provide the AppCenter secrets, and specify whether to anonymize the user information. Both Crash Reporting and Analytics are always enabled when using AppCenterSetup.

AppCenterSetup.Instance.Start(
    "[iOS AppCenter secret]",
    "[Android AppCenter secret]",
    anonymizeAppCenterUser: true);

or

await AppCenterSetup.Instance.StartAsync(
    "[iOS AppCenter secret]",
    "[Android AppCenter secret]",
    anonymizeAppCenterUser: true);

The reason for the async API here is because anonymizeAppCenterUser internally relies on an async API. The synchronous API’s for starting AppCenter are non-blocking methods that do a fire-and-forget call to StartAsync(string,bool).

Anonymous User Information
The component AppCenterSetup exposes a method called UseAnonymousUserIdAsync() which sets the UserId in AppCenter to the first 8 characters a GUID that is unique per app installation. This can be used as a support key for uniquely identifying application users for instrumentation and troubleshooting. The support key can be attached to all HTTP calls by using the DiagnosticDelegatingHandler

AppCenter Crash Report

Error Reporting
The library exposes extension methods to the Exception class for conveniently reporting Exceptions to AppCenter

try
{
    // Something that blows up
    explosives.Detonate();
}
catch (Exception e)
{
    // Safely handle error then report
    e.Report();
}

HTTP Error Logging
The library provides a HttpMessageHandler implementation that logs non-successfuly HTTP results to AppCenter Analytics. This component will also attach HTTP headers describing the AppCenter SDK Version, Install ID, and a support key to all HTTP requests. The logged failed responses will contain the Endpoint URL (including the HTTP verb), Response status code, how the duration of the HTTP call. This will be logged under the event name HTTP Error

You will in most (if not all) cases would want to keep a singleton instance of the HttpClient. The DiagnosticDelegatingHandler is designed with unit testing in mind and accepts an IAnalytics and IAppCenterSetup interface, it also accepts an inner HttpMessageHandler if you wish to chain multiple delegating handlers.

var httpClient = new HttpClient(new DiagnosticDelegatingHandler());
await httpClient.GetAsync("https://entbpr4b9bdpo.x.pipedream.net/");

In the example above we made an HTTP GET call to the RequestBin endpoint https://entbpr4b9bdpo.x.pipedream.net. This will result in the following we inspected in RequestBin

AppCenter Crash Report
ITrackingCommand

This library provides 3 convenience implementations of ICommand that will report the action to AppCenter Analytics after successfully invoking the execute callback method

  • TrackingCommand - This implementation accepts an Action as the Execute callback and a Func<bool> as the CanExecute callback
  • TrackingCommand - This implementation accepts an Action<T> as the Execute callback and a Func<T, bool> as the CanExecute callback
  • AsyncTrackingCommand - This implementation accepts a Func<Task> as the execute callback and a Func<bool> as the CanExecute callback. This also exposes a CompletionTask property that the consumer can await if desired. The Execute(object parameter) method here is a non-blocking call
using System.Threading.Tasks;
using System.Windows.Input;
using ChristianHelle.DeveloperTools.AppCenterExtensions.Commands;
using ChristianHelle.DeveloperTools.AppCenterExtensions.Extensions;
using Microsoft.AppCenter.Crashes;
using Xamarin.Essentials;

namespace SampleApp.ViewModels
{
    public class AboutViewModel : BaseViewModel
    {
        public AboutViewModel()
        {
            AsyncButtonTappedCommand = new AsyncTrackingCommand(
                OnAsyncButtonTapped,
                nameof(AsyncButtonTappedCommand).ToTrackingEventName(),
                nameof(AboutViewModel).ToTrackingEventName());

            ButtonOneTappedCommand = new TrackingCommand(
                OnButtonOneTapped,
                nameof(ButtonOneTappedCommand).ToTrackingEventName(),
                nameof(AboutViewModel).ToTrackingEventName());

            ButtonTwoTappedCommand = new TrackingCommand<string>(
                OnButtonTapped,
                nameof(ButtonTwoTappedCommand).ToTrackingEventName(),
                nameof(AboutViewModel).ToTrackingEventName());
        }

        public ICommand AsyncButtonTappedCommand { get; }
        public ICommand ButtonOneTappedCommand { get; }
        public ICommand ButtonTwoTappedCommand { get; }

        private Task OnAsyncButtonTapped()
            => Browser.OpenAsync("https://xamarin.com");

        private void OnButtonOneTapped() { }

        private void OnButtonTwoTapped(string obj) { }
    }
}

Specifying the screenName argument in the constructor is optional and when this is not provided manually then it will use the declaring Type name from the method that instantiated the ITrackingCommand instance and convert it to a more analytics friendly event name using the ToTrackingEventName() extension method. In the example above, if the nameof(AboutViewModel).ToTrackingEventName() parameter is not provided then the owner declaring Type is AboutViewModel and the ScreenName will be set to "About"

Automatic Page Tracking
Automatic page tracking is enabled by replacing the base class of the ContentPage to classes to use TrackingContentPage class. By doing so the library will send page tracking information to AppCenter after leaving every page. Currently, the library will send the page Type, Title, and the duration spent on the screen. The library is rather opinionated on how to log information, and this will only change if I get a request to do so. Duration spent on screen is calculated using a Stopwatch that is started upon Page OnAppearing and is reported to Analytics upon OnDisappearing. The event name is based on the Type name of the Page and is split into multiple words based on pascal case rules and afterwards removes words like Page, View, Model, Async. For example: UserSettingsPage or UserSettingsView becomes User Settings

XAML Example:

<?xml version="1.0" encoding="utf-8"?>
<ext:TrackingContentPage 
    xmlns="http://xamarin.com/schemas/2014/forms" 
    xmlns:x="http://schemas.microsoft.com/winfx/2009/xaml" 
    xmlns:d="http://xamarin.com/schemas/2014/forms/design" 
    xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
    xmlns:ext="clr-namespace:AppCenterExtensions.XamarinForms;assembly=AppCenterExtensions.XamarinForms"
    mc:Ignorable="d" 
    x:Class="SampleApp.Views.ItemDetailPage" 
    Title="{Binding Title}">

    <StackLayout Spacing="20" Padding="15">
        <Label Text="Text:" FontSize="Medium" />
        <Label Text="{Binding Item.Text}" d:Text="Item name" FontSize="Small" />
        <Label Text="Description:" FontSize="Medium" />
        <Label Text="{Binding Item.Description}" d:Text="Item description" FontSize="Small" />
    </StackLayout>

</ext:TrackingContentPage>

Custom Trace Listener
This library includes a trace listener implementation that reports to AppCenter. The reason for this is to cater to those who have implemented error handling or reporting using Trace Listeners, these types of users can just swap out (or add on) the AppCenterTraceListener

This implementation implements the following methods:

  • Write(object obj)
  • Write(object obj, string category)
  • WriteLine(object obj)
  • WriteLine(object obj, string category)

If the object provided is an Exception then this is reported to AppCenter Crash Reporting. If the object provided is an instance of AnalyticsEvent then this is sent to AppCenter Analytics

The AnalyticsEvent exposes 2 properties:

  • string EventName { get; } - self explanatory
  • IDictionary<string,string> Properties { get; } - Additional properties to attach to the Analytics event

To set it up you simply add an instance of AppCenterTraceListener to your existing Trace listeners:

Trace.Listeners.Add(new AppCenterTraceListener());

Here’s an example of how to use System.Diagnostics.Trace to report errors

try
{
    // Something that blows up
    explosives.Detonate();
}
catch (Exception e)
{
    // Safely handle error then report
    Trace.Write(e);

    // or
    Trace.Write(e, "Error");

    // or
    Trace.WriteLine(e);

    // or
    Trace.WriteLine(e, "Error");
}

and here’s an example of to use System.Diagnostics.Trace to send analytics data

public partial class App : Application
{
    private const string StateKey = "State";

    public App()
    {
        // Some initialization code ...

        Trace.Listeners.Add(new AppCenterTraceListener());
    }

    protected override void OnStart()
        => Trace.Write(
            new AnalyticsEvent(
                nameof(Application),
                new Dictionary<string, string>
                {
                    { StateKey, nameof(OnStart) }
                }));

    protected override void OnSleep()
        => Trace.Write(
            new AnalyticsEvent(
                nameof(Application),
                new Dictionary<string, string>
                {
                    { StateKey, nameof(OnSleep) }
                }));

    protected override void OnResume()
        => Trace.Write(
            new AnalyticsEvent(
                nameof(Application),
                new Dictionary<string, string>
                {
                    { StateKey, nameof(OnResume) }
                }));
}

Task Extensions
This library includes a few Task extension methods with AppCenter error reporting in mind. Possible exceptions that occur in the async operation are swallowed and reported to AppCenter. These extension methods will internally wrap the Task in a try/catch and await the Task using ConfigureAwait(false).

Here are usage some examples

Fire and Forget on a Task (Note: Forget() returns void)

var task = someClass.SomethingAsync()  
task.Forget()

Awaitable Task (also available for Task<T>)

var task = someClass.SomethingAsync()  
await task.WhenErrorReportAsync();



Generate Android Translations from Google Sheets

In previous articles Generating ResX translations from Google Sheets and Generate iOS InfoPlist.strings Translations from Google Sheets, I wrote about using Google Sheets as a translation tool by using the GOOGLETRANSLATE built in function to generate translation files for a Xamarin based solution. For this post, I will demonstrate something very similar, but instead of ResX files or InfoPlist.strings, I’ll generate strings.xml files for Android. For the sake of this article I created this sample Google Sheets

For a quick recap, we will use a tool called csvtrans written by my colleague and good friend, Ricky Kaare Engelharth. The tool is built with .NET Core and can be installed using this command

dotnet tool install -g csvtrans

Using the tool is also straight forward and it also comes with some quick start instructions

USAGE: csvtrans [--help] [--sheet <document id> <sheet name>]
            [--csv <url or path>] [--format <apple|android|resx>]
            [--outputdir <directory path>] [--name <string>]
            [--convert-placeholders <regex pattern>]

OPTIONS:

    --sheet, -s <document id> <sheet name>
                        specify a Google Sheet as input.
    --csv, -c <url or path>
                        specify a online or local cvs file as input.
    --format, -f <apple|android|resx>
                        specify the output format.
    --outputdir, -o <directory path>
                        specify the output directory.
    --name, -n <string>   specify an optional name for the output.
    --convert-placeholders, -p <regex pattern>
                        convert placeholders to match the output format.
    --help                display this list of options.

Here’s an example usage of tool

csvtrans --sheet 1mrMkhItrIDsPwEKMlR8JJ3Pgj1K6zUv0AhmBT4jWRqs Android --format android --outputdir .\Resources\

The first argument **–-sheet** is the Google Sheet document ID followed by the Sheet Name, the next argument **–-format** specifies the output file format, and the last argument **–-outputdir** specifies the output folder

You can get the Document ID from the URL of the Google Sheet

Here’s an example output

Now I can just bring these files into my project and use them directly. Well, almost! There’s one little problem, and that is that by default the Xamarin.Android csproj tooling explicitly adds each strings.xml file as an AndroidResource. Oddly enough, the csproj format allows to specify wild card folders, so if we want to enable dynamic generation of values/strings.xml translations then we need to manually edit the csproj.

This is actually very easy to do. We just need to replace the lines like

with

This opens up for dynamic translations at build time using your CI/CD build tools of choice