You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

113 lines
5.7 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use App\Models\DAG;
use App\Models\dag_run;
use App\Models\TaskInstance;
use App\Models\dag_event;
use App\Models\Airflow;
use Illuminate\Support\Facades\Http;
class SyncAirflowData extends Command
{
protected $signature = 'airflow:sync-data';
protected $description = 'Синхронизация данных DAG и задач из Airflow';
public function handle()
{
// Получаем все активные кластеры Airflow из базы данных
$airflows = Airflow::where('is_active', true)->get();
foreach ($airflows as $airflow) {
$this->info("Синхронизация данных для кластера: {$airflow->name}");
// Получаем DAG'и, привязанные к этому кластеру
$dags = DAG::where('airflow_id', $airflow->id)->get();
foreach ($dags as $dag) {
// Получаем DAG Runs из Airflow
$response = Http::withBasicAuth($airflow->username, $airflow->password)
->get("{$airflow->url}/dags/{$dag->name}/dagRuns");
if ($response->ok()) {
$dagRunsData = $response->json()['dag_runs'];
foreach ($dagRunsData as $dagRunData) {
// Обновляем или создаем DAGRun
$dagRun = dag_run::updateOrCreate(
['run_id' => $dagRunData['dag_run_id']],
[
'dag_id' => $dag->id,
'airflow_id' => $airflow->id, // Добавляем связь с кластером Airflow
'status' => $dagRunData['state'],
'execution_date' => $dagRunData['execution_date'],
'start_date' => $dagRunData['start_date'] ?? null,
'end_date' => $dagRunData['end_date'] ?? null,
'queue' => null,
]
);
// Получаем Task Instances для этого DAGRun
$taskInstancesResponse = Http::withBasicAuth($airflow->username, $airflow->password)
->get("{$airflow->url}/dags/{$dag->name}/dagRuns/{$dagRun->run_id}/taskInstances");
if ($taskInstancesResponse->ok()) {
$taskInstancesData = $taskInstancesResponse->json()['task_instances'];
foreach ($taskInstancesData as $taskInstanceData) {
// Обновляем или создаем TaskInstance
TaskInstance::updateOrCreate(
[
'dag_id' => $dag->id,
'dag_run_id' => $dagRun->id,
'task_id' => $taskInstanceData['task_id'],
],
[
'airflow_id' => $airflow->id, // Добавляем связь с кластером Airflow
'state' => $taskInstanceData['state'],
'execution_date' => $taskInstanceData['execution_date'],
'start_date' => $taskInstanceData['start_date'] ?? null,
'end_date' => $taskInstanceData['end_date'] ?? null,
'metadata' => $taskInstanceData,
]
);
// Логирование событий задач
dag_event::create([
'dag_id' => $dag->id,
'dag_run_id' => $dagRun->id,
'airflow_id' => $airflow->id, // Добавляем связь с кластером Airflow
'event_type' => 'task_' . $taskInstanceData['state'],
'message' => "Задача {$taskInstanceData['task_id']} находится в состоянии {$taskInstanceData['state']}",
'metadata' => $taskInstanceData,
]);
}
}
// Логирование событий DAGRun
dag_event::updateOrCreate(
[
'dag_id' => $dag->id,
'dag_run_id' => $dagRun->id,
'airflow_id' => $airflow->id, // Добавляем связь с кластером Airflow
'event_type' => $dagRunData['state'],
],
[
'message' => "DAGRun {$dagRun->run_id} находится в состоянии {$dagRunData['state']}",
'metadata' => $dagRunData,
]
);
}
} else {
$this->error("Не удалось получить данные DAGRun для DAG {$dag->name} в кластере {$airflow->name}");
}
}
}
$this->info('Синхронизация данных Airflow завершена.');
}
}