Zelfstudie 5: Een functieset ontwikkelen met een aangepaste bron

Met een Azure Machine Learning beheerde functieopslag kunt u functies detecteren, maken en operationeel maken. Functies fungeren als het bindweefsel in de machine learning-levenscyclus, vanaf de prototypefase, waar u experimenteer met verschillende functies. Die levenscyclus vervolgt zich in de operationalisatiefase, waar u uw modellen implementeert en inference stappen de kenmerkgegevens opzoeken. Ga voor meer informatie over functiearchieven naar de resource voor functieopslagconcepten .

Belangrijk

Azure Cache voor Redis heeft zijn tijdlijn voor buitengebruikstelling aangekondigd voor alle SKU's. We raden u aan uw bestaande Azure Cache voor Redis exemplaren zo snel mogelijk te verplaatsen naar Azure Managed Redis.

Migratierichtlijnen:

Voor meer informatie over de buitengebruikstelling:

Deel 1 van deze reeks zelfstudies liet zien hoe u een specificatie van een functieset maakt met aangepaste transformaties, materialisatie inschakelt en een backfill uitvoert. Deel 2 liet zien hoe u kunt experimenteren met functies in de experimenten en trainingsstromen. Deel 3 legde herhaalde materialisatie uit voor het transactions functieset en toonde hoe men een batch-inferentie-pijplijn kunt uitvoeren op het geregistreerde model. In deel 4 wordt beschreven hoe batchinferentie wordt uitgevoerd.

In deze handleiding leer je

  • Definieer de logica voor het laden van gegevens uit een aangepaste gegevensbron.
  • Configureer en registreer een functieset die moet worden gebruikt vanuit deze aangepaste gegevensbron.
  • Test de geregistreerde functieset.

Vereisten

Notitie

In deze zelfstudie wordt een Azure Machine Learning-notebook gebruikt met Serverloze Spark Compute.

  • Zorg ervoor dat u de vorige zelfstudies in deze reeks voltooit. In deze zelfstudie worden de feature store en andere resources die in de eerdere zelfstudies zijn gemaakt, opnieuw gebruikt.

Instellen

In deze zelfstudie wordt gebruikgemaakt van de core SDK van de Python feature store (azureml-featurestore). De Python SDK wordt gebruikt voor het maken, lezen, bijwerken en verwijderen (CRUD) van functiearchieven, onderdelensets en onderdelenarchiefentiteiten.

U hoeft deze resources niet expliciet te installeren voor deze zelfstudie, omdat in de hier weergegeven installatie-instructies het conda.yml bestand deze behandelt.

Het Azure Machine Learning Spark-notebook configureren

U kunt een nieuw notebook maken en de instructies in deze zelfstudie stapsgewijs uitvoeren. U kunt ook het bestaande notebook featurestore_sample/notebooks/sdk_only/5.Develop-feature-set-custom-source.ipynb openen en uitvoeren. Houd deze zelfstudie open en raadpleeg deze voor documentatiekoppelingen en meer uitleg.

  1. Selecteer in het bovenste menu de optie Serverless Spark Compute in de vervolgkeuzelijst Compute onder Azure Machine Learning Serverless Spark.

  2. Configureer de sessie:

    1. Selecteer Sessie configureren in de bovenste statusbalk
    2. Selecteer het tabblad Python-pakketten en selecteer Upload Conda-bestand
    3. Selecteer Conda-bestand uploaden
    4. Het conda.yml-bestand uploaden dat u in de eerste zelfstudie hebt geüpload
    5. Optioneel de time-out van de sessie (inactieve periode) verlengen om frequente herhalingen van vereiste processen te voorkomen.

De hoofdmap voor de voorbeelden instellen

In deze codecel wordt de hoofdmap voor de voorbeelden ingesteld. Het duurt ongeveer 10 minuten om alle afhankelijkheden te installeren en de Spark-sessie te starten.

import os

# Please update the dir to ./Users/{your_user_alias} (or any custom directory you uploaded the samples to).
# You can find the name from the directory structure in the left navigation panel.
root_dir = "./Users/<your_user_alias>/featurestore_sample"

if os.path.isdir(root_dir):
    print("The folder exists.")
else:
    print("The folder does not exist. Please create or fix the path")

Initialiseer de CRUD-client van de feature store-werkruimte

