Anonymizer#
Phone numbers can be anonymized before or after featurization; this workbook walks through both. Additionally, phone number formatting (e.g. expectations about international calling codes having been stripped) can be checked during the anonymization process by passing a format_checker to the anonymizer; this is demonstrated as well.
from cider.datastore import DataStore
from cider.anonymizer import Anonymizer
from pandas.api.types import is_numeric_dtype
import os
import sys
import pandas as pd
# Prevents python version mismatches between spark driver and executor
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
Set up the configuration file and load some simulated data, including featurization results if present, using the datastore.
# This path should point to your cider installation, where configs and data for this demo are located. In particular, this file must
# contain a
from pathlib import Path
cider_installation_directory = Path('../../cider')
datastore = DataStore(config_file_path_string= cider_installation_directory / 'configs' / 'config_anonymize.yml')
anonymizer = Anonymizer(datastore=datastore)
outputs_path = anonymizer.outputs_path
23/05/24 13:54:44 WARN Utils: Your hostname, MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.43 instead (on interface en0)
23/05/24 13:54:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/24 13:54:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Loading CDR...
Loading recharges...
SUCCESS!
Loading mobile data...
Loading mobile money...
Anonymize input data#
We can anonymize the five main categories of input data which contain phone numbers:
CDR (calls and texts)
Mobile money transactions
Mobile data transactions
Recharges
Labels (e.g. from a ground-truth survey)
It’s important to anonymize these using the same anonymization salt, to ensure that all of a given subscriber’s data is indexed by the same obfuscated string.
anonymizer.anonymize_cdr()
anonymized_cdr = pd.read_csv(outputs_path / 'outputs' / 'cdr.csv')
anonymized_cdr.head()
txn_type | caller_id | recipient_id | timestamp | duration | caller_antenna | recipient_antenna | international | |
---|---|---|---|---|---|---|---|---|
0 | call | p7OPQLOrvqqYDvVa | 1Z7rQb4GO2jXzbpG | 2020-01-01 00:00:42 | 253.0 | a101 | a54 | domestic |
1 | text | p7OPQLOrVlZYDvVa | AbRlyk74NOBJyqwY | 2020-01-01 00:02:04 | NaN | a44 | a110 | domestic |
2 | text | G5vAyA41G9wqQadO | 1xMmQpJkXaVVz9wa | 2020-01-01 00:02:12 | NaN | a145 | a96 | domestic |
3 | call | nVLoQg1drok6zx3Z | KlVDmV2g29mA7Qvq | 2020-01-01 00:02:23 | 96.0 | a84 | a36 | domestic |
4 | text | bRlykJqR9eAXJQqw | BWvLye4LxnnOQdYG | 2020-01-01 00:03:05 | NaN | a261 | a268 | domestic |
anonymizer.anonymize_mobilemoney()
anonymized_mobilemoney = pd.read_csv(outputs_path / 'outputs' / 'mobilemoney.csv')
anonymized_mobilemoney.head()
txn_type | caller_id | recipient_id | timestamp | amount | sender_balance_before | sender_balance_after | recipient_balance_before | recipient_balance_after | |
---|---|---|---|---|---|---|---|---|---|
0 | p2p | LxA7zdJNn8enQbVY | 35nJzJZNWkPnz16l | 2020-01-01 00:15:51 | 55.163315 | 324.21875 | 269.05542 | 112.54498 | 167.70830 |
1 | cashout | LxA7zdJd7rKPQbVY | NaN | 2020-01-01 00:18:59 | 81.861000 | 185.57620 | 103.71520 | NaN | NaN |
2 | cashin | 0vK4yVJnrmaOy71O | NaN | 2020-01-01 00:28:17 | 29.225048 | 98.63829 | 127.86334 | NaN | NaN |
3 | p2p | O8xoDrk5vX36QEGL | VpAaQawNqaN6ze9q | 2020-01-01 00:28:29 | 45.561913 | 248.71838 | 203.15646 | 212.01639 | 257.57828 |
4 | cashout | NkZlD88rZN0BDG5J | NaN | 2020-01-01 00:31:03 | 55.040770 | 150.88644 | 95.84567 | NaN | NaN |
anonymizer.anonymize_mobiledata()
anonymized_mobiledata = pd.read_csv(outputs_path / 'outputs' / 'mobiledata.csv')
anonymized_mobiledata.head()
caller_id | volume | timestamp | |
---|---|---|---|
0 | mpLEyMkZWmNdDqvN | 91.38652 | 2020-01-01 00:01:15 |
1 | NkZlD88rvpbZDG5J | 118.89835 | 2020-01-01 00:02:27 |
2 | BWvLye4EgqAWQdYG | 67.68214 | 2020-01-01 00:16:33 |
3 | WvLyem297x9l6ydY | 65.52507 | 2020-01-01 00:23:37 |
4 | nVLoQg1drok6zx3Z | 99.46138 | 2020-01-01 00:41:44 |
anonymizer.anonymize_recharges()
anonymized_recharges = pd.read_csv(outputs_path / 'outputs' / 'recharges.csv')
anonymized_recharges.head()
caller_id | amount | timestamp | |
---|---|---|---|
0 | 6aA9yvnL0o61yJMY | 96.0 | 2020-01-01 00:02:47 |
1 | 1xMmQpJ74bMkz9wa | 73.0 | 2020-01-01 00:04:33 |
2 | AbRlyk74R1YJyqwY | 98.0 | 2020-01-01 00:08:36 |
3 | 1xMmQpJk8ALoz9wa | 7.0 | 2020-01-01 00:14:37 |
4 | 1Z7rQb4Nbo1vzbpG | 76.0 | 2020-01-01 00:24:36 |
anonymizer.anonymize_labels()
anonymized_labels = pd.read_csv(outputs_path / 'outputs' / 'labels.csv')
anonymized_labels.head()
name | label | weight | |
---|---|---|---|
0 | BKGnyZoaegLxz9Za | 18760 | 3.711266 |
1 | 6aA9yvnL8aqOyJMY | 21897 | 69.385541 |
2 | Bj0nDOWV1VNbDK2O | 20663 | 67.035003 |
3 | eALxDXe0WKplQdlp | 19756 | 43.047178 |
4 | p7OPQLOYpMKXDvVa | 19978 | 76.778898 |
raw_cdr_file = pd.read_csv(datastore.cfg.path.input_data.file_paths.cdr)
raw_recharges_file = pd.read_csv(datastore.cfg.path.input_data.file_paths.recharges)
raw_mobiledata_file = pd.read_csv(datastore.cfg.path.input_data.file_paths.mobiledata)
raw_mobilemoney_file = pd.read_csv(datastore.cfg.path.input_data.file_paths.mobilemoney)
raw_labels_file = pd.read_csv(datastore.cfg.path.input_data.file_paths.labels)
Anonymize features#
We can also anonymize featurized data. This is not necessary (nor will it work) if the features are computed using already-anonymized data.
anonymizer.anonymize_features()
anonymized_features = pd.read_csv(outputs_path / 'outputs' / 'features.csv')
anonymized_features.head()
name | active_days_allweek_allday | active_days_allweek_day | active_days_allweek_night | active_days_weekday_allday | active_days_weekday_day | active_days_weekday_night | active_days_weekend_allday | active_days_weekend_day | active_days_weekend_night | ... | mobilemoney_outgoing_p2p_amount_min | mobilemoney_outgoing_p2p_amount_max | mobilemoney_outgoing_p2p_balance_before_mean | mobilemoney_outgoing_p2p_balance_before_min | mobilemoney_outgoing_p2p_balance_before_max | mobilemoney_outgoing_p2p_balance_after_mean | mobilemoney_outgoing_p2p_balance_after_min | mobilemoney_outgoing_p2p_balance_after_max | mobilemoney_outgoing_p2p_txns | mobilemoney_outgoing_p2p_contacts | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | xMmQpj25daOLRz9w | 54 | 46 | 46 | 38 | 33 | 32 | 16 | 13 | 14 | ... | 30.222473 | 70.803710 | 206.221286 | 180.20338 | 239.67809 | 158.092635 | 109.39968 | 185.19756 | 6.0 | 6.0 |
1 | Na6Q4xNdBX5RdQxw | 56 | 51 | 44 | 41 | 36 | 32 | 15 | 15 | 12 | ... | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
2 | nGeQjv2jeVrRBQXW | 57 | 45 | 45 | 41 | 34 | 32 | 16 | 11 | 13 | ... | 32.282750 | 66.296750 | 194.259438 | 136.78952 | 285.25427 | 147.203631 | 70.49276 | 252.97151 | 6.0 | 6.0 |
3 | 5vAyA0NZP6oZ6Qad | 54 | 47 | 46 | 40 | 35 | 33 | 14 | 12 | 13 | ... | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
4 | pLEyMlebKR37oQqv | 53 | 41 | 44 | 39 | 31 | 33 | 14 | 10 | 11 | ... | 43.527973 | 63.519386 | 191.142494 | 132.54959 | 297.36365 | 138.052689 | 69.03021 | 247.31479 | 6.0 | 6.0 |
5 rows × 1122 columns
Use a custom format checker#
It is a good idea to check the format of numbers as you anonymize them to make sure they have been processed according to expectations. Because the anonymizer uses a hash function, similar numbers will not result in similar anonymized strings. For example, the numbers 1234567, 01234567, and 880 1234567 will all result in completely different anonymized strings.
The Anonymizer object accepts a format checker when it’s constructed. The checker will be evaluated on inputs as strings, and should return True
if the format is acceptable and False
if not.
Anonymization will fail if any number fails this check. So we encourage you to clean your data prior to passing it through this step.
This demo is based on a simplified version of Bangladesh’s mobile phone number format. We’re assuming that leading “0”, “880”, and “+” are removed, which means all numbers will be ten digits and will not start with “0”.
It’s important to be consistent in how phone numbers are provided. For example, if a number is represented differently based on where the call is dialed from, it’s necessary to normalize: Perhaps by stripping the country code and also the domestic prefix (here ‘0’), and by stripping the operator prefix. The anonymizer does not intelligently map two representations of the same number to the same string. Cider expects information about whether calls/texts are domestic or international to be provided. So it’s likely users will already have checked number prefixes, and perhaps removed them.
def is_valid_mobile_number(x):
if any(c.isdigit() == False for c in x):
print(x, " has non-numeric characters. Please remove these.")
return False
if x[0] == "0":
print(x, "starts with 0. Please strip leading zeros from numbers.")
return False
sl = len(x)
if sl != 10:
print("Valid numbers have ten digits;", x, "has", sl)
return False
return True
anonymizer_with_check = Anonymizer(datastore=datastore, format_checker=is_valid_mobile_number)
Loading CDR...
Loading recharges...
SUCCESS!
Loading mobile data...
Loading mobile money...
The synthetic data we are working with has not been processed to remove leading “880”s, “0”s, or “+”s, so when we try to anonymize the CDR data, an error is raised notifying the user that phone numbers haven’t been processed as expected.
print(datastore.cdr.toPandas().head())
anonymizer_with_check.anonymize_cdr()
/Users/mlevy/miniconda3/envs/myenv_x86/lib/python3.8/site-packages/pyspark/sql/pandas/conversion.py:248: FutureWarning: Passing unit-less datetime64 dtype to .astype is deprecated and will raise in a future version. Pass 'datetime64[ns]' instead
series = series.astype(t, copy=False)
/Users/mlevy/miniconda3/envs/myenv_x86/lib/python3.8/site-packages/pyspark/sql/pandas/conversion.py:248: FutureWarning: Passing unit-less datetime64 dtype to .astype is deprecated and will raise in a future version. Pass 'datetime64[ns]' instead
series = series.astype(t, copy=False)
txn_type caller_id recipient_id timestamp duration \
0 call 00180479117 00186879451 2020-01-01 00:00:42 253.0
1 text 00181616517 00185633871 2020-01-01 00:02:04 NaN
2 text 00189653293 00186624501 2020-01-01 00:02:12 NaN
3 call 00183331706 +8800186773263 2020-01-01 00:02:23 96.0
4 text +8800189721427 00188688509 2020-01-01 00:03:05 NaN
caller_antenna recipient_antenna international day
0 a101 a54 domestic 2020-01-01
1 a44 a110 domestic 2020-01-01
2 a145 a96 domestic 2020-01-01
3 a84 a36 domestic 2020-01-01
4 a261 a268 domestic 2020-01-01
Valid numbers have ten digits; 8800180789080 has 13
00180479117 starts with 0. Please strip leading zeros from numbers.
23/05/24 13:55:20 ERROR Executor: Exception in task 0.0 in stage 49.0 (TID 45)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/mlevy/Dropbox/professional/givedirectly/cider/cider/anonymizer.py", line 142, in <lambda>
lambda raw: Anonymizer._check_identifier_format_and_hash(raw, encoder, format_checker), StringType()
File "/Users/mlevy/Dropbox/professional/givedirectly/cider/cider/anonymizer.py", line 197, in _check_identifier_format_and_hash
raise ValueError(f'Bad input to anonymization: {raw_string} rejected by provided format format_checker.')
ValueError: Bad input to anonymization: 00180479117 rejected by provided format format_checker.
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:552)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:505)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
23/05/24 13:55:20 ERROR Executor: Exception in task 1.0 in stage 49.0 (TID 46)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/mlevy/Dropbox/professional/givedirectly/cider/cider/anonymizer.py", line 142, in <lambda>
lambda raw: Anonymizer._check_identifier_format_and_hash(raw, encoder, format_checker), StringType()
File "/Users/mlevy/Dropbox/professional/givedirectly/cider/cider/anonymizer.py", line 197, in _check_identifier_format_and_hash
raise ValueError(f'Bad input to anonymization: {raw_string} rejected by provided format format_checker.')
ValueError: Bad input to anonymization: 8800180789080 rejected by provided format format_checker.
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:552)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:505)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
23/05/24 13:55:20 ERROR TaskSetManager: Task 0 in stage 49.0 failed 1 times; aborting job
---------------------------------------------------------------------------
PythonException Traceback (most recent call last)
Cell In[11], line 2
1 print(datastore.cdr.toPandas().head())
----> 2 anonymizer_with_check.anonymize_cdr()
File ~/Dropbox/professional/givedirectly/cider/cider/anonymizer.py:99, in Anonymizer.anonymize_cdr(self)
98 def anonymize_cdr(self):
---> 99 self._anonymize_dataset('cdr', ['caller_id', 'recipient_id'])
File ~/Dropbox/professional/givedirectly/cider/cider/anonymizer.py:157, in Anonymizer._anonymize_dataset(self, dataset_name, column_names)
154 if 'day' in dataset_with_anonymized_columns.columns:
155 dataset_with_anonymized_columns = dataset_with_anonymized_columns.drop(col('day'))
--> 157 save_df(dataset_with_anonymized_columns, self.outputs_path / 'outputs' / f'{dataset_name}.csv')
File ~/Dropbox/professional/givedirectly/cider/helpers/utils.py:112, in save_df(df, out_file_path, sep)
107 temp_folder = out_file_path.parent / 'temp'
109 # Ask spark to write output there. The repartition(1) call will tell spark to write a single file.
110 # It will name it with some meaningless partition name, but we can find it easily bc it's the only
111 # csv in the temp directory.
--> 112 df.repartition(1).write.csv(path=str(temp_folder), mode="overwrite", header="true", sep=sep)
113 spark_generated_file_name = [
114 fname for fname in os.listdir(temp_folder) if os.path.splitext(fname)[1] == '.csv'
115 ][0]
117 # move the file out of the temporary directory and rename it
File ~/miniconda3/envs/myenv_x86/lib/python3.8/site-packages/pyspark/sql/readwriter.py:1240, in DataFrameWriter.csv(self, path, mode, compression, sep, quote, escape, header, nullValue, escapeQuotes, quoteAll, dateFormat, timestampFormat, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, charToEscapeQuoteEscaping, encoding, emptyValue, lineSep)
1221 self.mode(mode)
1222 self._set_opts(
1223 compression=compression,
1224 sep=sep,
(...)
1238 lineSep=lineSep,
1239 )
-> 1240 self._jwrite.csv(path)
File ~/miniconda3/envs/myenv_x86/lib/python3.8/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
1315 command = proto.CALL_COMMAND_NAME +\
1316 self.command_header +\
1317 args_command +\
1318 proto.END_COMMAND_PART
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:
1325 temp_arg._detach()
File ~/miniconda3/envs/myenv_x86/lib/python3.8/site-packages/pyspark/sql/utils.py:196, in capture_sql_exception.<locals>.deco(*a, **kw)
192 converted = convert_exception(e.java_exception)
193 if not isinstance(converted, UnknownException):
194 # Hide where the exception came from that shows a non-Pythonic
195 # JVM exception message.
--> 196 raise converted from None
197 else:
198 raise
PythonException:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/Users/mlevy/Dropbox/professional/givedirectly/cider/cider/anonymizer.py", line 142, in <lambda>
lambda raw: Anonymizer._check_identifier_format_and_hash(raw, encoder, format_checker), StringType()
File "/Users/mlevy/Dropbox/professional/givedirectly/cider/cider/anonymizer.py", line 197, in _check_identifier_format_and_hash
raise ValueError(f'Bad input to anonymization: {raw_string} rejected by provided format format_checker.')
ValueError: Bad input to anonymization: 00180479117 rejected by provided format format_checker.