Nokia and Du deliver residential broadband services over fixed
Jun 13, 2023IoT News
Jun 07, 2023Today's deals: $99 AirPods, $22 Fire Stick, $39 Ring Video Doorbell, eero WiFi router sale, more
May 22, 2023Positive Technologies helps fix vulnerabilities in routers and other Zyxel devices
Apr 30, 2023Broadband CPE Market 2023 Growth Analysis, Opportunities, Trends and Developments to 2030
Apr 24, 2023Creating source-aligned data products in Adevinta Spain - Adevinta
Our product vision for the Data Platform team in Adevinta in Spain is centred on empowering every Adevintan to consume and share high-quality purposeful data to make decisions faster. It aims to maximise the value generated from data throughout the entire value chain, promoting autonomous value extraction and expediting the generation of data products through robust quality processes and governance automation. In order to achieve this we set a goal defined as: Improve process, responsibilities and tooling to ensure data quality.
This goal is built on four main principles:
Focusing on data governance by design, we define it as being the team owner of a business domain who is the owner of the data produced in it. They are also accountable for publishing that data in the data platform and the metadata in the catalogue.
To support this definition, the data platform team and our users, developed a solution based on data contracts. This approach maintains producer ownership while ensuring governance and seamless access for consumers through automation.
Before we implemented data contracts, data integration was being carried out by the Data Platform team, along with Data Engineers from each marketplace. These teams worked together to integrate each new source in the platform with an almost individual process to fulfil the needs of the analytics teams.
The diagram below shows how this process used to work. As you can see, for most of the sources from the different marketplaces, a microservice pushes the data as json to a Kafka topic. The data platform loads from those topics, checking schema references via a URL or an API. Even though there is a schema, the process lacks key features such as versioning, runtime contract checks, SLOs and governance metadata. So we found ourselves dealing with breaking schema changes on a daily basis.
We defined several goals to implement data contracts for ingestion.
Data product oriented: We aim to provide more than just a dataset in S3 and the metastore. We want the whole data product experience. This means delivering consumable output designed to solve a business problem. In order to be effective it should be trustable, discoverable, well-known and versioned. The data contract will include all necessary information to meet these needs. Each output table will be versioned to prevent schema changes from affecting usability.
Ownership: Data should be pushed by the sources, organised and scheduled by the data producers, so they can hold accountability and ownership of them.
Agreement: The contract will serve as an agreement between the producer, who defines how the data is stored and how it will be processed, and the consumer, who needs knowledge, enablement and access to the data. The consumer should be able to gather all this information from the contract.
Automation: We aim to minimise human interaction as much as possible, letting the producer and consumer focus on the contract, which holds all the necessary information. The contract will allow a process to consume data from the source and make it available for the consumer.
Standardisation: Data is loaded each time in the same standardised way. A single app is in charge of loading all kinds of data, with some minor adjustments. This will make it easier to evolve the underlying technology without causing disruption to the business.
Safe and secure: There will be a focus on privacy management. Removing the personal fields with no analytical value, while ingesting the ones with value into separated tables. This method ensures that reading rights are given only to the authorised consumers while making it easier to handle GDPR requirements, such as the right to be forgotten
A data contract is an agreement between a data producer, data consumers and a third party responsible for programmatic standardisation. It captures expectations around schema life cycle, semantic metadata, security and compliance requirements, quality specifications, SLOs/SLAs and other forms of data governance. A data contract for ingestion in Adevinta Spain is an immutable json file stored in a repository in GitHub containing the following fields:
Once defined in another GitHub repository, schema is accessible through URL. Schema versions are X.Y where X represents the major and Y the minor version.
Each field indicates if it is required for analytical purposes or not. This prevents the process from loading unnecessary data.
Once the contract is defined, we need to schedule its consumption. In the first MVP, the Data Platform team, as a facilitator, was responsible for building an airflow DAG for each contract. This worked well for the MVP but it took around five days from the contract definition to get the data available for consumption.
We automated the process, by creating the DAG through the GitHub workflow described above. This workflow is called once the contract is deployed.
This workflow will perform several steps (checkout repo, install Python) and finally run a sh script that will group modified contracts by source, a logical aggregation of them defined by the user.
Once aggregated a Python process is called, and based on a template will create the DAG itself. Some fields from the contract, such as periodicity and execution_time will be used to fill the DAG scheduling information. Each contract from the same source will be a task in the resulting DAG.
This automation speeds up the Time to Effective Data (TtED), representing the time that elapses from the data contract being promoted to production to the time the data is available in the data lake, from around 5 days to around 30 minutes.
Contract creation is easy but repetitive, so we were able to help the users by implementing some automations to propose a contract each time a schema is defined.
The schema is defined as a Json file in a GitHub repository. This process and repository has existed for a long time in Adevinta, we are simply using it more to apply data contracts to ensure governance and compliance.
Once the user, creates the definition they trigger a github workflow that will create a contract proposal:
The contract proposal workflow will first detect which schemas were added. A schema is never modified. If the schema must evolve, a new schema will be created with a minor or major version increase depending on whether the change is forward-compatible or not. This ensures that an event with a given version will always remain with the same schema, guaranteeing compatibility with the destination table as explained in the following chapter “Ingestion process”.
The workflow will then launch a Python script that first checks if the event’s schema represents a new event (version 1) or an updated version of an existing event. The process will split depending on that check.
If it is the first version, the process will use a template, creating a simple contract. It will perform several queries to Adevinta repositories in order to fill information in the contract proposal. Many fields that cannot be known at the time will be left for the user to fill. For instance which of the fields contain personal data and SLOs. A templated contract proposal is shown below:
If the version is higher than one, the process will look for a contract matching the schema name to copy. It will select the latest one, and copy it with the new version, while, at the same time, expiring the previous contract.
In both cases, a pull request will be raised for the user to review. In the first case, the user will need to complete several fields. In the second, the user will only need to modify the proposal if a new personal data field was included or if some other information has changed (execution hour for instance).
Despite this automation, the data producer can still create a contract manually if needed. But the idea is for them to focus on the business value and data definition, creating only the schema and filling some fields inside the contract. Everything (contract proposal, deployment and scheduling) should then run automatically until the data is available and ready to be exploited in the data lake.
Once the contract is created and the DAG is deployed and activated, it will trigger as defined in the contract (hourly or daily at 8am for instance). Then a Spark process written in Scala will run for each of the contracts with the following parameters:
As the first step, the Spark process will look for all contracts matching source and name in the S3 bucket where they are stored. It will load only the contracts valid as per the start_ts and end_ts received as parameter. This is done because an event may have several versions alive at the same time.
It then connects to the Kafka topic defined in the contract and reads data for the given time frame (start_ts -> end_ts), filtering the read data by two clauses:
Then the data is loaded, and it’s time to clean it up. The consumer will check for personal fields in the contract, separating them to another dataFrame if they have analytical purposes or removing them directly otherwise. See the GDPR management section of this article for more information.
There is no other transformation apart from the add/delete of the column explained above. The process also lacks any kind of schema evolution thus relying on all the evolution logic described in the contract lifecycle section. Once the data is split between personal and non-personal it will be stored in the proper table partitioned by processingTime, thus ensuring idempotency and allowing consistent retries.
Many times, when ingesting data to a data platform, we fall under the garbage-in-garbage-out paradigm, where we store as much data as possible regardless of its analytical value. This approach may avoid schema evolutions and backfills in the future. However, it also ingests more data than needed, increasing storage demands and, more importantly, management and maintenance costs.
Another issue when ingesting data to a data platform is that personal fields span across several database tables, making it difficult to confine access to authorised users. This complexity increases the time and cost of maintaining compliance, such as the right to be forgotten.
With that in mind, we defined the main goals of the GDPR management in this ingestion process as:
The first goal is achieved by defining in the contract if a personal field has analytical value and removing any that don’t.
The second goal is based on a personal table per website whose schema evolves only with the approval of the security office. So if a producer has a personal field that is not yet present in said table, an assessment must take place to ensure that the field has analytical value.
Here are the details of the personal data related fields defined in the contract:
This way, if the input data comes with personal data, two output tables will be updated (regular data and personal data). This raises a new problem: we need to make the whole write atomic. This means that if the write on the personal data fails we need to rollback both that write and the one for the regular data. In order to do that, we take advantage of the “time travel” feature from Delta Tables.
As the first step, we read the current version in the Delta history for the regular data table, before writing to that table. If there is a failure there, it will rollback thanks to Delta atomicity and the process will finish with error. If, on the other hand, it finishes successfully, we’ll go forward to write to the personal data table, capturing any error if it fails so we can time travel to the previous version in the regular data table before finishing the process.
When there is an error, both the producer and the consumer want to be aware of it, as they may need to fix it or manage the delay of the dependent processes respectively. The Data Platform team, as facilitators, need to be aware of errors in case some kind of bug in the code was promoted to production or if there is any issue with the underlying infrastructure.
In order to help the producer to analyse, find and fix the error, the process will write the input data in a special table called Quarantine.
The Quarantine table contains the following fields:
This table allows the producers to check if the error is due to something related to the source code, to the contract definition or some other issue. Knowing the root cause they can then perform the necessary actions to fix it, whether they are correcting the source, fixing the contract, modifying the schema or something else as required.
Once the error cause is known and fixed, the process can be rerun. Quarantine data is not re-ingested in any case, it’s only there for debugging purposes.
In order to keep track of the errors, data volumetry and everything that may affect data availability there are several processes in place.
First if an Airflow DAG fails, an alert will be sent to a Slack channel, so both the producers, consumers and Data Platform team as facilitators can be aware of it, check it and deal with it in the proper way.
Second, there is a Grafana dashboard to monitor the process. It’s a public dashboard, so anyone in the company can check it, both interested producers and consumers, as well as Data Platform team members as facilitators and builders of the process.
There are three sections in the dashboard:
Errors: Here, the errors raised during the process will be shown. As said before, each error will result in the data being loaded in the Quarantine zone and an alert being sent to a Slack channel.
With this solution, we achieved most of the goals we defined at the beginning of the project:
Improve process, responsibilities and tooling to ensure data quality.for ingestion.Data product oriented: OwnershipAgreementAutomation: StandardisationSafe and secure:Process monitorTable management: Errors: consumable outputproperly versionedallows the consumer to know how to consumePersonal fields are separated from the data,single code to ingest several sources of data in a standardised way.so there is no risk to break the process swiftly detect errorsThe Producer is the owner of the whole process, without intervention from the facilitator team