You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

113 lines
5.7 KiB

2 months ago
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use App\Models\DAG;
use App\Models\dag_run;
use App\Models\TaskInstance;
use App\Models\dag_event;
use App\Models\Airflow;
use Illuminate\Support\Facades\Http;
class SyncAirflowData extends Command
{
protected $signature = 'airflow:sync-data';
protected $description = 'Синхронизация данных DAG и задач из Airflow';
public function handle()
{
// Получаем все активные кластеры Airflow из базы данных
$airflows = Airflow::where('is_active', true)->get();
foreach ($airflows as $airflow) {
$this->info("Синхронизация данных для кластера: {$airflow->name}");
// Получаем DAG'и, привязанные к этому кластеру
$dags = DAG::where('airflow_id', $airflow->id)->get();
foreach ($dags as $dag) {
// Получаем DAG Runs из Airflow
$response = Http::withBasicAuth($airflow->username, $airflow->password)
->get("{$airflow->url}/dags/{$dag->name}/dagRuns");
if ($response->ok()) {
$dagRunsData = $response->json()['dag_runs'];
foreach ($dagRunsData as $dagRunData) {
// Обновляем или создаем DAGRun
$dagRun = dag_run::updateOrCreate(
['run_id' => $dagRunData['dag_run_id']],
[
'dag_id' => $dag->id,
'airflow_id' => $airflow->id, // Добавляем связь с кластером Airflow
'status' => $dagRunData['state'],
'execution_date' => $dagRunData['execution_date'],
'start_date' => $dagRunData['start_date'] ?? null,
'end_date' => $dagRunData['end_date'] ?? null,
'queue' => null,
]
);
// Получаем Task Instances для этого DAGRun
$taskInstancesResponse = Http::withBasicAuth($airflow->username, $airflow->password)
->get("{$airflow->url}/dags/{$dag->name}/dagRuns/{$dagRun->run_id}/taskInstances");
if ($taskInstancesResponse->ok()) {
$taskInstancesData = $taskInstancesResponse->json()['task_instances'];
foreach ($taskInstancesData as $taskInstanceData) {
// Обновляем или создаем TaskInstance
TaskInstance::updateOrCreate(
[
'dag_id' => $dag->id,
'dag_run_id' => $dagRun->id,
'task_id' => $taskInstanceData['task_id'],
],
[
'airflow_id' => $airflow->id, // Добавляем связь с кластером Airflow
'state' => $taskInstanceData['state'],
'execution_date' => $taskInstanceData['execution_date'],
'start_date' => $taskInstanceData['start_date'] ?? null,
'end_date' => $taskInstanceData['end_date'] ?? null,
'metadata' => $taskInstanceData,
]
);
// Логирование событий задач
dag_event::create([
'dag_id' => $dag->id,
'dag_run_id' => $dagRun->id,
'airflow_id' => $airflow->id, // Добавляем связь с кластером Airflow
'event_type' => 'task_' . $taskInstanceData['state'],
'message' => "Задача {$taskInstanceData['task_id']} находится в состоянии {$taskInstanceData['state']}",
'metadata' => $taskInstanceData,
]);
}
}
// Логирование событий DAGRun
dag_event::updateOrCreate(
[
'dag_id' => $dag->id,
'dag_run_id' => $dagRun->id,
'airflow_id' => $airflow->id, // Добавляем связь с кластером Airflow
'event_type' => $dagRunData['state'],
],
[
'message' => "DAGRun {$dagRun->run_id} находится в состоянии {$dagRunData['state']}",
'metadata' => $dagRunData,
]
);
}
} else {
$this->error("Не удалось получить данные DAGRun для DAG {$dag->name} в кластере {$airflow->name}");
}
}
}
$this->info('Синхронизация данных Airflow завершена.');
}
}