Initialiseer het MLClient voor de functionaliteitenopslagwerkruimte om de CRUD-bewerkingen (Create, Read, Update en Delete) voor de functionaliteitenopslagwerkruimte af te handelen.

from azure.ai.ml import MLClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

# Feature store
featurestore_name = (
    "<FEATURESTORE_NAME>"  # use the same name that was used in the tutorial #1
)
featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]

# Feature store ml client
fs_client = MLClient(
    AzureMLOnBehalfOfCredential(),
    featurestore_subscription_id,
    featurestore_resource_group_name,
    featurestore_name,
)

Initialiseer de kern-SDK-client voor de feature store

Zoals eerder vermeld, maakt deze zelfstudie gebruik van de Python feature store core SDK (azureml-featurestore). Deze geïnitialiseerde SDK-client behandelt CRUD-bewerkingen (create, read, update en delete) in functieopslagplaatsen, functiesets en entiteiten van functieopslagplaatsen.

from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

featurestore = FeatureStoreClient(
    credential=AzureMLOnBehalfOfCredential(),
    subscription_id=featurestore_subscription_id,
    resource_group_name=featurestore_resource_group_name,
    name=featurestore_name,
)

Aangepaste brondefinitie

U kunt uw eigen bronlaadlogica definiëren vanuit elke gegevensopslag met een aangepaste brondefinitie. Implementeer een door de gebruiker gedefinieerde functieklasse (UDF) van een bronprocessor (CustomSourceTransformer in deze zelfstudie) om deze functie te gebruiken. Deze klasse moet een __init__(self, **kwargs) functie en een process(self, start_time, end_time, **kwargs) functie definiëren. De kwargs woordenlijst wordt geleverd als onderdeel van de definitie van de functiesetspecificatie. Deze definitie wordt vervolgens doorgegeven aan de UDF. De parameters start_time en end_time worden berekend en doorgegeven aan de UDF-functie.

Dit is voorbeeldcode voor de UDF-klasse van de bronprocessor:

from datetime import datetime

class CustomSourceTransformer:
    def __init__(self, **kwargs):
        self.path = kwargs.get("source_path")
        self.timestamp_column_name = kwargs.get("timestamp_column_name")
        if not self.path:
            raise Exception("`source_path` is not provided")
        if not self.timestamp_column_name:
            raise Exception("`timestamp_column_name` is not provided")

    def process(
        self, start_time: datetime, end_time: datetime, **kwargs
    ) -> "pyspark.sql.DataFrame":
        from pyspark.sql import SparkSession
        from pyspark.sql.functions import col, lit, to_timestamp

        spark = SparkSession.builder.getOrCreate()
        df = spark.read.json(self.path)

        if start_time:
            df = df.filter(col(self.timestamp_column_name) >= to_timestamp(lit(start_time)))

        if end_time:
            df = df.filter(col(self.timestamp_column_name) < to_timestamp(lit(end_time)))

        return df

Een specificatie voor een functieset maken met een aangepaste bron en er lokaal mee experimenteren

Maak nu een specificatie van een functieset met een aangepaste brondefinitie en gebruik deze in uw ontwikkelomgeving om te experimenteren met de functieset. Het zelfstudienotebook dat is gekoppeld aan Serverloze Spark Compute , fungeert als de ontwikkelomgeving.

from azureml.featurestore import create_feature_set_spec
from azureml.featurestore.feature_source import CustomFeatureSource
from azureml.featurestore.contracts import (
    SourceProcessCode,
    TransformationCode,
    Column,
    ColumnType,
    DateTimeOffset,
    TimestampColumn,
)

transactions_source_process_code_path = (
    root_dir
    + "/featurestore/featuresets/transactions_custom_source/source_process_code"
)
transactions_feature_transform_code_path = (
    root_dir
    + "/featurestore/featuresets/transactions_custom_source/feature_process_code"
)

udf_featureset_spec = create_feature_set_spec(
    source=CustomFeatureSource(
        kwargs={
            "source_path": "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source-json/*.json",
            "timestamp_column_name": "timestamp",
        },
        timestamp_column=TimestampColumn(name="timestamp"),
        source_delay=DateTimeOffset(days=0, hours=0, minutes=20),
        source_process_code=SourceProcessCode(
            path=transactions_source_process_code_path,
            process_class="source_process.CustomSourceTransformer",
        ),
    ),
    feature_transformation=TransformationCode(
        path=transactions_feature_transform_code_path,
        transformer_class="transaction_transform.TransactionFeatureTransformer",
    ),
    index_columns=[Column(name="accountID", type=ColumnType.string)],
    source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
    temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
    infer_schema=True,
)

