AutoFaker - A Python library to minimize unit testing ceremony
Around a year ago, I found myself working full time in Python. Coming from a C# background, I really missed the unit testing tools you have in .NET. To be more specific, I missed things like xUnit, AutoFixture, and Fluent Assertions. This immediately got me started working on a project I called AutoFaker
AutoFaker is a Python library designed to minimize the setup/arrange phase of your unit tests by removing the need to manually write code to create anonymous variables as part of a test cases setup/arrange phase.
This library is heavily inspired by AutoFixture and was initially created for simplifying how to write unit tests for ETL (Extract-Transform-Load) code running from a python library on an Apache Spark cluster in Big Data solutions.
When writing unit tests you normally start with creating objects that represent the initial state of the test. This phase is called the arrange or setup phase of the test. In most cases, the system you want to test will force you to specify much more information than you really care about, so you frequently end up creating objects with no influence on the test itself just simply to satisfy the compiler/interpreter
AutoFaker is available from PyPI and should be installed using pip
pip install autofaker
AutoFaker can help by creating such anonymous variables for you. Here’s a simple example:
import unittest
from autofaker import Autodata
class Calculator:
def add(self, number1: int, number2: int):
return number1 + number2
class CalculatorTests(unittest.TestCase):
def test_can_add_two_numbers(self):
# arrange
numbers = Autodata.create_many(int, 2)
sut = Autodata.create(Calculator)
# act
result = sut.add(numbers[0], numbers[1])
# assert
self.assertEqual(numbers[0] + numbers[1], result)
Since the point of this library is to simplify the arrange step of writing unit tests, we can use the
@autodata
and @fakedata
are available to explicitly state
whether to use anonymous variables or fake data and construct our system under test.
To use this you can either define the types or the arguments as function arguments to the decorator, or specify
argument annotations
import unittest
from autofaker import autodata
class Calculator:
def add(self, number1: int, number2: int):
return number1 + number2
class CalculatorTests(unittest.TestCase):
@autodata(Calculator, int, int)
def test_can_add_two_numbers_using_test_arguments(self, sut, number1, number2):
result = sut.add(number1, number2)
self.assertEqual(number1 + number2, result)
@autodata()
def test_can_add_two_numbers_using_annotated_arguments(self,
sut: Calculator,
number1: int,
number2: int):
result = sut.add(number1, number2)
self.assertEqual(number1 + number2, result)
There are times when completely anonymous variables don’t make much sense, especially in data centric scenarios.
For these use cases this library uses Faker for generating fake data. This option
is enabled by setting use_fake_data
to True
when calling the Autodata.create()
function
from dataclasses import dataclass
from autofaker import Autodata
@dataclass
class DataClass:
id: int
name: str
job: str
data = Autodata.create(DataClass, use_fake_data=True)
print(f'id: {data.id}')
print(f'name: {data.name}')
print(f'job: {data.job}\n')
The following code above might output something like:
id: 8952
name: Justin Wise
job: Chief Operating Officer
Supported data types
Currently autofaker supports creating anonymous variables for the following data types:
Built-in types:
- int
- float
- str
- complex
- range
- bytes
- bytearray
Datetime types:
- datetime
- date
Classes:
- Simple classes
- @dataclass
- Nested classes (and recursion)
- Classes containing lists of other types
Dataframes:
- Pandas dataframe
Example usages
Create anonymous built-in types like int
, float
, str
and datetime types like datetime
and date
print(f'anonymous string: {Autodata.create(str)}')
print(f'anonymous int: {Autodata.create(int)}')
print(f'anonymous float: {Autodata.create(float)}')
print(f'anonymous complex: {Autodata.create(complex)}')
print(f'anonymous range: {Autodata.create(range)}')
print(f'anonymous bytes: {Autodata.create(bytes)}')
print(f'anonymous bytearray: {Autodata.create(bytearray)}')
print(f'anonymous datetime: {Autodata.create(datetime)}')
print(f'anonymous date: {Autodata.create(datetime.date)}')
The code above might output the following
anonymous string: f91954f1-96df-463f-a427-665c99213395
anonymous int: 2066712686
anonymous float: 725758222.8712853
anonymous datetime: 2017-06-19 02:40:41.000084
anonymous date: 2019-11-10 00:00:00
Creates an anonymous class
class SimpleClass:
id = -1
text = 'test'
cls = Autodata.create(SimpleClass)
print(f'id = {cls.id}')
print(f'text = {cls.text}')
The code above might output the following
id = 2020177162
text = ac54a65d-b4a3-4eda-a840-eb948ad10d5f
Create a collection of an anonymous class
class SimpleClass:
id = -1
text = 'test'
classes = Autodata.create_many(SimpleClass)
for cls in classes:
print(f'id = {cls.id}')
print(f'text = {cls.text}')
print()
The code above might output the following
id = 242996515
text = 5bb60504-ccca-4104-9b7f-b978e52a6518
id = 836984239
text = 079df61e-a87e-4f26-8196-3f44157aabd6
id = 570703150
text = a3b86f08-c73a-4730-bde7-4bdff5360ef4
Creates an anonymous dataclass
from dataclasses import dataclass
@dataclass
class DataClass:
id: int
text: str
cls = Autodata.create(DataClass)
print(f'id = {cls.id}')
print(f'text = {cls.text}')
The code above might output the following
id = 314075507
text = 4a3b3cae-f4cf-4502-a7f3-61115a1e0d2a
Creates an anonymous dataclass using fake data
@dataclass
class DataClass:
id: int
name: str
address: str
job: str
country: str
currency_name: str
currency_code: str
email: str
safe_email: str
company_email: str
hostname: str
ipv4: str
ipv6: str
text: str
data = Autodata.create(DataClass, use_fake_data=True)
print(f'id: {data.id}')
print(f'name: {data.name}')
print(f'job: {data.job}\n')
print(f'address:\n{data.address}\n')
print(f'country: {data.country}')
print(f'currency name: {data.currency_name}')
print(f'currency code: {data.currency_code}\n')
print(f'email: {data.email}')
print(f'safe email: {data.safe_email}')
print(f'work email: {data.company_email}\n')
print(f'hostname: {data.hostname}')
print(f'IPv4: {data.ipv4}')
print(f'IPv6: {data.ipv6}\n')
print(f'text:\n{data.text}')
The code above might output the following
id: 8952
name: Justin Wise
job: Chief Operating Officer
address:
65939 Hernandez Parks
Rochaport, NC 41760
country: Equatorial Guinea
currency name: Burmese kyat
currency code: ERN
email: smithjohn@example.com
safe email: kent11@example.com
work email: marissagreen@brown-cole.com
hostname: db-90.hendricks-west.org
IPv4: 66.139.143.242
IPv6: 895d:82f7:7c13:e7cb:f35d:c93:aeb2:8eeb
text:
Movie author culture represent. Enjoy myself over physical green lead but home.
Share wind factor far minute produce significant. Sense might fact leader.
Create an anonymous class with nested types
class NestedClass:
id = -1
text = 'test'
inner = SimpleClass()
cls = Autodata.create(NestedClass)
print(f'id = {cls.id}')
print(f'text = {cls.text}')
print(f'inner.id = {cls.inner.id}')
print(f'inner.text = {cls.inner.text}')
The code above might output the following
id = 1565737216
text = e66ecd5c-c17a-4426-b755-36dfd2082672
inner.id = 390282329
inner.text = eef94b5c-aa95-427a-a9e6-d99e2cc1ffb2
Create a collection of an anonymous class with nested types
class NestedClass:
id = -1
text = 'test'
inner = SimpleClass()
classes = Autodata.create_many(NestedClass)
for cls in classes:
print(f'id = {cls.id}')
print(f'text = {cls.text}')
print(f'inner.id = {cls.inner.id}')
print(f'inner.text = {cls.inner.text}')
print()
The code above might output the following
id = 1116454042
text = ceeecf0c-7375-4f3a-8d4b-6d7a4f2b20fd
inner.id = 1067027444
inner.text = 079573ce-1ef4-408d-8984-1dbc7b0d0b80
id = 730390288
text = ff3ca474-a69d-4ff6-95b4-fbdb1bea7cdb
inner.id = 1632771208
inner.text = 9423e824-dc8f-4145-ba47-7301351a91f8
id = 187364960
text = b31ca191-5031-43a2-870a-7bc7c99e4110
inner.id = 1705149100
inner.text = e703a117-ba4f-4201-a31b-10ab8e54a673
Create a Pandas DataFrame using anonymous data generated from a specified type
class DataClass:
id = -1
type = ''
value = 0
pdf = Autodata.create_pandas_dataframe(DataClass)
print(pdf)
The code above might output the following
id type value
0 778090854 13537c5a-62e7-488b-836e-a4b17f2f3ae9 1049015695
1 602015506 c043ca8d-e280-466a-8bba-ec1e0539fe28 1016359353
2 387753717 986b3b1c-abf4-4bc1-95cf-0e979390e4f3 766159839
Create a Pandas DataFrame using fake data generated from a specified type
class DataClass:
id = -1
type = ''
value = 0
pdf = Autodata.create_pandas_dataframe(DataClass, use_fake_data=True)
print(pdf)
The code above might output the following
first_name id last_name phone_number
0 Lawrence 7670 Jimenez 001-172-307-0561x471
1 Bryan 9084 Walker (697)893-6767
2 Paul 9824 Thomas 960.555.3577x65487
Orchestrated ETL Design Pattern for Apache Spark and Databricks
This article describes ideas based on existing software design patterns that can be applied to designing data processing libraries to be used by Databricks
As a data platform grows bigger it becomes harder and more complex to maintain a large set of Notebooks. Although, reusing code between Notebooks is possible using the %run
command, it is neither elegant nor efficient.
The goal in mind when writing this is to move as much code regarding ETL operations as possible, if not all, away from Notebooks and into a Python library. This library should also contain reusable components for ingesting data in various formats from various data sources into a variety of data stores
Databricks Notebooks should be used for only 3 things:
- Data exploration
- Executing library code
- Interacting with the dbutils API (i.e. reading secrets or setting up widgets for parameterized jobs)
All components must be designed with unit and integration testing in mind, and tests must execute in the CI/CD pipeline
In the past, some of us have used and implemented variations on the Model-View-Whatever (MVC, MVP, MVVM, etc) design pattern. Such patterns solve the problems regarding separating concerns between the following:
- Data layer
- Business logic
- Application logic
- User interface
A similar pattern can be derived for separating the following concerns:
- Extraction
- Transformation
- Loading
The OETL Design Pattern
Short for Orchestrated Extract-Transform-Load is a pattern that takes the ideas behind variations of the Model-View-Whatever design pattern
The Orchestrator is responsible for conducting the interactions between the Extractor -> Transformer -> Loader.
The Ochestrator reads data from the Extractor then uses the result as a parameter to calling the Transformer and saves the transformed result into the Loader. The Transformer can be optional as there are scenarios where data transformation is not needed, i.e. raw data ingestion to the landing zone (bronze)
Each layer may have a single or multiple implementations, and this is handled automatically in the Orchestrator
In Python, an example of an Orchestrator with single implementations of the Extractor, Transformer, and Loader would look something like this:
class Orchestrator:
def __init__(self,
extractor: Extractor,
transformer: Transformer,
loader: Loader):
self.loader = loader
self.transformer = transformer
self.extractor = extractor
def execute(self):
df = self.extractor.read()
df = self.transformer.process(df)
self.loader.save(df)
ATC-DataPlatform
A framework for this design pattern is implemented in a Python Library called atc-dataplatform available from PyPi
pip install atc-dataplatform
Orchestration Fluent Interface
atc-dataplatform provides common simple implementations and base classes for implementing the OETL design pattern.
To simplify object construction, this library provides the Orchestrator fluent interface from atc.etl
from atc.etl import Extractor, Transformer, Loader, Orchestrator
(Orchestrator()
.extract_from(Extractor())
.transform_with(Transformer())
.load_into(Loader())
.execute())
Usage examples:
Here are some example usages and implementations of the ETL class provided
Example-1
Here’s an example of reading data from a single location, transforming it once and saving to a single destination. This is the most simple elt case, and will be used as base for the below more complex examples.
import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import IntegerType
from atc.etl import Extractor, Transformer, Loader, Orchestrator
from atc.spark import Spark
class GuitarExtractor(Extractor):
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[
("1", "Fender", "Telecaster", "1950"),
("2", "Gibson", "Les Paul", "1959"),
("3", "Ibanez", "RG", "1987"),
]
),
"""
id STRING,
brand STRING,
model STRING,
year STRING
""",
)
class BasicTransformer(Transformer):
def process(self, df: DataFrame) -> DataFrame:
print("Current DataFrame schema")
df.printSchema()
df = df.withColumn("id", f.col("id").cast(IntegerType()))
df = df.withColumn("year", f.col("year").cast(IntegerType()))
print("New DataFrame schema")
df.printSchema()
return df
class NoopLoader(Loader):
def save(self, df: DataFrame) -> None:
df.write.format("noop").mode("overwrite").save()
df.printSchema()
df.show()
print("ETL Orchestrator using a single simple transformer")
etl = (
Orchestrator()
.extract_from(GuitarExtractor())
.transform_with(BasicTransformer())
.load_into(NoopLoader())
)
etl.execute()
The code above produces the following output:
Original DataFrame schema
root
|-- id: string (nullable = true)
|-- brand: string (nullable = true)
|-- model: string (nullable = true)
|-- year: string (nullable = true)
New DataFrame schema
root
|-- id: integer (nullable = true)
|-- brand: string (nullable = true)
|-- model: string (nullable = true)
|-- year: integer (nullable = true)
+---+------+----------+----+
| id| brand| model|year|
+---+------+----------+----+
| 1|Fender|Telecaster|1950|
| 2|Gibson| Les Paul|1959|
| 3|Ibanez| RG|1987|
+---+------+----------+----+
Example-2
Here’s an example of having multiple Transformer
implementations that is reused to change the data type of a given column,
where the column name is parameterized.
import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from atc.etl import Extractor, Transformer, Loader, Orchestrator
from atc.spark import Spark
class GuitarExtractor(Extractor):
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[
("1", "Fender", "Telecaster", "1950"),
("2", "Gibson", "Les Paul", "1959"),
("3", "Ibanez", "RG", "1987"),
]
),
StructType(
[
StructField("id", StringType()),
StructField("brand", StringType()),
StructField("model", StringType()),
StructField("year", StringType()),
]
),
)
class IntegerColumnTransformer(Transformer):
def __init__(self, col_name: str):
super().__init__()
self.col_name = col_name
def process(self, df: DataFrame) -> DataFrame:
df = df.withColumn(self.col_name, f.col(self.col_name).cast(IntegerType()))
return df
class NoopLoader(Loader):
def save(self, df: DataFrame) -> None:
df.write.format("noop").mode("overwrite").save()
df.printSchema()
df.show()
print("ETL Orchestrator using multiple transformers")
etl = (
Orchestrator()
.extract_from(GuitarExtractor())
.transform_with(IntegerColumnTransformer("id"))
.transform_with(IntegerColumnTransformer("year"))
.load_into(NoopLoader())
)
etl.execute()
Example-3
Here’s an example of having multiple Extractor
implementations and applying transformations using
the process_many
method.
The read()
function in Extractor
will return a dictionary that uses the type name of the Extractor
as the key, and a DataFrame
as its value, the used kan can be overridden in the constructor.
Transformer
provides the function process_many(dataset: {})
and returns a single DataFrame
.
import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, StringType
from atc.etl import Extractor, Loader, Orchestrator, Transformer
from atc.spark import Spark
class AmericanGuitarExtractor(Extractor):
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[
("1", "Fender", "Telecaster", "1950"),
("2", "Gibson", "Les Paul", "1959"),
]
),
StructType(
[
StructField("id", StringType()),
StructField("brand", StringType()),
StructField("model", StringType()),
StructField("year", StringType()),
]
),
)
class JapaneseGuitarExtractor(Extractor):
def __init__(self):
super().__init__(dataset_key="japanese")
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[("3", "Ibanez", "RG", "1987"), ("4", "Takamine", "Pro Series", "1959")]
),
StructType(
[
StructField("id", StringType()),
StructField("brand", StringType()),
StructField("model", StringType()),
StructField("year", StringType()),
]
),
)
class CountryOfOriginTransformer(Transformer):
def process_many(self, dataset: {}) -> DataFrame:
usa_df = dataset["AmericanGuitarExtractor"].withColumn("country", f.lit("USA"))
jap_df = dataset["japanese"].withColumn("country", f.lit("Japan"))
return usa_df.union(jap_df)
class NoopLoader(Loader):
def save(self, df: DataFrame) -> None:
df.write.format("noop").mode("overwrite").save()
df.printSchema()
df.show()
print("ETL Orchestrator using multiple extractors")
etl = (
Orchestrator()
.extract_from(AmericanGuitarExtractor())
.extract_from(JapaneseGuitarExtractor())
.transform_with(CountryOfOriginTransformer())
.load_into(NoopLoader())
)
etl.execute()
The code above produces the following output:
root
|-- id: string (nullable = true)
|-- brand: string (nullable = true)
|-- model: string (nullable = true)
|-- year: string (nullable = true)
|-- country: string (nullable = false)
+---+--------+----------+----+-------+
| id| brand| model|year|country|
+---+--------+----------+----+-------+
| 1| Fender|Telecaster|1950| USA|
| 2| Gibson| Les Paul|1959| USA|
| 3| Ibanez| RG|1987| Japan|
| 4|Takamine|Pro Series|1959| Japan|
+---+--------+----------+----+-------+
Example-4
Here’s an example of data raw ingestion without applying any transformations.
from pyspark.sql import DataFrame
from atc.etl import Extractor, Loader, Orchestrator
from atc.spark import Spark
class GuitarExtractor(Extractor):
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[
("1", "Fender", "Telecaster", "1950"),
("2", "Gibson", "Les Paul", "1959"),
("3", "Ibanez", "RG", "1987"),
]
),
"""id STRING, brand STRING, model STRING, year STRING""",
)
class NoopLoader(Loader):
def save(self, df: DataFrame) -> None:
df.write.format("noop").mode("overwrite").save()
df.printSchema()
df.show()
print("ETL Orchestrator with no transformations")
etl = Orchestrator().extract_from(GuitarExtractor()).load_into(NoopLoader())
etl.execute()
Example-5
Here’s an example of having multiple Loader
implementations that is writing the transformed data into multiple destinations.
import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from atc.etl import Extractor, Transformer, Loader, Orchestrator
from atc.spark import Spark
class GuitarExtractor(Extractor):
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[
("1", "Fender", "Telecaster", "1950"),
("2", "Gibson", "Les Paul", "1959"),
("3", "Ibanez", "RG", "1987"),
]
),
StructType(
[
StructField("id", StringType()),
StructField("brand", StringType()),
StructField("model", StringType()),
StructField("year", StringType()),
]
),
)
class BasicTransformer(Transformer):
def process(self, df: DataFrame) -> DataFrame:
print("Current DataFrame schema")
df.printSchema()
df = df.withColumn("id", f.col("id").cast(IntegerType()))
df = df.withColumn("year", f.col("year").cast(IntegerType()))
print("New DataFrame schema")
df.printSchema()
return df
class NoopSilverLoader(Loader):
def save(self, df: DataFrame) -> None:
df.write.format("noop").mode("overwrite").save()
class NoopGoldLoader(Loader):
def save(self, df: DataFrame) -> None:
df.write.format("noop").mode("overwrite").save()
df.printSchema()
df.show()
print("ETL Orchestrator using multiple loaders")
etl = (
Orchestrator()
.extract_from(GuitarExtractor())
.transform_with(BasicTransformer())
.load_into(NoopSilverLoader())
.load_into(NoopGoldLoader())
)
etl.execute()
Example-6
Using Example-2, Example-3 and Example-5 as reference,
any combinations for single/multiple implementations of Extractor
, Transformer
or Loader
can be created.
Here’s an example of having both multiple Extractor
, Transformer
and Loader
implementations.
It is important that the first transformer is a MultiInputTransformer
when having multiple extractors.
import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from atc.etl import Extractor, Transformer, Loader, Orchestrator
from atc.spark import Spark
class AmericanGuitarExtractor(Extractor):
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[
("1", "Fender", "Telecaster", "1950"),
("2", "Gibson", "Les Paul", "1959"),
]
),
StructType(
[
StructField("id", StringType()),
StructField("brand", StringType()),
StructField("model", StringType()),
StructField("year", StringType()),
]
),
)
class JapaneseGuitarExtractor(Extractor):
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[("3", "Ibanez", "RG", "1987"), ("4", "Takamine", "Pro Series", "1959")]
),
StructType(
[
StructField("id", StringType()),
StructField("brand", StringType()),
StructField("model", StringType()),
StructField("year", StringType()),
]
),
)
class CountryOfOriginTransformer(Transformer):
def process_many(self, dataset: {}) -> DataFrame:
usa_df = dataset["AmericanGuitarExtractor"].withColumn("country", f.lit("USA"))
jap_df = dataset["JapaneseGuitarExtractor"].withColumn(
"country", f.lit("Japan")
)
return usa_df.union(jap_df)
class BasicTransformer(Transformer):
def process(self, df: DataFrame) -> DataFrame:
print("Current DataFrame schema")
df.printSchema()
df = df.withColumn("id", f.col("id").cast(IntegerType()))
df = df.withColumn("year", f.col("year").cast(IntegerType()))
print("New DataFrame schema")
df.printSchema()
return df
class NoopSilverLoader(Loader):
def save(self, df: DataFrame) -> None:
df.write.format("noop").mode("overwrite").save()
class NoopGoldLoader(Loader):
def save(self, df: DataFrame) -> None:
df.write.format("noop").mode("overwrite").save()
df.printSchema()
df.show()
print("ETL Orchestrator using multiple loaders")
etl = (
Orchestrator()
.extract_from(AmericanGuitarExtractor())
.extract_from(JapaneseGuitarExtractor())
.transform_with(CountryOfOriginTransformer())
.transform_with(BasicTransformer())
.load_into(NoopSilverLoader())
.load_into(NoopGoldLoader())
)
etl.execute()
Example-7
This example illustrates the use of an orchestrator as just another ETL step. The principle is called composit orchestration:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import DataFrame
from pyspark.sql.types import IntegerType
from atc.etl import Extractor, Transformer, Loader, Orchestrator, dataset_group
from atc.spark import Spark
class AmericanGuitarExtractor(Extractor):
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[
("1", "Fender", "Telecaster", "1950"),
("2", "Gibson", "Les Paul", "1959"),
]
),
T.StructType(
[
T.StructField("id", T.StringType()),
T.StructField("brand", T.StringType()),
T.StructField("model", T.StringType()),
T.StructField("year", T.StringType()),
]
),
)
class JapaneseGuitarExtractor(Extractor):
def read(self) -> DataFrame:
return Spark.get().createDataFrame(
Spark.get().sparkContext.parallelize(
[
("3", "Ibanez", "RG", "1987"),
("4", "Takamine", "Pro Series", "1959"),
]
),
T.StructType(
[
T.StructField("id", T.StringType()),
T.StructField("brand", T.StringType()),
T.StructField("model", T.StringType()),
T.StructField("year", T.StringType()),
]
),
)
class CountryOfOriginTransformer(Transformer):
def process_many(self, dataset: dataset_group) -> DataFrame:
usa_df = dataset["AmericanGuitarExtractor"].withColumn("country", F.lit("USA"))
jap_df = dataset["JapaneseGuitarExtractor"].withColumn("country", F.lit("Japan"))
return usa_df.union(jap_df)
class OrchestratorLoader(Loader):
def __init__(self, orchestrator: Orchestrator):
super().__init__()
self.orchestrator = orchestrator
def save_many(self, datasets: dataset_group) -> None:
self.orchestrator.execute(datasets)
class NoopLoader(Loader):
def save(self, df: DataFrame) -> None:
df.write.format("noop").mode("overwrite").save()
df.printSchema()
df.show()
print("ETL Orchestrator using composit innter orchestrator")
etl_inner = (
Orchestrator()
.transform_with(CountryOfOriginTransformer())
.load_into(NoopLoader())
)
etl_outer = (
Orchestrator()
.extract_from(AmericanGuitarExtractor())
.extract_from(JapaneseGuitarExtractor())
.load_into(OrchestratorLoader(etl_inner))
)
etl_outer.execute()
If you find this interesting then you should definitely check out atc-dataplatform on Github
AppCenter Extensions for ASP.NET Core and Application Insights
In my previous post, I wrote about an open source project called AppCenterExtensions available at Github and nuget.org. I recently updated this project and added a few components for ASP.NET Core that enables including AppCenter diagnostic information in Application Insights.
The NuGet package is called AppCenterExtensions.AppInsights and contains extension methods and ITelemetryInitializer implementations to be used in a ASP.NET Core web app for including AppCenter diagnostic information when logging to Application Insights
Enabling this is easy. Assuming that the project is already configured to use Application Insights, just add the AppCenterExtensions.AppInsights NuGet package mentioned above to your ASP.NET Core and call services.AddAppCenterTelemetry() in the ConfigureServices method of the Startup class
Here’s an example:
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; }
public void ConfigureServices(IServiceCollection services)
{
// Configure and register services to the IoC
services.AddApplicationInsightsTelemetry();
services.AddAppCenterTelemetry();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
// Configure app
}
}
Once this is setup, AppCenter diagnostic information should now be searchable and visible in Application Insights.
Here’s a screenshot of search results for the x-supportkey header
and here’s a screenshot of the details of a single request containing AppCenter diagnostic information logged in Application Insights
With this flow you can now correlate Crash Reports and Analytics data from AppCenter with the HTTP requests for your backend systems in Application Insights. In the systems that I have been involved with building we include the AppCenter diagnostic information from our API Gateway to all calls to our internal Microservices