diff --git a/lib/src/main/java/io/ably/lib/object/LiveObjectsPlugin.java b/lib/src/main/java/io/ably/lib/object/LiveObjectsPlugin.java new file mode 100644 index 000000000..20d78fc1f --- /dev/null +++ b/lib/src/main/java/io/ably/lib/object/LiveObjectsPlugin.java @@ -0,0 +1,109 @@ +package io.ably.lib.object; + +import io.ably.lib.object.adapter.AblyClientAdapter; +import io.ably.lib.object.adapter.Adapter; +import io.ably.lib.objects.RealtimeObjects; +import io.ably.lib.realtime.AblyRealtime; +import io.ably.lib.realtime.ChannelState; +import io.ably.lib.types.ProtocolMessage; +import io.ably.lib.util.Log; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.lang.reflect.InvocationTargetException; + +/** + * The LiveObjectsPlugin interface provides a mechanism for managing and interacting with + * live data objects in a real-time environment. It allows for the retrieval, disposal, and + * management of Objects instances associated with specific channel names. + */ +public interface LiveObjectsPlugin { + + /** + * Retrieves an instance of RealtimeObjects associated with the specified channel name. + * This method ensures that a RealtimeObjects instance is available for the given channel, + * creating one if it does not already exist. + * + * @param channelName the name of the channel for which the RealtimeObjects instance is to be retrieved. + * @return the RealtimeObjects instance associated with the specified channel name. + */ + @NotNull + RealtimeObjects getInstance(@NotNull String channelName); + + /** + * Handles a protocol message. + * This method is invoked whenever a protocol message is received, allowing the implementation + * to process the message and take appropriate actions. + * + * @param message the protocol message to handle. + */ + void handle(@NotNull ProtocolMessage message); + + /** + * Handles state changes for a specific channel. + * This method is invoked whenever a channel's state changes, allowing the implementation + * to update the RealtimeObjects instances accordingly based on the new state and presence of objects. + * + * @param channelName the name of the channel whose state has changed. + * @param state the new state of the channel. + * @param hasObjects flag indicates whether the channel has any associated objects. + */ + void handleStateChange(@NotNull String channelName, @NotNull ChannelState state, boolean hasObjects); + + /** + * Disposes of the RealtimeObjects instance associated with the specified channel name. + * This method removes the RealtimeObjects instance for the given channel, releasing any + * resources associated with it. + * This is invoked when ablyRealtimeClient.channels.release(channelName) is called + * + * @param channelName the name of the channel whose RealtimeObjects instance is to be removed. + */ + void dispose(@NotNull String channelName); + + /** + * Disposes of the plugin instance and all underlying resources. + * This is invoked when ablyRealtimeClient.close() is called + */ + void dispose(); + + /** + * Attempts to initialize the LiveObjects plugin by reflectively loading its implementation + * from the classpath. Returns a new plugin instance on every successful invocation, or + * {@code null} if the LiveObjects plugin is not present in the classpath. + * + * @param ablyRealtime the AblyRealtime client used to build the adapter the plugin runs against. + * @return a new {@link LiveObjectsPlugin} instance, or {@code null} if the plugin is unavailable. + */ + @Nullable + static LiveObjectsPlugin tryInitialize(@NotNull AblyRealtime ablyRealtime) { + return Factory.create(ablyRealtime); + } + + /** + * Reflectively constructs the LiveObjects plugin implementation. Lives in a nested class so the + * implementation-class name stays {@code private} (interface fields are forced {@code public}), + * mirroring {@link io.ably.lib.object.serialization.ObjectSerializer.Holder}. Unlike {@code Holder} + * this is stateless: {@link #create} returns a new instance on every call. + */ + final class Factory { + private static final String TAG = LiveObjectsPlugin.Factory.class.getName(); + private static final String IMPLEMENTATION_CLASS = "io.ably.lib.object.DefaultLiveObjectsPlugin"; + + private Factory() {} + + @Nullable + static LiveObjectsPlugin create(@NotNull AblyRealtime ablyRealtime) { + try { + Class objectsImplementation = Class.forName(IMPLEMENTATION_CLASS); + AblyClientAdapter adapter = new Adapter(ablyRealtime); + return (LiveObjectsPlugin) objectsImplementation + .getDeclaredConstructor(AblyClientAdapter.class) + .newInstance(adapter); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | + InvocationTargetException e) { + Log.i(TAG, "LiveObjects plugin not found in classpath. LiveObjects functionality will not be available.", e); + return null; + } + } + } +} diff --git a/lib/src/main/java/io/ably/lib/object/serialization/ObjectJsonSerializer.java b/lib/src/main/java/io/ably/lib/object/serialization/ObjectJsonSerializer.java new file mode 100644 index 000000000..8c8566490 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/object/serialization/ObjectJsonSerializer.java @@ -0,0 +1,39 @@ +package io.ably.lib.object.serialization; + +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonNull; +import com.google.gson.JsonParseException; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; +import io.ably.lib.util.Log; + +import java.lang.reflect.Type; + +public class ObjectJsonSerializer implements JsonSerializer, JsonDeserializer { + private static final String TAG = ObjectJsonSerializer.class.getName(); + + @Override + public Object[] deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException { + ObjectSerializer serializer = ObjectSerializer.tryGet(); + if (serializer == null) { + Log.w(TAG, "Skipping 'state' field json deserialization because ObjectSerializer not found."); + return null; + } + if (!json.isJsonArray()) { + throw new JsonParseException("Expected a JSON array for 'state' field, but got: " + json); + } + return serializer.readFromJsonArray(json.getAsJsonArray()); + } + + @Override + public JsonElement serialize(Object[] src, Type typeOfSrc, JsonSerializationContext context) { + ObjectSerializer serializer = ObjectSerializer.tryGet(); + if (serializer == null) { + Log.w(TAG, "Skipping 'state' field json serialization because ObjectSerializer not found."); + return JsonNull.INSTANCE; + } + return serializer.asJsonArray(src); + } +} diff --git a/lib/src/main/java/io/ably/lib/object/serialization/ObjectSerializer.java b/lib/src/main/java/io/ably/lib/object/serialization/ObjectSerializer.java new file mode 100644 index 000000000..78d237104 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/object/serialization/ObjectSerializer.java @@ -0,0 +1,98 @@ +package io.ably.lib.object.serialization; + +import com.google.gson.JsonArray; +import io.ably.lib.util.Log; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.msgpack.core.MessagePacker; +import org.msgpack.core.MessageUnpacker; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; + +/** + * Serializer interface for converting between objects and their MessagePack or JSON representations. + */ +public interface ObjectSerializer { + + /** + * Reads a MessagePack array from the given unpacker and deserializes it into an Object array. + * + * @param unpacker the MessageUnpacker to read from + * @return the deserialized Object array + * @throws IOException if an I/O error occurs during unpacking + */ + @NotNull + Object[] readMsgpackArray(@NotNull MessageUnpacker unpacker) throws IOException; + + /** + * Serializes the given Object array as a MessagePack array using the provided packer. + * + * @param objects the Object array to serialize + * @param packer the MessagePacker to write to + * @throws IOException if an I/O error occurs during packing + */ + void writeMsgpackArray(@NotNull Object[] objects, @NotNull MessagePacker packer) throws IOException; + + /** + * Reads a JSON array from the given {@link JsonArray} and deserializes it into an Object array. + * + * @param json the {@link JsonArray} representing the array to deserialize + * @return the deserialized Object array + */ + @NotNull + Object[] readFromJsonArray(@NotNull JsonArray json); + + /** + * Serializes the given Object array as a JSON array. + * + * @param objects the Object array to serialize + * @return the resulting JsonArray + */ + @NotNull + JsonArray asJsonArray(@NotNull Object[] objects); + + /** + * Returns the lazily-initialized, process-wide {@link ObjectSerializer} singleton, reflectively + * loaded from the LiveObjects plugin on the classpath. Returns {@code null} if the plugin is not + * present; the lookup is retried on subsequent calls until it succeeds. + * + * @return the shared {@link ObjectSerializer} instance, or {@code null} if the plugin is unavailable. + */ + @Nullable + static ObjectSerializer tryGet() { + return Holder.getSerializer(); + } + + /** + * Holds the lazily-initialized {@link ObjectSerializer} singleton. Interfaces cannot declare + * mutable static fields, so the cache lives here while {@link #tryGet()} delegates to it. + */ + final class Holder { + private static final String TAG = ObjectSerializer.Holder.class.getName(); + private static final String IMPLEMENTATION_CLASS = "io.ably.lib.object.serialization.DefaultObjectsSerializer"; + private static volatile ObjectSerializer objectsSerializer; + + private Holder() {} + + @Nullable + static ObjectSerializer getSerializer() { + if (objectsSerializer == null) { + synchronized (Holder.class) { + if (objectsSerializer == null) { // Double-Checked Locking (DCL) + try { + Class serializerClass = Class.forName(IMPLEMENTATION_CLASS); + objectsSerializer = (ObjectSerializer) serializerClass.getDeclaredConstructor().newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | + NoSuchMethodException | + InvocationTargetException e) { + Log.w(TAG, "Failed to init ObjectSerializer, LiveObjects plugin not included in the classpath", e); + return null; + } + } + } + } + return objectsSerializer; + } + } +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/object/message/WireObjectMessage.kt b/liveobjects/src/main/kotlin/io/ably/lib/object/message/WireObjectMessage.kt index 6d8ccd785..b6f2f63f4 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/object/message/WireObjectMessage.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/object/message/WireObjectMessage.kt @@ -3,6 +3,9 @@ package io.ably.lib.`object`.message import com.google.gson.Gson import com.google.gson.JsonElement import com.google.gson.JsonObject +import com.google.gson.annotations.JsonAdapter +import com.google.gson.annotations.SerializedName +import io.ably.lib.`object`.serialization.WireObjectDataJsonSerializer import java.nio.charset.StandardCharsets import java.util.Base64 @@ -36,6 +39,7 @@ internal enum class WireObjectsMapSemantics(val code: Int) { } /** Spec: OD1, OD2 - binary carried as base64 string on the wire */ +@JsonAdapter(WireObjectDataJsonSerializer::class) internal data class WireObjectData( val objectId: String? = null, // OD2a val string: String? = null, // OD2f @@ -145,6 +149,7 @@ internal data class WireObjectMessage( val connectionId: String? = null, // OM2c val extras: JsonObject? = null, // OM2d val operation: WireObjectOperation? = null, // OM2f + @SerializedName("object") val objectState: WireObjectState? = null, // OM2g - wire key "object" val serial: String? = null, // OM2h val serialTimestamp: Long? = null, // OM2j diff --git a/liveobjects/src/main/kotlin/io/ably/lib/object/serialization/DefaultSerialization.kt b/liveobjects/src/main/kotlin/io/ably/lib/object/serialization/DefaultSerialization.kt new file mode 100644 index 000000000..f410999fd --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/object/serialization/DefaultSerialization.kt @@ -0,0 +1,42 @@ +package io.ably.lib.`object`.serialization + +import com.google.gson.* +import io.ably.lib.`object`.message.WireObjectMessage +import org.msgpack.core.MessagePacker +import org.msgpack.core.MessageUnpacker + +/** + * Default implementation of {@link ObjectSerializer} that handles serialization/deserialization + * of WireObjectMessage arrays for both JSON and MessagePack formats using Gson and MessagePack. + * Dynamically loaded by ObjectSerializer#tryGet() to avoid hard dependencies. + */ +@Suppress("unused") // Used via reflection in ObjectSerializer.Holder +internal class DefaultObjectsSerializer : ObjectSerializer { + + override fun readMsgpackArray(unpacker: MessageUnpacker): Array { + val objectMessagesCount = unpacker.unpackArrayHeader() + return Array(objectMessagesCount) { readObjectMessage(unpacker) } + } + + override fun writeMsgpackArray(objects: Array, packer: MessagePacker) { + val objectMessages = objects.map { it as WireObjectMessage } + packer.packArrayHeader(objectMessages.size) + objectMessages.forEach { it.writeMsgpack(packer) } + } + + override fun readFromJsonArray(json: JsonArray): Array { + return json.map { element -> + if (element.isJsonObject) element.asJsonObject.toObjectMessage() + else throw JsonParseException("Expected JsonObject, but found: $element") + }.toTypedArray() + } + + override fun asJsonArray(objects: Array): JsonArray { + val objectMessages = objects.map { it as WireObjectMessage } + val jsonArray = JsonArray() + for (objectMessage in objectMessages) { + jsonArray.add(objectMessage.toJsonObject()) + } + return jsonArray + } +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/object/serialization/JsonSerialization.kt b/liveobjects/src/main/kotlin/io/ably/lib/object/serialization/JsonSerialization.kt new file mode 100644 index 000000000..cc5098cc5 --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/object/serialization/JsonSerialization.kt @@ -0,0 +1,67 @@ +package io.ably.lib.`object`.serialization + +import com.google.gson.* +import io.ably.lib.`object`.message.WireObjectData +import io.ably.lib.`object`.message.WireObjectMessage +import io.ably.lib.`object`.message.WireObjectOperationAction +import io.ably.lib.`object`.message.WireObjectsMapSemantics +import java.lang.reflect.Type +import kotlin.enums.EnumEntries + +// Gson instance for JSON serialization/deserialization +internal val gson = GsonBuilder() + .registerTypeAdapter(WireObjectOperationAction::class.java, EnumCodeTypeAdapter({ it.code }, WireObjectOperationAction.entries)) + .registerTypeAdapter(WireObjectsMapSemantics::class.java, EnumCodeTypeAdapter({ it.code }, WireObjectsMapSemantics.entries)) + .create() + +internal fun WireObjectMessage.toJsonObject(): JsonObject { + return gson.toJsonTree(this).asJsonObject +} + +internal fun JsonObject.toObjectMessage(): WireObjectMessage { + return gson.fromJson(this, WireObjectMessage::class.java) +} + +internal class EnumCodeTypeAdapter>( + private val getCode: (T) -> Int, + private val enumValues: EnumEntries +) : JsonSerializer, JsonDeserializer { + + override fun serialize(src: T, typeOfSrc: Type, context: JsonSerializationContext): JsonElement { + return JsonPrimitive(getCode(src)) + } + + override fun deserialize(json: JsonElement, typeOfT: Type, context: JsonDeserializationContext): T { + val code = json.asInt + return enumValues.firstOrNull { getCode(it) == code } ?: enumValues.firstOrNull { getCode(it) == -1 } + ?: throw JsonParseException("Unknown enum code: $code and no Unknown fallback found") + } +} + +internal class WireObjectDataJsonSerializer : JsonSerializer, JsonDeserializer { + override fun serialize(src: WireObjectData, typeOfSrc: Type?, context: JsonSerializationContext?): JsonElement { + val obj = JsonObject() + src.objectId?.let { obj.addProperty("objectId", it) } + src.string?.let { obj.addProperty("string", it) } + src.number?.let { obj.addProperty("number", it) } + src.boolean?.let { obj.addProperty("boolean", it) } + src.bytes?.let { obj.addProperty("bytes", it) } + src.json?.let { obj.addProperty("json", it.toString()) } // Spec: OD4c5 + return obj + } + + override fun deserialize(json: JsonElement, typeOfT: Type?, context: JsonDeserializationContext?): WireObjectData { + val obj = if (json.isJsonObject) json.asJsonObject else throw JsonParseException("Expected JsonObject") + val objectId = if (obj.has("objectId")) obj.get("objectId").asString else null + val string = if (obj.has("string")) obj.get("string").asString else null + val number = if (obj.has("number")) obj.get("number").asDouble else null + val boolean = if (obj.has("boolean")) obj.get("boolean").asBoolean else null + val bytes = if (obj.has("bytes")) obj.get("bytes").asString else null + val json = if (obj.has("json")) JsonParser.parseString(obj.get("json").asString) else null + + if (objectId == null && string == null && number == null && boolean == null && bytes == null && json == null) { + throw JsonParseException("Since objectId is not present, at least one of the value fields must be present") + } + return WireObjectData(objectId = objectId, string = string, number = number, boolean = boolean, bytes = bytes, json = json) + } +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/object/serialization/MsgpackSerialization.kt b/liveobjects/src/main/kotlin/io/ably/lib/object/serialization/MsgpackSerialization.kt new file mode 100644 index 000000000..849f41a4e --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/object/serialization/MsgpackSerialization.kt @@ -0,0 +1,916 @@ +package io.ably.lib.`object`.serialization + +import com.google.gson.JsonElement +import com.google.gson.JsonObject +import com.google.gson.JsonParser +import io.ably.lib.`object`.message.WireCounterCreate +import io.ably.lib.`object`.message.WireCounterCreateWithObjectId +import io.ably.lib.`object`.message.WireCounterInc +import io.ably.lib.`object`.message.WireMapClear +import io.ably.lib.`object`.message.WireMapCreate +import io.ably.lib.`object`.message.WireMapCreateWithObjectId +import io.ably.lib.`object`.message.WireMapRemove +import io.ably.lib.`object`.message.WireMapSet +import io.ably.lib.`object`.message.WireObjectData +import io.ably.lib.`object`.message.WireObjectDelete +import io.ably.lib.`object`.message.WireObjectMessage +import io.ably.lib.`object`.message.WireObjectOperation +import io.ably.lib.`object`.message.WireObjectOperationAction +import io.ably.lib.`object`.message.WireObjectState +import io.ably.lib.`object`.message.WireObjectsCounter +import io.ably.lib.`object`.message.WireObjectsMap +import io.ably.lib.`object`.message.WireObjectsMapEntry +import io.ably.lib.`object`.message.WireObjectsMapSemantics +import io.ably.lib.`object`.objectStateError +import io.ably.lib.util.Serialisation +import java.util.Base64 +import org.msgpack.core.MessageFormat +import org.msgpack.core.MessagePacker +import org.msgpack.core.MessageUnpacker + +/** + * Write WireObjectMessage to MessagePacker + */ +internal fun WireObjectMessage.writeMsgpack(packer: MessagePacker) { + var fieldCount = 0 + + if (id != null) fieldCount++ + if (timestamp != null) fieldCount++ + if (clientId != null) fieldCount++ + if (connectionId != null) fieldCount++ + if (extras != null) fieldCount++ + if (operation != null) fieldCount++ + if (objectState != null) fieldCount++ + if (serial != null) fieldCount++ + if (serialTimestamp != null) fieldCount++ + if (siteCode != null) fieldCount++ + + packer.packMapHeader(fieldCount) + + if (id != null) { + packer.packString("id") + packer.packString(id) + } + + if (timestamp != null) { + packer.packString("timestamp") + packer.packLong(timestamp) + } + + if (clientId != null) { + packer.packString("clientId") + packer.packString(clientId) + } + + if (connectionId != null) { + packer.packString("connectionId") + packer.packString(connectionId) + } + + if (extras != null) { + packer.packString("extras") + packer.writePayload(Serialisation.gsonToMsgpack(extras)) + } + + if (operation != null) { + packer.packString("operation") + operation.writeMsgpack(packer) + } + + if (objectState != null) { + packer.packString("object") + objectState.writeMsgpack(packer) + } + + if (serial != null) { + packer.packString("serial") + packer.packString(serial) + } + + if (serialTimestamp != null) { + packer.packString("serialTimestamp") + packer.packLong(serialTimestamp) + } + + if (siteCode != null) { + packer.packString("siteCode") + packer.packString(siteCode) + } +} + +/** + * Read a WireObjectMessage from MessageUnpacker + */ +internal fun readObjectMessage(unpacker: MessageUnpacker): WireObjectMessage { + if (unpacker.nextFormat == MessageFormat.NIL) { + unpacker.unpackNil() + return WireObjectMessage() // default/empty message + } + + val fieldCount = unpacker.unpackMapHeader() + + var id: String? = null + var timestamp: Long? = null + var clientId: String? = null + var connectionId: String? = null + var extras: JsonObject? = null + var operation: WireObjectOperation? = null + var objectState: WireObjectState? = null + var serial: String? = null + var serialTimestamp: Long? = null + var siteCode: String? = null + + for (i in 0 until fieldCount) { + val fieldName = unpacker.unpackString().intern() + val fieldFormat = unpacker.nextFormat + + if (fieldFormat == MessageFormat.NIL) { + unpacker.unpackNil() + continue + } + + when (fieldName) { + "id" -> id = unpacker.unpackString() + "timestamp" -> timestamp = unpacker.unpackLong() + "clientId" -> clientId = unpacker.unpackString() + "connectionId" -> connectionId = unpacker.unpackString() + "extras" -> extras = Serialisation.msgpackToGson(unpacker.unpackValue()) as? JsonObject + "operation" -> operation = readObjectOperation(unpacker) + "object" -> objectState = readObjectState(unpacker) + "serial" -> serial = unpacker.unpackString() + "serialTimestamp" -> serialTimestamp = unpacker.unpackLong() + "siteCode" -> siteCode = unpacker.unpackString() + else -> unpacker.skipValue() + } + } + + return WireObjectMessage( + id = id, + timestamp = timestamp, + clientId = clientId, + connectionId = connectionId, + extras = extras, + operation = operation, + objectState = objectState, + serial = serial, + serialTimestamp = serialTimestamp, + siteCode = siteCode + ) +} + +/** + * Write WireObjectOperation to MessagePacker + */ +private fun WireObjectOperation.writeMsgpack(packer: MessagePacker) { + var fieldCount = 1 // action is always required + require(objectId.isNotEmpty()) { "objectId must be non-empty per Objects protocol" } + fieldCount++ + + if (mapCreate != null) fieldCount++ + if (mapSet != null) fieldCount++ + if (mapRemove != null) fieldCount++ + if (counterCreate != null) fieldCount++ + if (counterInc != null) fieldCount++ + if (objectDelete != null) fieldCount++ + if (mapCreateWithObjectId != null) fieldCount++ + if (counterCreateWithObjectId != null) fieldCount++ + if (mapClear != null) fieldCount++ + + packer.packMapHeader(fieldCount) + + packer.packString("action") + packer.packInt(action.code) + + // Always include objectId as per Objects protocol + packer.packString("objectId") + packer.packString(objectId) + + if (mapCreate != null) { + packer.packString("mapCreate") + mapCreate.writeMsgpack(packer) + } + + if (mapSet != null) { + packer.packString("mapSet") + mapSet.writeMsgpack(packer) + } + + if (mapRemove != null) { + packer.packString("mapRemove") + mapRemove.writeMsgpack(packer) + } + + if (counterCreate != null) { + packer.packString("counterCreate") + counterCreate.writeMsgpack(packer) + } + + if (counterInc != null) { + packer.packString("counterInc") + counterInc.writeMsgpack(packer) + } + + if (objectDelete != null) { + packer.packString("objectDelete") + packer.packMapHeader(0) // empty map + } + + if (mapCreateWithObjectId != null) { + packer.packString("mapCreateWithObjectId") + mapCreateWithObjectId.writeMsgpack(packer) + } + + if (counterCreateWithObjectId != null) { + packer.packString("counterCreateWithObjectId") + counterCreateWithObjectId.writeMsgpack(packer) + } + + if (mapClear != null) { + packer.packString("mapClear") + packer.packMapHeader(0) // empty map, no fields + } + +} + +/** + * Read WireObjectOperation from MessageUnpacker + */ +private fun readObjectOperation(unpacker: MessageUnpacker): WireObjectOperation { + val fieldCount = unpacker.unpackMapHeader() + + var action: WireObjectOperationAction? = null + var objectId: String = "" + var mapCreate: WireMapCreate? = null + var mapSet: WireMapSet? = null + var mapRemove: WireMapRemove? = null + var counterCreate: WireCounterCreate? = null + var counterInc: WireCounterInc? = null + var objectDelete: WireObjectDelete? = null + var mapCreateWithObjectId: WireMapCreateWithObjectId? = null + var counterCreateWithObjectId: WireCounterCreateWithObjectId? = null + var mapClear: WireMapClear? = null + + for (i in 0 until fieldCount) { + val fieldName = unpacker.unpackString().intern() + val fieldFormat = unpacker.nextFormat + + if (fieldFormat == MessageFormat.NIL) { + unpacker.unpackNil() + continue + } + + when (fieldName) { + "action" -> { + val actionCode = unpacker.unpackInt() + action = WireObjectOperationAction.entries.firstOrNull { it.code == actionCode } + ?: WireObjectOperationAction.entries.firstOrNull { it.code == -1 } + ?: throw objectStateError("Unknown WireObjectOperationAction code: $actionCode and no Unknown fallback found") + } + "objectId" -> objectId = unpacker.unpackString() + "mapCreate" -> mapCreate = readMapCreate(unpacker) + "mapSet" -> mapSet = readMapSet(unpacker) + "mapRemove" -> mapRemove = readMapRemove(unpacker) + "counterCreate" -> counterCreate = readCounterCreate(unpacker) + "counterInc" -> counterInc = readCounterInc(unpacker) + "objectDelete" -> { + unpacker.skipValue() // empty map, just consume it + objectDelete = WireObjectDelete + } + "mapCreateWithObjectId" -> mapCreateWithObjectId = readMapCreateWithObjectId(unpacker) + "counterCreateWithObjectId" -> counterCreateWithObjectId = readCounterCreateWithObjectId(unpacker) + "mapClear" -> { + unpacker.skipValue() // empty map, consume it + mapClear = WireMapClear + } + else -> unpacker.skipValue() + } + } + + if (action == null) { + throw objectStateError("Missing required 'action' field in WireObjectOperation") + } + + if (objectId.isEmpty()) { + throw objectStateError("Missing required 'objectId' field in WireObjectOperation") + } + + return WireObjectOperation( + action = action, + objectId = objectId, + mapCreate = mapCreate, + mapSet = mapSet, + mapRemove = mapRemove, + counterCreate = counterCreate, + counterInc = counterInc, + objectDelete = objectDelete, + mapCreateWithObjectId = mapCreateWithObjectId, + counterCreateWithObjectId = counterCreateWithObjectId, + mapClear = mapClear, + ) +} + +/** + * Write WireObjectState to MessagePacker + */ +private fun WireObjectState.writeMsgpack(packer: MessagePacker) { + var fieldCount = 3 // objectId, siteTimeserials, and tombstone are required + + if (createOp != null) fieldCount++ + if (map != null) fieldCount++ + if (counter != null) fieldCount++ + + packer.packMapHeader(fieldCount) + + packer.packString("objectId") + packer.packString(objectId) + + packer.packString("siteTimeserials") + packer.packMapHeader(siteTimeserials.size) + for ((key, value) in siteTimeserials) { + packer.packString(key) + packer.packString(value) + } + + packer.packString("tombstone") + packer.packBoolean(tombstone) + + if (createOp != null) { + packer.packString("createOp") + createOp.writeMsgpack(packer) + } + + if (map != null) { + packer.packString("map") + map.writeMsgpack(packer) + } + + if (counter != null) { + packer.packString("counter") + counter.writeMsgpack(packer) + } +} + +/** + * Read WireObjectState from MessageUnpacker + */ +private fun readObjectState(unpacker: MessageUnpacker): WireObjectState { + val fieldCount = unpacker.unpackMapHeader() + + var objectId = "" + var siteTimeserials = mapOf() + var tombstone = false + var createOp: WireObjectOperation? = null + var map: WireObjectsMap? = null + var counter: WireObjectsCounter? = null + + for (i in 0 until fieldCount) { + val fieldName = unpacker.unpackString().intern() + val fieldFormat = unpacker.nextFormat + + if (fieldFormat == MessageFormat.NIL) { + unpacker.unpackNil() + continue + } + + when (fieldName) { + "objectId" -> objectId = unpacker.unpackString() + "siteTimeserials" -> { + val mapSize = unpacker.unpackMapHeader() + val tempMap = mutableMapOf() + for (j in 0 until mapSize) { + val key = unpacker.unpackString() + val value = unpacker.unpackString() + tempMap[key] = value + } + siteTimeserials = tempMap + } + "tombstone" -> tombstone = unpacker.unpackBoolean() + "createOp" -> createOp = readObjectOperation(unpacker) + "map" -> map = readObjectMap(unpacker) + "counter" -> counter = readObjectCounter(unpacker) + else -> unpacker.skipValue() + } + } + + if (objectId.isEmpty()) { + throw objectStateError("Missing required 'objectId' field in WireObjectState") + } + + return WireObjectState( + objectId = objectId, + siteTimeserials = siteTimeserials, + tombstone = tombstone, + createOp = createOp, + map = map, + counter = counter + ) +} + +/** + * Write WireMapCreate to MessagePacker + */ +private fun WireMapCreate.writeMsgpack(packer: MessagePacker) { + packer.packMapHeader(2) + packer.packString("semantics") + packer.packInt(semantics.code) + packer.packString("entries") + packer.packMapHeader(entries.size) + for ((key, value) in entries) { + packer.packString(key) + value.writeMsgpack(packer) + } +} + +/** + * Read WireMapCreate from MessageUnpacker + */ +private fun readMapCreate(unpacker: MessageUnpacker): WireMapCreate { + val fieldCount = unpacker.unpackMapHeader() + var semantics: WireObjectsMapSemantics = WireObjectsMapSemantics.LWW + var entries: Map = emptyMap() + + for (i in 0 until fieldCount) { + val fieldName = unpacker.unpackString().intern() + val fieldFormat = unpacker.nextFormat + if (fieldFormat == MessageFormat.NIL) { unpacker.unpackNil(); continue } + when (fieldName) { + "semantics" -> { + val code = unpacker.unpackInt() + semantics = WireObjectsMapSemantics.entries.firstOrNull { it.code == code } + ?: WireObjectsMapSemantics.entries.firstOrNull { it.code == -1 } + ?: throw objectStateError("Unknown MapSemantics code: $code and no UNKNOWN fallback found") + } + "entries" -> { + val mapSize = unpacker.unpackMapHeader() + val tempMap = mutableMapOf() + for (j in 0 until mapSize) { + tempMap[unpacker.unpackString()] = readObjectMapEntry(unpacker) + } + entries = tempMap + } + else -> unpacker.skipValue() + } + } + return WireMapCreate(semantics = semantics, entries = entries) +} + +/** + * Write WireMapSet to MessagePacker + */ +private fun WireMapSet.writeMsgpack(packer: MessagePacker) { + packer.packMapHeader(2) + packer.packString("key") + packer.packString(key) + packer.packString("value") + value.writeMsgpack(packer) +} + +/** + * Read WireMapSet from MessageUnpacker + */ +private fun readMapSet(unpacker: MessageUnpacker): WireMapSet { + val fieldCount = unpacker.unpackMapHeader() + var key: String? = null + var value: WireObjectData? = null + + for (i in 0 until fieldCount) { + val fieldName = unpacker.unpackString().intern() + val fieldFormat = unpacker.nextFormat + if (fieldFormat == MessageFormat.NIL) { unpacker.unpackNil(); continue } + when (fieldName) { + "key" -> key = unpacker.unpackString() + "value" -> value = readObjectData(unpacker) + else -> unpacker.skipValue() + } + } + return WireMapSet( + key = key ?: throw objectStateError("Missing 'key' in WireMapSet payload"), + value = value ?: throw objectStateError("Missing 'value' in WireMapSet payload") + ) +} + +/** + * Write WireMapRemove to MessagePacker + */ +private fun WireMapRemove.writeMsgpack(packer: MessagePacker) { + packer.packMapHeader(1) + packer.packString("key") + packer.packString(key) +} + +/** + * Read WireMapRemove from MessageUnpacker + */ +private fun readMapRemove(unpacker: MessageUnpacker): WireMapRemove { + val fieldCount = unpacker.unpackMapHeader() + var key: String? = null + + for (i in 0 until fieldCount) { + val fieldName = unpacker.unpackString().intern() + val fieldFormat = unpacker.nextFormat + if (fieldFormat == MessageFormat.NIL) { unpacker.unpackNil(); continue } + when (fieldName) { + "key" -> key = unpacker.unpackString() + else -> unpacker.skipValue() + } + } + return WireMapRemove(key = key ?: throw objectStateError("Missing 'key' in WireMapRemove payload")) +} + +/** + * Write WireCounterCreate to MessagePacker + */ +private fun WireCounterCreate.writeMsgpack(packer: MessagePacker) { + packer.packMapHeader(1) + packer.packString("count") + packer.packDouble(count) +} + +/** + * Read WireCounterCreate from MessageUnpacker + */ +private fun readCounterCreate(unpacker: MessageUnpacker): WireCounterCreate { + val fieldCount = unpacker.unpackMapHeader() + var count: Double? = null + + for (i in 0 until fieldCount) { + val fieldName = unpacker.unpackString().intern() + val fieldFormat = unpacker.nextFormat + if (fieldFormat == MessageFormat.NIL) { unpacker.unpackNil(); continue } + when (fieldName) { + "count" -> count = unpacker.unpackDouble() + else -> unpacker.skipValue() + } + } + return WireCounterCreate(count = count ?: throw objectStateError("Missing 'count' in WireCounterCreate payload")) +} + +/** + * Write WireCounterInc to MessagePacker + */ +private fun WireCounterInc.writeMsgpack(packer: MessagePacker) { + packer.packMapHeader(1) + packer.packString("number") + packer.packDouble(number) +} + +/** + * Read WireCounterInc from MessageUnpacker + */ +private fun readCounterInc(unpacker: MessageUnpacker): WireCounterInc { + val fieldCount = unpacker.unpackMapHeader() + var number: Double? = null + + for (i in 0 until fieldCount) { + val fieldName = unpacker.unpackString().intern() + val fieldFormat = unpacker.nextFormat + if (fieldFormat == MessageFormat.NIL) { unpacker.unpackNil(); continue } + when (fieldName) { + "number" -> number = unpacker.unpackDouble() + else -> unpacker.skipValue() + } + } + return WireCounterInc(number = number ?: throw objectStateError("Missing 'number' in WireCounterInc payload")) +} + +/** + * Write WireMapCreateWithObjectId to MessagePacker + */ +private fun WireMapCreateWithObjectId.writeMsgpack(packer: MessagePacker) { + packer.packMapHeader(2) + packer.packString("initialValue") + packer.packString(initialValue) + packer.packString("nonce") + packer.packString(nonce) +} + +/** + * Read WireMapCreateWithObjectId from MessageUnpacker + */ +private fun readMapCreateWithObjectId(unpacker: MessageUnpacker): WireMapCreateWithObjectId { + val fieldCount = unpacker.unpackMapHeader() + var initialValue: String? = null + var nonce: String? = null + + for (i in 0 until fieldCount) { + val fieldName = unpacker.unpackString().intern() + val fieldFormat = unpacker.nextFormat + if (fieldFormat == MessageFormat.NIL) { unpacker.unpackNil(); continue } + when (fieldName) { + "initialValue" -> initialValue = unpacker.unpackString() + "nonce" -> nonce = unpacker.unpackString() + else -> unpacker.skipValue() + } + } + return WireMapCreateWithObjectId( + initialValue = initialValue ?: throw objectStateError("Missing 'initialValue' in WireMapCreateWithObjectId payload"), + nonce = nonce ?: throw objectStateError("Missing 'nonce' in WireMapCreateWithObjectId payload") + ) +} + +/** + * Write WireCounterCreateWithObjectId to MessagePacker + */ +private fun WireCounterCreateWithObjectId.writeMsgpack(packer: MessagePacker) { + packer.packMapHeader(2) + packer.packString("initialValue") + packer.packString(initialValue) + packer.packString("nonce") + packer.packString(nonce) +} + +/** + * Read WireCounterCreateWithObjectId from MessageUnpacker + */ +private fun readCounterCreateWithObjectId(unpacker: MessageUnpacker): WireCounterCreateWithObjectId { + val fieldCount = unpacker.unpackMapHeader() + var initialValue: String? = null + var nonce: String? = null + + for (i in 0 until fieldCount) { + val fieldName = unpacker.unpackString().intern() + val fieldFormat = unpacker.nextFormat + if (fieldFormat == MessageFormat.NIL) { unpacker.unpackNil(); continue } + when (fieldName) { + "initialValue" -> initialValue = unpacker.unpackString() + "nonce" -> nonce = unpacker.unpackString() + else -> unpacker.skipValue() + } + } + return WireCounterCreateWithObjectId( + initialValue = initialValue ?: throw objectStateError("Missing 'initialValue' in WireCounterCreateWithObjectId payload"), + nonce = nonce ?: throw objectStateError("Missing 'nonce' in WireCounterCreateWithObjectId payload") + ) +} + +/** + * Write ObjectMap to MessagePacker + */ +private fun WireObjectsMap.writeMsgpack(packer: MessagePacker) { + var fieldCount = 0 + + if (semantics != null) fieldCount++ + if (entries != null) fieldCount++ + if (clearTimeserial != null) fieldCount++ + + packer.packMapHeader(fieldCount) + + if (semantics != null) { + packer.packString("semantics") + packer.packInt(semantics.code) + } + + if (entries != null) { + packer.packString("entries") + packer.packMapHeader(entries.size) + for ((key, value) in entries) { + packer.packString(key) + value.writeMsgpack(packer) + } + } + + if (clearTimeserial != null) { + packer.packString("clearTimeserial") + packer.packString(clearTimeserial) + } +} + +/** + * Read ObjectMap from MessageUnpacker + */ +private fun readObjectMap(unpacker: MessageUnpacker): WireObjectsMap { + val fieldCount = unpacker.unpackMapHeader() + + var semantics: WireObjectsMapSemantics? = null + var entries: Map? = null + var clearTimeserial: String? = null + + for (i in 0 until fieldCount) { + val fieldName = unpacker.unpackString().intern() + val fieldFormat = unpacker.nextFormat + + if (fieldFormat == MessageFormat.NIL) { + unpacker.unpackNil() + continue + } + + when (fieldName) { + "semantics" -> { + val semanticsCode = unpacker.unpackInt() + semantics = WireObjectsMapSemantics.entries.firstOrNull { it.code == semanticsCode } + ?: WireObjectsMapSemantics.entries.firstOrNull { it.code == -1 } + ?: throw objectStateError("Unknown MapSemantics code: $semanticsCode and no UNKNOWN fallback found") + } + "entries" -> { + val mapSize = unpacker.unpackMapHeader() + val tempMap = mutableMapOf() + for (j in 0 until mapSize) { + val key = unpacker.unpackString() + val value = readObjectMapEntry(unpacker) + tempMap[key] = value + } + entries = tempMap + } + "clearTimeserial" -> clearTimeserial = unpacker.unpackString() + else -> unpacker.skipValue() + } + } + + return WireObjectsMap(semantics = semantics, entries = entries, clearTimeserial = clearTimeserial) +} + +/** + * Write ObjectCounter to MessagePacker + */ +private fun WireObjectsCounter.writeMsgpack(packer: MessagePacker) { + var fieldCount = 0 + + if (count != null) fieldCount++ + + packer.packMapHeader(fieldCount) + + if (count != null) { + packer.packString("count") + packer.packDouble(count) + } +} + +/** + * Read ObjectCounter from MessageUnpacker + */ +private fun readObjectCounter(unpacker: MessageUnpacker): WireObjectsCounter { + val fieldCount = unpacker.unpackMapHeader() + + var count: Double? = null + + for (i in 0 until fieldCount) { + val fieldName = unpacker.unpackString().intern() + val fieldFormat = unpacker.nextFormat + + if (fieldFormat == MessageFormat.NIL) { + unpacker.unpackNil() + continue + } + + when (fieldName) { + "count" -> count = unpacker.unpackDouble() + else -> unpacker.skipValue() + } + } + + return WireObjectsCounter(count = count) +} + +/** + * Write ObjectMapEntry to MessagePacker + */ +private fun WireObjectsMapEntry.writeMsgpack(packer: MessagePacker) { + var fieldCount = 0 + + if (tombstone != null) fieldCount++ + if (timeserial != null) fieldCount++ + if (serialTimestamp != null) fieldCount++ + if (data != null) fieldCount++ + + packer.packMapHeader(fieldCount) + + if (tombstone != null) { + packer.packString("tombstone") + packer.packBoolean(tombstone) + } + + if (timeserial != null) { + packer.packString("timeserial") + packer.packString(timeserial) + } + + if (serialTimestamp != null) { + packer.packString("serialTimestamp") + packer.packLong(serialTimestamp) + } + + if (data != null) { + packer.packString("data") + data.writeMsgpack(packer) + } +} + +/** + * Read ObjectMapEntry from MessageUnpacker + */ +private fun readObjectMapEntry(unpacker: MessageUnpacker): WireObjectsMapEntry { + val fieldCount = unpacker.unpackMapHeader() + + var tombstone: Boolean? = null + var timeserial: String? = null + var serialTimestamp: Long? = null + var data: WireObjectData? = null + + for (i in 0 until fieldCount) { + val fieldName = unpacker.unpackString().intern() + val fieldFormat = unpacker.nextFormat + + if (fieldFormat == MessageFormat.NIL) { + unpacker.unpackNil() + continue + } + + when (fieldName) { + "tombstone" -> tombstone = unpacker.unpackBoolean() + "timeserial" -> timeserial = unpacker.unpackString() + "serialTimestamp" -> serialTimestamp = unpacker.unpackLong() + "data" -> data = readObjectData(unpacker) + else -> unpacker.skipValue() + } + } + + return WireObjectsMapEntry(tombstone = tombstone, timeserial = timeserial, serialTimestamp = serialTimestamp, data = data) +} + +/** + * Write WireObjectData to MessagePacker + */ +private fun WireObjectData.writeMsgpack(packer: MessagePacker) { + var fieldCount = 0 + + if (objectId != null) fieldCount++ + if (string != null) fieldCount++ + if (number != null) fieldCount++ + if (boolean != null) fieldCount++ + if (bytes != null) fieldCount++ + if (json != null) fieldCount++ + + packer.packMapHeader(fieldCount) + + if (objectId != null) { + packer.packString("objectId") + packer.packString(objectId) + } + + if (string != null) { + packer.packString("string") + packer.packString(string) + } + + if (number != null) { + packer.packString("number") + packer.packDouble(number) + } + + if (boolean != null) { + packer.packString("boolean") + packer.packBoolean(boolean) + } + + if (bytes != null) { + val rawBytes = Base64.getDecoder().decode(bytes) + packer.packString("bytes") + packer.packBinaryHeader(rawBytes.size) + packer.writePayload(rawBytes) + } + + if (json != null) { + packer.packString("json") + packer.packString(json.toString()) + } +} + +/** + * Read WireObjectData from MessageUnpacker + */ +private fun readObjectData(unpacker: MessageUnpacker): WireObjectData { + val fieldCount = unpacker.unpackMapHeader() + var objectId: String? = null + var string: String? = null + var number: Double? = null + var boolean: Boolean? = null + var bytes: String? = null + var json: JsonElement? = null + + for (i in 0 until fieldCount) { + val fieldName = unpacker.unpackString().intern() + val fieldFormat = unpacker.nextFormat + + if (fieldFormat == MessageFormat.NIL) { + unpacker.unpackNil() + continue + } + + when (fieldName) { + "objectId" -> objectId = unpacker.unpackString() + "string" -> string = unpacker.unpackString() + "number" -> number = unpacker.unpackDouble() + "boolean" -> boolean = unpacker.unpackBoolean() + "bytes" -> { + val size = unpacker.unpackBinaryHeader() + val rawBytes = ByteArray(size) + unpacker.readPayload(rawBytes) + bytes = Base64.getEncoder().encodeToString(rawBytes) + } + "json" -> json = JsonParser.parseString(unpacker.unpackString()) + else -> unpacker.skipValue() + } + } + + return WireObjectData(objectId = objectId, string = string, number = number, boolean = boolean, bytes = bytes, json = json) +}