Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ click = "^8.1.8"
loguru = "^0.7.3"
numpy = "*"
pandas = "*"
pipreqs = "*"
pydantic = "2.13.1"
pyspark = "3.5.1"
python = ">=3.10,<3.12"
Expand All @@ -115,6 +114,7 @@ build = "*"
coverage = ">=7.0.0,<8.0.0"
ipykernel = "^6.29.5"
mypy = "*"
pipreqs = "*"
poetry-dynamic-versioning = {extras = ["plugin"], version = "^1.8.2"}
pre-commit = "*"
pytest = "*"
Expand Down
18 changes: 15 additions & 3 deletions src/datacustomcode/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,20 @@ def run_function_with_test(entrypoint: str, test_file: str) -> None:


def add_py_folder(entrypoint: str):
default_py_folder = "py-files" # Hardcoded folder name
"""Add py-files subfolder and entrypoint directory to sys.path.

This ensures:
1. py-files/ is available for additional dependencies
2. The entrypoint directory is available for local module imports
"""
default_py_folder = "py-files"
cwd = Path.cwd().joinpath(entrypoint)
py_folder = cwd.parent.joinpath(default_py_folder)
entrypoint_dir = cwd.parent
py_folder = entrypoint_dir.joinpath(default_py_folder)

# Add py-files folder if it exists
if py_folder.exists():
sys.path.insert(0, str(py_folder))

sys.path.append(str(py_folder))
# Add entrypoint directory to allow local module imports
sys.path.insert(0, str(entrypoint_dir))
208 changes: 51 additions & 157 deletions src/datacustomcode/templates/function/payload/entrypoint.py
Original file line number Diff line number Diff line change
@@ -1,173 +1,67 @@
import logging
from typing import List
from uuid import uuid4

from datacustomcode.einstein_predictions.types import (
PredictionColumBuilder,
PredictionRequestBuilder,
PredictionType,
)
from utility import extract_citations, split_text_into_chunks

