diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index b4ea66c..5c5481d 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -11,22 +11,21 @@ jobs: steps: - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Set up Python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: 3.x - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install setuptools wheel twine + - name: Install build dependencies + run: python -m pip install --upgrade build - - name: Build and publish to PyPI - run: | - python setup.py sdist bdist_wheel - twine upload dist/* - env: - TWINE_USERNAME: __token__ - TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }} + - name: Build package + run: python -m build + + - name: Publish package to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + with: + user: __token__ + password: ${{ secrets.PYPI_API_TOKEN }} diff --git a/README.md b/README.md index eeabe3c..28c1e22 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,66 @@ -[![contributions welcome](https://img.shields.io/badge/contributions-welcome-brightgreen?logo=github)](CODE_OF_CONDUCT.md) -[![Slack](.github/slack.svg)](https://join.slack.com/t/keploy/shared_invite/zt-12rfbvc01-o54cOG0X1G6eVJTuI_orSA) -[![License](.github/License-Apache_2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) +# Keploy Python Coverage Agent -# Keploy -[Keploy](https://keploy.io) is a no-code testing platform that generates tests from API calls. +The Keploy Python Coverage Agent is a lightweight, sidecar module designed to integrate with the Keploy integration testing platform (enterprise version). When imported into a Python application, it enables Keploy to capture and report code coverage on a per-test-case basis. +## Installation and Usage -## Community support -We'd love to collaborate with you to make Keploy great. To get started: -* [Slack](https://join.slack.com/t/keploy/shared_invite/zt-12rfbvc01-o54cOG0X1G6eVJTuI_orSA) - Discussions with the community and the team. -* [GitHub](https://github.com/keploy/keploy/issues) - For bug reports and feature requests. +Follow these steps to integrate the coverage agent into your Python project. +### Prerequisites + +You must have the `coverage` library installed in your project's Python environment. + +```bash +pip install coverage +``` + +### Step 1: Install the Agent + +Install the `keploy-agent` package into your virtual environment. If you are developing the agent, you can install it in an editable mode from its source directory: + +```bash +pip install -e /path/to/keploy_agent +``` + +### Step 2: Integrate into Your Application + +To enable coverage tracking, import the `keploy_agent` module at the **very top** of your application's main entry point file (e.g., `app.py`, `main.py`). + +It is crucial that this is one of the first imports, as this ensures the agent is initialized before your application code begins to execute. + +**Example `app.py`:** + +```python +import keploy_agent # <-- Add this line at the top +import os +from flask import Flask + +app = Flask(__name__) + +@app.get("/") +def hello(): + # This function will be tracked by the coverage agent + # when its endpoint is hit during a Keploy test. + return "Hello, World!" + +if __name__ == "__main__": + app.run() +``` + +### Step 3: Run with Keploy + +Now, you can run your application tests using the Keploy CLI. The agent will automatically connect with Keploy. + +```bash +sudo -E keploy-enterprise test -c "python3 app.py" --language python --dedup +``` + +Now you will see `dedupData.yaml` getting created. + +Run `sudo -E keploy-enterprise dedup` to get the tests which are duplicate in `duplicates.yaml` file + +In order to remove the duplicate tests, run the following command: + +```bash +sudo -E keploy-enterprise dedup --rm +``` diff --git a/keploy/__init__.py b/keploy/__init__.py deleted file mode 100644 index 095b5ac..0000000 --- a/keploy/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .keploy import Keploy -from .models import Dependency, AppConfig, ServerConfig, FilterConfig, Config, TestCase, TestCaseRequest, TestReq, HttpReq, HttpResp -from .contrib.flask import KFlask -from .mode import setMode, getMode -from .utils import capture_test diff --git a/keploy/client.py b/keploy/client.py deleted file mode 100644 index e69de29..0000000 diff --git a/keploy/constants.py b/keploy/constants.py deleted file mode 100644 index 692fc89..0000000 --- a/keploy/constants.py +++ /dev/null @@ -1,7 +0,0 @@ -MODE_RECORD = "record" -MODE_TEST = "test" -MODE_OFF = "off" -USE_HTTPS = 'https' -USE_HTTP = 'http' -ALLOWED_DEPENDENCY_TYPES = ['NO_SQL_DB', 'SQL_DB', 'GRPC', 'HTTP_CLIENT'] -ALLOWED_METHODS = ['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'HEAD', 'OPTIONS', 'TRACE'] \ No newline at end of file diff --git a/keploy/contrib/flask/__init__.py b/keploy/contrib/flask/__init__.py deleted file mode 100644 index 3dcc9c5..0000000 --- a/keploy/contrib/flask/__init__.py +++ /dev/null @@ -1,62 +0,0 @@ -# BSD 3-Clause License - -import copy -import io -from typing import Any -from flask import Flask, request -from keploy.constants import MODE_OFF -import keploy as k -from keploy.contrib.flask.utils import get_request_data -from keploy.models import HttpResp -from keploy.utils import capture_test -from werkzeug import Response - -class KFlask(object): - - def __init__(self, keploy:k.Keploy=None, app:Flask=None): - self.app = app - self.keploy = keploy - - if not app: - raise ValueError("Flask app instance not passed, Please initiate flask instance and pass it as keyword argument.") - - if not keploy or k.getMode() == MODE_OFF: - return - - app.wsgi_app = KeployMiddleware(keploy, app.wsgi_app) - - -class KeployMiddleware(object): - def __init__(self, kep, app) -> None: - self.app = app - self.keploy = kep - - def __call__(self, environ, start_response) -> Any: - - if not self.keploy: - return self.app(environ, start_response) - - req = {} - res = {} - - def _start_response(status, response_headers, *args): - nonlocal req - nonlocal res - req = get_request_data(request) - res['header'] = {key: [value] for key,value in response_headers} - res['status_code'] = int(status.split(' ')[0]) - return start_response(status, response_headers, *args) - - def _end_response(resp_body): - nonlocal res - res['body'] = b"".join(resp_body).decode("utf8") - return [res['body'].encode('utf-8')] - - resp = _end_response(self.app(environ, _start_response)) - - if environ.get("HTTP_KEPLOY_TEST_ID", None): - self.keploy.put_resp(environ.get('HTTP_KEPLOY_TEST_ID'), HttpResp(**res)) - else: - capture_test(self.keploy, req, res) - - return resp diff --git a/keploy/contrib/flask/utils.py b/keploy/contrib/flask/utils.py deleted file mode 100644 index 17dbac0..0000000 --- a/keploy/contrib/flask/utils.py +++ /dev/null @@ -1,20 +0,0 @@ - -def get_request_data(request) -> dict: - req_data = {} - - req_data['header'] = {key: [value] for key,value in request.headers.to_wsgi_list()} - req_data['method'] = request.method - req_data['body'] = request.get_data(as_text=True) - # req_data['form_data'] = request.form.to_dict() - # req_data['file_data'] = { k: v[0].read() for k, v in request.files.lists()} - req_data['uri'] = request.url_rule.rule - req_data['url'] = request.path - req_data['base'] = request.url - req_data['params'] = request.args.to_dict() - - protocol = request.environ.get('SERVER_PROTOCOL', None) - if protocol: - req_data['proto_major'] = int(protocol.split(".")[0][-1]) - req_data['proto_minor'] = int(protocol.split(".")[1]) - - return req_data \ No newline at end of file diff --git a/keploy/keploy.py b/keploy/keploy.py deleted file mode 100644 index 87f5ba5..0000000 --- a/keploy/keploy.py +++ /dev/null @@ -1,269 +0,0 @@ -import json -import logging -import http.client -import re -import threading -import time -from typing import Iterable, List, Mapping, Optional, Sequence -from keploy.mode import getMode -from keploy.constants import MODE_TEST, USE_HTTPS - -from keploy.models import Config, Dependency, HttpResp, TestCase, TestCaseRequest, TestReq - - -class Keploy(object): - - def __init__(self, conf:Config) -> None: - - if not isinstance(conf, Config): - raise TypeError("Please provide a valid keploy configuration.") - - logger = logging.getLogger('keploy') - logger.setLevel(logging.DEBUG) - - self._config = conf - self._logger = logger - self._dependencies = {} - self._responses = {} - self._client = None - - if self._config.server.protocol == USE_HTTPS: - self._client = http.client.HTTPSConnection(host=self._config.server.url, port=self._config.server.port) - else: - self._client = http.client.HTTPConnection(host=self._config.server.url, port=self._config.server.port) - - # self._client.connect() - - if getMode() == MODE_TEST: - t = threading.Thread(target=self.test) - t.start() - - - def get_dependencies(self, id: str) -> Optional[Iterable[Dependency]]: - return self._dependencies.get(id, None) - - - def get_resp(self, t_id: str) -> Optional[HttpResp]: - return self._responses.get(t_id, None) - - - def put_resp(self, t_id:str, resp: HttpResp) -> None: - self._responses[t_id] = resp - - - def capture(self, request:TestCaseRequest): - self.put(request) - - - def start(self, total:int) -> Optional[str]: - try: - headers = {'Content-type': 'application/json', 'key': self._config.server.licenseKey} - self._client.request("GET", "/{}/regression/start?app={}&total={}".format(self._config.server.suffix, self._config.app.name, total), None, headers) - - response = self._client.getresponse() - if response.status != 200: - self._logger.error("Error occured while fetching start information. Please try again.") - return - - body = json.loads(response.read().decode()) - if body.get('id', None): - return body['id'] - - self._logger.error("failed to start operation.") - return - except: - self._logger.error("Exception occured while starting the test case run.") - - - def end(self, id:str, status:bool) -> None: - try: - headers = {'Content-type': 'application/json', 'key': self._config.server.licenseKey} - self._client.request("GET", "/{}/regression/end?id={}&status={}".format(self._config.server.suffix, id, status), None, headers) - - response = self._client.getresponse() - if response.status != 200: - self._logger.error("failed to perform end operation.") - - return - except: - self._logger.error("Exception occured while ending the test run.") - - - def put(self, rq: TestCaseRequest): - try: - filters = self._config.app.filters - if filters: - match = re.search(filters.urlRegex, rq.uri) - if match: - return None - - headers = {'Content-type': 'application/json', 'key': self._config.server.licenseKey} - bytes_data = json.dumps(rq, default=lambda o: o.__dict__).encode() - self._client.request("POST", "/{}/regression/testcase".format(self._config.server.suffix), bytes_data, headers) - - response = self._client.getresponse() - if response.status != 200: - self._logger.error("failed to send testcase to backend") - - body = json.loads(response.read().decode()) - if body.get('id', None): - self.denoise(body['id'], rq) - - except: - self._logger.error("Exception occured while storing the request information. Try again.") - - - def denoise(self, id:str, tcase:TestCaseRequest): - try: - unit = TestCase(id=id, captured=tcase.captured, uri=tcase.uri, http_req=tcase.http_req, http_resp={}, deps=tcase.deps) - res = self.simulate(unit) - if not res: - self._logger.error("failed to simulate request while denoising") - return - - headers = {'Content-type': 'application/json', 'key': self._config.server.licenseKey} - bin_data = json.dumps(TestReq(id=id, app_id=self._config.app.name, resp=res), default=lambda o: o.__dict__).encode() - self._client.request("POST", "/{}/regression/denoise".format(self._config.server.suffix), bin_data, headers) - - response = self._client.getresponse() - if response.status != 200: - self._logger.error("failed to de-noise request to backend") - - body = response.read().decode() - - except: - self._logger.error("Error occured while denoising the test case request. Skipping...") - - - def simulate(self, test_case:TestCase) -> Optional[HttpResp]: - try: - self._dependencies[test_case.id] = test_case.deps - - heads = test_case.http_req.header - heads['KEPLOY_TEST_ID'] = [test_case.id] - - cli = http.client.HTTPConnection(self._config.app.host, self._config.app.port) - cli._http_vsn = int(str(test_case.http_req.proto_major) + str(test_case.http_req.proto_minor)) - cli._http_vsn_str = 'HTTP/{}.{}'.format(test_case.http_req.proto_major, test_case.http_req.proto_minor) - - cli.request( - method=test_case.http_req.method, - url=self._config.app.suffix + test_case.http_req.url, - body=json.dumps(test_case.http_req.body).encode(), - headers={key: value[0] for key, value in heads.items()} - ) - - time.sleep(2.0) - # TODO: getting None in case of regular execution. Urgent fix needed. - response = self.get_resp(test_case.id) - if not response or not self._responses.pop(test_case.id, None): - self._logger.error("failed loading the response for testcase.") - return - - self._dependencies.pop(test_case.id, None) - cli.close() - - return response - - except Exception as e: - self._logger.exception("Exception occured in simulation of test case with id: %s" %test_case.id) - - - def check(self, r_id:str, tc: TestCase) -> bool: - try: - resp = self.simulate(tc) - if not resp: - self._logger.error("failed to simulate request on local server.") - return False - - headers = {'Content-type': 'application/json', 'key': self._config.server.licenseKey} - bytes_data = json.dumps(TestReq(id=tc.id, app_id=self._config.app.name, run_id=r_id, resp=resp), default=lambda o: o.__dict__).encode() - self._client.request("POST", "/{}/regression/test".format(self._config.server.suffix), bytes_data, headers) - - response = self._client.getresponse() - if response.status != 200: - self._logger.error("failed to read response from backend") - - body = json.loads(response.read().decode()) - if body.get('pass', False): - return body['pass'] - - return False - - except: - self._logger.exception("[SKIP] Failed to check testcase with id: %s" %tc.id) - return False - - - def get(self, id:str) -> Optional[TestCase]: - try: - headers = {'Content-type': 'application/json', 'key': self._config.server.licenseKey} - self._client.request("GET", "/{}/regression/testcase/{}".format(self._config.server.suffix, id), None, headers) - - response = self._client.getresponse() - if response.status != 200: - self._logger.error("failed to get request.") - - body = json.loads(response.read().decode()) - unit = TestCase(**body) - - return unit - - except: - self._logger.error("Exception occured while fetching the test case with id: %s" %id) - return - - - def fetch(self, offset:int=0, limit:int=25) -> Optional[Sequence[TestCase]]: - try: - test_cases = [] - headers = {'Content-type': 'application/json', 'key': self._config.server.licenseKey} - - while True: - self._client.request("GET", "/{}/regression/testcase?app={}&offset={}&limit={}".format(self._config.server.suffix, self._config.app.name, offset, limit), None, headers) - response = self._client.getresponse() - if response.status != 200: - self._logger.error("Error occured while fetching test cases. Please try again.") - return - - body = json.loads(response.read().decode()) - if body: - for idx, case in enumerate(body): - body[idx] = TestCase(**case) - test_cases.extend(body) - offset += limit - else: - break - return test_cases - - except: - self._logger.exception("Exception occured while fetching test cases.") - return - - - def test(self): - passed = True - time.sleep(self._config.app.delay) - - self._logger.info("Started test operations on the captured test cases.") - cases = self.fetch() - count = len(cases) - - self._logger.info("Total number of test cases to be checked = %d" %count) - run_id = self.start(count) - - if not run_id: - return - - self._logger.info("Started with testing...") - for case in cases: - ok = self.check(run_id, case) - if not ok: - passed = False - self._logger.info("Finished with testing...") - - self._logger.info("Cleaning up things...") - self.end(run_id, passed) - - return passed - diff --git a/keploy/mode.py b/keploy/mode.py deleted file mode 100644 index 4b51eb6..0000000 --- a/keploy/mode.py +++ /dev/null @@ -1,21 +0,0 @@ -from typing import Literal -from keploy.constants import MODE_OFF, MODE_RECORD, MODE_TEST - - -mode = MODE_RECORD - -def isValidMode(mode): - if mode in [MODE_OFF, MODE_RECORD, MODE_TEST]: - return True - return False - -def getMode(): - return mode - -MODES = Literal["off", "record", "test"] -def setMode(m:MODES): - if isValidMode(m): - global mode - mode = m - return - raise Exception("Mode:{} not supported by keploy. Please enter a valid mode.".format(m)) diff --git a/keploy/models.py b/keploy/models.py deleted file mode 100644 index 5a22048..0000000 --- a/keploy/models.py +++ /dev/null @@ -1,179 +0,0 @@ - -from ast import literal_eval -from typing import Any, Iterable, List, Literal, Optional, Mapping, Sequence - -from keploy.constants import ALLOWED_DEPENDENCY_TYPES, ALLOWED_METHODS - - -def getHostPort(h:str = None, p:int = None): - host = '' - suffix = '' - port = 0 - - if not h and not p: - raise ValueError("Invalid host or port values.") - - if h.startswith(("http:", "https:")): - raise ValueError("please pass host name without http:// or https://") - - host = h - i = h.find("/") - if i > 0: - host = h[:i] - suffix = h[i+1:] - - port = p or 80 - i = h.find(":") - if i > 0: - if p and str(p) != host[i+1:]: - raise ValueError("2 Ports found. Please pass the port as a function argumet only.") - port = int(host[i+1:]) - host = host[:i] - - return (host, port, suffix.strip('/')) - - -class FilterConfig(object): - def __init__(self, regex: str) -> None: - self.urlRegex = regex - - -class AppConfig(object): - def __init__(self, name:str=None, host:str=None, port:int=None, delay:int=5, timeout:int=60, filters:FilterConfig=None): - self.name = name or 'test-keploy' - self.host = host - self.port = port - self.delay = delay - self.timeout = timeout - self.filters = filters - self.suffix = None - - if not host: - raise ValueError("Host not provided in AppConfig.") - - self.host, self.port, self.suffix = getHostPort(host, port) - - -PROTOCOL = Literal['https', 'http'] -class ServerConfig(object): - def __init__(self, protocol:PROTOCOL='https', host:str='api.keploy.io', port:int=None, licenseKey:str=''): - - self.protocol = protocol - self.url = host - self.port = port - self.licenseKey = licenseKey - self.suffix = '' - - if protocol not in ["http", "https"]: - raise ValueError("Invalid protocol type. Please use from available options.") - - if not licenseKey: - self.url, self.port, self.suffix = getHostPort(self.url, self.port) - - -class Config(object): - def __init__(self, appConfig:AppConfig, serverConfig:ServerConfig) -> None: - - if not isinstance(appConfig, AppConfig): - raise TypeError("Please provide a valid app configuration.") - - if not isinstance(serverConfig, ServerConfig): - raise TypeError("Please provide a valid server configuration.") - - self.app = appConfig - self.server = serverConfig - - -TYPES= Literal['NO_SQL_DB', 'SQL_DB', 'GRPC', 'HTTP_CLIENT'] -class Dependency(object): - def __init__(self, name:str, type:TYPES, meta:Mapping[str, str]=None, data:List[bytearray]=None) -> None: - self.name = name - self.type = type - self.meta = meta - self.data = data - - if not type in ALLOWED_DEPENDENCY_TYPES: - raise TypeError("Please provide a valid dependency type.") - - -METHODS = Literal['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'HEAD', 'OPTIONS', 'TRACE'] -class HttpReq(object): - def __init__(self, method:METHODS=None, proto_major:int=1, proto_minor:int=1, url:str=None, url_params:Mapping[str, str]=None, header:Mapping[str, Sequence[str]]=None, body:str=None) -> None: - self.method = method - self.proto_major = proto_major - self.proto_minor = proto_minor - self.url = url - self.url_params = url_params - self.header = header - self.body = body - - if method not in ALLOWED_METHODS: - raise ValueError("{} method is not supported. Please try some other method.".format(self.method)) - - -class HttpResp(object): - def __init__(self, status_code:int=None, header:Mapping[str, Sequence[str]]=None, body:str=None) -> None: - self.status_code = status_code - self.header = header - self.body = body - - -class TestCase(object): - def __init__(self, id:str=None, created:int=None, updated:int=None, captured:int=None, - cid:str=None, app_id:str=None, uri:str=None, http_req:HttpReq=None, - http_resp: HttpResp=None, deps:Sequence[Dependency]=None, all_keys:Mapping[str, Sequence[str]]=None, - anchors:Mapping[str, Sequence[str]]=None, noise:Sequence[str]=None ) -> None: - - #TODO: Need to handle the case when the API response is None instead of [] for deps - if not deps: - deps = [] - - self.id = id - self.created = created - self.updated = updated - self.captured = captured - self.cid = cid - self.app_id = app_id - self.uri = uri - self.http_req = http_req - self.http_resp = http_resp - self.deps = [Dependency(**dep) if not isinstance(dep, Dependency) else dep for dep in deps] - self.all_keys = all_keys - self.anchors = anchors - self.noise = noise - - if not isinstance(http_req, HttpReq): - self.http_req = HttpReq(**http_req) - - if not isinstance(http_resp, HttpResp): - self.http_resp = HttpResp(**http_resp) - - -class TestCaseRequest(object): - def __init__(self, captured:int=None, app_id:str=None, uri:str=None, http_req:HttpReq=None, http_resp:HttpResp=None, deps:Iterable[Dependency]=None) -> None: - self.captured = captured - self.app_id = app_id - self.uri = uri - self.http_req = http_req - self.http_resp = http_resp - self.deps = deps - - if not captured: - raise ValueError("Captured time cannot be empty.") - - if not app_id: - raise ValueError("valid App ID is required to link the TestReq object.") - - -class TestReq(object): - def __init__(self, id:str=None, app_id:str=None, run_id:str=None, resp:HttpResp=None) -> None: - self.id = id - self.app_id = app_id - self.run_id = run_id - self.resp = resp - - if not id: - raise ValueError("ID is required in the TestReq object.") - - if not app_id: - raise ValueError("valid App ID is required to link the TestReq object.") \ No newline at end of file diff --git a/keploy/setup.py b/keploy/setup.py deleted file mode 100644 index ec374ea..0000000 --- a/keploy/setup.py +++ /dev/null @@ -1,27 +0,0 @@ -from setuptools import setup, find_packages - -VERSION = '0.0.1' -DESCRIPTION = 'Keploy' -LONG_DESCRIPTION = 'Keploy Python SDK' - -# Setting up -setup( - name="keploy", - version=VERSION, - author="Keploy Inc.", - author_email="contact@keploy.io", - description=DESCRIPTION, - long_description=LONG_DESCRIPTION, - packages=find_packages(), - install_requires=[], # add any additional packages that needs to be installed along with your package. - - keywords=['keploy', 'python', 'sdk'], - classifiers= [ - "Development Status :: 3 - Alpha", - "Intended Audience :: Education", - "Programming Language :: Python :: 2", - "Programming Language :: Python :: 3", - "Operating System :: MacOS :: MacOS X", - "Operating System :: Microsoft :: Windows", - ] -) \ No newline at end of file diff --git a/keploy/utils.py b/keploy/utils.py deleted file mode 100644 index da68a04..0000000 --- a/keploy/utils.py +++ /dev/null @@ -1,32 +0,0 @@ -from keploy.keploy import Keploy -from keploy.models import Dependency, HttpReq, HttpResp, TestCaseRequest -import time - -def capture_test(k, reqst, resp): - - deps = [ Dependency('demo_dep', 'HTTP_CLIENT', {}, None), ] - - test = TestCaseRequest( - captured=int(time.time()), - app_id=k._config.app.name, - uri=reqst['uri'], - http_req=HttpReq( - method=reqst['method'], - proto_major=reqst['proto_major'], - proto_minor=reqst['proto_minor'], - url=reqst['url'], - url_params=reqst['params'], - header=reqst['header'], - body=reqst['body'] - ), - http_resp=HttpResp( - status_code=resp['status_code'], - header=resp['header'], - body=resp['body'] - ), - deps=deps - ) - - k.capture(test) - - return \ No newline at end of file diff --git a/keploy/v2/__init__.py b/keploy/v2/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/keploy/v2/keployCLI.py b/keploy/v2/keployCLI.py deleted file mode 100644 index 56b5a38..0000000 --- a/keploy/v2/keployCLI.py +++ /dev/null @@ -1,144 +0,0 @@ -import json -import os -import requests -import subprocess -import threading -import logging -import coverage - -logger = logging.getLogger('KeployCLI') - -GRAPHQL_ENDPOINT = "/query" -HOST = "http://localhost:" -server_port = 6789 -user_command_pid = 0 - -class TestRunStatus: - RUNNING = 1 - PASSED = 2 - FAILED = 3 - -def start_user_application(run_cmd): - global user_command_pid - - command = run_cmd.split(" ") - process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True) - user_command_pid = process.pid - -def stop_user_application(): - kill_process_by_pid(user_command_pid) - cov = coverage.Coverage() - cov.load() - cov.save() - - -def get_test_run_status(status_str): - status_mapping = { - 'RUNNING': TestRunStatus.RUNNING, - 'PASSED': TestRunStatus.PASSED, - 'FAILED': TestRunStatus.FAILED - } - return status_mapping.get(status_str) - -def set_http_client(): - try: - url = f"{HOST}{server_port}{GRAPHQL_ENDPOINT}" - logger.debug(f"Connecting to: {url}") - headers = { - "Content-Type": "application/json; charset=UTF-8", - "Accept": "application/json" - } - session = requests.Session() - return session, url, headers - except Exception as e: - logger.error("Error setting up HTTP client", e) - return None, None, None - -def fetch_test_sets(): - sessions,url, headers = set_http_client() - if url is None or headers is None: - return None - - payload = {"query": "{ testSets }"} - - try: - response = requests.post(url, headers=headers, json=payload, timeout=10) - logger.debug(f"Status code received: {response.status_code}") - - if response.ok: - res_body = response.json() - logger.debug(f"Response body received: {res_body}") - return res_body.get('data', {}).get('testSets') - except Exception as e: - logger.error("Error fetching test sets", e) - return None - -def fetch_test_set_status(test_run_id): - try: - session, url, headers = set_http_client() - payload = { - "query": f"{{ testSetStatus(testRunId: \"{test_run_id}\") {{ status }} }}" - } - - response = session.post(url, headers=headers, json=payload) - logger.debug(f"status code received: {response.status_code}") - - if response.ok: - res_body = response.json() - logger.debug(f"response body received: {res_body}") - status = res_body['data']['testSetStatus']['status'] - return get_test_run_status(status) - except Exception as e: - logger.error("Error fetching test set status", e) - return None - -def run_test_set(test_set_name): - try: - session, url, headers = set_http_client() - payload = { - "query": f"mutation {{ runTestSet(testSet: \"{test_set_name}\") {{ success testRunId message }} }}" - } - - response = session.post(url, headers=headers, json=payload, timeout=5) # Timeout set to 5 seconds - logger.debug(f"Status code received: {response.status_code}") - - if response.ok: - res_body = response.json() - logger.debug(f"Response body received: {res_body}") - if 'data' in res_body and 'runTestSet' in res_body['data']: - return res_body['data']['runTestSet']['testRunId'] - else: - logger.error(f"Unexpected response format: {res_body}") - return None - except Exception as e: - logger.error("Error running test set", e) - return None - -def find_coverage(test_set): - # Ensure the coverage-report directory exists - coverage_report_dir = f'coverage-report/{test_set}' - if not os.path.exists(coverage_report_dir): - os.makedirs(coverage_report_dir) - - cov = coverage.Coverage() - cov.load() - - # Generate text coverage report - text_report_file = f"{coverage_report_dir}/{test_set}_coverage_report.txt" - with open(text_report_file, 'w') as f: - cov.report(file=f) - - # Generate HTML coverage report - html_report_dir = f"{coverage_report_dir}/{test_set}_coverage_html" - cov.html_report(directory=html_report_dir) - - # Log the report generation - logger.info(f"Coverage reports generated in {coverage_report_dir}") - -def kill_process_by_pid(pid): - try: - # Using SIGTERM (signal 15) to gracefully terminate the process - subprocess.run(["kill", "-15", str(pid)]) - logger.debug(f"Killed process with PID: {pid}") - except Exception as e: - logger.error(f"Failed to kill process with PID {pid}", e) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..16377cd --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,34 @@ +# pyproject.toml + +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "keploy-agent" +version = "0.1.0" +authors = [ + { name="Keploy Team", email="hello@keploy.io" }, +] +description = "A side-effect agent for collecting Python code coverage during Keploy test runs." +readme = "README.md" +requires-python = ">=3.8" +license = { file="LICENSE" } +classifiers = [ + "Development Status :: 4 - Beta", + "Programming Language :: Python :: 3", + "License :: OSI Approved :: Apache Software License", + "Operating System :: OS Independent", + "Intended Audience :: Developers", + "Topic :: Software Development :: Testing", + "Topic :: System :: Monitoring", +] + +# This is where you list the package's dependencies +dependencies = [ + "coverage>=7.0", +] + +[project.urls] +Homepage = "https://github.com/keploy/python-sdk" +"Bug Tracker" = "https://github.com/keploy/python-sdk/issues" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ + diff --git a/src/keploy_agent/__init__.py b/src/keploy_agent/__init__.py new file mode 100644 index 0000000..121fc71 --- /dev/null +++ b/src/keploy_agent/__init__.py @@ -0,0 +1,153 @@ +""" +Keploy Python agent – always-on trace, per-test diff, std-lib filtered. +""" + +from __future__ import annotations +import os, sys, json, time, socket, threading, logging, trace +import sysconfig, site, pathlib +from typing import Dict, Tuple + +# ------------------------------------------------ configuration +CONTROL_SOCK = "/tmp/coverage_control.sock" +DATA_SOCK = "/tmp/coverage_data.sock" + +APP_ROOT = pathlib.Path(os.getenv("KEPLOY_APP_SOURCE_DIR", os.getcwd())).resolve() +AGENT_DIR = pathlib.Path(__file__).parent.resolve() +STDLIB = pathlib.Path(sysconfig.get_paths()["stdlib"]).resolve() +SITE_PKGS = [pathlib.Path(p).resolve() for p in site.getsitepackages()] + + +logging.basicConfig(level=logging.INFO, + format='[Keploy Agent] %(asctime)s - %(message)s') + +# ------------------------------------------------ global tracer +_tracer = trace.Trace(count=1, trace=0) +sys.settrace(_tracer.globaltrace) # main thread +threading.settrace(_tracer.globaltrace) # all future threads +logging.info("Global tracer started for every thread") + +# ------------------------------------------------ agent state +lock = threading.Lock() +current_id = None # active test-case id +baseline_counts: Dict[Tuple[str, int], int] = {} # empty after clear +baseline_tids: set[int] = set() + + +# ------------------------------------------------ helpers +def _copy_counts_once() -> dict[tuple[str, int], int]: + while True: + try: return dict(_tracer.results().counts) + except RuntimeError: continue # mutated – retry + +def _stable_snapshot(max_wait: float = .5) -> dict[tuple[str, int], int]: + start = time.time(); snap = _copy_counts_once() + while True: + time.sleep(0.07) + snap2 = _copy_counts_once() + if snap2 == snap or (time.time() - start) >= max_wait: + return snap2 + snap = snap2 + +def _ack(c: socket.socket): + try: c.sendall(b"ACK\n") + except OSError: pass + +def _is_app_file(raw: str, p: pathlib.Path) -> bool: + if " before.get(key, 0): + yield key + +def _emit(test_id: str): + after = _stable_snapshot() + data = {} + for (raw, line) in _diff(after, baseline_counts): + p = pathlib.Path(raw).resolve() + if _is_app_file(raw, p): + data.setdefault(str(p), []).append(line) + + payload = json.dumps({"id": test_id, + "executedLinesByFile": data}, + separators=(",", ":")).encode() + + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: + s.connect(DATA_SOCK); s.sendall(payload) + + logging.info(f"[{test_id}] sent {len(data)} file(s) / " + f"{sum(len(v) for v in data.values())} line(s)") + +# ------------------------------------------------ per-connection handler +def _handle(conn: socket.socket): + global current_id, baseline_counts, baseline_tids + fp = conn.makefile("r") + try: + cmd = fp.readline().strip() + if not cmd: return + action, test_id = cmd.split(" ", 1) + + with lock: + if action == "START": + logging.info(f"START {test_id}") + + _tracer.results().counts.clear() # start from 0 hits + baseline_counts = {} # diff against empty dict + baseline_tids = _current_tids() # remember threads + + current_id = test_id + _ack(conn) + return + + if action == "END": + logging.info(f"END {test_id}") + if test_id == current_id: + _wait_for_worker_exit(baseline_tids) # <- NEW + time.sleep(0.02) + _emit(test_id) + current_id = None + _ack(conn) + return + + logging.warning(f"Unknown command: {action}") + _ack(conn) + + except Exception as e: + logging.error(f"Handler error: {e}", exc_info=True); _ack(conn) + finally: + try: fp.close(); conn.close() + except Exception: pass + +# ------------------------------------------------ control server +def _server(): + if os.path.exists(CONTROL_SOCK): os.remove(CONTROL_SOCK) + srv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + srv.bind(CONTROL_SOCK); srv.listen() + logging.info(f"Keploy control server at {CONTROL_SOCK}") + try: + while True: + c, _ = srv.accept() + threading.Thread(target=_handle, args=(c,), daemon=True).start() + finally: + srv.close() + +threading.Thread(target=_server, daemon=False, + name="KeployControlServer").start() +logging.info("Keploy agent ready (always-on tracer, clean start per test)") + + +# ---------------------------------------------------------------- thread helper +def _current_tids() -> set[int]: + return {t.ident for t in threading.enumerate() if t.ident is not None} + +def _wait_for_worker_exit(baseline: set[int], timeout: float = 1.0): + """Block until no extra threads (vs. baseline) remain, or timeout.""" + deadline = time.time() + timeout + while time.time() < deadline: + if _current_tids() <= baseline: # no extra threads left + return + time.sleep(0.02) # wait 20 ms and retry