diff --git a/.github/workflows/run-test-suite.yml b/.github/workflows/run-test-suite.yml index 273f359df..0577be247 100644 --- a/.github/workflows/run-test-suite.yml +++ b/.github/workflows/run-test-suite.yml @@ -74,18 +74,6 @@ jobs: if: matrix.dependencies == 'highest' run: composer update --no-interaction --no-progress - - name: Cache Roadrunner - id: cache-roadrunner - uses: actions/cache@v3 - if: inputs.run-temporal-test-server == true - with: - path: | - ./rr* - .rr.yaml - key: ${{ matrix.os }}-roadrunner-${{ hashFiles('**/rr') }} - restore-keys: | - ${{ matrix.os }}-roadrunner- - - name: Download RoadRunner if: inputs.run-temporal-test-server == true && steps.cache-roadrunner.outputs.cache-hit != 'true' run: | diff --git a/testing/src/Replay/QueryReplayHelper.php b/testing/src/Replay/QueryReplayHelper.php new file mode 100644 index 000000000..958580d4d --- /dev/null +++ b/testing/src/Replay/QueryReplayHelper.php @@ -0,0 +1,69 @@ +workflowTaskHandler = $workflowTaskHandler; + } + + public function queryWorkflowExecution( + string $queryType, + ?Payloads $payloads, + WorkflowExecutionHistory $workflowExecutionHistory, + string $nextPageToken + ): Payloads { + $query = (new WorkflowQuery())->setQueryType($queryType); + if ($payloads !== null) { + $query->setQueryArgs($payloads); + } + + $task = (new PollWorkflowTaskQueueResponse()) + ->setWorkflowExecution($workflowExecutionHistory->getWorkflowExecution()->toProtoWorkflowExecution()) + ->setStartedEventId(PHP_INT_MAX) + ->setPreviousStartedEventId(PHP_INT_MAX) + ->setNextPageToken($nextPageToken) + ->setQuery($query); + + $events = $workflowExecutionHistory->getEvents(); + $startedEvent = $workflowExecutionHistory->getFirstEvent(); + if (!$startedEvent->hasWorkflowExecutionStartedEventAttributes()) { + throw new IllegalStateException( + 'First event of the history is not WorkflowExecutionStarted: ' . $startedEvent->getEventType() + ); + } + $startedAttributes = $startedEvent->getWorkflowExecutionStartedEventAttributes(); + $workflowType = $startedAttributes->getWorkflowType(); + $task->setWorkflowType($workflowType); + $task->setHistory((new History(['events' => $events]))); + + $result = $this->workflowTaskHandler->handleWorkflowTask($task); + if ($result->getQueryCompleted() != null) { + $request = $result->getQueryCompleted(); + if ($request->getErrorMessage() !== '') { + throw new RuntimeException(); // add message + } + + if ($request->hasQueryResult()) { + return $request->getQueryResult(); + } else { + return new Payloads(); + } + } + + throw new RuntimeException('Query returned wrong response'); + } +} diff --git a/testing/src/Replay/ReplayWorkflow.php b/testing/src/Replay/ReplayWorkflow.php new file mode 100644 index 000000000..2e2d221dc --- /dev/null +++ b/testing/src/Replay/ReplayWorkflow.php @@ -0,0 +1,41 @@ +workflowServiceClient = $workflowServiceClient; + $this->replayWorkflowFactory = $replayWorkflowFactory; + } + + public function handleWorkflowTask(PollWorkflowTaskQueueResponse $workflowTask): WorkflowTaskHandlerResult + { + $historyIterator = new ServiceWorkflowHistoryIterator($this->workflowServiceClient, $workflowTask); + $workflowRunTaskHandler = $this->getOrCreateWorkflowExecutor($workflowTask); + try { + $result = $workflowRunTaskHandler->handleWorkflowTask($workflowTask, $historyIterator); + return $this->createCompletedWFTRequest($workflowTask, $result); + } catch (\Throwable $e) { + return $this->failureToWFTResult($workflowTask, $e); + } finally { + $workflowRunTaskHandler->close(); + } + } + + private function getOrCreateWorkflowExecutor( + PollWorkflowTaskQueueResponse $workflowTask + ): ReplayWorkflowRunTaskHandler { + $workflowType = $workflowTask->getWorkflowType()->getName(); + $events = $workflowTask->getHistory()->getEvents(); + if ($events->count() === 0 || $events->offsetGet(0)->getEventId() > 1) { + $getHistoryRequest = (new GetWorkflowExecutionHistoryRequest()) + ->setExecution($workflowTask->getWorkflowExecution()); + /** @var GetWorkflowExecutionHistoryResponse $getHistoryResponse */ + $getHistoryResponse = $this->workflowServiceClient->GetWorkflowExecutionHistory($getHistoryRequest); + $workflowTask + ->setHistory($getHistoryResponse->getHistory()) + ->setNextPageToken($getHistoryResponse->getNextPageToken()); + } + + $replayWorkflow = $this->replayWorkflowFactory->getWorkflow($workflowType); + return new ReplayWorkflowRunTaskHandler($replayWorkflow, $workflowTask); + } + + /** + * @TODO: add implementation + */ + private function createCompletedWFTRequest( + PollWorkflowTaskQueueResponse $workflowTask, + WorkflowTaskResult $result + ): WorkflowTaskHandlerResult { + return new WorkflowTaskHandlerResult( + 'WorkflowType', + new RespondWorkflowTaskCompletedRequest(), + null, + (new RespondQueryTaskCompletedRequest()) + ->setQueryResult( + (new Payloads())->setPayloads( + [(new Payload())->setData('"hello"')->setMetadata(['encoding' => 'json/plain'])] + ) + ), + false + ); + } + + /** + * @TODO: add implementation + */ + private function failureToWFTResult( + PollWorkflowTaskQueueResponse $response, + Throwable $e + ): WorkflowTaskHandlerResult { + return new WorkflowTaskHandlerResult( + 'WorkflowType', + null, + (new RespondWorkflowTaskFailedRequest()), + null, + false + ); + } +} diff --git a/testing/src/Replay/ServiceWorkflowHistoryIterator.php b/testing/src/Replay/ServiceWorkflowHistoryIterator.php new file mode 100644 index 000000000..82a5ca343 --- /dev/null +++ b/testing/src/Replay/ServiceWorkflowHistoryIterator.php @@ -0,0 +1,30 @@ +workflowServiceClient = $workflowServiceClient; + $this->task = $task; + } + + public function getIterator(): ArrayIterator + { + return new ArrayIterator(); + } +} diff --git a/testing/src/Replay/WorkflowExecutionHistory.php b/testing/src/Replay/WorkflowExecutionHistory.php index 36c2551f3..5691e6d8e 100644 --- a/testing/src/Replay/WorkflowExecutionHistory.php +++ b/testing/src/Replay/WorkflowExecutionHistory.php @@ -6,13 +6,10 @@ use Google\Protobuf\Internal\RepeatedField; use InvalidArgumentException; -use Spiral\Attributes\AttributeReader; -use Spiral\Attributes\Factory; -use Temporal\Api\Common\V1\Payload; -use Temporal\Api\Common\V1\Payloads; use Temporal\Api\Enums\V1\EventType; use Temporal\Api\History\V1\History; use Temporal\Api\History\V1\HistoryEvent; +use Temporal\Workflow\WorkflowExecution; /** * Provides a wrapper with convenience methods over raw protobuf {@link History} object representing @@ -21,11 +18,13 @@ final class WorkflowExecutionHistory { private History $history; + private WorkflowExecution $workflowExecution; public function __construct(History $history) { self::checkHistory($history); $this->history = $history; + $this->workflowExecution = new WorkflowExecution('workflow_id_in_replay', 'run_id_in_replay',); } public function getHistory(): History @@ -48,6 +47,16 @@ public static function fromJson(string $json): self return new self($history); } + public static function fromEvents(HistoryEvent ...$events): self + { + $events = new RepeatedField(HistoryEvent::class); + foreach ($events as $index => $event) { + $events->offsetSet($index, $event); + } + + return new self(new History($events)); + } + private static function checkHistory(History $history): void { $events = $history->getEvents(); @@ -83,8 +92,23 @@ public function getLastEvent(): HistoryEvent return $events->offsetGet($events->count() - 1); } + public function getEventsCount(): int + { + return $this->history->getEvents()->count(); + } + public function getFirstEvent(): HistoryEvent { return $this->history->getEvents()->offsetGet(0); } + + public function getWorkflowExecution(): WorkflowExecution + { + return $this->workflowExecution; + } + + public function isEmpty(): bool + { + return $this->history->getEvents() === null; + } } diff --git a/testing/src/Replay/WorkflowHistoryIterator.php b/testing/src/Replay/WorkflowHistoryIterator.php deleted file mode 100644 index af6e3be21..000000000 --- a/testing/src/Replay/WorkflowHistoryIterator.php +++ /dev/null @@ -1,13 +0,0 @@ -dataConverter = $dataConverter; + $this->workflowTaskHandler = $workflowTaskHandler; + } + + public static function create(): self + { + return new self(DataConverter::createDefault(), new ReplayWorkflowTaskHandler()); + } + /** * Replays workflow from a resource that contains a json serialized history. */ - public function replayWorkflowExecutionFromFile(string $filename, string $workflowClass): void + public function replayWorkflowExecutionFromFile(string $filename): EncodedValues { $workflowExecutionHistory = WorkflowExecutionHistory::fromFile($filename); - $this->replayWorkflowExecution($workflowExecutionHistory, $workflowClass); + return $this->replayWorkflowExecution($workflowExecutionHistory); } - private function replayWorkflowExecution(WorkflowExecutionHistory $workflowExecutionHistory, string $workflowClass): void + private function replayWorkflowExecution(WorkflowExecutionHistory $workflowExecutionHistory, array $args = []): EncodedValues { - // @TODO: not implemented yet + $values = EncodedValues::fromValues($args, $this->dataConverter); + $queryHelper = new QueryReplayHelper($this->workflowTaskHandler); + $result = $queryHelper->queryWorkflowExecution( + self::QUERY_TYPE_REPLAY_ONLY, + $values->toPayloads(), + $workflowExecutionHistory, + '' + ); + + return EncodedValues::fromPayloads($result, $this->dataConverter); } } diff --git a/testing/src/Replay/WorkflowTaskHandlerResult.php b/testing/src/Replay/WorkflowTaskHandlerResult.php new file mode 100644 index 000000000..ef64bd41e --- /dev/null +++ b/testing/src/Replay/WorkflowTaskHandlerResult.php @@ -0,0 +1,47 @@ +workflowType = $workflowType; + $this->taskCompleted = $taskCompleted; + $this->taskFailed = $taskFailed; + $this->queryCompleted = $queryCompleted; + $this->completionCommand = $completionCommand; + } + + public function getTaskCompleted(): ?RespondWorkflowTaskCompletedRequest + { + return $this->taskCompleted; + } + + public function getTaskFailed(): ?RespondWorkflowTaskFailedRequest + { + return $this->taskFailed; + } + + public function getQueryCompleted(): ?RespondQueryTaskCompletedRequest + { + return $this->queryCompleted; + } +} diff --git a/testing/src/Replay/WorkflowTaskResult.php b/testing/src/Replay/WorkflowTaskResult.php new file mode 100644 index 000000000..a8cd44e1b --- /dev/null +++ b/testing/src/Replay/WorkflowTaskResult.php @@ -0,0 +1,54 @@ +commands = $commands; + $this->queryResults = $queryResults; + $this->hasFinalCommand = $hasFinalCommand; + $this->forceWorkflowTask = $forceWorkflowTask; + } + + /** + * @return Command[] + */ + public function getCommands(): array + { + return $this->commands; + } + + /** + * @return WorkflowQueryResult[] + */ + public function getQueryResults(): array + { + return $this->queryResults; + } + + /** + * Does this result contain a workflow completion command + */ + public function hasFinalCommand(): bool + { + return $this->hasFinalCommand; + } + + public function isForceWorkflowTask(): bool + { + return $this->forceWorkflowTask; + } +} diff --git a/tests/Unit/Replay/WorkflowReplayerTestCase.php b/tests/Unit/Replay/WorkflowReplayerTestCase.php index d6ec220e2..1a4f64744 100644 --- a/tests/Unit/Replay/WorkflowReplayerTestCase.php +++ b/tests/Unit/Replay/WorkflowReplayerTestCase.php @@ -4,22 +4,52 @@ namespace Temporal\Tests\Unit\Replay; +use PHPUnit\Framework\MockObject\MockObject; +use Temporal\Api\Common\V1\Payload; +use Temporal\Api\Common\V1\Payloads; +use Temporal\Api\Workflowservice\V1\RespondQueryTaskCompletedRequest; +use Temporal\Api\Workflowservice\V1\RespondWorkflowTaskCompletedRequest; +use Temporal\DataConverter\DataConverter; +use Temporal\Testing\Replay\ReplayWorkflowTaskHandler; use Temporal\Testing\Replay\WorkflowReplayer; +use Temporal\Testing\Replay\WorkflowTaskHandlerResult; use Temporal\Tests\Unit\Declaration\Fixture\SimpleWorkflow; use Temporal\Tests\Unit\UnitTestCase; final class WorkflowReplayerTestCase extends UnitTestCase { private WorkflowReplayer $replayer; + /** @var MockObject|ReplayWorkflowTaskHandler|ReplayWorkflowTaskHandler&MockObject */ + private $taskHandler; protected function setUp(): void { - $this->replayer = new WorkflowReplayer(); + $this->taskHandler = $this->createMock(ReplayWorkflowTaskHandler::class); + $this->replayer = new WorkflowReplayer(DataConverter::createDefault(), $this->taskHandler); parent::setUp(); } public function testHistoryIsReplayedFromFile(): void { - $this->replayer->replayWorkflowExecutionFromFile(__DIR__ . '/history.json', SimpleWorkflow::class); + $taskHandlerResult = new WorkflowTaskHandlerResult( + 'WorkflowType', + new RespondWorkflowTaskCompletedRequest(), + null, + (new RespondQueryTaskCompletedRequest()) + ->setQueryResult( + (new Payloads())->setPayloads( + [(new Payload())->setData('"hello"')->setMetadata(['encoding' => 'json/plain'])] + ) + ), + false + ); + + $this->taskHandler + ->expects($this->once()) + ->method('handleWorkflowTask') + ->willReturn($taskHandlerResult); + + $result = $this->replayer->replayWorkflowExecutionFromFile(__DIR__ . '/history.json', SimpleWorkflow::class); + $this->assertSame('hello', $result->getValue(0)); } }