Tenho arquivos em FTP e eles precisam ser copiados para o armazenamento de blobs.
Ao copiar os arquivos para o armazenamento de blob, preciso obter um status da atividade de cópia em um formato de tabela com os detalhes abaixo.
Vamos supor que temos 3 arquivos no FTP. Abaixo está como eu espero que a saída seja.
S.Não | Nome do arquivo | Mensagem de erro | Copiar status |
---|---|---|---|
1 | Arquivo 1 | N / D | Bem-sucedido |
2 | Arquivo 2 | N / D | Bem-sucedido |
3 | Arquivo 3 | exibir mensagem de erro | Fracassado |
Eu tentei o abaixo.
{
"name": "ABC_copy2",
"properties": {
"activities": [
{
"name": "Get Metadata",
"type": "GetMetadata",
"dependsOn": [
{
"activity": "Initialize counter to 1 for SNo 2x",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "PH_FTP2_Dir",
"type": "DatasetReference",
"parameters": {
"foldername": "ABC"
}
},
"fieldList": [
"childItems"
],
"storeSettings": {
"type": "FtpReadSettings",
"recursive": true,
"enablePartitionDiscovery": false,
"useBinaryTransfer": true,
"disableChunking": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "ForEach_ABC",
"type": "ForEach",
"dependsOn": [
{
"activity": "Get Metadata",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('Get Metadata').output.childItems",
"type": "Expression"
},
"activities": [
{
"name": "Move to Blobs",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"storeSettings": {
"type": "FtpReadSettings",
"recursive": true,
"enablePartitionDiscovery": false,
"useBinaryTransfer": true,
"disableChunking": true
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
},
"sink": {
"type": "DelimitedTextSink",
"storeSettings": {
"type": "AzureBlobStorageWriteSettings"
},
"formatSettings": {
"type": "DelimitedTextWriteSettings",
"quoteAllText": true,
"fileExtension": ".txt"
}
},
"enableStaging": false,
"enableSkipIncompatibleRow": true,
"logSettings": {
"enableCopyActivityLog": true,
"copyActivityLogSettings": {
"logLevel": "Info",
"enableReliableLogging": false
},
"logLocationSettings": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage_dir",
"type": "LinkedServiceReference"
},
"path": "dir"
}
},
"translator": {
"type": "TabularTranslator",
"typeConversion": true,
"typeConversionSettings": {
"allowDataTruncation": false,
"treatBooleanAsNumber": false,
"dateTimeFormat": "yyyy-MM-dd HH:mm:ss.SSS"
}
}
},
"inputs": [
{
"referenceName": "PH_FTP2_CopyActivity",
"type": "DatasetReference",
"parameters": {
"Filename": {
"value": "@item().name",
"type": "Expression"
},
"Foldername": "ABC"
}
}
],
"outputs": [
{
"referenceName": "PH_AzureStorage",
"type": "DatasetReference",
"parameters": {
"filename": {
"value": "@item().name",
"type": "Expression"
},
"foldername": "inputfiles-ABC"
}
}
]
},
{
"name": "Cpy blob Activity Success Status",
"type": "AppendVariable",
"dependsOn": [
{
"activity": "Move to Blobs",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "cpy_blob_status",
"value": {
"value": "@json(concat('{','\"S.No\":',variables('counter'),',\"File Name\":\"',item().Name,'\",\"Copy Status to Blobs\":\"Copied Successfully\"}'))",
"type": "Expression"
}
}
},
{
"name": "Cpy blob Activity Failure Status",
"type": "AppendVariable",
"dependsOn": [
{
"activity": "Move to Blobs",
"dependencyConditions": [
"Failed"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "cpy_blob_status",
"value": {
"value": "@json(concat('{','\"S.No\":',variables('counter'),',\"File Name\":\"',item().Name,'\",\"Copy Status to Blobs\":\"Failed\",\"Error\":\"',activity('Move to Blobs').output.errors[0].Message,'\"}'))\n",
"type": "Expression"
}
}
},
{
"name": "increment and store in temp 2x",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Cpy blob Activity Success Status",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "temp",
"value": {
"value": "@string(add(int(variables('counter')),1))",
"type": "Expression"
}
}
},
{
"name": "store temp to counter 2x",
"type": "SetVariable",
"dependsOn": [
{
"activity": "increment and store in temp 2x",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "counter",
"value": {
"value": "@variables('temp')",
"type": "Expression"
}
}
},
{
"name": "increment and store in temp 2x_F",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Cpy blob Activity Failure Status",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "temp",
"value": {
"value": "@string(add(int(variables('counter')),1))",
"type": "Expression"
}
}
},
{
"name": "store temp to counter 2x_F",
"type": "SetVariable",
"dependsOn": [
{
"activity": "increment and store in temp 2x_F",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "counter",
"value": {
"value": "@variables('temp')",
"type": "Expression"
}
}
}
]
}
},
{
"name": "Initialize counter to 1 for SNo 2x",
"type": "SetVariable",
"dependsOn": [],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "counter",
"value": "1"
}
},
{
"name": "Set Cpy_Blob_Status_final",
"type": "SetVariable",
"dependsOn": [
{
"activity": "ForEach_ABC",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "cpy_blob_status_final",
"value": {
"value": "@variables('cpy_blob_status')",
"type": "Expression"
}
}
}
],
"variables": {
"counter": {
"type": "String"
},
"temp": {
"type": "String"
},
"cpy_blob_status": {
"type": "Array"
},
"cpy_blob_status_final": {
"type": "Array"
}
},
"annotations": [],
"lastPublishTime": "2023-08-17T08:35:26Z"
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
Mas o S.No não está sendo incrementado como esperado .
Sua abordagem está correta, mas observei em seu pipeline JSON que você não habilitou o Sequential na atividade ForEach.
Certifique-se de verificar o sequencial nele.
If its not enabled, every iteration would run parallelly and they will take the first initialized value i.e,
1
and won't take the updated values from previous iterations.So, Enabling Sequential allows the iterations to run one after another and each iteration will take the updated Serial Numbers from the previous runs(
1,2,3
).