from datacustomcode.function import Runtime
from datacustomcode.llm_gateway.types.generate_text_request_builder import (
GenerateTextRequestBuilder,
from datacustomcode.function.feature_types.chunking import (
ChunkType,
SearchIndexChunkingV1Output,
SearchIndexChunkingV1Request,
SearchIndexChunkingV1Response,
)

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

# Default max chunk size (can be overridden if contract adds max_characters field)
DEFAULT_MAX_CHUNK_SIZE = 50

def chunk_text(text: str, chunk_size: int = 1000) -> List[str]:
"""
Split text into chunks of approximately chunk_size characters.
Tries to split at sentence boundaries when possible.

def function(
request: SearchIndexChunkingV1Request, runtime: Runtime
) -> SearchIndexChunkingV1Response:
"""Chunk documents into smaller pieces for search indexing.

Args:
request: SearchIndexChunkingV1Request with input documents
runtime: Runtime context (unused but required by contract)

Returns:
SearchIndexChunkingV1Response with chunked output
"""
if not text:
return []
print(f"Received {len(request.input)} documents to chunk")

chunks = []
current_chunk = ""

# Split text into sentences (simple split by period)
sentences = text.split(". ")

for sentence in sentences:
if len(current_chunk) + len(sentence) <= chunk_size:
current_chunk += sentence + ". "
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence + ". "

if current_chunk:
chunks.append(current_chunk.strip())

return chunks


def make_einstein_prediction(runtime: Runtime) -> None:
column = (
PredictionColumBuilder()
.set_column_name("col1")
.set_string_values(["str1", "str2"])
.build()
)
prediction_request = (
PredictionRequestBuilder()
.set_prediction_type(PredictionType.REGRESSION)
.set_model_api_name("regressionModel")
.set_prediction_columns([column])
.build()
)

prediction_response = runtime.einstein_predictions.predict(prediction_request)
logger.info(
f"Einstein prediction results - success: [{prediction_response.is_success}] "
f"response data: {prediction_response.data}"
)


def generate_text(runtime: Runtime, prompt: str, model: str = "sfdc_ai__DefaultGPT52"):
builder = GenerateTextRequestBuilder()
llm_request = builder.set_prompt(prompt).set_model(model).build()
llm_response = runtime.llm_gateway.generate_text(llm_request)
logger.info(
f"LLM Gateway generate text results - success: [{llm_response.is_success}] "
f"response data: {llm_response.data}"
)


def function(request: dict, runtime: Runtime) -> dict:
logger.info("Inside Function")
logger.info(request)

items = request["input"]
output_chunks = []
current_seq_no = 1 # Start sequence number from 1
seq_no = 1

"""
You can use your AI models configured in Salesforce to generate texts
or predict an outcome. See README.md for how to test locally before
deploying to Data Cloud.
# Use default max chunk size
max_chunk_size = DEFAULT_MAX_CHUNK_SIZE

Example:
# Process each document
for doc_idx, doc in enumerate(request.input):
text = doc.text
metadata = doc.metadata

>>> generated_text = generate_text(runtime, "Generate a greeting message")
... prediction = make_einstein_prediction(runtime)
print(f"Processing document {doc_idx + 1}: {len(text)} characters")

"""
# Split the text using our simple chunking algorithm
text_chunks = split_text_into_chunks(text, max_chunk_size, overlap=20)

# Create chunk outputs
for chunk_text in text_chunks:
citations = extract_citations(metadata)

chunk_output = SearchIndexChunkingV1Output(
chunk_type=ChunkType.TEXT,
text=chunk_text.strip(),
seq_no=seq_no,
citations=citations,
)
chunks.append(chunk_output)

print(f"Chunk {seq_no}: {len(chunk_text)} chars")
seq_no += 1

print(f"Generated {len(chunks)} chunks total")

for item in items:
# Item is DocElement as dict
logger.info(f"Processing item: {item}")

text = item.get("text", "")
metadata = item.get("metadata", {})

# Create chunks from the text
text_chunks = chunk_text(text, chunk_size=100) # Using a larger chunk size

# Create chunk dictionaries for each text chunk
for chunk_content in text_chunks:
chunk_dict = {
"text": chunk_content,
"metadata": metadata,
"seq_no": current_seq_no,
"chunk_type": "text",
"chunk_id": str(uuid4()),
"tag_metadata": {},
"citations": {},
"source_record": item,
}
output_chunks.append(chunk_dict)
current_seq_no += 1 # Increment sequence number for next chunk

logger.info("Completed chunking")
response = {
"output": output_chunks,
"status": {"status_type": "success", "status_message": "Chunking completed"},
}
logger.info(response)
return response


# Test the function
if __name__ == "__main__":
# Configure logging
logging.basicConfig(level=logging.INFO)

# Create test data with two DocElements
test_request = {
"input": [
{
"text": (
"""This is the first sentence of the first document, which is
intentionally made longer to test chunking. """
"""Here is the second sentence of the first document, which is also
quite long and should ensure that the chunking function splits
this text into two chunks when the chunk size is set to 100."""
),
"metadata": {"source": "test1", "type": "document"},
},
{
"text": (
"""This is the first sentence of the second document, and it is
also extended to be longer than usual for testing purposes. """
"""The second sentence of the second document is similarly lengthy,
so that the chunking function will again create two chunks for
this document."""
),
"metadata": {"source": "test2", "type": "document"},
},
]
}

# Run the function
result = function(test_request, Runtime())

# Print the results in a more readable format
print("\nChunking Results:")
print("----------------")
for chunk in result["output"]:
print(f"\nChunk #{chunk['seq_no']}:")
print(f"Text: {chunk['text'][:100]}...") # Print first 100 chars of each chunk
print(f"Source: {chunk['metadata']['source']}")
print(f"Chunk ID: {chunk['chunk_id']}")
return SearchIndexChunkingV1Response(output=chunks)
Loading
Loading