diff --git a/poetry.lock b/poetry.lock index e688033..df423ba 100644 --- a/poetry.lock +++ b/poetry.lock @@ -721,7 +721,7 @@ version = "0.6.2" description = "Pythonic argument parser, that will make you smile" optional = false python-versions = "*" -groups = ["main"] +groups = ["dev"] files = [ {file = "docopt-0.6.2.tar.gz", hash = "sha256:49b3a825280bd66b3aa83585ef59c4a8c82f2c8a522dbe754a8bc8d08c85c491"}, ] @@ -1978,7 +1978,7 @@ version = "0.4.13" description = "Pip requirements.txt generator based on imports in project" optional = false python-versions = ">=3.7" -groups = ["main"] +groups = ["dev"] files = [ {file = "pipreqs-0.4.13-py2.py3-none-any.whl", hash = "sha256:e522b9ed54aa3e8b7978ff251ab7a9af2f75d2cd8de4c102e881b666a79a308e"}, {file = "pipreqs-0.4.13.tar.gz", hash = "sha256:a17f167880b6921be37533ce4c81ddc6e22b465c107aad557db43b1add56a99b"}, @@ -3437,7 +3437,7 @@ version = "0.1.10" description = "A semi hard Cornish cheese, also queries PyPI (PyPI client)" optional = false python-versions = "*" -groups = ["main"] +groups = ["dev"] files = [ {file = "yarg-0.1.10-py2.py3-none-any.whl", hash = "sha256:4413145825e5d0e8b370754b3e829cf7c95ab0131dbc29fb5b52a8e27e1b7234"}, {file = "yarg-0.1.10.tar.gz", hash = "sha256:1fbc94af89d5ebc2d2dfe46a5006a570de549c43e78f0913ce8390a1c5848b7e"}, @@ -3581,4 +3581,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt [metadata] lock-version = "2.1" python-versions = ">=3.10,<3.12" -content-hash = "4547098cab9c49f56c37d396c4ba4d23baef5fd9d5bbf18d53e98b165d5fbef5" +content-hash = "44b63b352175fe0d10dff5124b8bb638af5c5cb42107e85afbfd0032ac2f25ba" diff --git a/pyproject.toml b/pyproject.toml index fca99d0..d20031e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,7 +101,6 @@ click = "^8.1.8" loguru = "^0.7.3" numpy = "*" pandas = "*" -pipreqs = "*" pydantic = "2.13.1" pyspark = "3.5.1" python = ">=3.10,<3.12" @@ -115,6 +114,7 @@ build = "*" coverage = ">=7.0.0,<8.0.0" ipykernel = "^6.29.5" mypy = "*" +pipreqs = "*" poetry-dynamic-versioning = {extras = ["plugin"], version = "^1.8.2"} pre-commit = "*" pytest = "*" diff --git a/src/datacustomcode/run.py b/src/datacustomcode/run.py index 6322270..006055c 100644 --- a/src/datacustomcode/run.py +++ b/src/datacustomcode/run.py @@ -201,8 +201,20 @@ def run_function_with_test(entrypoint: str, test_file: str) -> None: def add_py_folder(entrypoint: str): - default_py_folder = "py-files" # Hardcoded folder name + """Add py-files subfolder and entrypoint directory to sys.path. + + This ensures: + 1. py-files/ is available for additional dependencies + 2. The entrypoint directory is available for local module imports + """ + default_py_folder = "py-files" cwd = Path.cwd().joinpath(entrypoint) - py_folder = cwd.parent.joinpath(default_py_folder) + entrypoint_dir = cwd.parent + py_folder = entrypoint_dir.joinpath(default_py_folder) + + # Add py-files folder if it exists + if py_folder.exists(): + sys.path.insert(0, str(py_folder)) - sys.path.append(str(py_folder)) + # Add entrypoint directory to allow local module imports + sys.path.insert(0, str(entrypoint_dir)) diff --git a/src/datacustomcode/templates/function/payload/entrypoint.py b/src/datacustomcode/templates/function/payload/entrypoint.py index 5231174..81dcb32 100644 --- a/src/datacustomcode/templates/function/payload/entrypoint.py +++ b/src/datacustomcode/templates/function/payload/entrypoint.py @@ -1,173 +1,67 @@ import logging -from typing import List -from uuid import uuid4 -from datacustomcode.einstein_predictions.types import ( - PredictionColumBuilder, - PredictionRequestBuilder, - PredictionType, -) +from utility import extract_citations, split_text_into_chunks + from datacustomcode.function import Runtime -from datacustomcode.llm_gateway.types.generate_text_request_builder import ( - GenerateTextRequestBuilder, +from datacustomcode.function.feature_types.chunking import ( + ChunkType, + SearchIndexChunkingV1Output, + SearchIndexChunkingV1Request, + SearchIndexChunkingV1Response, ) logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) +# Default max chunk size (can be overridden if contract adds max_characters field) +DEFAULT_MAX_CHUNK_SIZE = 50 -def chunk_text(text: str, chunk_size: int = 1000) -> List[str]: - """ - Split text into chunks of approximately chunk_size characters. - Tries to split at sentence boundaries when possible. + +def function( + request: SearchIndexChunkingV1Request, runtime: Runtime +) -> SearchIndexChunkingV1Response: + """Chunk documents into smaller pieces for search indexing. + + Args: + request: SearchIndexChunkingV1Request with input documents + runtime: Runtime context (unused but required by contract) + + Returns: + SearchIndexChunkingV1Response with chunked output """ - if not text: - return [] + print(f"Received {len(request.input)} documents to chunk") chunks = [] - current_chunk = "" - - # Split text into sentences (simple split by period) - sentences = text.split(". ") - - for sentence in sentences: - if len(current_chunk) + len(sentence) <= chunk_size: - current_chunk += sentence + ". " - else: - if current_chunk: - chunks.append(current_chunk.strip()) - current_chunk = sentence + ". " - - if current_chunk: - chunks.append(current_chunk.strip()) - - return chunks - - -def make_einstein_prediction(runtime: Runtime) -> None: - column = ( - PredictionColumBuilder() - .set_column_name("col1") - .set_string_values(["str1", "str2"]) - .build() - ) - prediction_request = ( - PredictionRequestBuilder() - .set_prediction_type(PredictionType.REGRESSION) - .set_model_api_name("regressionModel") - .set_prediction_columns([column]) - .build() - ) - - prediction_response = runtime.einstein_predictions.predict(prediction_request) - logger.info( - f"Einstein prediction results - success: [{prediction_response.is_success}] " - f"response data: {prediction_response.data}" - ) - - -def generate_text(runtime: Runtime, prompt: str, model: str = "sfdc_ai__DefaultGPT52"): - builder = GenerateTextRequestBuilder() - llm_request = builder.set_prompt(prompt).set_model(model).build() - llm_response = runtime.llm_gateway.generate_text(llm_request) - logger.info( - f"LLM Gateway generate text results - success: [{llm_response.is_success}] " - f"response data: {llm_response.data}" - ) - - -def function(request: dict, runtime: Runtime) -> dict: - logger.info("Inside Function") - logger.info(request) - - items = request["input"] - output_chunks = [] - current_seq_no = 1 # Start sequence number from 1 + seq_no = 1 - """ - You can use your AI models configured in Salesforce to generate texts - or predict an outcome. See README.md for how to test locally before - deploying to Data Cloud. + # Use default max chunk size + max_chunk_size = DEFAULT_MAX_CHUNK_SIZE - Example: + # Process each document + for doc_idx, doc in enumerate(request.input): + text = doc.text + metadata = doc.metadata - >>> generated_text = generate_text(runtime, "Generate a greeting message") - ... prediction = make_einstein_prediction(runtime) + print(f"Processing document {doc_idx + 1}: {len(text)} characters") - """ + # Split the text using our simple chunking algorithm + text_chunks = split_text_into_chunks(text, max_chunk_size, overlap=20) + + # Create chunk outputs + for chunk_text in text_chunks: + citations = extract_citations(metadata) + + chunk_output = SearchIndexChunkingV1Output( + chunk_type=ChunkType.TEXT, + text=chunk_text.strip(), + seq_no=seq_no, + citations=citations, + ) + chunks.append(chunk_output) + + print(f"Chunk {seq_no}: {len(chunk_text)} chars") + seq_no += 1 + + print(f"Generated {len(chunks)} chunks total") - for item in items: - # Item is DocElement as dict - logger.info(f"Processing item: {item}") - - text = item.get("text", "") - metadata = item.get("metadata", {}) - - # Create chunks from the text - text_chunks = chunk_text(text, chunk_size=100) # Using a larger chunk size - - # Create chunk dictionaries for each text chunk - for chunk_content in text_chunks: - chunk_dict = { - "text": chunk_content, - "metadata": metadata, - "seq_no": current_seq_no, - "chunk_type": "text", - "chunk_id": str(uuid4()), - "tag_metadata": {}, - "citations": {}, - "source_record": item, - } - output_chunks.append(chunk_dict) - current_seq_no += 1 # Increment sequence number for next chunk - - logger.info("Completed chunking") - response = { - "output": output_chunks, - "status": {"status_type": "success", "status_message": "Chunking completed"}, - } - logger.info(response) - return response - - -# Test the function -if __name__ == "__main__": - # Configure logging - logging.basicConfig(level=logging.INFO) - - # Create test data with two DocElements - test_request = { - "input": [ - { - "text": ( - """This is the first sentence of the first document, which is - intentionally made longer to test chunking. """ - """Here is the second sentence of the first document, which is also - quite long and should ensure that the chunking function splits - this text into two chunks when the chunk size is set to 100.""" - ), - "metadata": {"source": "test1", "type": "document"}, - }, - { - "text": ( - """This is the first sentence of the second document, and it is - also extended to be longer than usual for testing purposes. """ - """The second sentence of the second document is similarly lengthy, - so that the chunking function will again create two chunks for - this document.""" - ), - "metadata": {"source": "test2", "type": "document"}, - }, - ] - } - - # Run the function - result = function(test_request, Runtime()) - - # Print the results in a more readable format - print("\nChunking Results:") - print("----------------") - for chunk in result["output"]: - print(f"\nChunk #{chunk['seq_no']}:") - print(f"Text: {chunk['text'][:100]}...") # Print first 100 chars of each chunk - print(f"Source: {chunk['metadata']['source']}") - print(f"Chunk ID: {chunk['chunk_id']}") + return SearchIndexChunkingV1Response(output=chunks) diff --git a/src/datacustomcode/templates/function/payload/utility.py b/src/datacustomcode/templates/function/payload/utility.py new file mode 100644 index 0000000..06c3dfd --- /dev/null +++ b/src/datacustomcode/templates/function/payload/utility.py @@ -0,0 +1,104 @@ +"""Utility functions for text chunking operations.""" + +import logging +from typing import ( + Dict, + List, + Optional, +) + +from datacustomcode.function.feature_types.chunking import SearchIndexChunkingV1Metadata + +logger = logging.getLogger(__name__) + + +def split_text_into_chunks(text: str, max_size: int, overlap: int = 20) -> List[str]: + """Split text into chunks with overlap, trying to break at natural boundaries. + + Tries to break at natural boundaries in order of preference: + 1. Paragraph boundaries (\\n\\n) + 2. Line boundaries (\\n) + 3. Sentence boundaries (. ! ?) + 4. Word boundaries (space) + 5. Hard cut if no good boundary found + + Args: + text: Text to split + max_size: Maximum characters per chunk + overlap: Number of characters to overlap between chunks + + Returns: + List of text chunks + """ + if len(text) <= max_size: + return [text] + + chunks = [] + start = 0 + + while start < len(text): + # Determine end position for this chunk + end = start + max_size + + if end >= len(text): + # Last chunk + chunks.append(text[start:]) + break + + # Try to find a good breaking point (in order of preference) + chunk_text = text[start:end] + break_point = None + + # Try to break at paragraph boundary (\n\n) + last_paragraph = chunk_text.rfind("\n\n") + if last_paragraph > max_size * 0.5: # Only if it's past halfway + break_point = start + last_paragraph + 2 # +2 to skip the \n\n + + # Try to break at line boundary (\n) + if break_point is None: + last_newline = chunk_text.rfind("\n") + if last_newline > max_size * 0.5: + break_point = start + last_newline + 1 + + # Try to break at sentence boundary (. ! ?) + if break_point is None: + for punct in [". ", "! ", "? "]: + last_sentence = chunk_text.rfind(punct) + if last_sentence > max_size * 0.5: + break_point = start + last_sentence + len(punct) + break + + # Try to break at word boundary (space) + if break_point is None: + last_space = chunk_text.rfind(" ") + if last_space > max_size * 0.5: + break_point = start + last_space + 1 + + # If no good breaking point, just hard cut + if break_point is None: + break_point = end + + chunks.append(text[start:break_point].strip()) + + # Move start position with overlap + start = max(break_point - overlap, start + 1) + + return chunks + + +def extract_citations( + metadata: Optional[SearchIndexChunkingV1Metadata], +) -> Dict[str, str]: + """Extract citations from document metadata. + + Args: + metadata: Document metadata containing source DMO fields + + Returns: + Dictionary of citation key-value pairs + """ + citations = {} + if metadata and metadata.source_dmo_fields: + for key, value in metadata.source_dmo_fields.items(): + citations[key] = str(value) + return citations diff --git a/tests/test_run.py b/tests/test_run.py index dcb8225..1eace88 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -237,6 +237,48 @@ def test_run_entrypoint_with_dependencies(): sys.path.remove(module_dir) +def test_add_py_folder_enables_local_imports(): + """Test that add_py_folder adds entrypoint directory to sys.path.""" + from datacustomcode.run import add_py_folder + + # Create a temporary directory structure + temp_dir = tempfile.mkdtemp() + + try: + # Create a utility module in the temp directory + utility_path = os.path.join(temp_dir, "utility.py") + with open(utility_path, "w") as f: + f.write("TEST_VALUE = 'local_module_works'\n") + + # Create an entrypoint file + entrypoint_path = os.path.join(temp_dir, "entrypoint.py") + with open(entrypoint_path, "w") as f: + f.write("# Test entrypoint\n") + + # Save original sys.path + original_path = sys.path.copy() + + # Call add_py_folder with relative path from current directory + relative_entrypoint = os.path.relpath(entrypoint_path) + add_py_folder(relative_entrypoint) + + # verify we can now import the utility module + import utility + + assert hasattr(utility, "TEST_VALUE"), "utility module should have TEST_VALUE" + assert ( + utility.TEST_VALUE == "local_module_works" + ), f"Expected 'local_module_works', got {utility.TEST_VALUE}" + + finally: + # Cleanup + sys.path = original_path + if "utility" in sys.modules: + del sys.modules["utility"] + if os.path.exists(temp_dir): + shutil.rmtree(temp_dir) + + class TestDataspaceScenarios: """Test dataspace functionality in run_entrypoint."""