get(); foreach ($airflows as $airflow) { $this->info("Синхронизация данных для кластера: {$airflow->name}"); // Получаем DAG'и, привязанные к этому кластеру $dags = DAG::where('airflow_id', $airflow->id)->get(); foreach ($dags as $dag) { // Получаем DAG Runs из Airflow $response = Http::withBasicAuth($airflow->username, $airflow->password) ->get("{$airflow->url}/dags/{$dag->name}/dagRuns"); if ($response->ok()) { $dagRunsData = $response->json()['dag_runs']; foreach ($dagRunsData as $dagRunData) { // Обновляем или создаем DAGRun $dagRun = dag_run::updateOrCreate( ['run_id' => $dagRunData['dag_run_id']], [ 'dag_id' => $dag->id, 'airflow_id' => $airflow->id, // Добавляем связь с кластером Airflow 'status' => $dagRunData['state'], 'execution_date' => $dagRunData['execution_date'], 'start_date' => $dagRunData['start_date'] ?? null, 'end_date' => $dagRunData['end_date'] ?? null, 'queue' => null, ] ); // Получаем Task Instances для этого DAGRun $taskInstancesResponse = Http::withBasicAuth($airflow->username, $airflow->password) ->get("{$airflow->url}/dags/{$dag->name}/dagRuns/{$dagRun->run_id}/taskInstances"); if ($taskInstancesResponse->ok()) { $taskInstancesData = $taskInstancesResponse->json()['task_instances']; foreach ($taskInstancesData as $taskInstanceData) { // Обновляем или создаем TaskInstance TaskInstance::updateOrCreate( [ 'dag_id' => $dag->id, 'dag_run_id' => $dagRun->id, 'task_id' => $taskInstanceData['task_id'], ], [ 'airflow_id' => $airflow->id, // Добавляем связь с кластером Airflow 'state' => $taskInstanceData['state'], 'execution_date' => $taskInstanceData['execution_date'], 'start_date' => $taskInstanceData['start_date'] ?? null, 'end_date' => $taskInstanceData['end_date'] ?? null, 'metadata' => $taskInstanceData, ] ); // Логирование событий задач dag_event::create([ 'dag_id' => $dag->id, 'dag_run_id' => $dagRun->id, 'airflow_id' => $airflow->id, // Добавляем связь с кластером Airflow 'event_type' => 'task_' . $taskInstanceData['state'], 'message' => "Задача {$taskInstanceData['task_id']} находится в состоянии {$taskInstanceData['state']}", 'metadata' => $taskInstanceData, ]); } } // Логирование событий DAGRun dag_event::updateOrCreate( [ 'dag_id' => $dag->id, 'dag_run_id' => $dagRun->id, 'airflow_id' => $airflow->id, // Добавляем связь с кластером Airflow 'event_type' => $dagRunData['state'], ], [ 'message' => "DAGRun {$dagRun->run_id} находится в состоянии {$dagRunData['state']}", 'metadata' => $dagRunData, ] ); } } else { $this->error("Не удалось получить данные DAGRun для DAG {$dag->name} в кластере {$airflow->name}"); } } } $this->info('Синхронизация данных Airflow завершена.'); } }