udf_featureset_spec

Definieer vervolgens een functievenster en geef de functiewaarden weer in dit functievenster.

from datetime import datetime

st = datetime(2023, 1, 1)
et = datetime(2023, 6, 1)

display(
    udf_featureset_spec.to_spark_dataframe(
        feature_window_start_date_time=st, feature_window_end_date_time=et
    )
)

Exporteren als een specificatie van een functieset

Als u de specificatie van de functieset wilt registreren bij de functiesopslag, sla die specificatie eerst op in een specifiek formaat. Controleer de specificatie van de gegenereerde transactions_custom_source functieset. Open dit bestand vanuit de bestandsstructuur om de specificatie weer te geven: featurestore/featuresets/transactions_custom_source/spec/FeaturesetSpec.yaml.

De specificatie heeft deze elementen:

  • features: Een lijst met functies en hun gegevenstypen.
  • index_columns: De joinsleutels die vereist zijn voor toegang tot waarden uit de functieset.

Ga voor meer informatie over de specificatie naar de entiteiten op hoog niveau in de beheerde functieopslag en de CLI-functieset (v2) YAML-schema's bronnen.

De persistentie van de functiesetspecificatie biedt een ander voordeel: de specificatie van de onderdelenset kan worden beheerd door de bron.

feature_spec_folder = (
    root_dir + "/featurestore/featuresets/transactions_custom_source/spec"
)

udf_featureset_spec.dump(feature_spec_folder)

De transactiefunctieset registreren bij de feature store

Gebruik deze code om een functieset te registreren die is geladen vanuit een aangepaste bron bij het functiemagazijn. U kunt die asset vervolgens opnieuw gebruiken en deze eenvoudig delen. Registratie van een functiesetasset biedt beheerde mogelijkheden, waaronder versiebeheer en materialisatie.

from azure.ai.ml.entities import FeatureSet, FeatureSetSpecification

transaction_fset_config = FeatureSet(
    name="transactions_custom_source",
    version="1",
    description="transactions feature set loaded from custom source",
    entities=["azureml:account:1"],
    stage="Development",
    specification=FeatureSetSpecification(path=feature_spec_folder),
    tags={"data_type": "nonPII"},
)

poller = fs_client.feature_sets.begin_create_or_update(transaction_fset_config)
print(poller.result())

Haal de geregistreerde kenmerken op en print de bijbehorende informatie.

# Look up the feature set by providing name and version
transactions_fset_config = featurestore.feature_sets.get(
    name="transactions_custom_source", version="1"
)
# Print feature set information
print(transactions_fset_config)

Het genereren van een features testen vanuit een geregistreerde featureset

Gebruik de to_spark_dataframe() functie van de onderdelenset om het genereren van functies uit de geregistreerde onderdelenset te testen en de functies weer te geven. print-txn-fset-sample-values

df = transactions_fset_config.to_spark_dataframe()
display(df)

U moet de geregistreerde functieset kunnen ophalen als een Spark-dataframe en deze vervolgens weergeven. U kunt deze functies nu gebruiken voor een punt-in-time-join met observatiegegevens en de volgende stappen in uw machine learning-pijplijn.

Opschonen

Als u een resourcegroep hebt gemaakt voor de zelfstudie, kunt u die resourcegroep verwijderen, waardoor alle resources die aan deze zelfstudie zijn gekoppeld, worden verwijderd. Anders kunt u de resources afzonderlijk verwijderen:

  • Als u het functiearchief wilt verwijderen, opent u de resourcegroep in de Azure-portal, selecteert u het functiearchief en verwijdert u deze.
  • De door de gebruiker toegewezen beheerde identiteit (UAI) die is toegewezen aan de werkruimte van de feature store, wordt niet verwijderd wanneer we de feature store verwijderen. Volg deze instructies om de UAI te verwijderen.
  • Als u een offline-opslag van het opslagaccounttype wilt verwijderen, opent u de resourcegroep in de Azure-portal, selecteert u de opslag die u hebt gemaakt en verwijdert u deze.
  • Als u een Azure Cache voor Redis exemplaar wilt verwijderen, opent u de resourcegroep in de Azure-portal, selecteert u het exemplaar dat u hebt gemaakt en verwijdert u deze.

Volgende stappen