diff --git a/.gitignore b/.gitignore index f9f3c6f..50c67bc 100644 --- a/.gitignore +++ b/.gitignore @@ -34,7 +34,9 @@ SampleProject/Library/* SampleProject/obj/* -SampleProject/ProjectSettings/* - SampleProject/SampleProject.sln SampleProject/Temp/* +nginx-luajit-ws/logs/error.log +nginx-luajit-ws/logs/host.access.log +nginx-luajit-ws/logs/nginx.pid +SampleProject/iOS/* diff --git a/SampleProject/Assets/Editor/Disquuun/Disquuun.cs b/SampleProject/Assets/Editor/Disquuun/Disquuun.cs new file mode 100755 index 0000000..fbb03b1 --- /dev/null +++ b/SampleProject/Assets/Editor/Disquuun/Disquuun.cs @@ -0,0 +1,405 @@ +using System; +using System.Collections.Generic; +using System.Net; + +namespace DisquuunCore { + public enum DisqueCommand { + ADDJOB,// queue_name job [REPLICATE ] [DELAY ] [RETRY ] [TTL ] [MAXLEN ] [ASYNC] + GETJOB,// [NOHANG] [TIMEOUT ] [COUNT ] [WITHCOUNTERS] FROM queue1 queue2 ... queueN + ACKJOB,// jobid1 jobid2 ... jobidN + FASTACK,// jobid1 jobid2 ... jobidN + WORKING,// jobid + NACK,// ... + INFO, + HELLO, + QLEN,// + QSTAT,// + QPEEK,// + ENQUEUE,// ... + DEQUEUE,// ... + DELJOB,// ... + SHOW,// + QSCAN,// [COUNT ] [BUSYLOOP] [MINLEN ] [MAXLEN ] [IMPORTRATE ] + JSCAN,// [] [COUNT ] [BUSYLOOP] [QUEUE ] [STATE STATE ... STATE ] [REPLY all|id] + PAUSE,// option1 [option2 ... optionN] + } + + /** + data structure for input. + */ + public class DisquuunInput { + public readonly DisqueCommand command; + public readonly byte[] data; + public readonly DisquuunSocketPool socketPool; + + public DisquuunInput (DisqueCommand command, byte[] data, DisquuunSocketPool socketPool) { + this.command = command; + this.data = data; + this.socketPool = socketPool; + } + } + + + /** + data structure for result. + */ + public struct DisquuunResult { + public ArraySegment[] bytesArray; + + public DisquuunResult (params ArraySegment[] bytesArray) { + this.bytesArray = bytesArray; + } + } + + public enum DisquuunExecuteType { + ASYNC, + LOOP, + PIPELINE + } + + public class Disquuun { + public readonly string connectionId; + + public readonly long bufferSize; + public readonly IPEndPoint endPoint; + + public ConnectionState connectionState; + + + private readonly Action ConnectionOpened; + private readonly Action ConnectionFailed; + + private DisquuunSocketPool socketPool; + + public readonly int minConnectionCount; + + private object lockObject = new object(); + + public enum ConnectionState { + OPENING, + OPENED, + OPENED_RECOVERING, + ALLCLOSING, + ALLCLOSED + } + + public Disquuun ( + string host, + int port, + long bufferSize, + int minConnectionCount, + Action ConnectionOpenedAct=null, + Action ConnectionFailedAct=null + ) { + this.connectionId = Guid.NewGuid().ToString(); + + this.bufferSize = bufferSize; + this.endPoint = new IPEndPoint(IPAddress.Parse(host), port); + + this.connectionState = ConnectionState.OPENING; + + /* + ConnectionOpened handler treats all connections are opened. + */ + if (ConnectionOpenedAct != null) this.ConnectionOpened = ConnectionOpenedAct; + else this.ConnectionOpened = conId => {}; + + /* + ConnectionFailed handler only treats connection error. + + other runtime errors will emit in API handler. + */ + if (ConnectionFailedAct != null) this.ConnectionFailed = ConnectionFailedAct; + else this.ConnectionFailed = (info, e) => {}; + + this.minConnectionCount = minConnectionCount; + + this.socketPool = new DisquuunSocketPool(minConnectionCount, this.OnSocketOpened, this.OnSocketConnectionFailed); + + this.socketPool.Connect(endPoint, bufferSize); + } + + public int StackedCommandCount () { + return socketPool.StackedCommandCount(); + } + + private void OnSocketOpened (DisquuunSocket source, string socketId) { + if (connectionState != ConnectionState.OPENING) return; + var availableSocketCount = socketPool.AvailableSocketNum(); + + lock (lockObject) { + if (connectionState != ConnectionState.OPENED && availableSocketCount == minConnectionCount) { + connectionState = ConnectionState.OPENED; + ConnectionOpened(connectionId); + } + } + } + + private void OnSocketConnectionFailed (DisquuunSocket source, string info, Exception e) { + UpdateState(); + if (ConnectionFailed != null) ConnectionFailed("OnSocketConnectionFailed:" + info, e); + } + + private ConnectionState UpdateState () { + + var availableSocketCount = socketPool.AvailableSocketNum(); + + switch (connectionState) { + case ConnectionState.OPENING: { + if (availableSocketCount == minConnectionCount) connectionState = ConnectionState.OPENED; + return connectionState; + } + case ConnectionState.OPENED: { + if (availableSocketCount != minConnectionCount) connectionState = ConnectionState.OPENED_RECOVERING; + return connectionState; + } + default: { + if (availableSocketCount == minConnectionCount) connectionState = ConnectionState.OPENED; + break; + } + } + return connectionState; + } + + + public ConnectionState State () { + return UpdateState(); + } + + public void Disconnect () { + connectionState = ConnectionState.ALLCLOSING; + socketPool.Disconnect(); + } + + public int AvailableSocketNum () { + return socketPool.AvailableSocketNum(); + } + + + + /* + Disque API gateway + */ + public DisquuunInput AddJob (string queueName, byte[] data, int timeout=0, params object[] args) { + var bytes = DisquuunAPI.AddJob(queueName, data, timeout, args); + + return new DisquuunInput(DisqueCommand.ADDJOB, bytes, socketPool); + } + + public DisquuunInput GetJob (string[] queueIds, params object[] args) { + var bytes = DisquuunAPI.GetJob(queueIds, args); + + return new DisquuunInput(DisqueCommand.GETJOB, bytes, socketPool); + } + + public DisquuunInput AckJob (string[] jobIds) { + var bytes = DisquuunAPI.AckJob(jobIds); + + return new DisquuunInput(DisqueCommand.ACKJOB, bytes, socketPool); + } + + public DisquuunInput FastAck (string[] jobIds) { + var bytes = DisquuunAPI.FastAck(jobIds); + + return new DisquuunInput(DisqueCommand.FASTACK, bytes, socketPool); + } + + public DisquuunInput Working (string jobId) { + var bytes = DisquuunAPI.Working(jobId); + + return new DisquuunInput(DisqueCommand.WORKING, bytes, socketPool); + } + + public DisquuunInput Nack (string[] jobIds) { + var bytes = DisquuunAPI.Nack(jobIds); + + return new DisquuunInput(DisqueCommand.NACK, bytes, socketPool); + } + + public DisquuunInput Info () { + var data = DisquuunAPI.Info(); + + return new DisquuunInput(DisqueCommand.INFO, data, socketPool); + } + + public DisquuunInput Hello () { + var bytes = DisquuunAPI.Hello(); + + return new DisquuunInput(DisqueCommand.HELLO, bytes, socketPool); + } + + public DisquuunInput Qlen (string queueId) { + var bytes = DisquuunAPI.Qlen(queueId); + + return new DisquuunInput(DisqueCommand.QLEN, bytes, socketPool); + } + + public DisquuunInput Qstat (string queueId) { + var bytes = DisquuunAPI.Qstat(queueId); + + return new DisquuunInput(DisqueCommand.QSTAT, bytes, socketPool); + } + + public DisquuunInput Qpeek (string queueId, int count) { + var bytes = DisquuunAPI.Qpeek(queueId, count); + + return new DisquuunInput(DisqueCommand.QPEEK, bytes, socketPool); + } + + public DisquuunInput Enqueue (params string[] jobIds) { + var bytes = DisquuunAPI.Enqueue(jobIds); + + return new DisquuunInput(DisqueCommand.ENQUEUE, bytes, socketPool); + } + + public DisquuunInput Dequeue (params string[] jobIds) { + var bytes = DisquuunAPI.Dequeue(jobIds); + + return new DisquuunInput(DisqueCommand.DEQUEUE, bytes, socketPool); + } + + public DisquuunInput DelJob (params string[] jobIds) { + var bytes = DisquuunAPI.DelJob(jobIds); + + return new DisquuunInput(DisqueCommand.DELJOB, bytes, socketPool); + } + + public DisquuunInput Show (string jobId) { + var bytes = DisquuunAPI.Show(jobId); + + return new DisquuunInput(DisqueCommand.SHOW, bytes, socketPool); + } + + public DisquuunInput Qscan (params object[] args) { + var bytes = DisquuunAPI.Qscan(args); + + return new DisquuunInput(DisqueCommand.QSCAN, bytes, socketPool); + } + + public DisquuunInput Jscan (int cursor=0, params object[] args) { + var bytes = DisquuunAPI.Jscan(cursor, args); + + return new DisquuunInput(DisqueCommand.JSCAN, bytes, socketPool); + } + + public DisquuunInput Pause (string queueId, string option1, params string[] options) { + var bytes = DisquuunAPI.Pause(queueId, option1, options); + + return new DisquuunInput(DisqueCommand.PAUSE, bytes, socketPool); + } + + /* + pipelines + */ + private List> pipelineStack = new List>(); + private int currentPipelineIndex = -1; + + public List> Pipeline(params DisquuunInput[] disquuunInput) { + lock (lockObject) { + if (0 < disquuunInput.Length) { + if (pipelineStack.Count == 0) currentPipelineIndex = 0; + + if (pipelineStack.Count < currentPipelineIndex + 1) pipelineStack.Add(new List()); + pipelineStack[currentPipelineIndex].AddRange(disquuunInput); + } + return pipelineStack; + } + } + + public void RevolvePipeline () { + lock (lockObject) { + if (currentPipelineIndex == -1) return; + if (pipelineStack.Count == 0) return; + + if (0 < pipelineStack[currentPipelineIndex].Count) currentPipelineIndex++; + } + } + } + + public static class DisquuunLogger { + public static void Log (string message, bool write=false) { + // TestLogger.Log(message, write); + } + } + + + public class DisquuunSocketPool { + private DisquuunSocket[] sockets; + + private StackSocket stackSocket; + + private object lockObject = new object(); + + public DisquuunSocketPool (int connectionCount, Action OnSocketOpened, Action OnSocketConnectionFailed) { + this.stackSocket = new StackSocket(); + this.sockets = new DisquuunSocket[connectionCount]; + for (var i = 0; i < sockets.Length; i++) this.sockets[i] = new DisquuunSocket(OnSocketOpened, this.OnReloaded, OnSocketConnectionFailed); + } + + public void Connect (IPEndPoint endPoint, long bufferSize) { + for (var i = 0; i < sockets.Length; i++) this.sockets[i].Connect(endPoint, bufferSize); + } + + public void Disconnect () { + lock (lockObject) { + foreach (var socket in sockets) socket.Disconnect(); + } + } + + public StackSocket ChooseAvailableSocket () { + lock (lockObject) { + for (var i = 0; i < sockets.Length; i++) { + var socket = sockets[i]; + if (socket.IsChoosable()) { + socket.SetBusy(); + return socket; + } + } + + return stackSocket; + } + } + + public void OnReloaded (DisquuunSocket reloadedSocket) { + lock (lockObject) { + if (stackSocket.IsQueued()) { + if (reloadedSocket.IsChoosable()) { + reloadedSocket.SetBusy(); + + var commandAndData = stackSocket.Dequeue(); + switch (commandAndData.executeType) { + case DisquuunExecuteType.ASYNC: { + reloadedSocket.Async(commandAndData.commands, commandAndData.data, commandAndData.Callback); + return; + } + case DisquuunExecuteType.LOOP: { + reloadedSocket.Loop(commandAndData.commands, commandAndData.data, commandAndData.Callback); + return; + } + case DisquuunExecuteType.PIPELINE: { + reloadedSocket.Execute(commandAndData.commands, commandAndData.data, commandAndData.Callback); + return; + } + } + } + } + } + } + + public int AvailableSocketNum() { + lock (lockObject) { + var availableSocketCount = 0; + for (var i = 0; i < sockets.Length; i++) { + var socket = sockets[i]; + if (socket == null) continue; + if (socket.IsChoosable()) availableSocketCount++; + } + return availableSocketCount; + } + } + + public int StackedCommandCount() { + lock (lockObject) return stackSocket.QueueCount(); + } + } +} diff --git a/SampleProject/Assets/Editor/Disquuun/DisquuunAPI.cs b/SampleProject/Assets/Editor/Disquuun/DisquuunAPI.cs new file mode 100755 index 0000000..9750565 --- /dev/null +++ b/SampleProject/Assets/Editor/Disquuun/DisquuunAPI.cs @@ -0,0 +1,883 @@ +using System; +using System.Collections.Generic; +using System.Globalization; +using System.IO; +using System.Text; + +namespace DisquuunCore { + public static class DisquuunAPI { + /* + disque protocol symbols + */ + public enum CommandString { + Error = '-', + Status = '+', + Bulk = '$', + MultiBulk = '*', + Int = ':' + } + + /* + chars + */ + public const char CharError = (char)CommandString.Error; + public const char CharStatus = (char)CommandString.Status; + public const char CharBulk = (char)CommandString.Bulk; + public const char CharMultiBulk = (char)CommandString.MultiBulk; + public const char CharInt = (char)CommandString.Int; + public const string CharEOL = "\r\n"; + + + public const string DISQUE_GETJOB_KEYWORD_FROM = "FROM"; + + /* + bytes + */ + public const byte ByteError = 45; + public const byte ByteStatus = 43; + public const byte ByteBulk = 36; + public const byte ByteMultiBulk = 42; + public const byte ByteInt = 58; + public static readonly byte ByteCR = Convert.ToByte('\r'); + public static readonly byte ByteLF = Convert.ToByte('\n'); + + private static byte[] BytesMultiBulk = new byte[]{ByteMultiBulk}; + private static byte[] BytesCRLF = new byte[]{ByteCR, ByteLF}; + private static byte[] BytesBulk = new byte[]{ByteBulk}; + + + /* + Disque APIs. + */ + public static byte[] AddJob (string queueName, byte[] data, int timeout=0, params object[] args) { + // ADDJOB queue_name job + // [REPLICATE ] [DELAY ] [RETRY ] [TTL ] [MAXLEN ] [ASYNC] + + var newArgs = new object[1 + args.Length]; + newArgs[0] = timeout; + for (var i = 1; i < newArgs.Length; i++) newArgs[i] = args[i-1]; + + using (var byteBuffer = new MemoryStream()) { + var contentCount = 1;// count of command. + + if (!string.IsNullOrEmpty(queueName)) { + contentCount++; + } + + if (0 < data.Length) { + contentCount++; + } + + if (0 < newArgs.Length) { + contentCount = contentCount + newArgs.Length; + } + + // "*" + contentCount.ToString() + "\r\n" + { + var contentCountBytes = Encoding.UTF8.GetBytes(contentCount.ToString()); + + byteBuffer.Write(BytesMultiBulk, 0, BytesMultiBulk.Length); + byteBuffer.Write(contentCountBytes, 0, contentCountBytes.Length); + byteBuffer.Write(BytesCRLF, 0, BytesCRLF.Length); + } + + // "$" + cmd.Length + "\r\n" + cmd + "\r\n" + { + var commandBytes = Encoding.UTF8.GetBytes(DisqueCommand.ADDJOB.ToString()); + var commandCountBytes = Encoding.UTF8.GetBytes(DisqueCommand.ADDJOB.ToString().Length.ToString()); + + byteBuffer.Write(BytesBulk, 0, BytesBulk.Length); + byteBuffer.Write(commandCountBytes, 0, commandCountBytes.Length); + byteBuffer.Write(BytesCRLF, 0, BytesCRLF.Length); + byteBuffer.Write(commandBytes, 0, commandBytes.Length); + byteBuffer.Write(BytesCRLF, 0, BytesCRLF.Length); + } + + // "$" + queueId.Length + "\r\n" + queueId + "\r\n" + if (!string.IsNullOrEmpty(queueName)) { + var queueIdBytes = Encoding.UTF8.GetBytes(queueName); + var queueIdCountBytes = Encoding.UTF8.GetBytes(queueName.Length.ToString()); + + byteBuffer.Write(BytesBulk, 0, BytesBulk.Length); + byteBuffer.Write(queueIdCountBytes, 0, queueIdCountBytes.Length); + byteBuffer.Write(BytesCRLF, 0, BytesCRLF.Length); + byteBuffer.Write(queueIdBytes, 0, queueIdBytes.Length); + byteBuffer.Write(BytesCRLF, 0, BytesCRLF.Length); + } + + // "$" + data.Length + "\r\n" + data + "\r\n" + if (0 < data.Length) { + var dataCountBytes = Encoding.UTF8.GetBytes(data.Length.ToString()); + + byteBuffer.Write(BytesBulk, 0, BytesBulk.Length); + byteBuffer.Write(dataCountBytes, 0, dataCountBytes.Length); + byteBuffer.Write(BytesCRLF, 0, BytesCRLF.Length); + byteBuffer.Write(data, 0, data.Length); + byteBuffer.Write(BytesCRLF, 0, BytesCRLF.Length); + } + + // "$" + option.Length + "\r\n" + option + "\r\n" + if (0 < newArgs.Length) { + foreach (var option in newArgs) { + var optionBytes = Encoding.UTF8.GetBytes(option.ToString()); + var optionCountBytes = Encoding.UTF8.GetBytes(optionBytes.Length.ToString()); + + byteBuffer.Write(BytesBulk, 0, BytesBulk.Length); + byteBuffer.Write(optionCountBytes, 0, optionCountBytes.Length); + byteBuffer.Write(BytesCRLF, 0, BytesCRLF.Length); + byteBuffer.Write(optionBytes, 0, optionBytes.Length); + byteBuffer.Write(BytesCRLF, 0, BytesCRLF.Length); + } + } + + return byteBuffer.ToArray(); + } + } + + public static byte[] GetJob (string[] queueIds, object[] args) { + // [NOHANG] [TIMEOUT ] [COUNT ] [WITHCOUNTERS] + // FROM queue1 queue2 ... queueN + var parameters = new object[args.Length + 1 + queueIds.Length]; + for (var i = 0; i < parameters.Length; i++) { + if (i < args.Length) { + parameters[i] = args[i]; + continue; + } + if (i == args.Length) { + parameters[i] = DISQUE_GETJOB_KEYWORD_FROM; + continue; + } + parameters[i] = queueIds[i - (args.Length + 1)]; + } + // foreach (var i in parameters) { + // Log("i:" + i); + // } + return ToBytes(DisqueCommand.GETJOB, parameters); + } + + public static byte[] AckJob (string[] jobIds) { + // jobid1 jobid2 ... jobidN + return ToBytes(DisqueCommand.ACKJOB, jobIds); + } + + public static byte[] FastAck (string[] jobIds) { + // jobid1 jobid2 ... jobidN + return ToBytes(DisqueCommand.FASTACK, jobIds); + } + + public static byte[] Working (string jobId) { + // jobid + return ToBytes(DisqueCommand.WORKING, jobId); + } + + public static byte[] Nack (string[] jobIds) { + // ... + return ToBytes(DisqueCommand.NACK, jobIds); + } + + public static byte[] Info () { + return ToBytes(DisqueCommand.INFO); + } + + public static byte[] Hello () { + return ToBytes(DisqueCommand.HELLO); + } + + public static byte[] Qlen (string queueId) { + return ToBytes(DisqueCommand.QLEN, queueId); + } + + public static byte[] Qstat (string queueId) { + return ToBytes(DisqueCommand.QSTAT, queueId); + } + + public static byte[] Qpeek (string queueId, int count) { + return ToBytes(DisqueCommand.QPEEK, queueId, count); + } + + public static byte[] Enqueue (string[] jobIds) { + return ToBytes(DisqueCommand.ENQUEUE, jobIds); + } + + public static byte[] Dequeue (string[] jobIds) { + return ToBytes(DisqueCommand.DEQUEUE, jobIds); + } + + public static byte[] DelJob (string[] jobIds) { + return ToBytes(DisqueCommand.DELJOB, jobIds); + } + + public static byte[] Show (string jobId) { + return ToBytes(DisqueCommand.SHOW, jobId); + } + + public static byte[] Qscan (object[] args) { + return ToBytes(DisqueCommand.QSCAN, args); + } + + public static byte[] Jscan (int cursor, object[] args) { + return ToBytes(DisqueCommand.JSCAN, cursor, args); + } + + public static byte[] Pause (string queueId, string option1, string[] options) { + return ToBytes(DisqueCommand.JSCAN, queueId, option1, options); + } + + + private static byte[] ToBytes (DisqueCommand commandEnum, params object[] args) { + int length = 1 + args.Length; + + var command = commandEnum.ToString(); + string strCommand; + + { + StringBuilder sb = new StringBuilder(); + sb.Append(CharMultiBulk).Append(length).Append(CharEOL); + + sb.Append(CharBulk).Append(Encoding.UTF8.GetByteCount(command)).Append(CharEOL).Append(command).Append(CharEOL); + + foreach (var arg in args) { + var str = String.Format(CultureInfo.InvariantCulture, "{0}", arg); + sb.Append(CharBulk) + .Append(Encoding.UTF8.GetByteCount(str)) + .Append(CharEOL) + .Append(str) + .Append(CharEOL); + } + strCommand = sb.ToString(); + } + + byte[] bytes = Encoding.UTF8.GetBytes(strCommand.ToCharArray()); + + return bytes; + } + + public struct ScanResult { + public readonly int cursor; + public readonly bool isDone; + public readonly DisquuunResult[] data; + + public ScanResult (int cursor, bool isDone, DisquuunResult[] data) { + this.cursor = cursor; + this.isDone = isDone; + this.data = data; + } + public ScanResult (bool dummy=false) { + this.cursor = -1; + this.isDone = false; + this.data = null; + } + } + + public static ScanResult ScanBuffer (DisqueCommand command, byte[] sourceBuffer, int fromCursor, long length, string socketId) { + var cursor = fromCursor; + + switch (command) { + case DisqueCommand.ADDJOB: { + switch (sourceBuffer[cursor]) { + // case ByteError: { + // - + // var lineEndCursor = ReadLine(sourceBuffer, cursor); + // cursor = cursor + 1;// add header byte size = 1. + + // if (Failed != null) { + // var errorStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor - cursor); + // // Disquuun.Log("errorStr:" + errorStr); + // Failed(currentCommand, errorStr); + // } + + // cursor = lineEndCursor + 2;// CR + LF + // break; + // } + case ByteStatus: { + // + count + var lineEndCursor = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor == -1) return new ScanResult(false); + cursor = cursor + 1;// add header byte size = 1. + + var countBuffer = new ArraySegment(sourceBuffer, cursor, lineEndCursor - cursor); + + cursor = lineEndCursor + 2;// CR + LF + + return new ScanResult(cursor, true, new DisquuunResult[]{new DisquuunResult(countBuffer)}); + } + } + break; + } + case DisqueCommand.GETJOB: { + switch (sourceBuffer[cursor]) { + case ByteMultiBulk: { + DisquuunResult[] jobDatas = null; + { + // * count. + var lineEndCursor = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor == -1) return new ScanResult(false); + + cursor = cursor + 1;// add header byte size = 1. + + var bulkCountStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor - cursor); + // TestLogger.Log("bulkCountStr:" + bulkCountStr); + var bulkCountNum = Convert.ToInt32(bulkCountStr); + + cursor = lineEndCursor + 2;// CR + LF + + + // trigger when GETJOB NOHANG + if (bulkCountNum < 0) return new ScanResult(cursor, true, new DisquuunResult[]{}); + + + jobDatas = new DisquuunResult[bulkCountNum]; + for (var i = 0; i < bulkCountNum; i++) { + var itemCount = 0; + + { + // * count. + var lineEndCursor2 = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor2 == -1) return new ScanResult(false); + + cursor = cursor + 1;// add header byte size = 1. + + var bulkCountStr2 = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor2 - cursor); + + itemCount = Convert.ToInt32(bulkCountStr2); + // Disquuun.Log("itemCount:" + itemCount); + + cursor = lineEndCursor2 + 2;// CR + LF + } + + // queueName + { + // $ count. + var lineEndCursor3 = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor3 == -1) return new ScanResult(false); + + cursor = cursor + 1;// add header byte size = 1 + + var countStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor3 - cursor); + var strNum = Convert.ToInt32(countStr); + + cursor = lineEndCursor3 + 2;// CR + LF + + // $ bulk. + if (ShortageOfReadableLength(sourceBuffer, cursor, strNum)) return new ScanResult(false); + // var nameStr = Encoding.UTF8.GetString(sourceBuffer, cursor, strNum); + // Disquuun.Log("nameStr:" + nameStr); + + cursor = cursor + strNum + 2;// CR + LF + } + + // jobId + ArraySegment jobIdBytes; + { + // $ count. + var lineEndCursor3 = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor3 == -1) return new ScanResult(false); + + cursor = cursor + 1;// add header byte size = 1. + + var countStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor3 - cursor); + var strNum = Convert.ToInt32(countStr); + // Disquuun.Log("id strNum:" + strNum); + + cursor = lineEndCursor3 + 2;// CR + LF + + + // $ bulk. + if (ShortageOfReadableLength(sourceBuffer, cursor, strNum)) return new ScanResult(false); + jobIdBytes = new ArraySegment(sourceBuffer, cursor, strNum); + // var jobIdStr = Encoding.UTF8.GetString(jobIdBytes); + // Disquuun.Log("jobIdStr:" + jobIdStr); + + cursor = cursor + strNum + 2;// CR + LF + } + + + // jobData + ArraySegment dataBytes; + { + // $ count. + var lineEndCursor3 = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor3 == -1) return new ScanResult(false); + + cursor = cursor + 1;// add header byte size = 1. + + var countStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor3 - cursor); + var strNum = Convert.ToInt32(countStr); + + cursor = lineEndCursor3 + 2;// CR + LF + + + // $ bulk. + if (ShortageOfReadableLength(sourceBuffer, cursor, strNum)) return new ScanResult(false); + dataBytes = new ArraySegment(sourceBuffer, cursor, strNum); + + cursor = cursor + strNum + 2;// CR + LF + } + + // no withcounters response. + if (itemCount == 3) { + jobDatas[i] = new DisquuunResult(jobIdBytes, dataBytes); + // cursor = cursor + 2;// CR + LF + continue; + } + + // withcounters response. + if (itemCount == 7) { + ArraySegment nackCountBytes; + { + // $ + var lineEndCursor3 = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor3 == -1) return new ScanResult(false); + + cursor = cursor + 1;// add header byte size = 1. + + var countStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor3 - cursor); + var strNum = Convert.ToInt32(countStr); + // Disquuun.Log("data strNum:" + strNum); + + cursor = lineEndCursor3 + 2;// CR + LF + + // ignore params. + + cursor = cursor + strNum + 2;// CR + LF + + // : + var lineEndCursor4 = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor4 == -1) return new ScanResult(false); + cursor = cursor + 1;// add header byte size = 1. + + nackCountBytes = new ArraySegment(sourceBuffer, cursor, lineEndCursor4 - cursor); + + cursor = lineEndCursor4 + 2;// CR + LF + } + + ArraySegment additionalDeliveriesCountBytes; + { + // $ + var lineEndCursor3 = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor3 == -1) return new ScanResult(false); + cursor = cursor + 1;// add header byte size = 1. + + var countStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor3 - cursor); + var strNum = Convert.ToInt32(countStr); + // Disquuun.Log("data strNum:" + strNum); + + cursor = lineEndCursor3 + 2;// CR + LF + + // ignore params. + + cursor = cursor + strNum + 2;// CR + LF + + // : + var lineEndCursor4 = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor4 == -1) return new ScanResult(false); + cursor = cursor + 1;// add header byte size = 1. + + additionalDeliveriesCountBytes = new ArraySegment(sourceBuffer, cursor, lineEndCursor4 - cursor); + + jobDatas[i] = new DisquuunResult(jobIdBytes, dataBytes, nackCountBytes, additionalDeliveriesCountBytes); + + cursor = lineEndCursor4 + 2;// CR + LF + } + } + } + } + + if (jobDatas != null && 0 < jobDatas.Length) return new ScanResult(cursor, true, jobDatas); + break; + } + // case ByteError: { + // // - + // Disquuun.Log("-"); + // throw new Exception("GetJob error."); + // // var lineEndCursor = ReadLine2(sourceBuffer, cursor, length); + // // cursor = cursor + 1;// add header byte size = 1. + + // // if (Failed != null) { + // // var errorStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor - cursor); + // // // Disquuun.Log("errorStr:" + errorStr); + // // Failed(currentCommand, errorStr); + // // } + + // // cursor = lineEndCursor + 2;// CR + LF + // break; + // } + } + break; + } + case DisqueCommand.ACKJOB: + case DisqueCommand.FASTACK: { + switch (sourceBuffer[cursor]) { + case ByteInt: { + // : count. + var lineEndCursor = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor == -1) return new ScanResult(false); + cursor = cursor + 1;// add header byte size = 1. + + // var countStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor - cursor); + // Disquuun.Log("countStr:" + countStr); + + var countBuffer = new ArraySegment(sourceBuffer, cursor, lineEndCursor - cursor); + + var byteData = new DisquuunResult(countBuffer); + + cursor = lineEndCursor + 2;// CR + LF + return new ScanResult(cursor, true, new DisquuunResult[]{byteData}); + } + // case ByteError: { + // // - + // var lineEndCursor = ReadLine(sourceBuffer, cursor); + // cursor = cursor + 1;// add header byte size = 1. + + // if (Failed != null) { + // var errorStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor - cursor); + // // Disquuun.Log("errorStr:" + errorStr); + // Failed(currentCommand, errorStr); + // } + // cursor = lineEndCursor + 2;// CR + LF + // break; + // } + } + break; + } + case DisqueCommand.INFO: { + switch (sourceBuffer[cursor]) { + case ByteBulk: { + + var countNum = 0; + {// readbulk count. + // $ + var lineEndCursor = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor == -1) return new ScanResult(false); + cursor = cursor + 1;// add header byte size = 1. + + var countStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor - cursor); + countNum = Convert.ToInt32(countStr); + + cursor = lineEndCursor + 2;// CR + LF + } + + {// readbulk string. + if (ShortageOfReadableLength(sourceBuffer, cursor, countNum)) return new ScanResult(false); + + var newBuffer = new ArraySegment(sourceBuffer, cursor, countNum); + + cursor = cursor + countNum + 2;// CR + LF + + return new ScanResult(cursor, true, new DisquuunResult[]{new DisquuunResult(newBuffer)}); + } + } + } + break; + } + case DisqueCommand.HELLO: { + switch (sourceBuffer[cursor]) { + case ByteMultiBulk: { + ArraySegment version; + ArraySegment thisNodeId; + List> nodeIdsAndInfos = new List>(); + /* + :*3 + :1 version [0][0] + + $40 this node ID [0][1] + 002698920b158ba29ff8d41d3e5303ceaf0e8d45 + + *4 [1~n][0~3] + $40 + 002698920b158ba29ff8d41d3e5303ceaf0e8d45 + + $0 + "" + + $4 + 7711 + + $1 + 1 + */ + + { + // * + var lineEndCursor = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor == -1) return new ScanResult(false); + cursor = cursor + 1;// add header byte size = 1. + + // var bulkCountStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor - cursor); + // Disquuun.Log("bulkCountStr:" + bulkCountStr); + + cursor = lineEndCursor + 2;// CR + LF + } + + { + // : format version + var lineEndCursor = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor == -1) return new ScanResult(false); + + cursor = cursor + 1;// add header byte size = 1. + + version = new ArraySegment(sourceBuffer, cursor, lineEndCursor - cursor); + // Disquuun.Log(":version:" + version); + + cursor = lineEndCursor + 2;// CR + LF + } + + { + // $ this node id + var lineEndCursor = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor == -1) return new ScanResult(false); + + cursor = cursor + 1;// add header byte size = 1. + + var countStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor - cursor); + var strNum = Convert.ToInt32(countStr); + // Disquuun.Log("id strNum:" + strNum); + + cursor = lineEndCursor + 2;// CR + LF + + if (ShortageOfReadableLength(sourceBuffer, cursor, strNum)) return new ScanResult(false); + thisNodeId = new ArraySegment(sourceBuffer, cursor, strNum); + // Disquuun.Log("thisNodeId:" + thisNodeId); + + cursor = cursor + strNum + 2;// CR + LF + } + + { + // * node ids + var lineEndCursor = ReadLine(sourceBuffer, cursor, length); + cursor = cursor + 1;// add header byte size = 1. + + var bulkCountStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor - cursor); + var bulkCountNum = Convert.ToInt32(bulkCountStr); + // Disquuun.Log("bulkCountNum:" + bulkCountNum); + + cursor = lineEndCursor + 2;// CR + LF + + // nodeId, ip, port, priority. + for (var i = 0; i < bulkCountNum/4; i++) { + ArraySegment idStr; + + // $ nodeId + { + var lineEndCursor2 = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor2 == -1) return new ScanResult(false); + cursor = cursor + 1;// add header byte size = 1. + + var countStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor2 - cursor); + var strNum = Convert.ToInt32(countStr); + + cursor = lineEndCursor2 + 2;// CR + LF + + if (ShortageOfReadableLength(sourceBuffer, cursor, strNum)) return new ScanResult(false); + idStr = new ArraySegment(sourceBuffer, cursor, strNum); + nodeIdsAndInfos.Add(idStr); + + cursor = cursor + strNum + 2;// CR + LF + } + + { + var lineEndCursor2 = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor2 == -1) return new ScanResult(false); + cursor = cursor + 1;// add header byte size = 1. + + var countStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor2 - cursor); + var strNum = Convert.ToInt32(countStr); + + cursor = lineEndCursor2 + 2;// CR + LF + + if (ShortageOfReadableLength(sourceBuffer, cursor, strNum)) return new ScanResult(false); + var ipStr = new ArraySegment(sourceBuffer, cursor, strNum); + nodeIdsAndInfos.Add(ipStr); + + cursor = cursor + strNum + 2;// CR + LF + } + + { + var lineEndCursor2 = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor2 == -1) return new ScanResult(false); + cursor = cursor + 1;// add header byte size = 1. + + var countStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor2 - cursor); + var strNum = Convert.ToInt32(countStr); + + cursor = lineEndCursor2 + 2;// CR + LF + + if (ShortageOfReadableLength(sourceBuffer, cursor, strNum)) return new ScanResult(false); + var portStr = new ArraySegment(sourceBuffer, cursor, strNum); + nodeIdsAndInfos.Add(portStr); + + cursor = cursor + strNum + 2;// CR + LF + } + + { + var lineEndCursor2 = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor2 == -1) return new ScanResult(false); + cursor = cursor + 1;// add header byte size = 1. + + var countStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor2 - cursor); + var strNum = Convert.ToInt32(countStr); + + cursor = lineEndCursor2 + 2;// CR + LF + + if (ShortageOfReadableLength(sourceBuffer, cursor, strNum)) return new ScanResult(false); + var priorityStr = new ArraySegment(sourceBuffer, cursor, strNum); + nodeIdsAndInfos.Add(priorityStr); + + cursor = cursor + strNum + 2;// CR + LF + } + } + } + + + var byteDatas = new DisquuunResult[1 + nodeIdsAndInfos.Count/4]; + byteDatas[0] = new DisquuunResult(version, thisNodeId); + + for (var index = 0; index < nodeIdsAndInfos.Count/4; index++) { + var nodeId = nodeIdsAndInfos[index*4 + 0]; + var ip = nodeIdsAndInfos[index*4 + 1]; + var port = nodeIdsAndInfos[index*4 + 2]; + var priority = nodeIdsAndInfos[index*4 + 3]; + + byteDatas[index + 1] = new DisquuunResult(nodeId, ip, port, priority); + } + + return new ScanResult(cursor, true, byteDatas); + } + } + break; + } + case DisqueCommand.QLEN: { + switch (sourceBuffer[cursor]) { + case ByteInt: { + // : format version + var lineEndCursor = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor == -1) return new ScanResult(false); + cursor = cursor + 1;// add header byte size = 1. + + var countBuffer = new ArraySegment(sourceBuffer, cursor, lineEndCursor - cursor); + + var byteData = new DisquuunResult(countBuffer); + + cursor = lineEndCursor + 2;// CR + LF + + return new ScanResult(cursor, true, new DisquuunResult[]{byteData}); + } + } + + break; + } + case DisqueCommand.QSTAT: { + // * count of item. + var bulkCountNum = 0; + { + var lineEndCursor = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor == -1) return new ScanResult(false); + + cursor = cursor + 1;// add header byte size = 1. + + var bulkCountStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor - cursor); + bulkCountNum = Convert.ToInt32(bulkCountStr); + + cursor = lineEndCursor + 2;// CR + LF + } + + // items are key & value pair(maybe "import-from" will not match..) + var itemCount = bulkCountNum / 2; + + var results = new DisquuunResult[itemCount]; + for (var i = 0; i < itemCount; i++) { + ArraySegment keyBytes; + {// key ($) + var lineEndCursor2 = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor2 == -1) return new ScanResult(false); + cursor = cursor + 1;// add header byte size = 1. + + var countStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor2 - cursor); + var strNum = Convert.ToInt32(countStr); + + cursor = lineEndCursor2 + 2;// CR + LF + + if (ShortageOfReadableLength(sourceBuffer, cursor, strNum)) return new ScanResult(false); + keyBytes = new ArraySegment(sourceBuffer, cursor, strNum); + + cursor = cursor + strNum + 2;// CR + LF + } + + {// value ($ or * or :) + ArraySegment valBytes; + + var type = sourceBuffer[cursor]; + /* + check next parameter = value parameter's type. + $ or * or : is expected. + */ + switch (type){ + case ByteBulk: { + // $ have string value. + var lineEndCursor3 = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor3 == -1) return new ScanResult(false); + + cursor = cursor + 1;// add header byte size = 1. + + var countStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor3 - cursor); + var strNum = Convert.ToInt32(countStr); + + cursor = lineEndCursor3 + 2;// CR + LF + + if (ShortageOfReadableLength(sourceBuffer, cursor, strNum)) return new ScanResult(false); + valBytes = new ArraySegment(sourceBuffer, cursor, strNum); + + cursor = cursor + strNum + 2;// CR + LF + break; + } + case ByteMultiBulk: + case ByteInt: { + // * or : have number value. + var lineEndCursor3 = ReadLine(sourceBuffer, cursor, length); + if (lineEndCursor3 == -1) return new ScanResult(false); + + cursor = cursor + 1;// add header byte size = 1. + + var countStr = Encoding.UTF8.GetString(sourceBuffer, cursor, lineEndCursor3 - cursor); + var strNum = countStr.Length; + + valBytes = new ArraySegment(sourceBuffer, cursor, strNum); + + cursor = lineEndCursor3 + 2;// CR + LF + break; + } + default: { + throw new Exception("qstat unexpected type:" + type); + } + } + results[i] = new DisquuunResult(keyBytes, valBytes); + } + } + return new ScanResult(cursor, true, results); + } + default: { + throw new Exception("error command:" + command + " unhandled:" + sourceBuffer[cursor] + " data:" + Encoding.UTF8.GetString(sourceBuffer)); + } + } + return new ScanResult(false); + } + + private static bool ShortageOfReadableLength (byte[] source, int cursor, int length) { + if (cursor + length < source.Length) return false; + return true; + } + + public static int ReadLine (byte[] bytes, int cursor, long length) { + while (cursor < length) { + if (bytes[cursor] == ByteLF) return cursor - 1; + cursor++; + } + + // Disquuun.Log("overflow detected."); + return -1; + } + } + + + +} \ No newline at end of file diff --git a/SampleProject/Assets/Editor/Disquuun/DisquuunDeserializer.cs b/SampleProject/Assets/Editor/Disquuun/DisquuunDeserializer.cs new file mode 100755 index 0000000..2fa1187 --- /dev/null +++ b/SampleProject/Assets/Editor/Disquuun/DisquuunDeserializer.cs @@ -0,0 +1,619 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace DisquuunCore.Deserialize { + + public static class DisquuunDeserializer { + public static byte[] ByteArrayFromSegment (ArraySegment arraySegment) { + var buffer = new byte[arraySegment.Count]; + Buffer.BlockCopy(arraySegment.Array, arraySegment.Offset, buffer, 0, arraySegment.Count); + return buffer; + } + + + public static string AddJob (DisquuunResult[] data) { + var idStrBytes = ByteArrayFromSegment(data[0].bytesArray[0]); + return Encoding.UTF8.GetString(idStrBytes); + } + + public struct JobData { + public readonly string jobId; + public readonly byte[] jobData; + + public readonly int nackCount; + public readonly int additionalDeliveriesCount; + + public JobData (DisquuunResult dataSourceBytes) { + this.jobId = Encoding.UTF8.GetString(ByteArrayFromSegment(dataSourceBytes.bytesArray[0])); + this.jobData = ByteArrayFromSegment(dataSourceBytes.bytesArray[1]); + + if (dataSourceBytes.bytesArray.Length < 3) { + nackCount = -1; + additionalDeliveriesCount = -1; + } else {// with "withcounters" option + nackCount = Convert.ToInt32(Encoding.UTF8.GetString(ByteArrayFromSegment(dataSourceBytes.bytesArray[2]))); + additionalDeliveriesCount = Convert.ToInt32(Encoding.UTF8.GetString(ByteArrayFromSegment(dataSourceBytes.bytesArray[3]))); + } + } + } + + public static JobData[] GetJob (DisquuunResult[] data) { + var jobDatas = new JobData[data.Length]; + for (var i = 0; i < data.Length; i++) { + var jobDataSource = data[i]; + jobDatas[i] = new JobData(jobDataSource); + } + return jobDatas; + } + + public static int DeserializeInt (DisquuunResult[] data) { + var valStr = Encoding.UTF8.GetString(ByteArrayFromSegment(data[0].bytesArray[0])); + return Convert.ToInt32(valStr); + } + public static int AckJob (DisquuunResult[] data) { + return DeserializeInt(data); + } + public static int FastAck (DisquuunResult[] data) { + return DeserializeInt(data); + } + public static int Working (DisquuunResult[] data) { + return DeserializeInt(data); + } + + public static int Nack (DisquuunResult[] data) { + return DeserializeInt(data); + } + + + + public class InfoStruct { + public struct HeaderAndValue { + public readonly string header; + public readonly string val; + public HeaderAndValue (string line) { + this.header = line.Split(':')[0]; + this.val = line.Split(':')[1]; + } + } + + public readonly string rawString; + public Server server; + public Clients clients; + public Memory memory; + public Jobs jobs; + public Queues queues; + public Persistence persistence; + public Stats stats; + public CPU cpu; + + public InfoStruct (byte[] sourceData) { + this.rawString = Encoding.UTF8.GetString(sourceData); + var lisesSoucrce = rawString.Replace("\r", string.Empty); + + var lines = lisesSoucrce.Split('\n'); + var lineIndexies = new List{0};// first index is 0. + + for (var i = 0; i < lines.Length; i++) { + var line = lines[i]; + if (string.IsNullOrEmpty(line)) lineIndexies.Add(i+1); + } + + for (var i = 0; i < lineIndexies.Count; i++) { + var firstLineIndex = lineIndexies[i]; + + var nextBlockIndex = -1; + if (i+1 < lineIndexies.Count) nextBlockIndex = lineIndexies[i+1]; + else continue; + + var infoCategolyStr = lines[firstLineIndex]; + var blockHeaderAndValue = lines + .Where((p, index) => firstLineIndex < index && index < nextBlockIndex) + .Where(line => line.Contains(":")) + .Select(line => new HeaderAndValue(line)) + .ToArray(); + + switch (infoCategolyStr) { + case "# Server": { + this.server = new Server(blockHeaderAndValue); + break; + } + case "# Clients": { + this.clients = new Clients(blockHeaderAndValue); + break; + } + case "# Memory": { + this.memory = new Memory(blockHeaderAndValue); + break; + } + case "# Jobs": { + this.jobs = new Jobs(blockHeaderAndValue); + break; + } + case "# Queues": { + this.queues = new Queues(blockHeaderAndValue); + break; + } + case "# Persistence": { + this.persistence = new Persistence(blockHeaderAndValue); + break; + } + case "# Stats": { + this.stats = new Stats(blockHeaderAndValue); + break; + } + case "# CPU": { + this.cpu = new CPU(blockHeaderAndValue); + break; + } + default: { + // unexpected info categoly. + break; + } + } + } + } + + public class Server { + public readonly string disque_version;//:1.0-rc1 + public readonly string disque_git_sha1;//:c95e6dc0 + public readonly string disque_git_dirty;//:1 + public readonly string disque_build_id;//:e95116bc5ef677ba + public readonly string os;//:Darwin 15.4.0 x86_64 + public readonly string arch_bits;//:64 + public readonly string multiplexing_api;//:kqueue + public readonly string gcc_version;//:4.2.1 + public readonly string process_id;//:11899 + public readonly string run_id;//:b184f132a28d37c7967bfa4d8ab990953b8610f2 + public readonly int tcp_port;//:7711 + public readonly string uptime_in_seconds;//:516 + public readonly string uptime_in_days;//:0 + public readonly string hz;//:10 + public readonly string executable;//:/Users/tartetatin/Desktop/RolePlayingChat/Server/./disque/src/disque-server + public readonly string config_file;//: + public Server (HeaderAndValue[] sourceDatas) { + foreach (var sourceData in sourceDatas) { + switch (sourceData.header) { + case "disque_version": { + this.disque_version = sourceData.val; + break; + } + case "disque_git_sha1": { + this.disque_git_sha1 = sourceData.val; + break; + } + case "disque_git_dirty": { + this.disque_git_dirty = sourceData.val; + break; + } + case "disque_build_id": { + this.disque_build_id = sourceData.val; + break; + } + case "os": { + this.os = sourceData.val; + break; + } + case "arch_bits": { + this.arch_bits = sourceData.val; + break; + } + case "multiplexing_api": { + this.multiplexing_api = sourceData.val; + break; + } + case "gcc_version": { + this.gcc_version = sourceData.val; + break; + } + case "process_id": { + this.process_id = sourceData.val; + break; + } + case "run_id": { + this.run_id = sourceData.val; + break; + } + case "tcp_port": { + this.tcp_port = Convert.ToInt32(sourceData.val); + break; + } + case "uptime_in_seconds": { + this.uptime_in_seconds = sourceData.val; + break; + } + case "uptime_in_days": { + this.uptime_in_days = sourceData.val; + break; + } + case "hz": { + this.hz = sourceData.val; + break; + } + case "executable": { + this.executable = sourceData.val; + break; + } + case "config_file": { + this.config_file = sourceData.val; + break; + } + default: { + break; + } + } + } + + } + } + + public class Clients { + public readonly string connected_clients;//:4 + public readonly string client_longest_output_list;//:0 + public readonly string client_biggest_input_buf;//:1016 + public readonly string blocked_clients;//:1 + public Clients (HeaderAndValue[] sourceDatas) { + foreach (var sourceData in sourceDatas) { + switch (sourceData.header) { + case "connected_clients": { + this.connected_clients = sourceData.val; + break; + } + case "client_longest_output_list": { + this.client_longest_output_list = sourceData.val; + break; + } + case "client_biggest_input_buf": { + this.client_biggest_input_buf = sourceData.val; + break; + } + case "blocked_clients": { + this.blocked_clients = sourceData.val; + break; + } + } + } + } + } + + public class Memory { + public readonly string used_memory;//:1070080 + public readonly string used_memory_human;//:1.02M + public readonly string used_memory_rss;//:4984832 + public readonly string used_memory_peak;//:1758176 + public readonly string used_memory_peak_human;//:1.68M + public readonly string mem_fragmentation_ratio;//:4.66 + public readonly string mem_allocator;//:libc + public Memory (HeaderAndValue[] sourceDatas) { + foreach (var sourceData in sourceDatas) { + switch (sourceData.header) { + case "used_memory": { + this.used_memory = sourceData.val; + break; + } + case "used_memory_human": { + this.used_memory_human = sourceData.val; + break; + } + case "used_memory_rss": { + this.used_memory_rss = sourceData.val; + break; + } + case "used_memory_peak": { + this.used_memory_peak = sourceData.val; + break; + } + case "used_memory_peak_human": { + this.used_memory_peak_human = sourceData.val; + break; + } + case "mem_fragmentation_ratio": { + this.mem_fragmentation_ratio = sourceData.val; + break; + } + case "mem_allocator": { + this.mem_allocator = sourceData.val; + break; + } + } + } + } + } + + public class Jobs { + public readonly int registered_jobs;//101 + public Jobs (HeaderAndValue[] sourceDatas) { + foreach (var sourceData in sourceDatas) { + switch (sourceData.header) { + case "registered_jobs": { + this.registered_jobs = Convert.ToInt32(sourceData.val); + break; + } + } + } + } + } + + public class Queues { + public readonly int registered_queues;//:31 + public Queues (HeaderAndValue[] sourceDatas) { + foreach (var sourceData in sourceDatas) { + switch (sourceData.header) { + case "registered_queues": { + this.registered_queues = Convert.ToInt32(sourceData.val); + break; + } + } + } + } + } + + public class Persistence { + public readonly string loading;//:0 + public readonly string aof_enabled;//:0 + public readonly string aof_state;//:off + public readonly string aof_rewrite_in_progress;//:0 + public readonly string aof_rewrite_scheduled;//:0 + public readonly string aof_last_rewrite_time_sec;//:-1 + public readonly string aof_current_rewrite_time_sec;//:-1 + public readonly string aof_last_bgrewrite_status;//:ok + public readonly string aof_last_write_status;//:ok + public Persistence (HeaderAndValue[] sourceDatas) { + foreach (var sourceData in sourceDatas) { + switch (sourceData.header) { + case "loading": { + this.loading = sourceData.val; + break; + } + case "aof_enabled": { + this.aof_enabled = sourceData.val; + break; + } + case "aof_state": { + this.aof_state = sourceData.val; + break; + } + case "aof_rewrite_in_progress": { + this.aof_rewrite_in_progress = sourceData.val; + break; + } + case "aof_rewrite_scheduled": { + this.aof_rewrite_scheduled = sourceData.val; + break; + } + case "aof_last_rewrite_time_sec": { + this.aof_last_rewrite_time_sec = sourceData.val; + break; + } + case "aof_current_rewrite_time_sec": { + this.aof_current_rewrite_time_sec = sourceData.val; + break; + } + case "aof_last_bgrewrite_status": { + this.aof_last_bgrewrite_status = sourceData.val; + break; + } + case "aof_last_write_status": { + this.aof_last_write_status = sourceData.val; + break; + } + } + } + } + } + + public class Stats { + public readonly string total_connections_received;//:262 + public readonly string total_commands_processed;//:856 + public readonly string instantaneous_ops_per_sec;//:55 + public readonly string total_net_input_bytes;//:2402606 + public readonly string total_net_output_bytes;//:1065901 + public readonly string instantaneous_input_kbps;//:4.07 + public readonly string instantaneous_output_kbps;//:73.92 + public readonly string rejected_connections;//:0 + public readonly string latest_fork_usec;//:0 + + public Stats (HeaderAndValue[] sourceDatas) { + foreach (var sourceData in sourceDatas) { + switch (sourceData.header) { + case "total_connections_received": { + this.total_connections_received = sourceData.val; + break; + } + case "total_commands_processed": { + this.total_commands_processed = sourceData.val; + break; + } + case "instantaneous_ops_per_sec": { + this.instantaneous_ops_per_sec = sourceData.val; + break; + } + case "total_net_input_bytes": { + this.total_net_input_bytes = sourceData.val; + break; + } + case "total_net_output_bytes": { + this.total_net_output_bytes = sourceData.val; + break; + } + case "instantaneous_input_kbps": { + this.instantaneous_input_kbps = sourceData.val; + break; + } + case "instantaneous_output_kbps": { + this.instantaneous_output_kbps = sourceData.val; + break; + } + case "rejected_connections": { + this.rejected_connections = sourceData.val; + break; + } + case "latest_fork_usec": { + this.latest_fork_usec = sourceData.val; + break; + } + } + + } + } + } + + public class CPU { + public readonly string used_cpu_sys;//:4.47 + public readonly string used_cpu_user;//:2.96 + public readonly string used_cpu_sys_children;//:0.00 + public readonly string used_cpu_user_children;//:0.00 + public CPU (HeaderAndValue[] sourceDatas) { + foreach (var sourceData in sourceDatas) { + switch (sourceData.header) { + case "used_cpu_sys": { + this.used_cpu_sys = sourceData.val; + break; + } + case "used_cpu_user": { + this.used_cpu_user = sourceData.val; + break; + } + case "used_cpu_sys_children": { + this.used_cpu_sys_children = sourceData.val; + break; + } + case "used_cpu_user_children": { + this.used_cpu_user_children = sourceData.val; + break; + } + } + } + } + } + } + + + public static InfoStruct Info (DisquuunResult[] data) { + return new InfoStruct(ByteArrayFromSegment(data[0].bytesArray[0])); + } + + public struct HelloData { + public readonly string version; + public readonly string sourceNodeId; + public readonly NodeData[] nodeDatas; + public HelloData (string version, string sourceNodeId, NodeData[] nodeDatas) { + this.version = version; + this.sourceNodeId = sourceNodeId; + this.nodeDatas = nodeDatas; + } + } + public struct NodeData { + public readonly string nodeId; + public readonly string ip; + public readonly int port; + public readonly int priority; + public NodeData (string nodeId, string ip, int port, int priority) { + this.nodeId = nodeId; + this.ip = ip; + this.port = port; + this.priority = priority; + } + } + + public static HelloData Hello (DisquuunResult[] data) { + var version = Encoding.UTF8.GetString(ByteArrayFromSegment(data[0].bytesArray[0])); + var sourceNodeId = Encoding.UTF8.GetString(ByteArrayFromSegment(data[0].bytesArray[1])); + var nodeDatas = new List(); + for (var i = 1; i < data.Length; i++) { + var nodeIdStr = Encoding.UTF8.GetString(ByteArrayFromSegment(data[i].bytesArray[0])); + var ipStr = Encoding.UTF8.GetString(ByteArrayFromSegment(data[i].bytesArray[1])); + var portInt = Convert.ToInt16(Encoding.UTF8.GetString(ByteArrayFromSegment(data[i].bytesArray[2]))); + var priorityInt = Convert.ToInt16(Encoding.UTF8.GetString(ByteArrayFromSegment(data[i].bytesArray[3]))); + nodeDatas.Add(new NodeData(nodeIdStr, ipStr, portInt, priorityInt)); + } + var helloData = new HelloData(version, sourceNodeId, nodeDatas.ToArray()); + return helloData; + } + + public static int Qlen (DisquuunResult[] data) { + var qLenStr = Encoding.UTF8.GetString(ByteArrayFromSegment(data[0].bytesArray[0])); + return Convert.ToInt32(qLenStr); + } + + public class QstatData { + public readonly string name; + public readonly int len; + public readonly int age; + public readonly int idle; + public readonly int blocked; + public readonly int import_from; + public readonly int import_rate; + public readonly int jobs_in; + public readonly int jobs_out; + public readonly string pause; + + public QstatData (DisquuunResult[] data) { + foreach (var keyValue in data) { + var key = Encoding.UTF8.GetString(ByteArrayFromSegment(keyValue.bytesArray[0])); + var strVal = Encoding.UTF8.GetString(ByteArrayFromSegment(keyValue.bytesArray[1])); + switch (key) { + case "name": { + this.name = strVal; + break; + } + case "len": { + this.len = Convert.ToInt32(strVal); + break; + } + case "age": { + this.age = Convert.ToInt32(strVal); + break; + } + case "idle": { + this.idle = Convert.ToInt32(strVal); + break; + } + case "blocked": { + this.blocked = Convert.ToInt32(strVal); + break; + } + case "import_from": { + this.import_from = Convert.ToInt32(strVal); + break; + } + case "import_rate": { + this.import_rate = Convert.ToInt32(strVal); + break; + } + case "jobs_in": { + this.jobs_in = Convert.ToInt32(strVal); + break; + } + case "jobs_out": { + this.jobs_out = Convert.ToInt32(strVal); + break; + } + case "pause": { + this.pause = strVal; + break; + } + } + } + } + } + + public static QstatData Qstat (DisquuunResult[] data) { + return new QstatData(data); + } + +// QPEEK,// +// ENQUEUE,// ... +// DEQUEUE,// ... +// DELJOB,// ... +// SHOW,// +// QSCAN,// [COUNT ] [BUSYLOOP] [MINLEN ] [MAXLEN ] [IMPORTRATE ] +// JSCAN,// [] [COUNT ] [BUSYLOOP] [QUEUE ] [STATE STATE ... STATE ] [REPLY all|id] +// PAUSE,// option1 [option2 ... optionN] + + + } + + +} \ No newline at end of file diff --git a/SampleProject/Assets/Editor/Disquuun/DisquuunSocket.cs b/SampleProject/Assets/Editor/Disquuun/DisquuunSocket.cs new file mode 100755 index 0000000..209eb16 --- /dev/null +++ b/SampleProject/Assets/Editor/Disquuun/DisquuunSocket.cs @@ -0,0 +1,695 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net; +using System.Net.Sockets; + +namespace DisquuunCore { + public class DisquuunSocket : StackSocket { + public readonly string socketId; + + private Action SocketOpened; + public Action SocketReloaded; + private Action SocketClosed; + + + private SocketToken socketToken; + + public bool IsChoosable () { + if (socketToken == null) return false; + if (socketToken.socketState == SocketState.OPENED) return true; + return false; + } + + public void SetBusy () { + socketToken.socketState = SocketState.BUSY; + } + + public enum SocketState { + NONE, + OPENING, + OPENED, + BUSY, + + SENDED, + RECEIVED, + + CLOSING, + CLOSED + } + + public class SocketToken { + public SocketState socketState; + + public readonly Socket socket; + + public byte[] receiveBuffer; + public int readableDataLength; + + public readonly SocketAsyncEventArgs connectArgs; + public SocketAsyncEventArgs sendArgs; + public readonly SocketAsyncEventArgs receiveArgs; + + public bool isPipeline; + public bool continuation; + + public Queue currentCommands; + public byte[] currentSendingBytes; + + public Func AsyncCallback; + + public SocketToken () {} + + public SocketToken (Socket socket, long bufferSize, SocketAsyncEventArgs connectArgs, SocketAsyncEventArgs sendArgs, SocketAsyncEventArgs receiveArgs) { + this.socket = socket; + + this.receiveBuffer = new byte[bufferSize]; + + this.connectArgs = connectArgs; + this.sendArgs = sendArgs; + this.receiveArgs = receiveArgs; + + this.connectArgs.UserToken = this; + this.sendArgs.UserToken = this; + this.receiveArgs.UserToken = this; + + this.receiveArgs.SetBuffer(receiveBuffer, 0, receiveBuffer.Length); + } + } + + public DisquuunSocket ( + Action SocketOpenedAct, + Action SocketReloadedAct, + Action SocketClosedAct + ) { + this.socketId = Guid.NewGuid().ToString(); + + this.SocketOpened = SocketOpenedAct; + this.SocketReloaded = SocketReloadedAct; + this.SocketClosed = SocketClosedAct; + } + + public void Connect (IPEndPoint endPoint, long bufferSize) { + try { + var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + clientSocket.NoDelay = true; + + var connectArgs = new SocketAsyncEventArgs(); + connectArgs.RemoteEndPoint = endPoint; + connectArgs.Completed += new EventHandler(OnConnect); + + var sendArgs = new SocketAsyncEventArgs(); + sendArgs.RemoteEndPoint = endPoint; + sendArgs.Completed += new EventHandler(OnSend); + + var receiveArgs = new SocketAsyncEventArgs(); + receiveArgs.RemoteEndPoint = endPoint; + receiveArgs.Completed += new EventHandler(OnReceived); + + socketToken = new SocketToken(clientSocket, bufferSize, connectArgs, sendArgs, receiveArgs); + socketToken.socketState = SocketState.OPENING; + + // start connect. + StartConnectAsync(clientSocket, socketToken.connectArgs); + } catch (Exception e) { + SocketClosed(this, "failed to create new socket.", e); + } + } + + private void StartConnectAsync (Socket clientSocket, SocketAsyncEventArgs connectArgs) { + if (!clientSocket.ConnectAsync(socketToken.connectArgs)) OnConnect(clientSocket, connectArgs); + } + + /* + Core methods of Disquuun. + */ + + /** + method for Sync execution of specific Disque command. + DEPRECATED. only use for testing. + */ + public override DisquuunResult[] DEPRECATED_Sync (DisqueCommand command, byte[] data) { + try { + + socketToken.socket.Send(data); + + var currentLength = 0; + var scanResult = new DisquuunAPI.ScanResult(false); + + while (true) { + // waiting for head of transferring data or rest of data. + socketToken.socket.Receive(socketToken.receiveBuffer, currentLength, 1, SocketFlags.None); + currentLength = currentLength + 1; + + var available = socketToken.socket.Available; + var readableLength = currentLength + available; + { + if (socketToken.receiveBuffer.Length < readableLength) { + // Disquuun.Log("サイズオーバーしてる " + socketToken.receiveBuffer.Length + " vs:" + readableLength); + Array.Resize(ref socketToken.receiveBuffer, readableLength); + } + } + + // read rest. + socketToken.socket.Receive(socketToken.receiveBuffer, currentLength, available, SocketFlags.None); + currentLength = currentLength + available; + + scanResult = DisquuunAPI.ScanBuffer(command, socketToken.receiveBuffer, 0, currentLength, socketId); + if (scanResult.isDone) break; + + // continue reading data from socket. + // if need, prepare for next 1 byte. + if (socketToken.receiveBuffer.Length == readableLength) { + Array.Resize(ref socketToken.receiveBuffer, socketToken.receiveBuffer.Length + 1); + } + } + + socketToken.socketState = SocketState.OPENED; + return scanResult.data; + + } catch (Exception e) { + DisquuunLogger.Log("DEPRECATED_Sync error:" + e, true); + throw e; + } + } + + /** + method for Async execution of specific Disque command. + */ + public override void Async (Queue commands, byte[] data, Func Callback) { + switch (socketToken.socketState) { + case SocketState.BUSY: { + StartReceiveAndSendDataAsync(commands, data, Callback); + break; + } + } + } + + /** + method for start Looping of specific Disque command. + */ + public override void Loop (Queue commands, byte[] data, Func Callback) { + switch (socketToken.socketState) { + case SocketState.BUSY: { + StartReceiveAndSendDataAsync(commands, data, Callback); + break; + } + } + } + + /** + method for execute pipelined commands. + */ + public override void Execute (Queue commands, byte[] wholeData, Func Callback) { + switch (socketToken.socketState) { + case SocketState.BUSY: { + StartReceiveAndSendDataAsync(commands, wholeData, Callback); + break; + } + } + } + + + /* + default pooled socket + disposable socket shared + */ + private void StartReceiveAndSendDataAsync (Queue commands, byte[] data, Func Callback) { + // ready for receive. + socketToken.readableDataLength = 0; + + socketToken.receiveArgs.SetBuffer(socketToken.receiveBuffer, 0, socketToken.receiveBuffer.Length); + if (!socketToken.socket.ReceiveAsync(socketToken.receiveArgs)) OnReceived(socketToken.socket, socketToken.receiveArgs); + + // if multiple commands exist, set as pipeline. + socketToken.isPipeline = false; + if (1 < commands.Count) socketToken.isPipeline = true; + + socketToken.currentCommands = commands; + socketToken.currentSendingBytes = data; + socketToken.AsyncCallback = Callback; + + try { + socketToken.sendArgs.SetBuffer(data, 0, data.Length); + } catch { + // renew. potential error is exists and should avoid this error. + var sendArgs = new SocketAsyncEventArgs(); + sendArgs.RemoteEndPoint = socketToken.receiveArgs.RemoteEndPoint; + sendArgs.Completed += new EventHandler(OnSend); + sendArgs.UserToken = socketToken; + + socketToken.sendArgs = sendArgs; + socketToken.sendArgs.SetBuffer(data, 0, data.Length); + } + + if (!socketToken.socket.SendAsync(socketToken.sendArgs)) OnSend(socketToken.socket, socketToken.sendArgs); + } + + + /* + handlers + */ + private void OnConnect (object unused, SocketAsyncEventArgs args) { + var token = (SocketToken)args.UserToken; + switch (token.socketState) { + case SocketState.OPENING: { + if (args.SocketError != SocketError.Success) { + token.socketState = SocketState.CLOSED; + var error = new Exception("connect error:" + args.SocketError.ToString()); + + SocketClosed(this, "connect failed.", error); + return; + } + // lock (socketLockObject) { + token.socketState = SocketState.OPENED; + SocketOpened(this, socketId); + return; + // } + } + default: { + throw new Exception("socket state does not correct:" + token.socketState); + } + } + } + + private void OnSend (object unused, SocketAsyncEventArgs args) { + switch (args.SocketError) { + case SocketError.Success: { + var token = args.UserToken as SocketToken; + + switch (token.socketState) { + case SocketState.BUSY: { + token.socketState = SocketState.SENDED; + break; + } + case SocketState.RECEIVED: { + if (token.continuation) { + // ready for next loop receive. + token.readableDataLength = 0; + token.receiveArgs.SetBuffer(token.receiveBuffer, 0, token.receiveBuffer.Length); + if (!token.socket.ReceiveAsync(token.receiveArgs)) OnReceived(token.socket, token.receiveArgs); + + try { + token.sendArgs.SetBuffer(token.currentSendingBytes, 0, token.currentSendingBytes.Length); + } catch { + // renew. potential error is exists and should avoid this error. + var sendArgs = new SocketAsyncEventArgs(); + sendArgs.RemoteEndPoint = token.receiveArgs.RemoteEndPoint; + sendArgs.Completed += new EventHandler(OnSend); + sendArgs.UserToken = token; + token.sendArgs = sendArgs; + token.sendArgs.SetBuffer(token.currentSendingBytes, 0, token.currentSendingBytes.Length); + } + if (!token.socket.SendAsync(token.sendArgs)) OnSend(token.socket, token.sendArgs); + return; + } + + token.socketState = SocketState.OPENED; + SocketReloaded(this); + return; + } + } + return; + } + default: { + DisquuunLogger.Log("onsend error, " + args.SocketError, true); + // if (Error != null) { + // var error = new Exception("send error:" + socketError.ToString()); + // Error(error); + // } + return; + } + } + } + + + + private void OnReceived (object unused, SocketAsyncEventArgs args) { + var token = (SocketToken)args.UserToken; + if (args.SocketError != SocketError.Success) { + switch (token.socketState) { + case SocketState.CLOSING: + case SocketState.CLOSED: { + // already closing, ignore. + return; + } + default: { + switch (args.SocketError) { + case SocketError.ConnectionReset: { + DisquuunLogger.Log("ConnectionResetが出てる. " + " token.socketState:" + token.socketState, true); + break; + } + default: { + DisquuunLogger.Log("onReceive default token.socketState:" + token.socketState + " error:" + args.SocketError, true); + break; + } + } + + Disconnect(); + + var e1 = new Exception("receive status is not good."); + SocketClosed(this, "failed to receive.", e1); + return; + } + } + } + + if (args.BytesTransferred == 0) return; + + var bytesAmount = args.BytesTransferred; + + // update token-dataLength as read completed. + token.readableDataLength = token.readableDataLength + bytesAmount; + + if (token.isPipeline) PipelineReceive(token); + else LoopOrAsyncReceive(token); + } + + private void PipelineReceive (SocketToken token) { + var fromCursor = 0; + + /* + read data from receiveBuffer by moving fromCursor. + */ + while (true) { + var currentCommand = token.currentCommands.Peek(); + var result = DisquuunAPI.ScanBuffer(currentCommand, token.receiveBuffer, fromCursor, token.readableDataLength, socketId); + + if (result.isDone) { + token.AsyncCallback(currentCommand, result.data); + + // deque as read done. + token.currentCommands.Dequeue(); + + if (token.currentCommands.Count == 0) { + // pipelining is over. + switch (token.socketState) { + case SocketState.BUSY: { + token.socketState = SocketState.RECEIVED; + break; + } + case SocketState.SENDED: { + token.socketState = SocketState.OPENED; + SocketReloaded(this); + break; + } + default: { + break; + } + } + return; + } + + // commands are still remained. + + // got all data is just consumed. get rest from outside. + if (fromCursor == token.readableDataLength) { + StartContinueReceiving(token, 0); + return; + } + + // rest pipeline commands and received data is exists in buffer. + fromCursor = result.cursor; + continue; + } + + /* + reading is not completed. the fragment of command exists. + */ + + var fragmentDataLength = token.readableDataLength - fromCursor; + + // move fragment data to head of buffer. + Buffer.BlockCopy(token.receiveBuffer, fromCursor, token.receiveBuffer, 0, fragmentDataLength); + + StartContinueReceiving(token, fragmentDataLength); + break; + } + } + + private void LoopOrAsyncReceive (SocketToken token) { + var currentCommand = token.currentCommands.Peek(); + var result = DisquuunAPI.ScanBuffer(currentCommand, token.receiveBuffer, 0, token.readableDataLength, socketId); + + if (result.isDone && result.cursor == token.readableDataLength) { + // update continuation status. + token.continuation = token.AsyncCallback(currentCommand, result.data); + + if (token.continuation) { + switch (token.socketState) { + case SocketState.BUSY: { + token.socketState = SocketState.RECEIVED; + break; + } + case SocketState.SENDED: { + // ready for next loop receive. + token.readableDataLength = 0; + token.receiveArgs.SetBuffer(token.receiveBuffer, 0, token.receiveBuffer.Length); + if (!token.socket.ReceiveAsync(token.receiveArgs)) OnReceived(token.socket, token.receiveArgs); + + try { + token.sendArgs.SetBuffer(token.currentSendingBytes, 0, token.currentSendingBytes.Length); + } catch { + // renew. potential error is exists and should avoid this error. + var sendArgs = new SocketAsyncEventArgs(); + sendArgs.RemoteEndPoint = token.receiveArgs.RemoteEndPoint; + sendArgs.Completed += new EventHandler(OnSend); + sendArgs.UserToken = token; + token.sendArgs = sendArgs; + token.sendArgs.SetBuffer(token.currentSendingBytes, 0, token.currentSendingBytes.Length); + } + + if (!token.socket.SendAsync(token.sendArgs)) OnSend(token.socket, token.sendArgs); + + break; + } + default: { + // closing or other state. should close. + break; + } + } + return; + } + + // end of loop or end of async. + + switch (token.socketState) { + case SocketState.BUSY: { + token.socketState = SocketState.RECEIVED; + break; + } + case SocketState.SENDED: { + token.socketState = SocketState.OPENED; + SocketReloaded(this); + break; + } + default: { + break; + } + } + return; + } + + // not yet received all data. + // continue receiving. + + StartContinueReceiving(token, token.readableDataLength); + } + + private void StartContinueReceiving (SocketToken token, int receiveAfterFragmentIndex) { + // set already got data length to set param. + token.readableDataLength = receiveAfterFragmentIndex; + + /* + get readable size of already received data for next read=OnReceived. + resize if need. + */ + var nextAdditionalBytesLength = token.socket.Available; + if (receiveAfterFragmentIndex == token.receiveBuffer.Length) Array.Resize(ref token.receiveBuffer, token.receiveArgs.Buffer.Length + nextAdditionalBytesLength); + + /* + note that, + + SetBuffer([buffer], offset, count)'s "count" is, actually not count. + + it's "offset" is "offset of receiving-data-window against buffer", + but the "count" is actually "size limitation of next receivable data size". + + this "size" should be smaller than size of current bufferSize - offset && larger than 0. + + e.g. + if buffer is buffer[10], offset can set 0 ~ 8, and, + count should be 9 ~ 1. + + if vaiolate to this rule, ReceiveAsync never receive data. not good behaviour. + + and, the "buffer" is treated as pointer. this API treats the pointer of buffer directly. + this means, when the byteTransferred is reaching to the size of "buffer", then you resize it to proper size, + + you should re-set the buffer's pointer by using SetBuffer API. + + + actually, SetBuffer's parameters are below. + + socket.SetBuffer([bufferAsPointer], additionalDataOffset, receiveSizeLimit) + */ + var receivableCount = token.receiveBuffer.Length - receiveAfterFragmentIndex; + + // should set token.receiveBuffer to receiveArgs. because it was resized or not. + // and of cource this SetBuffer is for setting receivableCount. + token.receiveArgs.SetBuffer(token.receiveBuffer, receiveAfterFragmentIndex, receivableCount); + + if (!token.socket.ReceiveAsync(token.receiveArgs)) OnReceived(token.socket, token.receiveArgs); + } + + public void Disconnect () { + try { + socketToken.socketState = SocketState.CLOSING; + socketToken.socket.Shutdown(SocketShutdown.Both); + // socketToken.socket.Dispose(); + socketToken.socketState = SocketState.CLOSED; + } catch (Exception e) { + DisquuunLogger.Log("Disconnect e:" + e.Message, true); + } + return; + } + + /* + utils + */ + + private static bool IsSocketConnected (Socket s) { + bool part1 = s.Poll(1000, SelectMode.SelectRead); + bool part2 = (s.Available == 0); + + if (part1 && part2) return false; + + return true; + } + } + + public struct StackCommandData { + public readonly DisquuunExecuteType executeType; + public readonly Queue commands; + public readonly byte[] data; + public readonly Func Callback; + + public StackCommandData (DisquuunExecuteType executeType, Queue commands, byte[] dataSource, Func Callback) { + this.executeType = executeType; + this.commands = commands; + this.data = dataSource; + this.Callback = Callback; + } + } + + public class StackSocket { + private object stackLockObject = new object(); + + private Queue stackedDataQueue; + + public int QueueCount () { + lock (stackLockObject) { + return stackedDataQueue.Count; + } + } + + public bool IsQueued () { + lock (stackLockObject) { + if (0 < stackedDataQueue.Count) return true; + return false; + } + } + public StackCommandData Dequeue () { + lock (stackLockObject) { + return stackedDataQueue.Dequeue(); + } + } + + public StackSocket () { + this.stackedDataQueue = new Queue(); + } + + public virtual DisquuunResult[] DEPRECATED_Sync (DisqueCommand command, byte[] data) { + throw new Exception("deprecated & all sockets are using."); + } + + public virtual void Async (Queue commands, byte[] data, Func Callback) { + lock (stackLockObject) this.stackedDataQueue.Enqueue(new StackCommandData(DisquuunExecuteType.ASYNC, commands, data, Callback)); + } + + public virtual void Loop (Queue commands, byte[] data, Func Callback) { + lock (stackLockObject) this.stackedDataQueue.Enqueue(new StackCommandData(DisquuunExecuteType.LOOP, commands, data, Callback)); + } + + public virtual void Execute (Queue commands, byte[] wholeData, Func Callback) { + lock (stackLockObject) this.stackedDataQueue.Enqueue(new StackCommandData(DisquuunExecuteType.PIPELINE, commands, wholeData, Callback)); + } + } + + + /** + extension definition for DisquuunSocket. + */ + public static class DisquuunExtension { + public static DisquuunResult[] DEPRICATED_Sync (this DisquuunInput input) { + var socket = input.socketPool.ChooseAvailableSocket(); + return socket.DEPRECATED_Sync(input.command, input.data); + } + + public static void Async (this DisquuunInput input, Action Callback) { + var socket = input.socketPool.ChooseAvailableSocket(); + var commands = new Queue(); + commands.Enqueue(input.command); + + socket.Async( + commands, + input.data, + (command, resultBytes) => { + Callback(command, resultBytes); + return false; + } + ); + } + + public static void Loop (this DisquuunInput input, Func Callback) { + var socket = input.socketPool.ChooseAvailableSocket(); + var commands = new Queue(); + commands.Enqueue(input.command); + + socket.Loop(commands, input.data, Callback); + } + + public static void Execute (this List> inputs, Action Callback) { + if (!inputs.Any()) return; + if (!inputs[0].Any()) return; + + var socketPool = inputs[0][0].socketPool; + + for (var i = 0; i < inputs.Count; i++) { + var currentSlotInputs = inputs[i]; + + var socket = socketPool.ChooseAvailableSocket(); + + var commands = new Queue(); + foreach (var input in currentSlotInputs) commands.Enqueue(input.command); + + using (var memStream = new MemoryStream()) { + foreach (var input in currentSlotInputs) memStream.Write(input.data, 0, input.data.Length); + var wholeData = memStream.ToArray(); + + socket.Execute( + commands, + wholeData, + (command, resultBytes) => { + Callback(command, resultBytes); + return false; + } + ); + } + } + + inputs.Clear(); + } + } + +} diff --git a/SampleProject/Assets/Editor/DummyServer.cs b/SampleProject/Assets/Editor/DummyServer.cs new file mode 100644 index 0000000..bb58b17 --- /dev/null +++ b/SampleProject/Assets/Editor/DummyServer.cs @@ -0,0 +1,165 @@ +using UnityEditor; +using UnityEngine; +using DisquuunCore; +using System; +using DisquuunCore.Deserialize; +using System.Linq; +using System.Collections.Generic; +using System.Text; + +[InitializeOnLoad] public class DummyServer { + + static DummyServer () { + Debug.Log("ignore."); + return; + + + if (!EditorApplication.isPlaying && EditorApplication.isPlayingOrWillChangePlaymode) { + // pass. + } else { + return; + } + + + Debug.Log("initializing."); + Disquuun disquuun = null; + + // set server handler. + { + EditorApplication.CallbackFunction runningCheck = null; + + runningCheck = () => { + if (!EditorApplication.isPlaying && !EditorApplication.isPlayingOrWillChangePlaymode) { + EditorApplication.update -= runningCheck; + disquuun.Disconnect(); + Debug.Log("server disconnected."); + } + }; + + EditorApplication.update += runningCheck; + } + + disquuun = new Disquuun( + "192.168.11.5", + 7711, + 1024, + 5, + disquuunId => { + Debug.Log("connected to disque."); + + var queueId = "sample_disque_client_context"; + + // getJobのループ + disquuun.GetJob(new string[]{queueId}).Loop( + (command, getJobData) => { + var jobDatas = DisquuunDeserializer.GetJob(getJobData); + + // get jobId from got job data. + var gotJobIds = jobDatas.Select(jobData => jobData.jobId).ToArray(); + + // fastack it. + disquuun.FastAck(gotJobIds).Async( + (fastAckCommand, fastAckData) => { + // fastack succeded or not. + var fastAckedJobCount = DisquuunDeserializer.FastAck(fastAckData); + if (fastAckedJobCount != gotJobIds.Length) { + Debug.LogWarning("shortage of fastAckedJobCount:" + fastAckedJobCount + " expected:" + gotJobIds.Length); + } + // Debug.Log("fastack done."); + } + ); + + var datas = jobDatas.Select(job => job.jobData).ToArray(); + + var echoPool = new Dictionary>(); + + + foreach (var data in datas) { + switch ((char)data[0]) { + /* + STATE_CONNECT = 1 + STATE_STRING_MESSAGE = 2 + STATE_BINARY_MESSAGE = 3 + STATE_DISCONNECT_INTENT = 4 + STATE_DISCONNECT_ACCIDT = 5 + STATE_DISCONNECT_DISQUE_ACKFAILED = 6 + STATE_DISCONNECT_DISQUE_ACCIDT_SENDFAILED = 7 + */ + case '1': { + // connected. do nothing. + continue; + } + case '2': { + // string message. + // pass. + break; + } + case '3': { + // binary message. + // pass. + break; + } + case '4': { + // intentional disconnect. + // do nothing. + continue; + } + case '5': { + // accidentional disconnect. + // do nothing. + continue; + } + default: { + // some kind of error. + continue; + } + } + + // 簡単なエコーなので、そのままデータを送信者に返せればいい。 + var conId = new byte[36]; + Buffer.BlockCopy(data, 1, conId, 0, conId.Length); + + var conIdStr = Encoding.UTF8.GetString(conId); + + var payloadData = new byte[data.Length - 1 - conId.Length]; + if (payloadData.Length == 0) { + // no data found. + continue; + } + + Buffer.BlockCopy(data, 1 + conId.Length, payloadData, 0, payloadData.Length); + + if (!echoPool.ContainsKey(conIdStr)) { + echoPool[conIdStr] = new List(); + } + echoPool[conIdStr].Add(payloadData); + } + + if (!echoPool.Any()) { + return true; + } + + // 受け取ったデータをパイプラインに込めて送付 + var addJobs = new List(); + + foreach (var echoDataByConnection in echoPool) { + var targetQueueId = echoDataByConnection.Key; + var echoDatas = echoDataByConnection.Value; + foreach (var echoData in echoDatas) { + addJobs.Add(disquuun.AddJob(targetQueueId, echoData)); + } + } + + disquuun.Pipeline(addJobs.ToArray()).Execute( + (responseCommand, results) => { + // do nothing. + } + ); + + return true; + } + ); + } + ); + } +} \ No newline at end of file diff --git a/SampleProject/Assets/Samples/Connect/ConnectSample.unity b/SampleProject/Assets/Samples/Connect/ConnectSample.unity index 5479362..5d01655 100644 Binary files a/SampleProject/Assets/Samples/Connect/ConnectSample.unity and b/SampleProject/Assets/Samples/Connect/ConnectSample.unity differ diff --git a/SampleProject/Assets/Samples/Connect/ConnectionSampleScript.cs b/SampleProject/Assets/Samples/Connect/ConnectionSampleScript.cs index 5ea3262..84727fc 100644 --- a/SampleProject/Assets/Samples/Connect/ConnectionSampleScript.cs +++ b/SampleProject/Assets/Samples/Connect/ConnectionSampleScript.cs @@ -1,9 +1,12 @@ using System; using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Sockets; using System.Text; - +using System.Threading; using UnityEngine; - +using UnityEngine.UI; using WebuSocketCore; @@ -11,16 +14,111 @@ webuSocket connection sample. */ public class ConnectionSampleScript : MonoBehaviour { + public Text times; + public Text achieved2; WebuSocket webSocket; - + private string serverIP = "13.230.48.184"; + // private string serverIP = "127.0.0.1"; + private int portNum = 8080; + + + + + bool opened = false; + public const string userId = "test"; + UdpClient udp; + IPEndPoint remoteEP = null; + + private int udpReceiveCount; + private bool achieved; + + void OnGUI () { + GUILayout.Label("udpReceiveCount:" + udpReceiveCount); + GUILayout.Label("achieved:" + achieved); + } + + private List acts = new List(); + private object lockObj = new object(); + private void Enqueue (Action act) { + lock (lockObj) { + acts.Add(act); + } + } + + private void ThreadMethod () { + + + while(true) + { + try { + Debug.Log("start waiting."); + byte[] data = udp.Receive(ref remoteEP); + remoteEP = null; + string text = Encoding.ASCII.GetString(data); + udpReceiveCount++; + Action act = () => { + times.text += "+1 "; + }; + Enqueue(act); + + if (text.Contains(":")) { + Debug.Log("サーバからudpでのレスポンスは来た"); + var ipAndPort = text.Split(':'); + var currentReceivedIp = ipAndPort[0];// サーバが返してきたクライアントのglobal ip + var currentReceivedPort = ipAndPort[1];// サーバが返してきたクライアントのglobal port + + Connect(currentReceivedIp, currentReceivedPort); + continue; + } + + Debug.Log("udp received:" + text); + Action act2 = () => { + achieved2.text += "true. text:" + text; + }; + Enqueue(act2); + achieved = true; + } catch (Exception e) { + Debug.LogError("e:" + e); + Thread.CurrentThread.Abort(); + } + } + } + + IPAddress localIP; + + void Start () { + + // udpClientでデータを送るために、自分のglobal ipを得る + using (Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, 0)) { + socket.Connect("8.8.8.8", 65530); + var endPoint = socket.LocalEndPoint as IPEndPoint; + localIP = endPoint.Address; + Debug.Log("localIP:" + localIP.ToString()); + } + + // udpClientを作成し、epを指定してサーバへとudpを送付する。 + // これで上りの経路がNATに記録される。 + if (true) { + udp = new UdpClient(new IPEndPoint(new IPAddress(localIP.GetAddressBytes()), portNum)); + var ep = new IPEndPoint(IPAddress.Parse(serverIP), portNum); + + // 送るデータはなんでもいい。 + var bytes = Encoding.UTF8.GetBytes("hello!"); + + udp.Send(bytes, bytes.Length, ep); + var thread = new Thread(new ThreadStart(ThreadMethod)); + thread.Start(); + } + } + private void Connect (string udpIp, string udpPort) { webSocket = new WebuSocket( // url. - "wss://echo.websocket.org:443/", + "ws://" + serverIP + ":"+ portNum + "/sample_disque_client", // buffer size. 1024, @@ -29,10 +127,10 @@ void Start () { () => { opened = true; Debug.Log("connected to websocket echo-server. send hello to echo-server"); - webSocket.SendString("hello!"); - webSocket.SendString("wooooo!"); - webSocket.SendString("looks!"); - webSocket.SendString("fine!"); + // webSocket.SendString("hello!"); + // webSocket.SendString("wooooo!"); + // webSocket.SendString("looks!"); + // webSocket.SendString("fine!"); }, // handler for receiving data from server. @@ -54,6 +152,22 @@ these data array will be destroyed soon after leaving this block. Buffer.BlockCopy(data.Array, data.Offset, bytes, 0, data.Count); Debug.Log("message:" + Encoding.UTF8.GetString(bytes)); + + // もし数字がかえっていたら、udpで通信してみる。 + try { + var portNum = Convert.ToUInt16(Encoding.UTF8.GetString(bytes)); + + udp = new UdpClient(new IPEndPoint(new IPAddress(localIP.GetAddressBytes()), portNum)); + var ep = new IPEndPoint(IPAddress.Parse(serverIP), portNum); + var bytes2 = Encoding.UTF8.GetBytes("hello! again."); + + udp.Send(bytes2, bytes2.Length, ep); + Debug.Log("udp sended. target portNum:" + portNum); + + // var bytes3 = Encoding.UTF8.GetBytes(portNum.ToString()); + // webSocket.Send(bytes3); + } catch {} + } }, () => { @@ -66,12 +180,27 @@ these data array will be destroyed soon after leaving this block. Debug.LogError("error, errorEnum:" + errorEnum + " exception:" + exception); }, new Dictionary{ - // set WebSocket connection header parameters here! + // // set WebSocket connection header parameters here! + {"id", userId}, + // // {"debugaddr", udpIp}, + {"debugport", udpPort} } ); } - + void Update () { + lock (lockObj) { + if (acts.Any()) { + acts.ForEach(a => a()); + acts.Clear(); + } + } + } void OnApplicationQuit () { - webSocket.Disconnect(); + if (webSocket != null && webSocket.IsConnected()) { + webSocket.Disconnect(); + } + if (udp != null) { + udp.Close(); + } } } diff --git a/SampleProject/Assets/Samples/WebSocket+Udp/Connections/IP.cs b/SampleProject/Assets/Samples/WebSocket+Udp/Connections/IP.cs new file mode 100644 index 0000000..7c8a60f --- /dev/null +++ b/SampleProject/Assets/Samples/WebSocket+Udp/Connections/IP.cs @@ -0,0 +1,42 @@ +using System; +using System.Net; +using System.Net.Sockets; + +/** + implementation of IP services. +*/ +namespace AutoyaFramework.Connections.IP { + public class IP { + + public static IPAddress LocalIPAddressSync () { + using (Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, 0)) { + socket.Connect("8.8.8.8", 65530); + var endPoint = socket.LocalEndPoint as IPEndPoint; + UnityEngine.Debug.Log("end:" + endPoint.Address); + return endPoint.Address; + } + } + + public static void LocalIPAddress (Action onDone) { + var socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, 0); + var connectArgs = new SocketAsyncEventArgs(); + connectArgs.AcceptSocket = socket; + connectArgs.RemoteEndPoint = new IPEndPoint(new IPAddress(new byte[]{8, 8, 8, 8}), 65530); + + Action onConnected = (a, b) => { + var endPoint = socket.LocalEndPoint as IPEndPoint; + var localIP = endPoint.Address; + + onDone(localIP); + + socket.Disconnect(false); + }; + + connectArgs.Completed += new EventHandler(onConnected); + + if (!socket.ConnectAsync(connectArgs)) { + onConnected(socket, connectArgs); + } + } + } +} \ No newline at end of file diff --git a/SampleProject/Assets/Samples/WebSocket+Udp/Connections/README.md b/SampleProject/Assets/Samples/WebSocket+Udp/Connections/README.md new file mode 100644 index 0000000..16e3749 --- /dev/null +++ b/SampleProject/Assets/Samples/WebSocket+Udp/Connections/README.md @@ -0,0 +1,8 @@ +# Connections +connection utilities which can connect from Unity to outside, e.g. http. + +## Contains +* http/1.1 + +### Get / Post / Put / Delete + diff --git a/SampleProject/Assets/Samples/WebSocket+Udp/Connections/Udp.cs b/SampleProject/Assets/Samples/WebSocket+Udp/Connections/Udp.cs new file mode 100644 index 0000000..1039116 --- /dev/null +++ b/SampleProject/Assets/Samples/WebSocket+Udp/Connections/Udp.cs @@ -0,0 +1,120 @@ +using System; +using System.Net; +using System.Net.Sockets; +using AutoyaFramework.Connections.IP; +using UnityEngine; + +/** + implementation of Udp send/receive. +*/ +namespace AutoyaFramework.Connections.Udp { + /** + udp receiver feature. + you MUST Close() when you finish using this receiver. + */ + public class UdpReceiver { + private readonly UdpClient udp; + private readonly IPEndPoint remoteEndPoint; + + private readonly object lockObj; + private bool closed; + + public UdpReceiver (IPAddress target, int port, Action receiver, IPEndPoint remoteEndPoint=null) { + var endpoint = new IPEndPoint(target, port); + + if (remoteEndPoint != null) { + this.remoteEndPoint = remoteEndPoint; + } + + udp = new UdpClient(new IPEndPoint(IPAddress.Any, port)); + udp.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); + + lockObj = new object(); + + ContinueReceive(receiver, endpoint); + } + + public int Send (byte[] data) { + if (remoteEndPoint != null) { + return udp.Send(data, data.Length, remoteEndPoint); + } + return 0; + } + + private void ContinueReceive (Action receiver, IPEndPoint endpoint) { + udp.BeginReceive( + ar => { + var receivedBytes = udp.EndReceive(ar, ref endpoint); + if (receivedBytes.Length == 0) { + throw new Exception("receivedBytes is 0."); + } + + if (receiver != null) { + receiver(receivedBytes); + } + + if (!closed) { + ContinueReceive(receiver, endpoint); + } + }, + lockObj + ); + } + + public void Close () { + if (closed) { + return; + } + + closed = true; + udp.Close(); + } + } + + /** + udp sender feature. + you MUST Close() when you finish using this sender. + */ + public class UdpSender { + private readonly UdpClient udp; + private readonly object lockObj; + private bool closed; + + public UdpSender (IPAddress target, int port) { + udp = new UdpClient(); + udp.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); + udp.Connect(target, port); + // udp.EnableBroadcast = true; + + lockObj = new object(); + } + + public int SendSync (byte[] data) { + return udp.Send(data, data.Length); + } + + public void Send (byte[] data) { + try { + udp.BeginSend( + data, + data.Length, + ar => { + udp.EndSend(ar); + }, + lockObj + ); + } catch (Exception e) { + Debug.Log("どんなエラーでるのこれ:" + e); + } + } + + public void Close () { + if (closed) { + return; + } + + closed = true; + udp.Close(); + } + } +} \ No newline at end of file diff --git a/SampleProject/Assets/Samples/WebSocket+Udp/WebSocket+Udp.unity b/SampleProject/Assets/Samples/WebSocket+Udp/WebSocket+Udp.unity new file mode 100644 index 0000000..1598e6c Binary files /dev/null and b/SampleProject/Assets/Samples/WebSocket+Udp/WebSocket+Udp.unity differ diff --git a/SampleProject/Assets/Samples/WebSocket+Udp/WebSocket_Udp.cs b/SampleProject/Assets/Samples/WebSocket+Udp/WebSocket_Udp.cs new file mode 100644 index 0000000..7672044 --- /dev/null +++ b/SampleProject/Assets/Samples/WebSocket+Udp/WebSocket_Udp.cs @@ -0,0 +1,120 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Net; +using System.Net.Sockets; +using System.Text; +using AutoyaFramework.Connections.Udp; +using UnityEngine; +using WebuSocketCore; + +public class WebSocket_Udp : MonoBehaviour { + + UdpReceiver udpUnit; + int udpReceivePort = 9090; + int wsPort = 8080; + + + // private string serverIP = "13.230.98.195"; + private string serverIP = "127.0.0.1"; + private WebuSocket webuSocket; + + // Use this for initialization + void Start () { + + // udpサーバを立ち上げ、往復 + wsの接続を行う。 + // タイムアウトまでに帰ってこなければ、wsのみの接続を行う。 + using (var socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, 0)) { + socket.Connect("8.8.8.8", 65530); + var endPoint = socket.LocalEndPoint as IPEndPoint; + var localIP = endPoint.Address; + + var first = true; + udpUnit = new UdpReceiver( + localIP, + udpReceivePort, + udpData => { + Debug.Log("received udp data len:" + udpData.Length); + var param = Encoding.UTF8.GetString(udpData); + + if (first) { + first = false; + + Debug.Log("param:" + param); + var ip = param.Split(':')[0]; + var port = param.Split(':')[1]; + ConnectWebSocket("testUser", ip, port); + } else { + // udp data received. + Debug.Log("received udp param:" + param); + } + }, + new IPEndPoint(IPAddress.Parse(serverIP), 8080) + ); + + udpUnit.Send( + Encoding.UTF8.GetBytes("hello first udp from client.") + ); + + Debug.Log("localIP:" + localIP.ToString()); + } + } + + private void ConnectWebSocket (string userId, string udpIp, string udpPort) { + IPAddress localIP; + + using (var socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, 0)) { + socket.Connect("8.8.8.8", 65530); + var endPoint = socket.LocalEndPoint as IPEndPoint; + localIP = endPoint.Address; + } + + Debug.Log("ready connect."); + webuSocket = new WebuSocket( + // url. + "ws://" + serverIP + ":"+ wsPort + "/sample_disque_client", + + // buffer size. + 1024, + () => { + Debug.Log("connected to websocket echo-server. send hello to echo-server"); + }, + datas => { + while (0 < datas.Count) { + ArraySegment data = datas.Dequeue(); + Debug.Log("received tcp data len:" + data.Count); + + byte[] bytes = new byte[data.Count]; + Buffer.BlockCopy(data.Array, data.Offset, bytes, 0, data.Count); + + Debug.Log("tcp message:" + Encoding.UTF8.GetString(bytes)); + } + }, + () => { + Debug.Log("received server ping. automatically ponged."); + }, + closeReason => { + Debug.Log("closed, closeReason:" + closeReason); + }, + (errorEnum, exception) => { + Debug.LogError("error, errorEnum:" + errorEnum + " exception:" + exception); + }, + new Dictionary{ + // // set WebSocket connection header parameters here! + {"id", userId}, + {"ip", udpIp}, + {"port", udpPort} + } + ); + } + + // Update is called once per frame + void OnApplicationQuit () { + if (webuSocket != null) { + webuSocket.Disconnect(); + } + if (udpUnit != null) { + udpUnit.Close(); + } + } +} diff --git a/SampleProject/Assets/WebuSocket/WebuSocket.cs b/SampleProject/Assets/WebuSocket/WebuSocket.cs index 07bd4c2..39a0d78 100644 --- a/SampleProject/Assets/WebuSocket/WebuSocket.cs +++ b/SampleProject/Assets/WebuSocket/WebuSocket.cs @@ -606,6 +606,8 @@ ready buffer data. ReadBuffer(token); } return; + } else { + UnityEngine.Debug.LogError("received."); } } diff --git a/SampleProject/ProjectSettings/AudioManager.asset b/SampleProject/ProjectSettings/AudioManager.asset new file mode 100644 index 0000000..8ba3744 Binary files /dev/null and b/SampleProject/ProjectSettings/AudioManager.asset differ diff --git a/SampleProject/ProjectSettings/ClusterInputManager.asset b/SampleProject/ProjectSettings/ClusterInputManager.asset new file mode 100644 index 0000000..e511338 Binary files /dev/null and b/SampleProject/ProjectSettings/ClusterInputManager.asset differ diff --git a/SampleProject/ProjectSettings/DynamicsManager.asset b/SampleProject/ProjectSettings/DynamicsManager.asset new file mode 100644 index 0000000..fc75f7b Binary files /dev/null and b/SampleProject/ProjectSettings/DynamicsManager.asset differ diff --git a/SampleProject/ProjectSettings/EditorBuildSettings.asset b/SampleProject/ProjectSettings/EditorBuildSettings.asset new file mode 100644 index 0000000..81ce24b Binary files /dev/null and b/SampleProject/ProjectSettings/EditorBuildSettings.asset differ diff --git a/SampleProject/ProjectSettings/EditorSettings.asset b/SampleProject/ProjectSettings/EditorSettings.asset new file mode 100644 index 0000000..fe458d2 Binary files /dev/null and b/SampleProject/ProjectSettings/EditorSettings.asset differ diff --git a/SampleProject/ProjectSettings/GraphicsSettings.asset b/SampleProject/ProjectSettings/GraphicsSettings.asset new file mode 100644 index 0000000..fba5be4 Binary files /dev/null and b/SampleProject/ProjectSettings/GraphicsSettings.asset differ diff --git a/SampleProject/ProjectSettings/InputManager.asset b/SampleProject/ProjectSettings/InputManager.asset new file mode 100644 index 0000000..7e270a8 Binary files /dev/null and b/SampleProject/ProjectSettings/InputManager.asset differ diff --git a/SampleProject/ProjectSettings/NavMeshAreas.asset b/SampleProject/ProjectSettings/NavMeshAreas.asset new file mode 100644 index 0000000..312165e Binary files /dev/null and b/SampleProject/ProjectSettings/NavMeshAreas.asset differ diff --git a/SampleProject/ProjectSettings/NetworkManager.asset b/SampleProject/ProjectSettings/NetworkManager.asset new file mode 100644 index 0000000..19bf410 Binary files /dev/null and b/SampleProject/ProjectSettings/NetworkManager.asset differ diff --git a/SampleProject/ProjectSettings/Physics2DSettings.asset b/SampleProject/ProjectSettings/Physics2DSettings.asset new file mode 100644 index 0000000..1bc3ddc Binary files /dev/null and b/SampleProject/ProjectSettings/Physics2DSettings.asset differ diff --git a/SampleProject/ProjectSettings/ProjectSettings.asset b/SampleProject/ProjectSettings/ProjectSettings.asset new file mode 100644 index 0000000..55b9566 Binary files /dev/null and b/SampleProject/ProjectSettings/ProjectSettings.asset differ diff --git a/SampleProject/ProjectSettings/ProjectVersion.txt b/SampleProject/ProjectSettings/ProjectVersion.txt new file mode 100644 index 0000000..d542d5a --- /dev/null +++ b/SampleProject/ProjectSettings/ProjectVersion.txt @@ -0,0 +1 @@ +m_EditorVersion: 5.6.1p4 diff --git a/SampleProject/ProjectSettings/QualitySettings.asset b/SampleProject/ProjectSettings/QualitySettings.asset new file mode 100644 index 0000000..74cc1aa Binary files /dev/null and b/SampleProject/ProjectSettings/QualitySettings.asset differ diff --git a/SampleProject/ProjectSettings/TagManager.asset b/SampleProject/ProjectSettings/TagManager.asset new file mode 100644 index 0000000..f520705 Binary files /dev/null and b/SampleProject/ProjectSettings/TagManager.asset differ diff --git a/SampleProject/ProjectSettings/TimeManager.asset b/SampleProject/ProjectSettings/TimeManager.asset new file mode 100644 index 0000000..2583748 Binary files /dev/null and b/SampleProject/ProjectSettings/TimeManager.asset differ diff --git a/SampleProject/ProjectSettings/UnityConnectSettings.asset b/SampleProject/ProjectSettings/UnityConnectSettings.asset new file mode 100644 index 0000000..5e012e9 Binary files /dev/null and b/SampleProject/ProjectSettings/UnityConnectSettings.asset differ