我在 FTP 中有文件,需要将其复制到 Blob 存储。
将文件复制到 blob 存储时,我需要以表格格式获取复制活动的状态,并包含以下详细信息。
假设 FTP 中有 3 个文件。以下是我期望的输出。
序列号 | 文件名 | 错误信息 | 复制状态 |
---|---|---|---|
1 | 文件1 | 不适用 | 成功的 |
2 | 文件2 | 不适用 | 成功的 |
3 | 文件3 | 显示错误信息 | 失败的 |
我尝试了以下。
{
"name": "ABC_copy2",
"properties": {
"activities": [
{
"name": "Get Metadata",
"type": "GetMetadata",
"dependsOn": [
{
"activity": "Initialize counter to 1 for SNo 2x",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataset": {
"referenceName": "PH_FTP2_Dir",
"type": "DatasetReference",
"parameters": {
"foldername": "ABC"
}
},
"fieldList": [
"childItems"
],
"storeSettings": {
"type": "FtpReadSettings",
"recursive": true,
"enablePartitionDiscovery": false,
"useBinaryTransfer": true,
"disableChunking": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
}
},
{
"name": "ForEach_ABC",
"type": "ForEach",
"dependsOn": [
{
"activity": "Get Metadata",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@activity('Get Metadata').output.childItems",
"type": "Expression"
},
"activities": [
{
"name": "Move to Blobs",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"storeSettings": {
"type": "FtpReadSettings",
"recursive": true,
"enablePartitionDiscovery": false,
"useBinaryTransfer": true,
"disableChunking": true
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
},
"sink": {
"type": "DelimitedTextSink",
"storeSettings": {
"type": "AzureBlobStorageWriteSettings"
},
"formatSettings": {
"type": "DelimitedTextWriteSettings",
"quoteAllText": true,
"fileExtension": ".txt"
}
},
"enableStaging": false,
"enableSkipIncompatibleRow": true,
"logSettings": {
"enableCopyActivityLog": true,
"copyActivityLogSettings": {
"logLevel": "Info",
"enableReliableLogging": false
},
"logLocationSettings": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage_dir",
"type": "LinkedServiceReference"
},
"path": "dir"
}
},
"translator": {
"type": "TabularTranslator",
"typeConversion": true,
"typeConversionSettings": {
"allowDataTruncation": false,
"treatBooleanAsNumber": false,
"dateTimeFormat": "yyyy-MM-dd HH:mm:ss.SSS"
}
}
},
"inputs": [
{
"referenceName": "PH_FTP2_CopyActivity",
"type": "DatasetReference",
"parameters": {
"Filename": {
"value": "@item().name",
"type": "Expression"
},
"Foldername": "ABC"
}
}
],
"outputs": [
{
"referenceName": "PH_AzureStorage",
"type": "DatasetReference",
"parameters": {
"filename": {
"value": "@item().name",
"type": "Expression"
},
"foldername": "inputfiles-ABC"
}
}
]
},
{
"name": "Cpy blob Activity Success Status",
"type": "AppendVariable",
"dependsOn": [
{
"activity": "Move to Blobs",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "cpy_blob_status",
"value": {
"value": "@json(concat('{','\"S.No\":',variables('counter'),',\"File Name\":\"',item().Name,'\",\"Copy Status to Blobs\":\"Copied Successfully\"}'))",
"type": "Expression"
}
}
},
{
"name": "Cpy blob Activity Failure Status",
"type": "AppendVariable",
"dependsOn": [
{
"activity": "Move to Blobs",
"dependencyConditions": [
"Failed"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "cpy_blob_status",
"value": {
"value": "@json(concat('{','\"S.No\":',variables('counter'),',\"File Name\":\"',item().Name,'\",\"Copy Status to Blobs\":\"Failed\",\"Error\":\"',activity('Move to Blobs').output.errors[0].Message,'\"}'))\n",
"type": "Expression"
}
}
},
{
"name": "increment and store in temp 2x",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Cpy blob Activity Success Status",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "temp",
"value": {
"value": "@string(add(int(variables('counter')),1))",
"type": "Expression"
}
}
},
{
"name": "store temp to counter 2x",
"type": "SetVariable",
"dependsOn": [
{
"activity": "increment and store in temp 2x",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "counter",
"value": {
"value": "@variables('temp')",
"type": "Expression"
}
}
},
{
"name": "increment and store in temp 2x_F",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Cpy blob Activity Failure Status",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "temp",
"value": {
"value": "@string(add(int(variables('counter')),1))",
"type": "Expression"
}
}
},
{
"name": "store temp to counter 2x_F",
"type": "SetVariable",
"dependsOn": [
{
"activity": "increment and store in temp 2x_F",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "counter",
"value": {
"value": "@variables('temp')",
"type": "Expression"
}
}
}
]
}
},
{
"name": "Initialize counter to 1 for SNo 2x",
"type": "SetVariable",
"dependsOn": [],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "counter",
"value": "1"
}
},
{
"name": "Set Cpy_Blob_Status_final",
"type": "SetVariable",
"dependsOn": [
{
"activity": "ForEach_ABC",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "cpy_blob_status_final",
"value": {
"value": "@variables('cpy_blob_status')",
"type": "Expression"
}
}
}
],
"variables": {
"counter": {
"type": "String"
},
"temp": {
"type": "String"
},
"cpy_blob_status": {
"type": "Array"
},
"cpy_blob_status_final": {
"type": "Array"
}
},
"annotations": [],
"lastPublishTime": "2023-08-17T08:35:26Z"
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
但 S.No 并未按预期增加。
您的方法很好,但我从您的管道 JSON 中观察到您没有在 ForEach 活动中启用顺序。
确保检查其中的顺序。
如果未启用,每个迭代将并行运行,并且它们将采用第一个初始化值,即,
1
并且不会采用先前迭代的更新值。因此,启用顺序允许迭代一个接一个地运行,并且每次迭代都将从之前的运行中获取更新的序列号(
1,2,3
)。